Skip to main content

Documentation Index

Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

This guide demonstrates how to implement a noise reduction system for real-time audio transcription using AssemblyAI’s Streaming STT and the noisereduce library. You’ll learn how to create a custom audio pipeline that preprocesses incoming audio to remove background noise before it reaches the transcription service. This solution is particularly valuable for:
  • Voice assistants operating in noisy environments
  • Customer service applications processing calls
  • Meeting transcription tools
  • Voice-enabled applications requiring high accuracy
The implementation uses Python and combines proven audio processing techniques with AssemblyAI’s powerful transcription capabilities. While our example focuses on microphone input, the principles can be applied to any real-time audio stream.

Quickstart

import pyaudio
import websocket
import json
import os
import threading
import time
import numpy as np
import noisereduce as nr
from urllib.parse import urlencode
from datetime import datetime

# --- Configuration ---
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]

CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Audio Configuration
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

# Noise reduction configuration
NOISE_BUFFER_SECONDS = 0.5
NOISE_BUFFER_SIZE = int(SAMPLE_RATE * NOISE_BUFFER_SECONDS)

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()

# --- WebSocket Event Handlers ---

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_audio():
        global stream
        print("Starting audio streaming with noise reduction...")
        buffer = np.array([], dtype=np.int16)
        overlap = 1024
        has_overlap = False

        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                audio_array = np.frombuffer(audio_data, dtype=np.int16)
                buffer = np.append(buffer, audio_array)

                if len(buffer) >= NOISE_BUFFER_SIZE:
                    # Apply noise reduction
                    float_audio = buffer.astype(np.float32) / 32768.0
                    denoised = nr.reduce_noise(
                        y=float_audio,
                        sr=SAMPLE_RATE,
                        prop_decrease=0.75,
                        n_fft=1024,
                    )
                    int_audio = (denoised * 32768.0).astype(np.int16)

                    # Send only the non-overlapping portion to avoid duplicate audio
                    if has_overlap:
                        ws.send(int_audio[overlap:].tobytes(), websocket.ABNF.OPCODE_BINARY)
                    else:
                        ws.send(int_audio.tobytes(), websocket.ABNF.OPCODE_BINARY)
                        has_overlap = True

                    # Keep some overlap for continuity
                    buffer = buffer[-overlap:]
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)
            else:
                print(f"\r{transcript}", end='')
        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")

    global stream, audio
    stop_event.set()

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)

# --- Main Execution ---
def run():
    global audio, stream, ws_app

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
        print("Audio will be noise-reduced before transcription.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        # Keep main thread alive until interrupted
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        # Send termination message to the server
        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message: {json.dumps(terminate_message)}")
                ws_app.send(json.dumps(terminate_message))
                time.sleep(5)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()

        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()

Step-by-step guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for an AssemblyAI account and get your API key from your dashboard. Please note that Streaming Speech-to-text is available for upgraded accounts only. If you’re on the free plan, you’ll need to upgrade your account by adding a credit card.

Install and import packages

Install the required packages:
pip install websocket-client pyaudio noisereduce numpy
Import packages and set your API key.
import pyaudio
import websocket
import json
import os
import threading
import time
import numpy as np
import noisereduce as nr
from urllib.parse import urlencode
from datetime import datetime

ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
Make sure not to share this token with anyone - it is a private key associated uniquely to your account.

Audio configuration and global variables

Set all of your audio configurations and global variables. The NOISE_BUFFER_SIZE controls how much audio is buffered before applying noise reduction — 0.5 seconds provides a good balance between latency and noise reduction quality.
CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Audio Configuration
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

# Noise reduction configuration
NOISE_BUFFER_SECONDS = 0.5
NOISE_BUFFER_SIZE = int(SAMPLE_RATE * NOISE_BUFFER_SECONDS)

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()

WebSocket event handlers

Open WebSocket

When the connection opens, we start a background thread that reads audio from the microphone, buffers it, applies noise reduction using noisereduce, and sends the denoised audio to AssemblyAI. The noise reduction works by:
  1. Accumulating raw audio samples into a buffer
  2. Once the buffer reaches 0.5 seconds, converting to float and applying nr.reduce_noise()
  3. Converting back to int16 and sending over the WebSocket
  4. Keeping the last 1024 samples as overlap for continuity, and only sending the non-overlapping portion to avoid duplicate audio
def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_audio():
        global stream
        print("Starting audio streaming with noise reduction...")
        buffer = np.array([], dtype=np.int16)
        overlap = 1024
        has_overlap = False

        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                audio_array = np.frombuffer(audio_data, dtype=np.int16)
                buffer = np.append(buffer, audio_array)

                if len(buffer) >= NOISE_BUFFER_SIZE:
                    # Apply noise reduction
                    float_audio = buffer.astype(np.float32) / 32768.0
                    denoised = nr.reduce_noise(
                        y=float_audio,
                        sr=SAMPLE_RATE,
                        prop_decrease=0.75,
                        n_fft=1024,
                    )
                    int_audio = (denoised * 32768.0).astype(np.int16)

                    # Send only the non-overlapping portion to avoid duplicate audio
                    if has_overlap:
                        ws.send(int_audio[overlap:].tobytes(), websocket.ABNF.OPCODE_BINARY)
                    else:
                        ws.send(int_audio.tobytes(), websocket.ABNF.OPCODE_BINARY)
                        has_overlap = True

                    # Keep some overlap for continuity
                    buffer = buffer[-overlap:]
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

Handle WebSocket messages

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)
            else:
                print(f"\r{transcript}", end='')
        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

WebSocket error and close handlers

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")

    global stream, audio
    stop_event.set()

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)

Begin streaming STT transcription

Open the microphone, connect the WebSocket, and start streaming noise-reduced audio.
def run():
    global audio, stream, ws_app

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
        print("Audio will be noise-reduced before transcription.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message: {json.dumps(terminate_message)}")
                ws_app.send(json.dumps(terminate_message))
                time.sleep(5)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()

        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
You can press Ctrl+C to stop the transcription.