The keyterms prompting feature helps improve recognition accuracy for specific words and phrases that are important to your use case. Keyterms prompting is supported for Universal-3 Pro, Universal-Streaming English, and Universal-Streaming Multilingual.
Start with no keytermsWe strongly recommend starting with no keyterms_prompt and then adding terms as needed based on important words for your use case that you are consistently seeing the model struggle with.Including a large number of terms or common terms that are well represented in the training data could lead to overcorrections and hallucinations.
Keyterms Prompting costs an additional $0.04/hour.
import pyaudioimport websocketimport jsonimport threadingimport timefrom urllib.parse import urlencodefrom datetime import datetime# --- Configuration ---YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API keyCONNECTION_PARAMS = { "sample_rate": 16000, "speech_model": "u3-rt-pro", "keyterms_prompt": json.dumps(["Keanu Reeves", "AssemblyAI", "Universal-2"])}API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"# Audio ConfigurationFRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]CHANNELS = 1FORMAT = pyaudio.paInt16# Global variables for audio stream and websocketaudio = Nonestream = Nonews_app = Noneaudio_thread = Nonestop_event = threading.Event() # To signal the audio thread to stop# --- WebSocket Event Handlers ---def on_open(ws): """Called when the WebSocket connection is established.""" print("WebSocket connection opened.") print(f"Connected to: {API_ENDPOINT}") # Start sending audio data in a separate thread def stream_audio(): global stream print("Starting audio streaming...") while not stop_event.is_set(): try: audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) # Send audio data as binary message ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Error streaming audio: {e}") # If stream read fails, likely means it's closed, stop the loop break print("Audio streaming stopped.") global audio_thread audio_thread = threading.Thread(target=stream_audio) audio_thread.daemon = ( True # Allow main thread to exit even if this thread is running ) audio_thread.start()def on_message(ws, message): try: data = json.loads(message) msg_type = data.get('type') if msg_type == "Begin": session_id = data.get('id') expires_at = data.get('expires_at') print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") elif msg_type == "Turn": transcript = data.get('transcript', '') if data.get('end_of_turn'): print('\r' + ' ' * 80 + '\r', end='') print(transcript) else: print(f"\r{transcript}", end='') elif msg_type == "Termination": audio_duration = data.get('audio_duration_seconds', 0) session_duration = data.get('session_duration_seconds', 0) print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") except json.JSONDecodeError as e: print(f"Error decoding message: {e}") except Exception as e: print(f"Error handling message: {e}")def on_error(ws, error): """Called when a WebSocket error occurs.""" print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error stop_event.set()def on_close(ws, close_status_code, close_msg): """Called when the WebSocket connection is closed.""" print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") # Ensure audio resources are released global stream, audio stop_event.set() # Signal audio thread just in case it's still running if stream: if stream.is_active(): stream.stop_stream() stream.close() stream = None if audio: audio.terminate() audio = None # Try to join the audio thread to ensure clean exit if audio_thread and audio_thread.is_alive(): audio_thread.join(timeout=1.0)# --- Main Execution ---def run(): global audio, stream, ws_app # Initialize PyAudio audio = pyaudio.PyAudio() # Open microphone stream try: stream = audio.open( input=True, frames_per_buffer=FRAMES_PER_BUFFER, channels=CHANNELS, format=FORMAT, rate=SAMPLE_RATE, ) print("Microphone stream opened successfully.") print("Speak into your microphone. Press Ctrl+C to stop.") except Exception as e: print(f"Error opening microphone stream: {e}") if audio: audio.terminate() return # Exit if microphone cannot be opened # Create WebSocketApp ws_app = websocket.WebSocketApp( API_ENDPOINT, header={"Authorization": YOUR_API_KEY}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, ) # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt ws_thread = threading.Thread(target=ws_app.run_forever) ws_thread.daemon = True ws_thread.start() try: # Keep main thread alive until interrupted while ws_thread.is_alive(): time.sleep(0.1) except KeyboardInterrupt: print("\nCtrl+C received. Stopping...") stop_event.set() # Signal audio thread to stop # Send termination message to the server if ws_app and ws_app.sock and ws_app.sock.connected: try: terminate_message = {"type": "Terminate"} print(f"Sending termination message: {json.dumps(terminate_message)}") ws_app.send(json.dumps(terminate_message)) # Give a moment for messages to process before forceful close time.sleep(5) except Exception as e: print(f"Error sending termination message: {e}") # Close the WebSocket connection (will trigger on_close) if ws_app: ws_app.close() # Wait for WebSocket thread to finish ws_thread.join(timeout=2.0) except Exception as e: print(f"\nAn unexpected error occurred: {e}") stop_event.set() if ws_app: ws_app.close() ws_thread.join(timeout=2.0) finally: # Final cleanup (already handled in on_close, but good as a fallback) if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")if __name__ == "__main__": run()
import loggingfrom typing import Typeimport assemblyai as aaifrom assemblyai.streaming.v3 import ( BeginEvent, StreamingClient, StreamingClientOptions, StreamingError, StreamingEvents, StreamingParameters, TerminationEvent, TurnEvent,)api_key = "<YOUR_API_KEY>"logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def on_begin(self: Type[StreamingClient], event: BeginEvent): print(f"Session started: {event.id}")def on_turn(self: Type[StreamingClient], event: TurnEvent): if event.end_of_turn: # Clear the line and print formatted final transcript on new line print(f"\r{' ' * 100}\r{event.transcript}") else: # Overwrite current line with partial unformatted transcript print(f"\r{event.transcript}", end='', flush=True)def on_terminated(self: Type[StreamingClient], event: TerminationEvent): print( f"Session terminated: {event.audio_duration_seconds} seconds of audio processed" )def on_error(self: Type[StreamingClient], error: StreamingError): print(f"Error occurred: {error}")def main(): client = StreamingClient( StreamingClientOptions( api_key=api_key, api_host="streaming.assemblyai.com", ) ) client.on(StreamingEvents.Begin, on_begin) client.on(StreamingEvents.Turn, on_turn) client.on(StreamingEvents.Termination, on_terminated) client.on(StreamingEvents.Error, on_error) client.connect( StreamingParameters( sample_rate=16000, speech_model="u3-rt-pro", keyterms_prompt=["Keanu Reeves", "AssemblyAI", "Universal-2"], ) ) try: client.stream( aai.extras.MicrophoneStream(sample_rate=16000) ) finally: client.disconnect(terminate=True)if __name__ == "__main__": main()
const WebSocket = require("ws");const mic = require("mic");const querystring = require("querystring");// --- Configuration ---const YOUR_API_KEY = "YOUR-API-KEY"; // Replace with your actual API keyconst CONNECTION_PARAMS = { sample_rate: 16000, speech_model: "u3-rt-pro", keyterms_prompt: JSON.stringify([ "Keanu Reeves", "AssemblyAI", "Universal-2", ]),};const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${querystring.stringify(CONNECTION_PARAMS)}`;// Audio Configurationconst SAMPLE_RATE = CONNECTION_PARAMS.sample_rate;const CHANNELS = 1;// Global variableslet micInstance = null;let micInputStream = null;let ws = null;let stopRequested = false;// --- Helper functions ---function formatTimestamp(timestamp) { return new Date(timestamp * 1000).toISOString();}// --- Main function ---async function run() { console.log("Starting AssemblyAI real-time transcription..."); // Initialize WebSocket connection ws = new WebSocket(API_ENDPOINT, { headers: { Authorization: YOUR_API_KEY, }, }); // Setup WebSocket event handlers ws.on("open", () => { console.log("WebSocket connection opened."); console.log(`Connected to: ${API_ENDPOINT}`); // Start the microphone startMicrophone(); }); ws.on("message", (message) => { try { const data = JSON.parse(message); const msgType = data.type; if (msgType === "Begin") { const sessionId = data.id; const expiresAt = data.expires_at; console.log( `\nSession began: ID=${sessionId}, ExpiresAt=${formatTimestamp(expiresAt)}` ); } else if (msgType === "Turn") { const transcript = data.transcript || ""; if (data.end_of_turn) { // Clear the line and print formatted final transcript on new line process.stdout.write("\r" + " ".repeat(100) + "\r"); console.log(transcript); } else { // Overwrite current line with partial unformatted transcript process.stdout.write(`\r${transcript}`); } } else if (msgType === "Termination") { const audioDuration = data.audio_duration_seconds; const sessionDuration = data.session_duration_seconds; console.log( `\nSession Terminated: Audio Duration=${audioDuration}s, Session Duration=${sessionDuration}s` ); } } catch (error) { console.error(`\nError handling message: ${error}`); console.error(`Message data: ${message}`); } }); ws.on("error", (error) => { console.error(`\nWebSocket Error: ${error}`); cleanup(); }); ws.on("close", (code, reason) => { console.log(`\nWebSocket Disconnected: Status=${code}, Msg=${reason}`); cleanup(); }); // Handle process termination setupTerminationHandlers();}function startMicrophone() { try { micInstance = mic({ rate: SAMPLE_RATE.toString(), channels: CHANNELS.toString(), debug: false, exitOnSilence: 6, // This won't actually exit, just a parameter for mic }); micInputStream = micInstance.getAudioStream(); micInputStream.on("data", (data) => { if (ws && ws.readyState === WebSocket.OPEN && !stopRequested) { // Send audio data to WebSocket ws.send(data); } }); micInputStream.on("error", (err) => { console.error(`Microphone Error: ${err}`); cleanup(); }); micInstance.start(); console.log("Microphone stream opened successfully."); console.log("Speak into your microphone. Press Ctrl+C to stop."); } catch (error) { console.error(`Error opening microphone stream: ${error}`); cleanup(); }}function cleanup() { stopRequested = true; // Stop microphone if it's running if (micInstance) { try { micInstance.stop(); } catch (error) { console.error(`Error stopping microphone: ${error}`); } micInstance = null; } // Close WebSocket connection if it's open if (ws && [WebSocket.OPEN, WebSocket.CONNECTING].includes(ws.readyState)) { try { // Send termination message if possible if (ws.readyState === WebSocket.OPEN) { const terminateMessage = { type: "Terminate" }; console.log( `Sending termination message: ${JSON.stringify(terminateMessage)}` ); ws.send(JSON.stringify(terminateMessage)); } ws.close(); } catch (error) { console.error(`Error closing WebSocket: ${error}`); } ws = null; } console.log("Cleanup complete.");}function setupTerminationHandlers() { // Handle Ctrl+C and other termination signals process.on("SIGINT", () => { console.log("\nCtrl+C received. Stopping..."); cleanup(); // Give time for cleanup before exiting setTimeout(() => process.exit(0), 1000); }); process.on("SIGTERM", () => { console.log("\nTermination signal received. Stopping..."); cleanup(); // Give time for cleanup before exiting setTimeout(() => process.exit(0), 1000); }); // Handle uncaught exceptions process.on("uncaughtException", (error) => { console.error(`\nUncaught exception: ${error}`); cleanup(); // Give time for cleanup before exiting setTimeout(() => process.exit(1), 1000); });}// Start the applicationrun();
import { Readable } from "stream";import { AssemblyAI } from "assemblyai";import recorder from "node-record-lpcm16";const run = async () => { const client = new AssemblyAI({ apiKey: "<YOUR_API_KEY>", }); const transcriber = client.streaming.transcriber({ sampleRate: 16_000, speechModel: "u3-rt-pro", keytermsPrompt: ["Keanu Reeves", "AssemblyAI", "Universal-2"], }); transcriber.on("open", ({ id }) => { console.log(`Session opened with ID: ${id}`); }); transcriber.on("error", (error) => { console.error("Error:", error); }); transcriber.on("close", (code, reason) => console.log("Session closed:", code, reason) ); transcriber.on("turn", (turn) => { if (turn.end_of_turn) { // Clear the line and print formatted final transcript on new line process.stdout.write("\r" + " ".repeat(100) + "\r"); console.log(turn.transcript); } else { // Overwrite current line with partial unformatted transcript process.stdout.write(`\r${turn.transcript}`); } }); try { console.log("Connecting to streaming transcript service"); await transcriber.connect(); console.log("Starting recording"); const recording = recorder.record({ channels: 1, sampleRate: 16_000, audioType: "wav", // Linear PCM }); Readable.toWeb(recording.stream()).pipeTo(transcriber.stream()); // Stop recording and close connection using Ctrl-C. process.on("SIGINT", async function () { console.log(); console.log("Stopping recording"); recording.stop(); console.log("Closing streaming transcript connection"); await transcriber.close(); process.exit(); }); } catch (error) { console.error(error); }};run();
The streaming model itself is biased during inference to be more accurate at identifying words from your keyterms list. This happens in real-time as words are emitted during the streaming process, providing immediate improvements to recognition accuracy. This component is enabled by default.
After each turn is completed, an additional boosting pass analyzes the full transcript using your keyterms list. This post-processing step provides a second layer of accuracy improvement by examining the complete context of the turn. Turn-level boosting requires format_turns=true to be enabled.
For Universal-3 Pro (u3-rt-pro), turn-level boosting is always active. For Universal-Streaming English and Universal-Streaming Multilingual, turn-level boosting is only active when format_turns=true.
Both stages work together to maximize recognition accuracy for your keyterms throughout the streaming process.
Dynamic keyterms prompting allows you to update keyterms during an active streaming session using the UpdateConfiguration message. This enables you to adapt the recognition context in real-time based on conversation flow or changing requirements.
To update keyterms while streaming, send an UpdateConfiguration message with a new keyterms_prompt array:
Python
Python SDK
Javascript
JavaScript SDK
# Replace or establish new set of keytermswebsocket.send('{"type": "UpdateConfiguration", "keyterms_prompt": ["Universal-3"]}')# Remove keyterms and reset context biasingwebsocket.send('{"type": "UpdateConfiguration", "keyterms_prompt": []}')
# Replace or establish new set of keytermsclient.update_configuration(keyterms_prompt=["Universal-3"])# Remove keyterms and reset context biasingclient.update_configuration(keyterms_prompt=[])
// Replace or establish new set of keytermswebsocket.send( '{"type": "UpdateConfiguration", "keyterms_prompt": ["Universal-3"]}');// Remove keyterms and reset context biasingwebsocket.send('{"type": "UpdateConfiguration", "keyterms_prompt": []}');
// Replace or establish new set of keytermstranscriber.updateConfiguration({ keytermsPrompt: ["Universal-3"] });// Remove keyterms and reset context biasingtranscriber.updateConfiguration({ keytermsPrompt: [] });
Replacing keyterms: Providing a new array of keyterms completely replaces the existing set. The new keyterms take effect immediately for subsequent audio processing.
Clearing keyterms: Sending an empty array [] removes all keyterms and resets context biasing to the default state.
Both boosting stages: Dynamic keyterms work with both word-level boosting (native context biasing) and turn-level boosting (metaphone-based), just like initial keyterms.
To maximize the effectiveness of keyterms prompting:
Specify Unique Terminology: Include proper names, company names, technical terms, or vocabulary specific to your domain that might not be commonly recognized.
Exact Spelling and Capitalization: Provide keyterms with the precise spelling and capitalization you expect to see in the output transcript. This helps the system accurately identify the terms.
Avoid Common Words: Do not include single, common English words (e.g., “information”) as keyterms. The system is generally proficient with such words, and adding them as keyterms can be redundant.