Skip to main content

Documentation Index

Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

This cookbook guides you through migrating from AssemblyAI’s legacy Streaming STT model (v2) to our latest Universal Streaming STT model (v3), which provides ultra-low latency for faster transcription, intelligent endpointing for more natural speech detection, and improved accuracy across various audio conditions. Check out this blog post to learn more about this new model!

Overview of changes

The migration involves several key improvements:
  • API Version: Upgrade from v2 (/v2/realtime/ws) to v3 (/v3/ws)
  • Enhanced Error Handling: Robust cleanup and resource management
  • Improved Threading: Better control over audio streaming threads
  • Modern Message Format: Updated message types and structure
  • Configuration Options: More flexible connection parameters
  • Graceful Shutdown: Proper termination handling
You can follow the step-by-step guide below to make changes to your existing code but here is what your code should look like in the end:
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
from datetime import datetime

# --- Configuration ---
YOUR_API_KEY = "YOUR-API-KEY"  # Replace with your actual API key

CONNECTION_PARAMS = {
    "sample_rate": 16000,
    "format_turns": True,  # Request formatted final transcripts
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Audio Configuration
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()  # To signal the audio thread to stop

# --- WebSocket Event Handlers ---


def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    # Start sending audio data in a separate thread
    def stream_audio():
        global stream
        print("Starting audio streaming...")
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                # Send audio data as binary message
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                # If stream read fails, likely means it's closed, stop the loop
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = (
        True  # Allow main thread to exit even if this thread is running
    )
    audio_thread.start()

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)
            else:
                print(f"\r{transcript}", end='')
        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    # Attempt to signal stop on error
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    # Ensure audio resources are released
    global stream, audio
    stop_event.set()  # Signal audio thread just in case it's still running

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    # Try to join the audio thread to ensure clean exit
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)


# --- Main Execution ---
def run():
    global audio, stream, ws_app

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return  # Exit if microphone cannot be opened

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": YOUR_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        # Keep main thread alive until interrupted
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()  # Signal audio thread to stop

        # Send termination message to the server
        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message: {json.dumps(terminate_message)}")
                ws_app.send(json.dumps(terminate_message))
                # Give a moment for messages to process before forceful close
                time.sleep(5)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        # Close the WebSocket connection (will trigger on_close)
        if ws_app:
            ws_app.close()

        # Wait for WebSocket thread to finish
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        # Final cleanup (already handled in on_close, but good as a fallback)
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
For more information on our Universal Streaming feature, see this section of our official documentation.

Step-by-step migration guide

1. Update API endpoint and configuration

Before (v2):
ws = websocket.WebSocketApp(
    f'wss://api.assemblyai.com/v2/realtime/ws?sample_rate={SAMPLE_RATE}',
    header={'Authorization': YOUR_API_KEY},
    on_message=on_message,
    on_open=on_open,
    on_error=on_error,
    on_close=on_close
)
After (v3):
CONNECTION_PARAMS = {
    "sample_rate": 16000,
    "format_turns": True,  # Request formatted final transcripts
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

ws_app = websocket.WebSocketApp(
    API_ENDPOINT,
    header={"Authorization": YOUR_API_KEY},
    # ...
)
Key Changes:
  • New base URL: streaming.assemblyai.com instead of api.assemblyai.com
  • Version upgrade: /v3/ws instead of /v2/realtime/ws
  • Configuration via URL parameters using urlencode()
  • Added format_turns option for better transcript formatting

2. Improve audio configuration

Before (v2):
FRAMES_PER_BUFFER = 3200  # 200ms of audio
SAMPLE_RATE = 16000
CHANNELS = 1
FORMAT = pyaudio.paInt16
After (v3):
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
Key Changes:
  • Reduced buffer size from 200ms to 50ms for lower latency
  • Sample rate now references the configuration parameter
  • Added detailed comments explaining the calculations

3. Enhance thread management

Before (v2):
def on_open(ws):
    def stream_audio():
        while True:
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f'\nError streaming audio: {e}')
                break

    audio_thread = Thread(target=stream_audio, daemon=True)
    audio_thread.start()
After (v3):
# Global variables for better resource management
stop_event = threading.Event()
audio_thread = None

def on_open(ws):
    def stream_audio():
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()
Key Changes:
  • Added threading.Event() for controlled thread termination
  • Global audio_thread variable for better lifecycle management
  • Condition-based loop (while not stop_event.is_set()) instead of infinite loop
  • Improved error handling and logging

4. Update message handling

Before (v2):
def on_message(ws, message):
    try:
        msg = json.loads(message)
        msg_type = msg.get('message_type')

        if msg_type == 'SessionBegins':
            session_id = msg.get('session_id')
            print("Session ID:", session_id)
            return

        text = msg.get('text', '')
        if not text:
            return

        if msg_type == 'PartialTranscript':
            print(text, end='\r')
        elif msg_type == 'FinalTranscript':
            print(text, end='\r\n')
        elif msg_type == 'error':
            print(f'\nError: {msg.get("error", "Unknown error")}')
    except Exception as e:
        print(f'\nError handling message: {e}')
After (v3):
def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)
            else:
                print(f"\r{transcript}", end='')
        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")
Key Changes:
  • Message types renamed: SessionBeginsBegin, PartialTranscript/FinalTranscriptTurn
  • Field names updated: message_typetype, session_idid, texttranscript
  • Added session expiration timestamp handling
  • Improved transcript formatting with turn_is_formatted flag
  • Added Termination message handling with session statistics
  • Enhanced error handling with specific JSONDecodeError catch

5. Implement robust resource management

Before (v2):
def on_close(ws, status, msg):
    stream.stop_stream()
    stream.close()
    audio.terminate()
    print('\nDisconnected')

# Global audio resources (potential for memory leaks)
audio = pyaudio.PyAudio()
stream = audio.open(...)
After (v3):
def on_close(ws, close_status_code, close_msg):
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    global stream, audio
    stop_event.set()  # Signal audio thread to stop

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    # Ensure audio thread cleanup
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)

def on_error(ws, error):
    print(f"\nWebSocket Error: {error}")
    stop_event.set()  # Signal threads to stop on error
Key Changes:
  • Added thread stop signaling via stop_event.set()
  • Conditional resource cleanup with null checks
  • Proper thread joining with timeout
  • Resource nullification to prevent reuse
  • Enhanced error handling in on_error

6. Add graceful shutdown handling

Before (v2):
try:
    ws.run_forever()
except Exception as e:
    print(f'\nError: {e}')
After (v3):
def run():
    # ... initialization code ...

    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        # Send termination message
        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                ws_app.send(json.dumps(terminate_message))
                time.sleep(5)  # Allow message processing
            except Exception as e:
                print(f"Error sending termination message: {e}")

        ws_app.close()
        ws_thread.join(timeout=2.0)
    finally:
        # Final cleanup
        # ... resource cleanup code ...
        pass
Key Changes:
  • WebSocket runs in separate thread for better control
  • Proper KeyboardInterrupt handling
  • Graceful termination message sending
  • Thread joining with timeouts
  • Comprehensive cleanup in finally block
Streaming is billed on the total duration that your WebSocket connection stays open (session duration), not on the amount of audio sent. Always terminate sessions explicitly to avoid being billed for idle time — unterminated sessions auto-close after 3 hours and are billed for the full duration. See Streaming Speech-to-Text billing for details.

7. Improve error handling and logging

Before (v2):
  • Basic error printing
  • Limited context in error messages
  • No resource cleanup on errors
After (v3):
  • Detailed error context and timestamps
  • Proper exception type handling
  • Resource cleanup on all error paths
  • Connection status checking before operations

Migration checklist

  • Update API endpoint from v2 to v3
  • Change base URL to streaming.assemblyai.com
  • Update message type handling (Begin, Turn, Termination)
  • Implement threading.Event() for thread control
  • Add proper resource cleanup in all code paths
  • Update field names in message parsing
  • Add graceful shutdown with termination messages
  • Implement timeout-based thread joining
  • Add detailed error logging with context
  • Test KeyboardInterrupt handling
  • Verify audio resource cleanup
  • Test connection failure scenarios

Testing your migration

  1. Basic Functionality: Verify transcription works with simple speech
  2. Error Handling: Test with invalid API keys or network issues
  3. Graceful Shutdown: Test Ctrl+C interruption
  4. Resource Cleanup: Monitor for memory leaks during extended use
  5. Thread Management: Verify proper thread termination
  6. Message Formatting: Test with format_turns enabled/disabled

Common migration issues

Issue: “WebSocket connection failed”

Solution: Verify you’re using the new v3 endpoint URL and proper authentication header format.

Issue: “Message type not recognized”

Solution: Update message type handling from old names (SessionBegins, PartialTranscript) to new ones (Begin, Turn).

Issue: “Audio thread won’t stop”

Solution: Ensure you’re using threading.Event() and calling stop_event.set() in error handlers.

Issue: “Resource leak warnings”

Solution: Verify all audio resources are properly cleaned up in on_close and finally blocks.

Benefits of migration

  • Improved Reliability: Better error handling and recovery
  • Lower Latency: Reduced buffer sizes for faster response
  • Enhanced Features: Formatted transcripts and session statistics
  • Better Resource Management: Proper cleanup prevents memory leaks
  • Graceful Shutdown: Clean termination with proper cleanup
  • Modern Architecture: Improved threading and event handling

Conclusion

This migration provides a more robust, maintainable, and feature-rich streaming transcription implementation. The enhanced error handling, resource management, and modern API features make it suitable for production use cases where reliability and performance are critical.