Skip to main content

Documentation Index

Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

This guide shows you how to transcribe WAV audio files with varying sample rates using our Streaming API.
If you’re streaming a pre-recorded file for benchmarking or testing, see Stream a pre-recorded file in real time for wall-clock pacing that more closely simulates live microphone input.

Quickstart

Here is the complete Python script to transcribe a WAV audio file using the Streaming API.
import websocket
import json
import threading
import time
import wave
import sys
import os
from urllib.parse import urlencode
from pathlib import Path

# --- Configuration ---
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
AUDIO_FILE = "audio.wav"  # Path to your audio file
SAMPLE_RATE = 48000  # Change to match the sample rate of your audio file
SAVE_TRANSCRIPT_TO_FILE = True  # Set to False to disable saving transcript to file
PLAY_AUDIO = True  # Set to False to disable audio playback

CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": SAMPLE_RATE,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Global variables
ws_app = None
audio_thread = None
stop_event = threading.Event()

# Track session data for output file
session_data = {
    "session_id": None,
    "audio_file": AUDIO_FILE,
    "audio_duration_seconds": None,
    "turns": []
}

# --- Helper Functions ---

def validate_audio_file(filepath, sample_rate):
    """Validate audio file before streaming."""
    file_ext = Path(filepath).suffix.lower()
    if file_ext != ".wav":
        print(f"Error: Only WAV files are supported. Got: {file_ext}", file=sys.stderr)
        print(f"Convert your file to WAV using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
        sys.exit(1)

    with wave.open(filepath, 'rb') as wav_file:
        if wav_file.getnchannels() != 1:
            print("Error: Only mono audio is supported", file=sys.stderr)
            print(f"Convert your file to mono using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)

        file_sample_rate = wav_file.getframerate()
        if file_sample_rate != sample_rate:
            print(f"Error: File sample rate ({file_sample_rate}) doesn't match expected rate ({sample_rate})", file=sys.stderr)
            print(f"Either update SAMPLE_RATE to {file_sample_rate}, or convert your file using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)


def save_transcript():
    """Save the transcript to a file in the same directory as the script."""
    audio_name = Path(session_data["audio_file"]).stem
    session_id = session_data["session_id"] or "unknown"
    output_file = f"{audio_name}_{session_id}.txt"

    with open(output_file, "w") as f:
        f.write(f"AssemblyAI Session ID: {session_data['session_id']}\n")
        f.write(f"Audio file: {session_data['audio_file']}\n")
        f.write(f"Audio duration: {session_data['audio_duration_seconds']} seconds\n")
        f.write("See all available parameters and defaults at https://www.assemblyai.com/docs/api-reference/streaming-api/universal-3-pro-streaming#request.query\n\n")
        f.write("\nTranscription Output\n")
        for i, turn in enumerate(session_data["turns"], 1):
            f.write(f"[Turn #{i}]: {turn}\n")

    print(f"Transcript saved to {output_file}")


# --- WebSocket Event Handlers ---

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_file():
        chunk_duration = 0.05  # 50ms chunks
        audio_player = None

        if PLAY_AUDIO:
            try:
                import pyaudio
                p = pyaudio.PyAudio()
                with wave.open(AUDIO_FILE, 'rb') as wav_file:
                    audio_player = p.open(
                        format=p.get_format_from_width(wav_file.getsampwidth()),
                        channels=wav_file.getnchannels(),
                        rate=wav_file.getframerate(),
                        output=True
                    )
            except ImportError:
                print("Warning: pyaudio not installed. Audio playback disabled.", file=sys.stderr)
                print("Install with: pip install pyaudio", file=sys.stderr)

        try:
            with wave.open(AUDIO_FILE, 'rb') as wav_file:
                frames_per_chunk = int(SAMPLE_RATE * chunk_duration)

                while not stop_event.is_set():
                    frames = wav_file.readframes(frames_per_chunk)
                    if not frames:
                        break

                    if audio_player:
                        audio_player.write(frames)
                    else:
                        time.sleep(chunk_duration)

                    ws.send(frames, websocket.ABNF.OPCODE_BINARY)
        finally:
            if audio_player:
                audio_player.stop_stream()
                audio_player.close()
                p.terminate()

        # All audio sent - terminate the session
        print("File streaming complete. Waiting for final transcripts...")
        try:
            ws.send(json.dumps({"type": "Terminate"}))
        except Exception:
            pass

    global audio_thread
    audio_thread = threading.Thread(target=stream_file)
    audio_thread.daemon = True
    audio_thread.start()


def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_data["session_id"] = data.get('id')
            print(f"Session ID: {data.get('id')}\n")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if not transcript:
                return

            if data.get('end_of_turn'):
                print(f"[Final]: {transcript}\n")
                session_data["turns"].append(transcript)
            else:
                print(f"[Partial]: {transcript}")
        elif msg_type == "Termination":
            session_data["audio_duration_seconds"] = data.get('audio_duration_seconds', 0)
            print(f"Session terminated: {data.get('audio_duration_seconds', 0)} seconds of audio processed")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")


def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    stop_event.set()

    if SAVE_TRANSCRIPT_TO_FILE:
        save_transcript()

    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)


# --- Main Execution ---
def run():
    global ws_app

    # Validate audio file before connecting
    validate_audio_file(AUDIO_FILE, SAMPLE_RATE)

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                ws_app.send(json.dumps({"type": "Terminate"}))
                time.sleep(2)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()

Step-by-step guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Install and import packages

Install the required packages. PyAudio is optional — only needed for audio playback during streaming.
pip install websocket-client
pip install pyaudio
Import packages.
import websocket
import json
import os
import threading
import time
import wave
import sys
from urllib.parse import urlencode
from pathlib import Path

Configure settings

Set your ASSEMBLYAI_API_KEY environment variable. Set AUDIO_FILE to the relative or absolute path of your audio file, and set SAMPLE_RATE to match your file’s sample rate.
ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
AUDIO_FILE = "audio.wav"  # Path to your audio file
SAMPLE_RATE = 48000  # Change to match the sample rate of your audio file
SAVE_TRANSCRIPT_TO_FILE = True  # Set to False to disable saving transcript to file
PLAY_AUDIO = True  # Set to False to disable audio playback

CONNECTION_PARAMS = {
    "speech_model": "u3-rt-pro",
    "sample_rate": SAMPLE_RATE,
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

# Global variables
ws_app = None
audio_thread = None
stop_event = threading.Event()

# Track session data for output file
session_data = {
    "session_id": None,
    "audio_file": AUDIO_FILE,
    "audio_duration_seconds": None,
    "turns": []
}

Helper functions

The following helper functions are used to validate audio files and save the transcript output:
  • validate_audio_file() - Validates that the audio file is a mono WAV file with the expected sample rate.
  • save_transcript() - Saves the transcript to a text file after the session ends.
def validate_audio_file(filepath, sample_rate):
    """Validate audio file before streaming."""
    file_ext = Path(filepath).suffix.lower()
    if file_ext != ".wav":
        print(f"Error: Only WAV files are supported. Got: {file_ext}", file=sys.stderr)
        print(f"Convert your file to WAV using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
        sys.exit(1)

    with wave.open(filepath, 'rb') as wav_file:
        if wav_file.getnchannels() != 1:
            print("Error: Only mono audio is supported", file=sys.stderr)
            print(f"Convert your file to mono using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)

        file_sample_rate = wav_file.getframerate()
        if file_sample_rate != sample_rate:
            print(f"Error: File sample rate ({file_sample_rate}) doesn't match expected rate ({sample_rate})", file=sys.stderr)
            print(f"Either update SAMPLE_RATE to {file_sample_rate}, or convert your file using: ffmpeg -i {filepath} -ar {sample_rate} -ac 1 output.wav", file=sys.stderr)
            sys.exit(1)


def save_transcript():
    """Save the transcript to a file in the same directory as the script."""
    audio_name = Path(session_data["audio_file"]).stem
    session_id = session_data["session_id"] or "unknown"
    output_file = f"{audio_name}_{session_id}.txt"

    with open(output_file, "w") as f:
        f.write(f"AssemblyAI Session ID: {session_data['session_id']}\n")
        f.write(f"Audio file: {session_data['audio_file']}\n")
        f.write(f"Audio duration: {session_data['audio_duration_seconds']} seconds\n")
        f.write("See all available parameters and defaults at https://www.assemblyai.com/docs/api-reference/streaming-api/universal-3-pro-streaming#request.query\n\n")
        f.write("\nTranscription Output\n")
        for i, turn in enumerate(session_data["turns"], 1):
            f.write(f"[Turn #{i}]: {turn}\n")

    print(f"Transcript saved to {output_file}")

WebSocket event handlers

Open WebSocket and stream audio file

When the connection opens, we start a background thread that reads the WAV file in 50ms chunks and sends them over the WebSocket. If PLAY_AUDIO is enabled, the audio is also played through your speakers.
def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    def stream_file():
        chunk_duration = 0.05  # 50ms chunks
        audio_player = None

        if PLAY_AUDIO:
            try:
                import pyaudio
                p = pyaudio.PyAudio()
                with wave.open(AUDIO_FILE, 'rb') as wav_file:
                    audio_player = p.open(
                        format=p.get_format_from_width(wav_file.getsampwidth()),
                        channels=wav_file.getnchannels(),
                        rate=wav_file.getframerate(),
                        output=True
                    )
            except ImportError:
                print("Warning: pyaudio not installed. Audio playback disabled.", file=sys.stderr)
                print("Install with: pip install pyaudio", file=sys.stderr)

        try:
            with wave.open(AUDIO_FILE, 'rb') as wav_file:
                frames_per_chunk = int(SAMPLE_RATE * chunk_duration)

                while not stop_event.is_set():
                    frames = wav_file.readframes(frames_per_chunk)
                    if not frames:
                        break

                    if audio_player:
                        audio_player.write(frames)
                    else:
                        time.sleep(chunk_duration)

                    ws.send(frames, websocket.ABNF.OPCODE_BINARY)
        finally:
            if audio_player:
                audio_player.stop_stream()
                audio_player.close()
                p.terminate()

        # All audio sent - terminate the session
        print("File streaming complete. Waiting for final transcripts...")
        try:
            ws.send(json.dumps({"type": "Terminate"}))
        except Exception:
            pass

    global audio_thread
    audio_thread = threading.Thread(target=stream_file)
    audio_thread.daemon = True
    audio_thread.start()

Handle WebSocket messages

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_data["session_id"] = data.get('id')
            print(f"Session ID: {data.get('id')}\n")
        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if not transcript:
                return

            if data.get('end_of_turn'):
                print(f"[Final]: {transcript}\n")
                session_data["turns"].append(transcript)
            else:
                print(f"[Partial]: {transcript}")
        elif msg_type == "Termination":
            session_data["audio_duration_seconds"] = data.get('audio_duration_seconds', 0)
            print(f"Session terminated: {data.get('audio_duration_seconds', 0)} seconds of audio processed")
    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

WebSocket error and close handlers

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()


def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    stop_event.set()

    if SAVE_TRANSCRIPT_TO_FILE:
        save_transcript()

    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)

Connect and stream the file

def run():
    global ws_app

    # Validate audio file before connecting
    validate_audio_file(AUDIO_FILE, SAMPLE_RATE)

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": ASSEMBLYAI_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                ws_app.send(json.dumps({"type": "Terminate"}))
                time.sleep(2)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        print("Cleanup complete. Exiting.")


if __name__ == "__main__":
    run()
The session will terminate once the file is finished streaming. If SAVE_TRANSCRIPT_TO_FILE is enabled (default), the transcript will be saved to {audio_filename}_{session_id}.txt in the current working directory.
The AUDIO_FILE path can be either relative (e.g., audio.wav) or absolute (e.g., /path/to/audio.wav).

Example output

Here’s an example of what the console output looks like when streaming an audio file:
WebSocket connection opened.
Connected to: wss://streaming.assemblyai.com/v3/ws?speech_model=u3-rt-pro&sample_rate=48000

Session ID: f37d7c4e-6be9-47ed-b6fc-7600fc78e34d

[Partial]: the
[Partial]: the quick
[Partial]: the quick brown
[Partial]: the quick brown fox
[Partial]: the quick brown fox jumps
[Partial]: the quick brown fox jumps over
[Partial]: the quick brown fox jumps over the
[Partial]: the quick brown fox jumps over the lazy
[Partial]: The quick brown fox jumps over the lazy dog
[Final]: The quick brown fox jumps over the lazy dog.

[Partial]: It
[Partial]: It is
[Partial]: It is a
[Partial]: It is a common
[Partial]: It is a common typing
[Partial]: It is a common typing test
[Final]: It is a common typing test.

File streaming complete. Waiting for final transcripts...
Session terminated: 7.52 seconds of audio processed

WebSocket Disconnected: Status=1000, Msg=None
Transcript saved to audio_f37d7c4e-6be9-47ed-b6fc-7600fc78e34d.txt
Cleanup complete. Exiting.
The output shows:
  • Partial transcripts: Real-time updates as words are recognized
  • Final: The complete turn with proper capitalization and punctuation