An often-overlooked aspect of implementing AssemblyAI’s Streaming Speech-to-Text (STT) service is efficiently terminating transcription sessions. In this cookbook, you will learn how to terminate a Streaming session after any fixed duration of silence. For the full code, refer to this GitHub gist.Documentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
Quickstart
import pyaudio
import websocket
import json
import os
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
CONNECTION_PARAMS = {
"speech_model": "u3-rt-pro",
"sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
# Silence tracking
last_transcript_received = datetime.now()
terminated = False
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
global last_transcript_received, terminated
try:
data = json.loads(message)
msg_type = data.get('type')
if terminated and msg_type != "Termination":
return
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
if data.get('end_of_turn'):
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
# Update timestamp if meaningful speech received
if transcript.strip():
last_transcript_received = datetime.now()
# Check for silence timeout
silence_duration = (datetime.now() - last_transcript_received).total_seconds()
if silence_duration > 5:
print("No transcription received in 5 seconds. Terminating session...")
try:
ws.send(json.dumps({"type": "Terminate"}))
except Exception:
pass
terminated = True
stop_event.set()
return
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# --- Main Execution ---
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": ASSEMBLYAI_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
Get Started
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for an AssemblyAI account and get your API key from your dashboard.Step-by-step instructions
First, install the required packages.pip install websocket-client pyaudio
import pyaudio
import websocket
import json
import os
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
Audio configuration and global variables
CONNECTION_PARAMS = {
"speech_model": "u3-rt-pro",
"sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
Implementing Speech Activity Checks
Our Streaming API emits a Turn Event each time speech is processed. You can use this behavior to detect inactivity and automatically terminate the session. We track the timestamp of the most recent non-empty transcript. On every Turn Event, we:- Update the timestamp if meaningful speech is received
- Check how many seconds have passed since the last valid transcript
- If that exceeds your timeout (e.g. 5 seconds), terminate the session
Key Variables
last_transcript_received = datetime.now()
terminated = False
WebSocket event handlers
Open WebSocket
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
Handle WebSocket messages with silence detection
def on_message(ws, message):
global last_transcript_received, terminated
try:
data = json.loads(message)
msg_type = data.get('type')
if terminated and msg_type != "Termination":
return
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
if data.get('end_of_turn'):
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
# Update timestamp if meaningful speech received
if transcript.strip():
last_transcript_received = datetime.now()
# Check for silence timeout
silence_duration = (datetime.now() - last_transcript_received).total_seconds()
if silence_duration > 5:
print("No transcription received in 5 seconds. Terminating session...")
try:
ws.send(json.dumps({"type": "Terminate"}))
except Exception:
pass
terminated = True
stop_event.set()
return
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
WebSocket error and close handlers
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
Begin streaming STT transcription
def run():
global audio, stream, ws_app
audio = pyaudio.PyAudio()
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": ASSEMBLYAI_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
What You’ll Observe
- Live transcription continues as long as there’s speech
- After 5 seconds of silence, the session ends automatically
silence_duration > 5 check.