Use this file to discover all available pages before exploring further.
This cookbook guides you through migrating from AssemblyAI’s legacy Streaming STT model (v2) to our latest Universal Streaming STT model (v3), which provides ultra-low latency for faster transcription, intelligent endpointing for more natural speech detection, and improved accuracy across various audio conditions.Check out this blog post to learn more about this new model!
API Version: Upgrade from v2 (/v2/realtime/ws) to v3 (/v3/ws)
Enhanced Error Handling: Robust cleanup and resource management
Improved Threading: Better control over audio streaming threads
Modern Message Format: Updated message types and structure
Configuration Options: More flexible connection parameters
Graceful Shutdown: Proper termination handling
You can follow the step-by-step guide below to make changes to your existing code but here is what your code should look like in the end:
import pyaudioimport websocketimport jsonimport threadingimport timefrom urllib.parse import urlencodefrom datetime import datetime# --- Configuration ---YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API keyCONNECTION_PARAMS = { "sample_rate": 16000, "format_turns": True, # Request formatted final transcripts}API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"# Audio ConfigurationFRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]CHANNELS = 1FORMAT = pyaudio.paInt16# Global variables for audio stream and websocketaudio = Nonestream = Nonews_app = Noneaudio_thread = Nonestop_event = threading.Event() # To signal the audio thread to stop# --- WebSocket Event Handlers ---def on_open(ws): """Called when the WebSocket connection is established.""" print("WebSocket connection opened.") print(f"Connected to: {API_ENDPOINT}") # Start sending audio data in a separate thread def stream_audio(): global stream print("Starting audio streaming...") while not stop_event.is_set(): try: audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) # Send audio data as binary message ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Error streaming audio: {e}") # If stream read fails, likely means it's closed, stop the loop break print("Audio streaming stopped.") global audio_thread audio_thread = threading.Thread(target=stream_audio) audio_thread.daemon = ( True # Allow main thread to exit even if this thread is running ) audio_thread.start()def on_message(ws, message): try: data = json.loads(message) msg_type = data.get('type') if msg_type == "Begin": session_id = data.get('id') expires_at = data.get('expires_at') print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") elif msg_type == "Turn": transcript = data.get('transcript', '') if data.get('end_of_turn'): print('\r' + ' ' * 80 + '\r', end='') print(transcript) else: print(f"\r{transcript}", end='') elif msg_type == "Termination": audio_duration = data.get('audio_duration_seconds', 0) session_duration = data.get('session_duration_seconds', 0) print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") except json.JSONDecodeError as e: print(f"Error decoding message: {e}") except Exception as e: print(f"Error handling message: {e}")def on_error(ws, error): """Called when a WebSocket error occurs.""" print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error stop_event.set()def on_close(ws, close_status_code, close_msg): """Called when the WebSocket connection is closed.""" print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") # Ensure audio resources are released global stream, audio stop_event.set() # Signal audio thread just in case it's still running if stream: if stream.is_active(): stream.stop_stream() stream.close() stream = None if audio: audio.terminate() audio = None # Try to join the audio thread to ensure clean exit if audio_thread and audio_thread.is_alive(): audio_thread.join(timeout=1.0)# --- Main Execution ---def run(): global audio, stream, ws_app # Initialize PyAudio audio = pyaudio.PyAudio() # Open microphone stream try: stream = audio.open( input=True, frames_per_buffer=FRAMES_PER_BUFFER, channels=CHANNELS, format=FORMAT, rate=SAMPLE_RATE, ) print("Microphone stream opened successfully.") print("Speak into your microphone. Press Ctrl+C to stop.") except Exception as e: print(f"Error opening microphone stream: {e}") if audio: audio.terminate() return # Exit if microphone cannot be opened # Create WebSocketApp ws_app = websocket.WebSocketApp( API_ENDPOINT, header={"Authorization": YOUR_API_KEY}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, ) # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt ws_thread = threading.Thread(target=ws_app.run_forever) ws_thread.daemon = True ws_thread.start() try: # Keep main thread alive until interrupted while ws_thread.is_alive(): time.sleep(0.1) except KeyboardInterrupt: print("\nCtrl+C received. Stopping...") stop_event.set() # Signal audio thread to stop # Send termination message to the server if ws_app and ws_app.sock and ws_app.sock.connected: try: terminate_message = {"type": "Terminate"} print(f"Sending termination message: {json.dumps(terminate_message)}") ws_app.send(json.dumps(terminate_message)) # Give a moment for messages to process before forceful close time.sleep(5) except Exception as e: print(f"Error sending termination message: {e}") # Close the WebSocket connection (will trigger on_close) if ws_app: ws_app.close() # Wait for WebSocket thread to finish ws_thread.join(timeout=2.0) except Exception as e: print(f"\nAn unexpected error occurred: {e}") stop_event.set() if ws_app: ws_app.close() ws_thread.join(timeout=2.0) finally: # Final cleanup (already handled in on_close, but good as a fallback) if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")if __name__ == "__main__": run()
For more information on our Universal Streaming feature, see this section of our official documentation.
WebSocket runs in separate thread for better control
Proper KeyboardInterrupt handling
Graceful termination message sending
Thread joining with timeouts
Comprehensive cleanup in finally block
Streaming is billed on the total duration that your WebSocket connection stays open (session duration), not on the amount of audio sent. Always terminate sessions explicitly to avoid being billed for idle time — unterminated sessions auto-close after 3 hours and are billed for the full duration. See Streaming Speech-to-Text billing for details.
This migration provides a more robust, maintainable, and feature-rich streaming transcription implementation. The enhanced error handling, resource management, and modern API features make it suitable for production use cases where reliability and performance are critical.