Apply Noise Reduction to Audio for Streaming Speech-to-Text

This guide demonstrates how to implement a noise reduction system for real-time audio transcription using AssemblyAI’s Streaming STT and the noisereduce library. You’ll learn how to create a custom audio pipeline that preprocesses incoming audio to remove background noise before it reaches the transcription service.

This solution is particularly valuable for:

  • Voice assistants operating in noisy environments
  • Customer service applications processing calls
  • Meeting transcription tools
  • Voice-enabled applications requiring high accuracy

The implementation uses Python and combines proven audio processing techniques with AssemblyAI’s powerful transcription capabilities. While our example focuses on microphone input, the principles can be applied to any real-time audio stream.

Quickstart

1import pyaudio
2import websocket
3import json
4import os
5import threading
6import time
7import numpy as np
8import noisereduce as nr
9from urllib.parse import urlencode
10from datetime import datetime
11
12# --- Configuration ---
13ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
14
15CONNECTION_PARAMS = {
16 "speech_model": "u3-rt-pro",
17 "sample_rate": 16000,
18}
19API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
20API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
21
22# Audio Configuration
23FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
24SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
25CHANNELS = 1
26FORMAT = pyaudio.paInt16
27
28# Noise reduction configuration
29NOISE_BUFFER_SECONDS = 0.5
30NOISE_BUFFER_SIZE = int(SAMPLE_RATE * NOISE_BUFFER_SECONDS)
31
32# Global variables for audio stream and websocket
33audio = None
34stream = None
35ws_app = None
36audio_thread = None
37stop_event = threading.Event()
38
39# --- WebSocket Event Handlers ---
40
41def on_open(ws):
42 """Called when the WebSocket connection is established."""
43 print("WebSocket connection opened.")
44 print(f"Connected to: {API_ENDPOINT}")
45
46 def stream_audio():
47 global stream
48 print("Starting audio streaming with noise reduction...")
49 buffer = np.array([], dtype=np.int16)
50 overlap = 1024
51 has_overlap = False
52
53 while not stop_event.is_set():
54 try:
55 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
56 audio_array = np.frombuffer(audio_data, dtype=np.int16)
57 buffer = np.append(buffer, audio_array)
58
59 if len(buffer) >= NOISE_BUFFER_SIZE:
60 # Apply noise reduction
61 float_audio = buffer.astype(np.float32) / 32768.0
62 denoised = nr.reduce_noise(
63 y=float_audio,
64 sr=SAMPLE_RATE,
65 prop_decrease=0.75,
66 n_fft=1024,
67 )
68 int_audio = (denoised * 32768.0).astype(np.int16)
69
70 # Send only the non-overlapping portion to avoid duplicate audio
71 if has_overlap:
72 ws.send(int_audio[overlap:].tobytes(), websocket.ABNF.OPCODE_BINARY)
73 else:
74 ws.send(int_audio.tobytes(), websocket.ABNF.OPCODE_BINARY)
75 has_overlap = True
76
77 # Keep some overlap for continuity
78 buffer = buffer[-overlap:]
79 except Exception as e:
80 print(f"Error streaming audio: {e}")
81 break
82 print("Audio streaming stopped.")
83
84 global audio_thread
85 audio_thread = threading.Thread(target=stream_audio)
86 audio_thread.daemon = True
87 audio_thread.start()
88
89def on_message(ws, message):
90 try:
91 data = json.loads(message)
92 msg_type = data.get('type')
93
94 if msg_type == "Begin":
95 session_id = data.get('id')
96 expires_at = data.get('expires_at')
97 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
98 elif msg_type == "Turn":
99 transcript = data.get('transcript', '')
100 if data.get('end_of_turn'):
101 print('\r' + ' ' * 80 + '\r', end='')
102 print(transcript)
103 else:
104 print(f"\r{transcript}", end='')
105 elif msg_type == "Termination":
106 audio_duration = data.get('audio_duration_seconds', 0)
107 session_duration = data.get('session_duration_seconds', 0)
108 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
109 except json.JSONDecodeError as e:
110 print(f"Error decoding message: {e}")
111 except Exception as e:
112 print(f"Error handling message: {e}")
113
114def on_error(ws, error):
115 """Called when a WebSocket error occurs."""
116 print(f"\nWebSocket Error: {error}")
117 stop_event.set()
118
119
120def on_close(ws, close_status_code, close_msg):
121 """Called when the WebSocket connection is closed."""
122 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
123
124 global stream, audio
125 stop_event.set()
126
127 if stream:
128 if stream.is_active():
129 stream.stop_stream()
130 stream.close()
131 stream = None
132 if audio:
133 audio.terminate()
134 audio = None
135 if audio_thread and audio_thread.is_alive():
136 audio_thread.join(timeout=1.0)
137
138# --- Main Execution ---
139def run():
140 global audio, stream, ws_app
141
142 # Initialize PyAudio
143 audio = pyaudio.PyAudio()
144
145 # Open microphone stream
146 try:
147 stream = audio.open(
148 input=True,
149 frames_per_buffer=FRAMES_PER_BUFFER,
150 channels=CHANNELS,
151 format=FORMAT,
152 rate=SAMPLE_RATE,
153 )
154 print("Microphone stream opened successfully.")
155 print("Speak into your microphone. Press Ctrl+C to stop.")
156 print("Audio will be noise-reduced before transcription.")
157 except Exception as e:
158 print(f"Error opening microphone stream: {e}")
159 if audio:
160 audio.terminate()
161 return
162
163 # Create WebSocketApp
164 ws_app = websocket.WebSocketApp(
165 API_ENDPOINT,
166 header={"Authorization": ASSEMBLYAI_API_KEY},
167 on_open=on_open,
168 on_message=on_message,
169 on_error=on_error,
170 on_close=on_close,
171 )
172
173 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
174 ws_thread = threading.Thread(target=ws_app.run_forever)
175 ws_thread.daemon = True
176 ws_thread.start()
177
178 try:
179 # Keep main thread alive until interrupted
180 while ws_thread.is_alive():
181 time.sleep(0.1)
182 except KeyboardInterrupt:
183 print("\nCtrl+C received. Stopping...")
184 stop_event.set()
185
186 # Send termination message to the server
187 if ws_app and ws_app.sock and ws_app.sock.connected:
188 try:
189 terminate_message = {"type": "Terminate"}
190 print(f"Sending termination message: {json.dumps(terminate_message)}")
191 ws_app.send(json.dumps(terminate_message))
192 time.sleep(5)
193 except Exception as e:
194 print(f"Error sending termination message: {e}")
195
196 if ws_app:
197 ws_app.close()
198
199 ws_thread.join(timeout=2.0)
200
201 except Exception as e:
202 print(f"\nAn unexpected error occurred: {e}")
203 stop_event.set()
204 if ws_app:
205 ws_app.close()
206 ws_thread.join(timeout=2.0)
207
208 finally:
209 if stream and stream.is_active():
210 stream.stop_stream()
211 if stream:
212 stream.close()
213 if audio:
214 audio.terminate()
215 print("Cleanup complete. Exiting.")
216
217
218if __name__ == "__main__":
219 run()

Step-by-step guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for an AssemblyAI account and get your API key from your dashboard. Please note that Streaming Speech-to-text is available for upgraded accounts only. If you’re on the free plan, you’ll need to upgrade your account by adding a credit card.

Install and import packages

Install the required packages:

$pip install websocket-client pyaudio noisereduce numpy

Import packages and set your API key.

1import pyaudio
2import websocket
3import json
4import os
5import threading
6import time
7import numpy as np
8import noisereduce as nr
9from urllib.parse import urlencode
10from datetime import datetime
11
12ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]

Make sure not to share this token with anyone - it is a private key associated uniquely to your account.

Audio configuration and global variables

Set all of your audio configurations and global variables. The NOISE_BUFFER_SIZE controls how much audio is buffered before applying noise reduction — 0.5 seconds provides a good balance between latency and noise reduction quality.

1CONNECTION_PARAMS = {
2 "speech_model": "u3-rt-pro",
3 "sample_rate": 16000,
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8# Audio Configuration
9FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
10SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
11CHANNELS = 1
12FORMAT = pyaudio.paInt16
13
14# Noise reduction configuration
15NOISE_BUFFER_SECONDS = 0.5
16NOISE_BUFFER_SIZE = int(SAMPLE_RATE * NOISE_BUFFER_SECONDS)
17
18# Global variables for audio stream and websocket
19audio = None
20stream = None
21ws_app = None
22audio_thread = None
23stop_event = threading.Event()

WebSocket event handlers

Open WebSocket

When the connection opens, we start a background thread that reads audio from the microphone, buffers it, applies noise reduction using noisereduce, and sends the denoised audio to AssemblyAI.

The noise reduction works by:

  1. Accumulating raw audio samples into a buffer
  2. Once the buffer reaches 0.5 seconds, converting to float and applying nr.reduce_noise()
  3. Converting back to int16 and sending over the WebSocket
  4. Keeping the last 1024 samples as overlap for continuity, and only sending the non-overlapping portion to avoid duplicate audio
1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Connected to: {API_ENDPOINT}")
5
6 def stream_audio():
7 global stream
8 print("Starting audio streaming with noise reduction...")
9 buffer = np.array([], dtype=np.int16)
10 overlap = 1024
11 has_overlap = False
12
13 while not stop_event.is_set():
14 try:
15 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
16 audio_array = np.frombuffer(audio_data, dtype=np.int16)
17 buffer = np.append(buffer, audio_array)
18
19 if len(buffer) >= NOISE_BUFFER_SIZE:
20 # Apply noise reduction
21 float_audio = buffer.astype(np.float32) / 32768.0
22 denoised = nr.reduce_noise(
23 y=float_audio,
24 sr=SAMPLE_RATE,
25 prop_decrease=0.75,
26 n_fft=1024,
27 )
28 int_audio = (denoised * 32768.0).astype(np.int16)
29
30 # Send only the non-overlapping portion to avoid duplicate audio
31 if has_overlap:
32 ws.send(int_audio[overlap:].tobytes(), websocket.ABNF.OPCODE_BINARY)
33 else:
34 ws.send(int_audio.tobytes(), websocket.ABNF.OPCODE_BINARY)
35 has_overlap = True
36
37 # Keep some overlap for continuity
38 buffer = buffer[-overlap:]
39 except Exception as e:
40 print(f"Error streaming audio: {e}")
41 break
42 print("Audio streaming stopped.")
43
44 global audio_thread
45 audio_thread = threading.Thread(target=stream_audio)
46 audio_thread.daemon = True
47 audio_thread.start()

Handle WebSocket messages

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 if data.get('end_of_turn'):
13 print('\r' + ' ' * 80 + '\r', end='')
14 print(transcript)
15 else:
16 print(f"\r{transcript}", end='')
17 elif msg_type == "Termination":
18 audio_duration = data.get('audio_duration_seconds', 0)
19 session_duration = data.get('session_duration_seconds', 0)
20 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
21 except json.JSONDecodeError as e:
22 print(f"Error decoding message: {e}")
23 except Exception as e:
24 print(f"Error handling message: {e}")

WebSocket error and close handlers

1def on_error(ws, error):
2 """Called when a WebSocket error occurs."""
3 print(f"\nWebSocket Error: {error}")
4 stop_event.set()
5
6
7def on_close(ws, close_status_code, close_msg):
8 """Called when the WebSocket connection is closed."""
9 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
10
11 global stream, audio
12 stop_event.set()
13
14 if stream:
15 if stream.is_active():
16 stream.stop_stream()
17 stream.close()
18 stream = None
19 if audio:
20 audio.terminate()
21 audio = None
22 if audio_thread and audio_thread.is_alive():
23 audio_thread.join(timeout=1.0)

Begin streaming STT transcription

Open the microphone, connect the WebSocket, and start streaming noise-reduced audio.

1def run():
2 global audio, stream, ws_app
3
4 # Initialize PyAudio
5 audio = pyaudio.PyAudio()
6
7 # Open microphone stream
8 try:
9 stream = audio.open(
10 input=True,
11 frames_per_buffer=FRAMES_PER_BUFFER,
12 channels=CHANNELS,
13 format=FORMAT,
14 rate=SAMPLE_RATE,
15 )
16 print("Microphone stream opened successfully.")
17 print("Speak into your microphone. Press Ctrl+C to stop.")
18 print("Audio will be noise-reduced before transcription.")
19 except Exception as e:
20 print(f"Error opening microphone stream: {e}")
21 if audio:
22 audio.terminate()
23 return
24
25 # Create WebSocketApp
26 ws_app = websocket.WebSocketApp(
27 API_ENDPOINT,
28 header={"Authorization": ASSEMBLYAI_API_KEY},
29 on_open=on_open,
30 on_message=on_message,
31 on_error=on_error,
32 on_close=on_close,
33 )
34
35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
36 ws_thread = threading.Thread(target=ws_app.run_forever)
37 ws_thread.daemon = True
38 ws_thread.start()
39
40 try:
41 while ws_thread.is_alive():
42 time.sleep(0.1)
43 except KeyboardInterrupt:
44 print("\nCtrl+C received. Stopping...")
45 stop_event.set()
46
47 if ws_app and ws_app.sock and ws_app.sock.connected:
48 try:
49 terminate_message = {"type": "Terminate"}
50 print(f"Sending termination message: {json.dumps(terminate_message)}")
51 ws_app.send(json.dumps(terminate_message))
52 time.sleep(5)
53 except Exception as e:
54 print(f"Error sending termination message: {e}")
55
56 if ws_app:
57 ws_app.close()
58
59 ws_thread.join(timeout=2.0)
60
61 except Exception as e:
62 print(f"\nAn unexpected error occurred: {e}")
63 stop_event.set()
64 if ws_app:
65 ws_app.close()
66 ws_thread.join(timeout=2.0)
67
68 finally:
69 if stream and stream.is_active():
70 stream.stop_stream()
71 if stream:
72 stream.close()
73 if audio:
74 audio.terminate()
75 print("Cleanup complete. Exiting.")
76
77
78if __name__ == "__main__":
79 run()

You can press Ctrl+C to stop the transcription.