Documentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
Overview
By the end of this tutorial, you’ll be able to transcribe audio from your microphone.speech_model is requiredYou must include the
speech_model parameter in every streaming transcription request. There is no default model. If you omit speech_model, the request will fail. See Model selection to learn about available models.Recommended modelWe recommend Universal-3 Pro Streaming for streaming transcription. It provides the highest accuracy with sub-300ms latency, native multilingual code switching, and advanced prompting support — ideal for voice agents and real-time applications.
Streaming is billed per sessionStreaming Speech-to-Text is billed on the total duration that your WebSocket connection stays open, not on the amount of audio you send. Always send a termination message when you’re done with a stream — sessions that aren’t closed auto-close after 3 hours and are billed for the full duration. See Billing and pricing for details.
Before you begin
To complete this tutorial, you need: Here’s the full sample code of what you’ll build in this tutorial:- Python
- Python SDK
- JavaScript
- JavaScript SDK
import pyaudio
import websocket
import json
import threading
import time
import wave
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
CONNECTION_PARAMS = {
"speech_model": "u3-rt-pro",
"sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
# WAV recording variables
recorded_frames = [] # Store audio frames for WAV file
recording_lock = threading.Lock() # Thread-safe access to recorded_frames
def save_wav_file():
"""Save recorded audio frames to a WAV file."""
if not recorded_frames:
print("No audio data recorded.")
return
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recorded_audio_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(2) # 16-bit = 2 bytes
wf.setframerate(SAMPLE_RATE)
# Write all recorded frames
with recording_lock:
wf.writeframes(b''.join(recorded_frames))
print(f"Audio saved to: {filename}")
print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
except Exception as e:
print(f"Error saving WAV file: {e}")
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Store audio data for WAV recording
with recording_lock:
recorded_frames.append(audio_data)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
if data.get('end_of_turn'):
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Save recorded audio to WAV file
save_wav_file()
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# --- Main Execution ---
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
print("Audio will be saved to a WAV file when the session ends.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
import logging
from typing import Type
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
StreamingSessionParameters,
TerminationEvent,
TurnEvent,
)
api_key = "<YOUR_API_KEY>"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def on_begin(self: Type[StreamingClient], event: BeginEvent):
print(f"Session started: {event.id}")
def on_turn(self: Type[StreamingClient], event: TurnEvent):
print(f"{event.transcript} ({event.end_of_turn})")
def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
print(
f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
)
def on_error(self: Type[StreamingClient], error: StreamingError):
print(f"Error occurred: {error}")
def main():
client = StreamingClient(
StreamingClientOptions(
api_key=api_key,
api_host="streaming.assemblyai.com",
)
)
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)
client.connect(
StreamingParameters(
speech_model="u3-rt-pro",
sample_rate=16000,
)
)
try:
client.stream(
aai.extras.MicrophoneStream(sample_rate=16000)
)
finally:
client.disconnect(terminate=True)
if __name__ == "__main__":
main()
const WebSocket = require("ws");
const mic = require("mic");
const querystring = require("querystring");
const fs = require("fs");
// --- Configuration ---
const YOUR_API_KEY = "YOUR-API-KEY"; // Replace with your actual API key
const CONNECTION_PARAMS = {
speechModel: "u3-rt-pro",
sampleRate: 16000,
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${querystring.stringify(CONNECTION_PARAMS)}`;
// Audio Configuration
const SAMPLE_RATE = CONNECTION_PARAMS.sampleRate;
const CHANNELS = 1;
// Global variables
let micInstance = null;
let micInputStream = null;
let ws = null;
let stopRequested = false;
// WAV recording variables
let recordedFrames = []; // Store audio frames for WAV file
// --- Helper functions ---
function clearLine() {
process.stdout.write("\r" + " ".repeat(80) + "\r");
}
function formatTimestamp(timestamp) {
return new Date(timestamp * 1000).toISOString();
}
function createWavHeader(sampleRate, channels, dataLength) {
const buffer = Buffer.alloc(44);
// RIFF header
buffer.write("RIFF", 0);
buffer.writeUInt32LE(36 + dataLength, 4);
buffer.write("WAVE", 8);
// fmt chunk
buffer.write("fmt ", 12);
buffer.writeUInt32LE(16, 16); // fmt chunk size
buffer.writeUInt16LE(1, 20); // PCM format
buffer.writeUInt16LE(channels, 22);
buffer.writeUInt32LE(sampleRate, 24);
buffer.writeUInt32LE(sampleRate * channels * 2, 28); // byte rate
buffer.writeUInt16LE(channels * 2, 32); // block align
buffer.writeUInt16LE(16, 34); // bits per sample
// data chunk
buffer.write("data", 36);
buffer.writeUInt32LE(dataLength, 40);
return buffer;
}
function saveWavFile() {
if (recordedFrames.length === 0) {
console.log("No audio data recorded.");
return;
}
// Generate filename with timestamp
const timestamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const filename = `recorded_audio_${timestamp}.wav`;
try {
// Combine all recorded frames
const audioData = Buffer.concat(recordedFrames);
const dataLength = audioData.length;
// Create WAV header
const wavHeader = createWavHeader(SAMPLE_RATE, CHANNELS, dataLength);
// Write WAV file
const wavFile = Buffer.concat([wavHeader, audioData]);
fs.writeFileSync(filename, wavFile);
console.log(`Audio saved to: ${filename}`);
console.log(
`Duration: ${(dataLength / (SAMPLE_RATE * CHANNELS * 2)).toFixed(2)} seconds`
);
} catch (error) {
console.error(`Error saving WAV file: ${error}`);
}
}
// --- Main function ---
async function run() {
console.log("Starting AssemblyAI streaming transcription...");
console.log("Audio will be saved to a WAV file when the session ends.");
// Initialize WebSocket connection
ws = new WebSocket(API_ENDPOINT, {
headers: {
Authorization: YOUR_API_KEY,
},
});
// Setup WebSocket event handlers
ws.on("open", () => {
console.log("WebSocket connection opened.");
console.log(`Connected to: ${API_ENDPOINT}`);
// Start the microphone
startMicrophone();
});
ws.on("message", (message) => {
try {
const data = JSON.parse(message);
const msgType = data.type;
if (msgType === "Begin") {
const sessionId = data.id;
const expiresAt = data.expires_at;
console.log(
`\nSession began: ID=${sessionId}, ExpiresAt=${formatTimestamp(expiresAt)}`
);
} else if (msgType === "Turn") {
const transcript = data.transcript || "";
if (data.end_of_turn) {
clearLine();
console.log(transcript);
} else {
process.stdout.write(`\r${transcript}`);
}
} else if (msgType === "Termination") {
const audioDuration = data.audio_duration_seconds;
const sessionDuration = data.session_duration_seconds;
console.log(
`\nSession Terminated: Audio Duration=${audioDuration}s, Session Duration=${sessionDuration}s`
);
}
} catch (error) {
console.error(`\nError handling message: ${error}`);
console.error(`Message data: ${message}`);
}
});
ws.on("error", (error) => {
console.error(`\nWebSocket Error: ${error}`);
cleanup();
});
ws.on("close", (code, reason) => {
console.log(`\nWebSocket Disconnected: Status=${code}, Msg=${reason}`);
cleanup();
});
// Handle process termination
setupTerminationHandlers();
}
function startMicrophone() {
try {
micInstance = mic({
rate: SAMPLE_RATE.toString(),
channels: CHANNELS.toString(),
debug: false,
exitOnSilence: 6, // This won't actually exit, just a parameter for mic
});
micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws && ws.readyState === WebSocket.OPEN && !stopRequested) {
// Store audio data for WAV recording
recordedFrames.push(Buffer.from(data));
// Send audio data to WebSocket
ws.send(data);
}
});
micInputStream.on("error", (err) => {
console.error(`Microphone Error: ${err}`);
cleanup();
});
micInstance.start();
console.log("Microphone stream opened successfully.");
console.log("Speak into your microphone. Press Ctrl+C to stop.");
} catch (error) {
console.error(`Error opening microphone stream: ${error}`);
cleanup();
}
}
function cleanup() {
stopRequested = true;
// Save recorded audio to WAV file
saveWavFile();
// Stop microphone if it's running
if (micInstance) {
try {
micInstance.stop();
} catch (error) {
console.error(`Error stopping microphone: ${error}`);
}
micInstance = null;
}
// Close WebSocket connection if it's open
if (ws && [WebSocket.OPEN, WebSocket.CONNECTING].includes(ws.readyState)) {
try {
// Send termination message if possible
if (ws.readyState === WebSocket.OPEN) {
const terminateMessage = { type: "Terminate" };
console.log(
`Sending termination message: ${JSON.stringify(terminateMessage)}`
);
ws.send(JSON.stringify(terminateMessage));
}
ws.close();
} catch (error) {
console.error(`Error closing WebSocket: ${error}`);
}
ws = null;
}
console.log("Cleanup complete.");
}
function setupTerminationHandlers() {
// Handle Ctrl+C and other termination signals
process.on("SIGINT", () => {
console.log("\nCtrl+C received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
process.on("SIGTERM", () => {
console.log("\nTermination signal received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
// Handle uncaught exceptions
process.on("uncaughtException", (error) => {
console.error(`\nUncaught exception: ${error}`);
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(1), 1000);
});
}
// Start the application
run();
import { Readable } from "stream";
import { AssemblyAI } from "assemblyai";
import recorder from "node-record-lpcm16";
const run = async () => {
const client = new AssemblyAI({
apiKey: "<YOUR_API_KEY>",
});
const transcriber = client.streaming.transcriber({
speechModel: "u3-rt-pro",
sampleRate: 16_000,
});
transcriber.on("open", ({ id }) => {
console.log(`Session opened with ID: ${id}`);
});
transcriber.on("error", (error) => {
console.error("Error:", error);
});
transcriber.on("close", (code, reason) =>
console.log("Session closed:", code, reason)
);
transcriber.on("turn", (turn) => {
if (!turn.transcript) {
return;
}
console.log("Turn:", turn.transcript);
});
try {
console.log("Connecting to streaming transcript service");
await transcriber.connect();
console.log("Starting recording");
const recording = recorder.record({
channels: 1,
sampleRate: 16_000,
audioType: "wav", // Linear PCM
});
Readable.toWeb(recording.stream()).pipeTo(transcriber.stream());
// Stop recording and close connection using Ctrl-C.
process.on("SIGINT", async function () {
console.log();
console.log("Stopping recording");
recording.stop();
console.log("Closing streaming transcript connection");
await transcriber.close();
process.exit();
});
} catch (error) {
console.error(error);
}
};
run();
Step 1: Install and import dependencies
- Python
- Python SDK
- JavaScript
- JavaScript SDK
Create a file called
main.py and import the following packages at the top of your file:import logging
from typing import Type
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
StreamingSessionParameters,
TerminationEvent,
TurnEvent,
)
Run
npm init to create an NPM package, and then install the following packages via NPM:npm install ws mic
Run
npm init to create an NPM package, and then install the necessary packages via NPM:npm install assemblyai node-record-lpcm16
The module For most linux disto’s:For Windows:download the binaries
node-record-lpcm16 requires SoX and it must be available in your $PATH.For Mac OS:brew install sox
sudo apt-get install sox libsox-fmt-all
Step 2: Configure the API key
In this step, you’ll configure your AssemblyAI API key to authenticate your application and enable access to the streaming transcription service.Browse to API Keys in your dashboard, and then copy your API key.
- Python
- Python SDK
- JavaScript
- JavaScript SDK
Store your API key in a variable. Replace
<YOUR_API_KEY> with your copied API key.YOUR_API_KEY = "<YOUR_API_KEY>"
Configure the SDK to use your API key. Replace
<YOUR_API_KEY> with your copied API key.
api_key = "<YOUR_API_KEY>"
Store your API key in a variable. Replace
<YOUR_API_KEY> with your copied API key.const API_KEY = "<YOUR_API_KEY>";
In your file, define an async function and create an SDK client within the function. Configure the client to use your API key by replacing
<YOUR_API_KEY> with your copied API key.const run = async () => {
const client = new AssemblyAI({
apiKey: "<YOUR_API_KEY>",
});
};
Authenticate with a temporary tokenIf you need to authenticate on the client, you can avoid exposing your API key
by using temporary authentication
tokens.
Step 3: Set up audio and websocket configuration
- Python
- Python SDK
- JavaScript
- JavaScript SDK
Set the parameters that control how your client connects to AssemblyAI’s streaming transcription API. These options determine things like audio sample rate and whether you want punctuation and formatting in your final transcripts.
CONNECTION_PARAMS = {
"speech_model": "u3-rt-pro",
"sample_rate": 16000,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
See Streaming endpoints and data zones for more information on endpoints for Streaming STT.
Prepare your audio input settings and recording logic. This configuration controls how microphone data is streamed in real-time:
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
# WAV recording variables
recorded_frames = [] # Store audio frames for WAV file
recording_lock = threading.Lock() # Thread-safe access to recorded_frames
def save_wav_file():
"""Save recorded audio frames to a WAV file."""
if not recorded_frames:
print("No audio data recorded.")
return
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recorded_audio_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(2) # 16-bit = 2 bytes
wf.setframerate(SAMPLE_RATE)
# Write all recorded frames
with recording_lock:
wf.writeframes(b''.join(recorded_frames))
print(f"Audio saved to: {filename}")
print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
except Exception as e:
print(f"Error saving WAV file: {e}")
The Python SDK handles audio configuration automatically. You’ll specify the sample rate when connecting to the transcriber. If you don’t set a sample rate, it defaults to 16 kHz.
See Streaming endpoints and data zones for more information on endpoints for Streaming STT.
Set the parameters that control how your client connects to AssemblyAI’s streaming transcription API. These options determine things like audio sample rate and whether you want punctuation and formatting in your final transcripts.
const CONNECTION_PARAMS = {
speechModel: "u3-rt-pro",
sampleRate: 16000,
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${querystring.stringify(CONNECTION_PARAMS)}`;
See Streaming endpoints and data zones for more information on endpoints for Streaming STT.
Prepare your audio input settings and recording logic. This configuration controls how microphone data is streamed in real-time:
// Audio Configuration
const SAMPLE_RATE = CONNECTION_PARAMS.sampleRate;
const CHANNELS = 1;
// Global variables
let micInstance = null;
let micInputStream = null;
let ws = null;
let stopRequested = false;
// WAV recording variables
let recordedFrames = []; // Store audio frames for WAV file
// --- Helper functions ---
function clearLine() {
process.stdout.write("\r" + " ".repeat(80) + "\r");
}
function formatTimestamp(timestamp) {
return new Date(timestamp * 1000).toISOString();
}
function createWavHeader(sampleRate, channels, dataLength) {
const buffer = Buffer.alloc(44);
// RIFF header
buffer.write("RIFF", 0);
buffer.writeUInt32LE(36 + dataLength, 4);
buffer.write("WAVE", 8);
// fmt chunk
buffer.write("fmt ", 12);
buffer.writeUInt32LE(16, 16); // fmt chunk size
buffer.writeUInt16LE(1, 20); // PCM format
buffer.writeUInt16LE(channels, 22);
buffer.writeUInt32LE(sampleRate, 24);
buffer.writeUInt32LE(sampleRate * channels * 2, 28); // byte rate
buffer.writeUInt16LE(channels * 2, 32); // block align
buffer.writeUInt16LE(16, 34); // bits per sample
// data chunk
buffer.write("data", 36);
buffer.writeUInt32LE(dataLength, 40);
return buffer;
}
function saveWavFile() {
if (recordedFrames.length === 0) {
console.log("No audio data recorded.");
return;
}
// Generate filename with timestamp
const timestamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const filename = `recorded_audio_${timestamp}.wav`;
try {
// Combine all recorded frames
const audioData = Buffer.concat(recordedFrames);
const dataLength = audioData.length;
// Create WAV header
const wavHeader = createWavHeader(SAMPLE_RATE, CHANNELS, dataLength);
// Write WAV file
const wavFile = Buffer.concat([wavHeader, audioData]);
fs.writeFileSync(filename, wavFile);
console.log(`Audio saved to: ${filename}`);
console.log(
`Duration: ${(dataLength / (SAMPLE_RATE * CHANNELS * 2)).toFixed(2)} seconds`
);
} catch (error) {
console.error(`Error saving WAV file: ${error}`);
}
}
Create a new streaming service from the AssemblyAI client and add it to your async function. If you don’t set a sample rate, it defaults to 16 kHz.For the JavaScript SDK, we will set up the audio configuration in Step 5 of this guide.
const transcriber = client.streaming.transcriber({
speechModel: "u3-rt-pro",
sampleRate: 16_000,
});
See Streaming endpoints and data zones for more information on endpoints for Streaming STT.
Step 4: Create event handlers
In this step, you’ll define event handlers to manage the different types of events emitted during the streaming session. The handlers will respond to session lifecycle events, transcription turns, errors, and session termination.- Python
- Python SDK
- JavaScript
- JavaScript SDK
Implement basic event handlers. These handlers let your app respond to key streaming events:
on_open– Starts streaming microphone audio in a background thread.on_message– Handles transcription events likeBegin,Turn, andTermination.on_error– Logs any connection or streaming errors and triggers cleanup.on_close– Cleans up audio resources and saves a WAV recording when the session ends.
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Store audio data for WAV recording
with recording_lock:
recorded_frames.append(audio_data)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
if data.get('end_of_turn'):
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Save recorded audio to WAV file
save_wav_file()
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
Implement basic event handlers. These handlers let your app respond to key streaming events:
on_begin– Logs when the session starts.on_turn– Handles each transcription turn and optionally enables formatted turns.on_terminated– Logs when the session ends and how much audio was processed.on_error– Captures and prints any errors during streaming.
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def on_begin(self: Type[StreamingClient], event: BeginEvent):
print(f"Session started: {event.id}")
def on_turn(self: Type[StreamingClient], event: TurnEvent):
print(f"{event.transcript} ({event.end_of_turn})")
def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
print(
f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
)
def on_error(self: Type[StreamingClient], error: StreamingError):
print(f"Error occurred: {error}")
Create an async function. Within it you’ll initialize the the WebSocket and set the event handlers:
open– Triggered when the WebSocket connection is established; starts the microphone stream.message– Handles incoming messages likeBegin,Turn, andTermination, and displays transcripts in real time.error– Logs connection or message errors and triggers cleanup.close– Called when the connection closes; logs status and reason, and cleans up resources.
async function run() {
console.log("Starting AssemblyAI streaming transcription...");
console.log("Audio will be saved to a WAV file when the session ends.");
// Initialize WebSocket connection
ws = new WebSocket(API_ENDPOINT, {
headers: {
Authorization: YOUR_API_KEY,
},
});
// Setup WebSocket event handlers
ws.on("open", () => {
console.log("WebSocket connection opened.");
console.log(`Connected to: ${API_ENDPOINT}`);
// Start the microphone
startMicrophone();
});
ws.on("message", (message) => {
try {
const data = JSON.parse(message);
const msgType = data.type;
if (msgType === "Begin") {
const sessionId = data.id;
const expiresAt = data.expires_at;
console.log(
`\nSession began: ID=${sessionId}, ExpiresAt=${formatTimestamp(expiresAt)}`
);
} else if (msgType === "Turn") {
const transcript = data.transcript || "";
if (data.end_of_turn) {
clearLine();
console.log(transcript);
} else {
process.stdout.write(`\r${transcript}`);
}
} else if (msgType === "Termination") {
const audioDuration = data.audio_duration_seconds;
const sessionDuration = data.session_duration_seconds;
console.log(
`\nSession Terminated: Audio Duration=${audioDuration}s, Session Duration=${sessionDuration}s`
);
}
} catch (error) {
console.error(`\nError handling message: ${error}`);
console.error(`Message data: ${message}`);
}
});
ws.on("error", (error) => {
console.error(`\nWebSocket Error: ${error}`);
cleanup();
});
ws.on("close", (code, reason) => {
console.log(`\nWebSocket Disconnected: Status=${code}, Msg=${reason}`);
cleanup();
});
// Handle process termination
setupTerminationHandlers();
}
Add basic event handlers to your async function. These handlers let your app respond to key streaming events:
open– Triggered when the session starts; logs the session ID.error– Logs any errors that occur during the session.close– Triggered when the session ends; logs the close code and reason.turn– Handles each transcription turn and logs the transcript if available.
transcriber.on("open", ({ id }) => {
console.log(`Session opened with ID: ${id}`);
});
transcriber.on("error", (error) => {
console.error("Error:", error);
});
transcriber.on("close", (code, reason) =>
console.log("Session closed:", code, reason)
);
transcriber.on("turn", (turn) => {
if (!turn.transcript) {
return;
}
console.log("Turn:", turn.transcript);
});
Message sequence and turn eventsTo get a better understanding of the turn event and the message sequences,
check out our Message Sequence Breakdown page. This
object is how you’ll receive your transcripts.
Step 5: Connect and start transcription
Streaming Speech-to-Text uses WebSockets to stream audio to AssemblyAI. This requires first establishing a connection to the API.- Python
- Python SDK
- JavaScript
- JavaScript SDK
Create a main execution function and initialize the audio stream.
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
print("Audio will be saved to a WAV file when the session ends.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
Next, create a WebSocket connection to the streaming service:
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
In the
main function create a client and connect to the streaming service: client = StreamingClient(
StreamingClientOptions(
api_key=api_key,
api_host="streaming.assemblyai.com",
)
)
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)
client.connect(
StreamingParameters(
speech_model="u3-rt-pro",
sample_rate=16000,
)
)
Initializing WebSocketThe WebSocket was initialized in at the beginning of the
main function in
Step 4.Create the function that initializes the audio stream. This function is called within the
main function from Step 4.function startMicrophone() {
try {
micInstance = mic({
rate: SAMPLE_RATE.toString(),
channels: CHANNELS.toString(),
debug: false,
exitOnSilence: 6, // This won't actually exit, just a parameter for mic
});
micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws && ws.readyState === WebSocket.OPEN && !stopRequested) {
// Store audio data for WAV recording
recordedFrames.push(Buffer.from(data));
// Send audio data to WebSocket
ws.send(data);
}
});
micInputStream.on("error", (err) => {
console.error(`Microphone Error: ${err}`);
cleanup();
});
micInstance.start();
console.log("Microphone stream opened successfully.");
console.log("Speak into your microphone. Press Ctrl+C to stop.");
} catch (error) {
console.error(`Error opening microphone stream: ${error}`);
cleanup();
}
}
In your async function, connect to the transcriber. Create a new microphone stream and set up the audio configuration after the transcriber connects. The
sampleRate needs to be the same value used in the streaming service setting. try {
console.log("Connecting to streaming transcript service");
await transcriber.connect();
console.log("Starting recording");
const recording = recorder.record({
channels: 1,
sampleRate: 16_000,
audioType: "wav", // Linear PCM
});
} catch (error) {
console.error(error);
}
Step 6: Close the connection
- Python
- Python SDK
- JavaScript
- JavaScript SDK
Close the WebSocket connection when you’re done:The connection will also close automatically when you press Ctrl+C. In both cases, the
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
.close() handler will clean up the audio resources.Disconnect the client when you’re done:The connection will also close automatically when you press Ctrl+C. In both cases, the
finally:
client.disconnect(terminate=True)
.disconnect() handler will clean up the audio resources.Terminate the session when you’re done:The connection will also close automatically when you press Ctrl+C. In both cases, the
function cleanup() {
stopRequested = true;
// Save recorded audio to WAV file
saveWavFile();
// Stop microphone if it's running
if (micInstance) {
try {
micInstance.stop();
} catch (error) {
console.error(`Error stopping microphone: ${error}`);
}
micInstance = null;
}
// Close WebSocket connection if it's open
if (ws && [WebSocket.OPEN, WebSocket.CONNECTING].includes(ws.readyState)) {
try {
// Send termination message if possible
if (ws.readyState === WebSocket.OPEN) {
const terminateMessage = { type: "Terminate" };
console.log(
`Sending termination message: ${JSON.stringify(terminateMessage)}`
);
ws.send(JSON.stringify(terminateMessage));
}
ws.close();
} catch (error) {
console.error(`Error closing WebSocket: ${error}`);
}
ws = null;
}
console.log("Cleanup complete.");
}
function setupTerminationHandlers() {
// Handle Ctrl+C and other termination signals
process.on("SIGINT", () => {
console.log("\nCtrl+C received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
process.on("SIGTERM", () => {
console.log("\nTermination signal received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
// Handle uncaught exceptions
process.on("uncaughtException", (error) => {
console.error(`\nUncaught exception: ${error}`);
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(1), 1000);
});
}
cleanup function will clean up the audio resources.Terminate the session when you’re done:
try {
/* ....code for transcriber connection and audio configuration here... */
// Stop recording and close connection using Ctrl-C.
process.on("SIGINT", async function () {
console.log();
console.log("Stopping recording");
recording.stop();
console.log("Closing streaming transcript connection");
await transcriber.close();
process.exit();
});
} catch (error) {
console.error(error);
}
Note: Pricing is based on session duration so it is very important to close
sessions properly to avoid unexpected usage and cost.