Documentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
Supported models
Supported models
Streaming Diarization
Streaming Diarization lets you identify and label individual speakers in real time directly from the Streaming API. EachTurn event includes a speaker_label
field (e.g. A, B) indicating the dominant speaker for that turn. Each final
word in the words array also carries a speaker field, enabling mid-turn speaker
change detection. Speaker accuracy improves over the course of a session as the model
accumulates embedding context — so the longer the conversation, the better the labels.
Already using AssemblyAI streaming?You can enable Streaming Diarization by adding
speaker_labels: true to your
connection parameters. No other changes are required — the speaker_label field
will appear on every Turn event, and each final word in the words array will
include a speaker field automatically.Quickstart
Get started with Streaming Diarization using the code below. This example streams audio from your microphone and prints each turn with its speaker label.- Python
- Python SDK
- JavaScript
- JavaScript SDK
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
YOUR_API_KEY = "<YOUR_API_KEY>"
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
"speaker_labels": "true",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
FRAMES_PER_BUFFER = 800
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
def on_open(ws):
print("WebSocket connection opened.")
def stream_audio():
global stream
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "Begin":
print(f"Session began: ID={data.get('id')}")
elif msg_type == "Turn":
turn_speaker = data.get("speaker_label") or "UNKNOWN"
end_of_turn = data.get("end_of_turn", False)
if end_of_turn:
words = data.get("words", [])
segments, current_speaker, current_words = [], None, []
for word in words:
spk = word["speaker"] if word.get("speaker") is not None else turn_speaker
if spk != current_speaker:
if current_words:
segments.append(f"[{current_speaker}] {' '.join(current_words)}")
current_speaker, current_words = spk, [word["text"]]
else:
current_words.append(word["text"])
if current_words:
segments.append(f"[{current_speaker}] {' '.join(current_words)}")
output = " ".join(segments) or f"[{turn_speaker}] {data.get('transcript', '')}"
print(f"\r{' ' * 80}\r{output}")
else:
print(f"\r[{turn_speaker}] {data.get('transcript', '')}", end="")
elif msg_type == "Termination":
print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
print(f"\nWebSocket Disconnected: Status={close_status_code}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
if audio:
audio.terminate()
def run():
global audio, stream, ws_app
audio = pyaudio.PyAudio()
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Speak into your microphone. Press Ctrl+C to stop.")
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nStopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
ws_app.send(json.dumps({"type": "Terminate"}))
time.sleep(2)
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
if __name__ == "__main__":
run()
import logging
from typing import Type
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
TurnEvent,
TerminationEvent,
)
api_key = "<YOUR_API_KEY>"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def on_begin(self: Type[StreamingClient], event: BeginEvent):
print(f"Session started: {event.id}")
def on_turn(self: Type[StreamingClient], event: TurnEvent):
turn_speaker = event.speaker_label or "UNKNOWN"
if event.end_of_turn:
print(f"\r{' ' * 80}\r[{turn_speaker}] {event.transcript}")
else:
print(f"\r[{turn_speaker}] {event.transcript}", end="")
def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
print(
f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
)
def on_error(self: Type[StreamingClient], error: StreamingError):
print(f"Error occurred: {error}")
def main():
client = StreamingClient(
StreamingClientOptions(
api_key=api_key,
api_host="streaming.assemblyai.com",
)
)
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)
client.connect(
StreamingParameters(
sample_rate=16000,
speech_model="u3-rt-pro",
speaker_labels=True,
)
)
try:
client.stream(
aai.extras.MicrophoneStream(sample_rate=16000)
)
finally:
client.disconnect(terminate=True)
if __name__ == "__main__":
main()
const WebSocket = require("ws");
const mic = require("mic");
const querystring = require("querystring");
const YOUR_API_KEY = "<YOUR_API_KEY>";
const CONNECTION_PARAMS = {
sample_rate: 16000,
speech_model: "u3-rt-pro",
speaker_labels: true,
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${querystring.stringify(CONNECTION_PARAMS)}`;
const SAMPLE_RATE = CONNECTION_PARAMS.sample_rate;
let micInstance = null;
let ws = null;
function run() {
console.log("Starting AssemblyAI streaming diarization...");
ws = new WebSocket(API_ENDPOINT, {
headers: { Authorization: YOUR_API_KEY },
});
ws.on("open", () => {
console.log("WebSocket connection opened.");
micInstance = mic({
rate: String(SAMPLE_RATE),
channels: "1",
bitwidth: "16",
encoding: "signed-integer",
endian: "little",
});
const micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
micInstance.start();
console.log("Speak into your microphone. Press Ctrl+C to stop.");
});
ws.on("message", (data) => {
try {
const msg = JSON.parse(data);
if (msg.type === "Begin") {
console.log(`Session began: ID=${msg.id}`);
} else if (msg.type === "Turn") {
const turnSpeaker = msg.speaker_label || "UNKNOWN";
if (msg.end_of_turn) {
const words = msg.words || [];
const segments = [];
let currentSpeaker = null, currentWords = [];
for (const word of words) {
const spk = word.speaker != null ? word.speaker : turnSpeaker;
if (spk !== currentSpeaker) {
if (currentWords.length) segments.push(`[${currentSpeaker}] ${currentWords.join(" ")}`);
currentSpeaker = spk;
currentWords = [word.text];
} else {
currentWords.push(word.text);
}
}
if (currentWords.length) segments.push(`[${currentSpeaker}] ${currentWords.join(" ")}`);
const output = segments.length ? segments.join(" ") : `[${turnSpeaker}] ${msg.transcript || ""}`;
process.stdout.write("\r" + " ".repeat(80) + "\r");
console.log(output);
} else {
process.stdout.write(`\r[${turnSpeaker}] ${msg.transcript || ""}`);
}
} else if (msg.type === "Termination") {
console.log(
`\nSession terminated: ${msg.audio_duration_seconds}s of audio`
);
}
} catch (e) {
console.error("Error parsing message:", e);
}
});
ws.on("error", (error) => {
console.error("WebSocket error:", error);
});
ws.on("close", (code, reason) => {
console.log(`WebSocket closed: ${code}`);
if (micInstance) micInstance.stop();
});
process.on("SIGINT", () => {
console.log("\nStopping...");
if (micInstance) micInstance.stop();
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "Terminate" }));
setTimeout(() => ws.close(), 2000);
}
});
}
run();
Install the required libraries
npm install assemblyai node-record-lpcm16
The module For most linux distros:For Windows:
download the binaries
node-record-lpcm16 requires SoX and it must be available in your $PATH.
For Mac OS:brew install sox
sudo apt-get install sox libsox-fmt-all
import { Readable } from "stream";
import { AssemblyAI } from "assemblyai";
import recorder from "node-record-lpcm16";
const run = async () => {
const client = new AssemblyAI({
apiKey: "<YOUR_API_KEY>",
});
const transcriber = client.streaming.transcriber({
sampleRate: 16_000,
speechModel: "u3-rt-pro",
speakerLabels: true,
});
transcriber.on("open", ({ id }) => {
console.log(`Session opened with ID: ${id}`);
});
transcriber.on("error", (error) => {
console.error("Error:", error);
});
transcriber.on("close", (code, reason) =>
console.log("Session closed:", code, reason)
);
transcriber.on("turn", (turn) => {
if (!turn.transcript) return;
const speaker = turn.speakerLabel ?? "UNKNOWN";
console.log(`[${speaker}] ${turn.transcript}`);
});
try {
console.log("Connecting to streaming transcript service");
await transcriber.connect();
console.log("Starting recording");
const recording = recorder.record({
channels: 1,
sampleRate: 16_000,
audioType: "wav",
});
Readable.toWeb(recording.stream()).pipeTo(transcriber.stream());
process.on("SIGINT", async function () {
console.log();
console.log("Stopping recording");
recording.stop();
console.log("Closing streaming transcript connection");
await transcriber.close();
process.exit();
});
} catch (error) {
console.error(error);
}
};
run();
Configuration
Enable Streaming Diarization by addingspeaker_labels: true to your connection
parameters. You can optionally cap the number of speakers with max_speakers.
| Parameter | Type | Default | Description |
|---|---|---|---|
speaker_labels | boolean | false | Set to true to enable real-time speaker diarization. |
max_speakers | integer | — | Optional. Hint the maximum number of speakers expected (1–10). Setting this accurately can improve assignment accuracy when you know the speaker count in advance. |
{
"speech_model": "u3-rt-pro",
"speaker_labels": true,
"max_speakers": 2
}
Diarization is supported on all streaming models:
u3-rt-pro,
universal-streaming-english, universal-streaming-multilingual, and
whisper-rt. You do not need to change your speech model to use it — just
add speaker_labels: true.Reading speaker labels
When diarization is enabled, everyTurn event includes a speaker_label field
reflecting the dominant speaker for that turn.
{
"type": "Turn",
"transcript": "Good morning, thanks for joining the call.",
"speaker_label": "A",
"end_of_turn": true,
"turn_is_formatted": true
}
Word-level speaker labels
Each final word in thewords array also carries a speaker field. This allows
you to detect speaker changes within a single turn — for example, a turn where one
speaker finishes another’s sentence, or where a brief interjection appears mid-turn.
{
"type": "Turn",
"transcript": "Yeah. Different things the way they are said here than in contrast to the way they'd be said in other countries. Yeah. Your Colombian Spanish won't work. Uh, no. And she said she could— when I—",
"speaker_label": "A",
"end_of_turn": true,
"words": [
{ "text": "Yeah.", "speaker": "UNKNOWN", "word_is_final": true, "start": 0, "end": 96, "confidence": 0.204160 },
{ "text": "Different", "speaker": "A", "word_is_final": true, "start": 145, "end": 290, "confidence": 0.844642 },
{ "text": "things", "speaker": "A", "word_is_final": true, "start": 306, "end": 516, "confidence": 0.998971 },
{ "text": "the", "speaker": "A", "word_is_final": true, "start": 581, "end": 613, "confidence": 0.807043 },
{ "text": "way", "speaker": "A", "word_is_final": true, "start": 662, "end": 694, "confidence": 0.999722 },
{ "text": "they", "speaker": "A", "word_is_final": true, "start": 840, "end": 920, "confidence": 0.995030 },
{ "text": "are", "speaker": "A", "word_is_final": true, "start": 985, "end": 1017, "confidence": 0.867126 },
{ "text": "said", "speaker": "A", "word_is_final": true, "start": 1082, "end": 1260, "confidence": 0.990436 },
{ "text": "here", "speaker": "A", "word_is_final": true, "start": 1308, "end": 1502, "confidence": 0.999566 },
{ "text": "than", "speaker": "A", "word_is_final": true, "start": 1873, "end": 2003, "confidence": 0.618784 },
{ "text": "in", "speaker": "A", "word_is_final": true, "start": 2116, "end": 2229, "confidence": 0.955239 },
{ "text": "contrast", "speaker": "A", "word_is_final": true, "start": 2277, "end": 2568, "confidence": 0.998716 },
{ "text": "to", "speaker": "A", "word_is_final": true, "start": 2617, "end": 2714, "confidence": 0.992622 },
{ "text": "the", "speaker": "A", "word_is_final": true, "start": 2778, "end": 2810, "confidence": 0.996170 },
{ "text": "way", "speaker": "A", "word_is_final": true, "start": 2859, "end": 2956, "confidence": 0.999566 },
{ "text": "they'd", "speaker": "A", "word_is_final": true, "start": 3020, "end": 3214, "confidence": 0.844162 },
{ "text": "be", "speaker": "A", "word_is_final": true, "start": 3263, "end": 3295, "confidence": 0.998969 },
{ "text": "said", "speaker": "A", "word_is_final": true, "start": 3424, "end": 3602, "confidence": 0.994370 },
{ "text": "in", "speaker": "A", "word_is_final": true, "start": 3650, "end": 3683, "confidence": 0.999225 },
{ "text": "other", "speaker": "A", "word_is_final": true, "start": 3747, "end": 3861, "confidence": 0.999323 },
{ "text": "countries.", "speaker": "A", "word_is_final": true, "start": 3974, "end": 4281, "confidence": 0.868172 },
{ "text": "Yeah.", "speaker": "UNKNOWN", "word_is_final": true, "start": 4458, "end": 4636, "confidence": 0.656062 },
{ "text": "Your", "speaker": "B", "word_is_final": true, "start": 5040, "end": 5088, "confidence": 0.856220 },
{ "text": "Colombian", "speaker": "B", "word_is_final": true, "start": 5121, "end": 5638, "confidence": 0.962598 },
{ "text": "Spanish", "speaker": "B", "word_is_final": true, "start": 5638, "end": 6090, "confidence": 0.999557 },
{ "text": "won't", "speaker": "B", "word_is_final": true, "start": 6154, "end": 6284, "confidence": 0.999431 },
{ "text": "work.", "speaker": "B", "word_is_final": true, "start": 6332, "end": 6445, "confidence": 0.589761 },
{ "text": "Uh,", "speaker": "A", "word_is_final": true, "start": 6736, "end": 6752, "confidence": 0.343677 },
{ "text": "no.", "speaker": "A", "word_is_final": true, "start": 7673, "end": 7867, "confidence": 0.728975 },
{ "text": "And", "speaker": "A", "word_is_final": true, "start": 8820, "end": 8869, "confidence": 0.891464 },
{ "text": "she", "speaker": "A", "word_is_final": true, "start": 8901, "end": 9014, "confidence": 0.992945 },
{ "text": "said", "speaker": "A", "word_is_final": true, "start": 9079, "end": 9240, "confidence": 0.999052 },
{ "text": "she", "speaker": "A", "word_is_final": true, "start": 9305, "end": 9402, "confidence": 0.902266 },
{ "text": "could—", "speaker": "A", "word_is_final": true, "start": 9466, "end": 9579, "confidence": 0.605757 },
{ "text": "when", "speaker": "A", "word_is_final": true, "start": 9644, "end": 9757, "confidence": 0.827706 },
{ "text": "I—", "speaker": "A", "word_is_final": true, "start": 9870, "end": 9886, "confidence": 0.437470 }
]
}
speaker:
- Final words only. The
speakerfield only appears on words whereword_is_final: true. Non-final (in-progress) words never carry it. speakercan be absent on individual words. If the field is missing from a word entirely, treat that word as unattributed and fall back to the turn-levelspeaker_labelif you need a label. Absent means the field is omitted from the JSON — it will never benull.UNKNOWNat word level means the model couldn’t confidently attribute that word to any specific speaker — common for short backchannels (“uh huh”, “yeah”) or brief low-quality audio segments. It is not an ambiguity flag between two known speakers; words in a confidently-attributed stretch carry the speaker’s letter, notUNKNOWN.
speaker_label
will be set to "UNKNOWN". This is because the model needs at least ~1 second of
audio to generate a reliable diarization embedding — without enough audio, embeddings
may be inaccurate and could lead to a single speaker being labeled as multiple
speakers. Labeling short turns as "UNKNOWN" ensures that speaker labels remain
as accurate as possible.
{
"type": "Turn",
"transcript": "Hello?",
"speaker_label": "UNKNOWN",
"end_of_turn": true,
"turn_is_formatted": true
}
[A] Good morning, thanks for joining the call.
[B] Good morning. Happy to be here.
[A] So let's start with a quick overview of the project timeline.
[B] Sure. We're currently on track for the March deadline.
[A] Great. And how's the team handling the workload?
[C] It's been busy, but manageable. We brought on two new engineers last week.
How speaker accuracy improves over time
Streaming Diarization builds a speaker profile incrementally as audio flows in. In practice this means:- Early in a session, speaker assignments may be less stable, especially if the first few turns are short.
- As the session progresses, the model accumulates richer speaker embeddings and assignments become more consistent.
Known limitations
Real-time diarization is an inherently harder problem than diarization for async transcription on pre-recorded audio. The following limitations apply to the current beta:- Short utterances — Turns with less than ~1 second of audio are labeled
as
"UNKNOWN"because there is insufficient audio to generate a reliable speaker embedding. This prevents inaccurate embeddings from causing a single speaker to be split across multiple labels. - Overlapping speech — When two speakers talk simultaneously, the model cannot split the audio and will assign the turn to a single speaker. Performance degrades with frequent cross-talk.
- Session start accuracy — The first 1–2 turns of a session may be misassigned because the model has not yet built up speaker profiles. This self-corrects quickly in practice.
- Noisy environments — Background noise and microphone bleed between speakers can reduce embedding quality and lead to more frequent misassignments.
For the best results, use a microphone setup that minimizes cross-talk and
background noise, and ensure each speaker produces at least a few complete
sentences before you rely on per-turn labels for downstream processing.
Supported models
| Model | speech_model value | Diarization supported |
|---|---|---|
| Universal-3 Pro Streaming | u3-rt-pro | ✓ |
| Universal Streaming (English) | universal-streaming-english | ✓ |
| Universal Streaming (Multilingual) | universal-streaming-multilingual | ✓ |
| Whisper Streaming | whisper-rt | ✓ |
Multichannel streaming audio
To transcribe multichannel streaming audio, we recommend creating a separate session for each channel. This approach allows you to maintain clear speaker separation and get accurate diarized transcriptions for conversations, phone calls, or interviews where speakers are recorded on two different channels. The following code example demonstrates how to transcribe a dual-channel audio file with diarized, speaker-separated transcripts. This same approach can be applied to any multi-channel audio stream, including those with more than two channels.- Python
- Python SDK
- JavaScript
- JavaScript SDK
Use this complete script to transcribe dual-channel audio with speaker separation:
import websocket
import json
import threading
import numpy as np
import wave
import time
import pyaudio
from urllib.parse import urlencode
# Configuration
YOUR_API_KEY = "<YOUR_API_KEY>"
AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>"
API_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_PARAMS = {
"sample_rate": 8000,
"format_turns": "true",
"end_of_turn_confidence_threshold": 0.4,
"min_turn_silence": 160,
"max_turn_silence": 400,
}
# Build API endpoint with URL encoding
API_ENDPOINT = f"{API_BASE_URL}?{urlencode(API_PARAMS)}"
class ChannelTranscriber:
def __init__(self, channel_id, channel_name):
self.channel_id = channel_id
self.channel_name = channel_name
self.ws_app = None
self.audio_data = []
self.current_turn_line = None
self.line_count = 0
def load_audio_channel(self):
"""Extract single channel from dual-channel audio file."""
with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
frames = wf.readframes(wf.getnframes())
audio_array = np.frombuffer(frames, dtype=np.int16)
if wf.getnchannels() == 2:
audio_array = audio_array.reshape(-1, 2)
channel_audio = audio_array[:, self.channel_id]
# Split into chunks for streaming
FRAMES_PER_BUFFER = 400 # 50ms chunks
for i in range(0, len(channel_audio), FRAMES_PER_BUFFER):
chunk = channel_audio[i:i+FRAMES_PER_BUFFER]
if len(chunk) < FRAMES_PER_BUFFER:
chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant')
self.audio_data.append(chunk.astype(np.int16).tobytes())
def on_open(self, ws):
"""Stream audio data when connection opens."""
def stream_audio():
for chunk in self.audio_data:
ws.send(chunk, websocket.ABNF.OPCODE_BINARY)
time.sleep(0.05) # 50ms intervals
# Send termination message
terminate_message = {"type": "Terminate"}
ws.send(json.dumps(terminate_message))
threading.Thread(target=stream_audio, daemon=True).start()
def clear_current_line(self):
if self.current_turn_line is not None:
print("\r" + " " * 100 + "\r", end="", flush=True)
def print_partial_transcript(self, words):
self.clear_current_line()
# Build transcript from individual words
word_texts = [word.get('text', '') for word in words]
transcript = ' '.join(word_texts)
partial_text = f"{self.channel_name}: {transcript}"
print(partial_text, end="", flush=True)
self.current_turn_line = len(partial_text)
def print_final_transcript(self, transcript):
self.clear_current_line()
final_text = f"{self.channel_name}: {transcript}"
print(final_text, flush=True)
self.current_turn_line = None
self.line_count += 1
def on_message(self, ws, message):
"""Handle transcription results."""
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Turn":
transcript = data.get('transcript', '').strip()
words = data.get('words', [])
if transcript or words:
if data.get('end_of_turn'):
self.print_final_transcript(transcript)
else:
self.print_partial_transcript(words)
def start_transcription(self):
self.load_audio_channel()
self.ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=self.on_open,
on_message=self.on_message,
)
thread = threading.Thread(target=self.ws_app.run_forever, daemon=True)
thread.start()
return thread
def play_audio_file():
try:
with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
p = pyaudio.PyAudio()
stream = p.open(
format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True
)
print(f"Playing audio: {AUDIO_FILE_PATH}")
# Play audio in chunks
chunk_size = 1024
data = wf.readframes(chunk_size)
while data:
stream.write(data)
data = wf.readframes(chunk_size)
stream.stop_stream()
stream.close()
p.terminate()
print("Audio playback finished")
except Exception as e:
print(f"Error playing audio: {e}")
def transcribe_multichannel():
# Create transcribers for each channel
transcriber_1 = ChannelTranscriber(0, "Speaker 1")
transcriber_2 = ChannelTranscriber(1, "Speaker 2")
# Start audio playback
audio_thread = threading.Thread(target=play_audio_file, daemon=True)
audio_thread.start()
# Start both transcriptions
thread_1 = transcriber_1.start_transcription()
thread_2 = transcriber_2.start_transcription()
# Wait for completion
thread_1.join()
thread_2.join()
audio_thread.join()
if __name__ == "__main__":
transcribe_multichannel()
Use this complete script to transcribe dual-channel audio with speaker separation:
import logging
from typing import Type
import threading
import time
import wave
import numpy as np
import pyaudio
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
TerminationEvent,
TurnEvent,
)
# Configuration
API_KEY = "<YOUR_API_KEY>"
AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChannelTranscriber:
def __init__(self, channel_id, channel_name, sample_rate):
self.channel_id = channel_id
self.channel_name = channel_name
self.sample_rate = sample_rate
self.client = None
self.audio_data = []
self.current_turn_line = None
self.line_count = 0
self.streaming_done = threading.Event()
def load_audio_channel(self):
"""Extract single channel from dual-channel audio file."""
with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
frames = wf.readframes(wf.getnframes())
audio_array = np.frombuffer(frames, dtype=np.int16)
if wf.getnchannels() == 2:
audio_array = audio_array.reshape(-1, 2)
channel_audio = audio_array[:, self.channel_id]
# Split into chunks for streaming
FRAMES_PER_BUFFER = 400 # 50ms chunks
for i in range(0, len(channel_audio), FRAMES_PER_BUFFER):
chunk = channel_audio[i:i+FRAMES_PER_BUFFER]
if len(chunk) < FRAMES_PER_BUFFER:
chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant')
self.audio_data.append(chunk.astype(np.int16).tobytes())
def clear_current_line(self):
if self.current_turn_line is not None:
print("\r" + " " * 100 + "\r", end="", flush=True)
def print_partial_transcript(self, words):
self.clear_current_line()
# Build transcript from individual words
word_texts = [word.text for word in words]
transcript = ' '.join(word_texts)
partial_text = f"{self.channel_name}: {transcript}"
print(partial_text, end="", flush=True)
self.current_turn_line = len(partial_text)
def print_final_transcript(self, transcript):
self.clear_current_line()
final_text = f"{self.channel_name}: {transcript}"
print(final_text, flush=True)
self.current_turn_line = None
self.line_count += 1
def on_begin(self, client: Type[StreamingClient], event: BeginEvent):
"""Called when the streaming session begins."""
pass # Session started
def on_turn(self, client: Type[StreamingClient], event: TurnEvent):
"""Called when a turn is received."""
transcript = event.transcript.strip() if event.transcript else ''
words = event.words if event.words else []
if transcript or words:
if event.end_of_turn:
self.print_final_transcript(transcript)
else:
self.print_partial_transcript(words)
def on_terminated(self, client: Type[StreamingClient], event: TerminationEvent):
"""Called when the session is terminated."""
self.clear_current_line()
self.streaming_done.set()
def on_error(self, client: Type[StreamingClient], error: StreamingError):
"""Called when an error occurs."""
print(f"\n{self.channel_name}: Error: {error}")
self.streaming_done.set()
def start_transcription(self):
"""Start the transcription for this channel."""
self.load_audio_channel()
# Create streaming client
self.client = StreamingClient(
StreamingClientOptions(
api_key=API_KEY,
api_host="streaming.assemblyai.com",
)
)
# Register event handlers
self.client.on(StreamingEvents.Begin, self.on_begin)
self.client.on(StreamingEvents.Turn, self.on_turn)
self.client.on(StreamingEvents.Termination, self.on_terminated)
self.client.on(StreamingEvents.Error, self.on_error)
# Connect to streaming service with turn detection configuration
self.client.connect(
StreamingParameters(
sample_rate=self.sample_rate,
format_turns=True,
end_of_turn_confidence_threshold=0.4,
min_turn_silence=160,
max_turn_silence=400,
)
)
# Create audio generator
def audio_generator():
for chunk in self.audio_data:
yield chunk
time.sleep(0.05) # 50ms intervals
try:
# Stream audio
self.client.stream(audio_generator())
finally:
# Disconnect
self.client.disconnect(terminate=True)
self.streaming_done.set()
def start_transcription_thread(self):
"""Start transcription in a separate thread."""
thread = threading.Thread(target=self.start_transcription, daemon=True)
thread.start()
return thread
def play_audio_file():
try:
with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
p = pyaudio.PyAudio()
stream = p.open(
format=p.get_format_from_width(wf.getsampwidth()),
channels=wf.getnchannels(),
rate=wf.getframerate(),
output=True
)
print(f"Playing audio: {AUDIO_FILE_PATH}")
# Play audio in chunks
chunk_size = 1024
data = wf.readframes(chunk_size)
while data:
stream.write(data)
data = wf.readframes(chunk_size)
stream.stop_stream()
stream.close()
p.terminate()
print("Audio playback finished")
except Exception as e:
print(f"Error playing audio: {e}")
def transcribe_multichannel():
# Get sample rate from file
with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
sample_rate = wf.getframerate()
# Create transcribers for each channel
transcriber_1 = ChannelTranscriber(0, "Speaker 1", sample_rate)
transcriber_2 = ChannelTranscriber(1, "Speaker 2", sample_rate)
# Start audio playback
audio_thread = threading.Thread(target=play_audio_file, daemon=True)
audio_thread.start()
# Start both transcriptions
thread_1 = transcriber_1.start_transcription_thread()
thread_2 = transcriber_2.start_transcription_thread()
# Wait for completion
thread_1.join()
thread_2.join()
audio_thread.join()
if __name__ == "__main__":
transcribe_multichannel()
Use this complete script to transcribe dual-channel audio with speaker separation:
const WebSocket = require("ws");
const fs = require("fs");
const { spawn } = require("child_process");
// Configuration
const YOUR_API_KEY = "<YOUR_API_KEY>";
const AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>";
const API_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_PARAMS = {
sample_rate: 8000,
format_turns: "true",
end_of_turn_confidence_threshold: 0.4,
min_turn_silence: 160,
max_turn_silence: 400,
};
// Build API endpoint with URL encoding
const queryString = new URLSearchParams(API_PARAMS).toString();
const API_ENDPOINT = `${API_BASE_URL}?${queryString}`;
// Simple WAV file parser
class SimpleWavParser {
constructor(filePath) {
this.buffer = fs.readFileSync(filePath);
this.parseHeader();
}
parseHeader() {
// Read WAV header
this.channels = this.buffer.readUInt16LE(22);
this.sampleRate = this.buffer.readUInt32LE(24);
this.bitsPerSample = this.buffer.readUInt16LE(34);
// Find data chunk
let dataOffset = 12;
while (dataOffset < this.buffer.length - 8) {
const chunkId = this.buffer.toString("ascii", dataOffset, dataOffset + 4);
const chunkSize = this.buffer.readUInt32LE(dataOffset + 4);
if (chunkId === "data") {
this.dataStart = dataOffset + 8;
this.dataSize = chunkSize;
break;
}
dataOffset += 8 + chunkSize;
}
}
getChannelData(channelIndex) {
if (this.channels !== 2) {
throw new Error("Audio file is not stereo");
}
const bytesPerSample = this.bitsPerSample / 8;
const samplesPerChannel = this.dataSize / (bytesPerSample * this.channels);
const channelData = [];
// Extract samples for the specified channel
for (let i = 0; i < samplesPerChannel; i++) {
const sampleOffset =
this.dataStart + (i * this.channels + channelIndex) * bytesPerSample;
if (this.bitsPerSample === 16) {
const sample = this.buffer.readInt16LE(sampleOffset);
channelData.push(sample);
} else if (this.bitsPerSample === 8) {
const sample = this.buffer.readUInt8(sampleOffset) - 128;
channelData.push(sample * 256); // Convert to 16-bit range
}
}
return channelData;
}
}
class ChannelTranscriber {
constructor(channelId, channelName) {
this.channelId = channelId;
this.channelName = channelName;
this.ws = null;
this.audioData = [];
this.currentTurnLine = null;
this.lineCount = 0;
this.isConnected = false;
}
loadAudioChannel() {
try {
const wavParser = new SimpleWavParser(AUDIO_FILE_PATH);
const channelSamples = wavParser.getChannelData(this.channelId);
// Split into chunks for streaming (50ms chunks at 8000Hz = 400 samples)
const FRAMES_PER_BUFFER = 400;
for (let i = 0; i < channelSamples.length; i += FRAMES_PER_BUFFER) {
const chunkArray = new Int16Array(FRAMES_PER_BUFFER);
// Copy samples and pad if necessary
for (let j = 0; j < FRAMES_PER_BUFFER; j++) {
if (i + j < channelSamples.length) {
chunkArray[j] = channelSamples[i + j];
} else {
chunkArray[j] = 0; // Pad with silence
}
}
// Convert to Buffer (Little Endian)
const buffer = Buffer.from(chunkArray.buffer);
this.audioData.push(buffer);
}
} catch (error) {
throw error;
}
}
clearCurrentLine() {
if (this.currentTurnLine !== null) {
process.stdout.write("\r" + " ".repeat(100) + "\r");
}
}
printPartialTranscript(words) {
this.clearCurrentLine();
// Build transcript from individual words
const wordTexts = words.map((word) => word.text || "");
const transcript = wordTexts.join(" ");
const partialText = `${this.channelName}: ${transcript}`;
process.stdout.write(partialText);
this.currentTurnLine = partialText.length;
}
printFinalTranscript(transcript) {
this.clearCurrentLine();
const finalText = `${this.channelName}: ${transcript}`;
console.log(finalText);
this.currentTurnLine = null;
this.lineCount++;
}
async streamAudio() {
// Wait a bit for connection to stabilize
await new Promise((resolve) => setTimeout(resolve, 100));
for (const chunk of this.audioData) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(chunk, { binary: true });
await new Promise((resolve) => setTimeout(resolve, 50)); // 50ms intervals
} else {
break;
}
}
// Send termination message
if (this.ws.readyState === WebSocket.OPEN) {
const terminateMessage = { type: "Terminate" };
this.ws.send(JSON.stringify(terminateMessage));
}
}
startTranscription() {
return new Promise((resolve, reject) => {
try {
this.loadAudioChannel();
} catch (error) {
reject(error);
return;
}
this.ws = new WebSocket(API_ENDPOINT, {
headers: {
Authorization: YOUR_API_KEY,
},
});
this.ws.on("open", () => {
this.isConnected = true;
// Start streaming audio
this.streamAudio().catch((error) => {});
});
this.ws.on("message", (data) => {
try {
const message = JSON.parse(data.toString());
const msgType = message.type;
if (msgType === "Turn") {
const transcript = (message.transcript || "").trim();
const words = message.words || [];
if (transcript || words.length > 0) {
if (message.end_of_turn) {
this.printFinalTranscript(transcript);
} else {
this.printPartialTranscript(words);
}
}
} else if (msgType === "error") {
console.error(`\n${this.channelName}: API Error:`, message.error);
}
} catch (error) {
// Silently ignore parse errors
}
});
this.ws.on("close", (code, reason) => {
this.clearCurrentLine();
if (code !== 1000 && code !== 1001) {
console.log(`\n${this.channelName}: Connection closed unexpectedly`);
}
this.isConnected = false;
resolve();
});
this.ws.on("error", (error) => {
console.error(`\n${this.channelName} WebSocket error:`, error.message);
this.isConnected = false;
reject(error);
});
});
}
close() {
if (this.ws && this.isConnected) {
this.ws.close();
}
}
}
function playAudioFile() {
return new Promise((resolve) => {
console.log(`Playing audio: ${AUDIO_FILE_PATH}`);
// Use platform-specific audio player
let command;
let args;
if (process.platform === "darwin") {
// macOS
command = "afplay";
args = [AUDIO_FILE_PATH];
} else if (process.platform === "win32") {
// Windows - using PowerShell
command = "powershell";
args = [
"-c",
`(New-Object Media.SoundPlayer '${AUDIO_FILE_PATH}').PlaySync()`,
];
} else {
// Linux - try aplay
command = "aplay";
args = [AUDIO_FILE_PATH];
}
try {
const player = spawn(command, args, {
stdio: ["ignore", "ignore", "ignore"], // Suppress all output from player
});
player.on("close", (code) => {
if (code === 0) {
console.log("Audio playback finished");
}
resolve();
});
player.on("error", (error) => {
// Silently continue without audio
resolve();
});
} catch (error) {
resolve();
}
});
}
async function transcribeMultichannel() {
const transcriber1 = new ChannelTranscriber(0, "Speaker 1");
const transcriber2 = new ChannelTranscriber(1, "Speaker 2");
try {
// Verify API key is set
if (YOUR_API_KEY === "<YOUR_API_KEY>") {
console.error("ERROR: Please set YOUR_API_KEY before running");
process.exit(1);
}
// Verify file exists
if (!fs.existsSync(AUDIO_FILE_PATH)) {
console.error(`ERROR: Audio file not found: ${AUDIO_FILE_PATH}`);
process.exit(1);
}
// Start audio playback (non-blocking)
const audioPromise = playAudioFile();
// Start both transcriptions
const transcriptionPromises = [
transcriber1.startTranscription(),
transcriber2.startTranscription(),
];
// Wait for all to complete
await Promise.all([...transcriptionPromises, audioPromise]);
} catch (error) {
console.error("\nError during transcription:", error.message);
// Clean up
transcriber1.close();
transcriber2.close();
process.exit(1);
}
}
// Handle graceful shutdown
process.on("SIGINT", () => {
console.log("\n"); // Clean line break before exit
process.exit(0);
});
// Main execution
if (require.main === module) {
transcribeMultichannel();
}
Use this complete script to transcribe dual-channel audio with speaker separation:
import { AssemblyAI } from "assemblyai";
import fs from "fs";
import { spawn } from "child_process";
import { Readable } from "stream";
// Configuration
const YOUR_API_KEY = "<YOUR_API_KEY>";
const AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>";
// Simple WAV file parser
class SimpleWavParser {
constructor(filePath) {
this.buffer = fs.readFileSync(filePath);
this.parseHeader();
}
parseHeader() {
// Read WAV header
this.channels = this.buffer.readUInt16LE(22);
this.sampleRate = this.buffer.readUInt32LE(24);
this.bitsPerSample = this.buffer.readUInt16LE(34);
// Find data chunk
let dataOffset = 12;
while (dataOffset < this.buffer.length - 8) {
const chunkId = this.buffer.toString("ascii", dataOffset, dataOffset + 4);
const chunkSize = this.buffer.readUInt32LE(dataOffset + 4);
if (chunkId === "data") {
this.dataStart = dataOffset + 8;
this.dataSize = chunkSize;
break;
}
dataOffset += 8 + chunkSize;
}
}
getChannelData(channelIndex) {
if (this.channels !== 2) {
throw new Error("Audio file is not stereo");
}
const bytesPerSample = this.bitsPerSample / 8;
const samplesPerChannel = this.dataSize / (bytesPerSample * this.channels);
const channelData = [];
// Extract samples for the specified channel
for (let i = 0; i < samplesPerChannel; i++) {
const sampleOffset =
this.dataStart + (i * this.channels + channelIndex) * bytesPerSample;
if (this.bitsPerSample === 16) {
const sample = this.buffer.readInt16LE(sampleOffset);
channelData.push(sample);
} else if (this.bitsPerSample === 8) {
const sample = this.buffer.readUInt8(sampleOffset) - 128;
channelData.push(sample * 256); // Convert to 16-bit range
}
}
return channelData;
}
}
class ChannelTranscriber {
constructor(client, channelId, channelName, sampleRate) {
this.client = client;
this.channelId = channelId;
this.channelName = channelName;
this.sampleRate = sampleRate;
this.transcriber = null;
this.audioData = [];
this.currentTurnLine = null;
this.lineCount = 0;
}
loadAudioChannel() {
try {
const wavParser = new SimpleWavParser(AUDIO_FILE_PATH);
const channelSamples = wavParser.getChannelData(this.channelId);
// Split into chunks for streaming (50ms chunks)
const FRAMES_PER_BUFFER = Math.floor(this.sampleRate * 0.05); // 50ms
for (let i = 0; i < channelSamples.length; i += FRAMES_PER_BUFFER) {
const chunkArray = new Int16Array(FRAMES_PER_BUFFER);
// Copy samples and pad if necessary
for (let j = 0; j < FRAMES_PER_BUFFER; j++) {
if (i + j < channelSamples.length) {
chunkArray[j] = channelSamples[i + j];
} else {
chunkArray[j] = 0; // Pad with silence
}
}
// Convert to Buffer (Little Endian)
const buffer = Buffer.from(chunkArray.buffer);
this.audioData.push(buffer);
}
} catch (error) {
throw error;
}
}
clearCurrentLine() {
if (this.currentTurnLine !== null) {
process.stdout.write("\r" + " ".repeat(100) + "\r");
}
}
printPartialTranscript(words) {
this.clearCurrentLine();
// Build transcript from individual words
const wordTexts = words.map((word) => word.text || "");
const transcript = wordTexts.join(" ");
const partialText = `${this.channelName}: ${transcript}`;
process.stdout.write(partialText);
this.currentTurnLine = partialText.length;
}
printFinalTranscript(transcript) {
this.clearCurrentLine();
const finalText = `${this.channelName}: ${transcript}`;
console.log(finalText);
this.currentTurnLine = null;
this.lineCount++;
}
async startTranscription() {
try {
this.loadAudioChannel();
} catch (error) {
throw error;
}
const turnDetectionConfig = {
endOfTurnConfidenceThreshold: 0.4,
minEndOfTurnSilenceWhenConfident: 160,
maxTurnSilence: 400,
};
// Create transcriber with SDK
this.transcriber = this.client.streaming.transcriber({
sampleRate: this.sampleRate,
formatTurns: true,
...turnDetectionConfig,
});
// Set up event handlers
this.transcriber.on("open", ({ id }) => {
// Session opened
});
this.transcriber.on("error", (error) => {
console.error(`\n${this.channelName}: Error:`, error);
});
this.transcriber.on("close", (code, reason) => {
this.clearCurrentLine();
if (code !== 1000 && code !== 1001) {
console.log(`\n${this.channelName}: Connection closed unexpectedly`);
}
});
this.transcriber.on("turn", (turn) => {
const transcript = (turn.transcript || "").trim();
const words = turn.words || [];
if (transcript || words.length > 0) {
if (turn.end_of_turn) {
this.printFinalTranscript(transcript);
} else {
this.printPartialTranscript(words);
}
}
});
// Connect to the streaming service
await this.transcriber.connect();
// Create a readable stream from audio chunks
const audioStream = new Readable({
async read() {
// This will be controlled by our manual push below
},
});
// Pipe audio stream to transcriber
Readable.toWeb(audioStream).pipeTo(this.transcriber.stream());
// Stream audio data
for (const chunk of this.audioData) {
audioStream.push(chunk);
await new Promise((resolve) => setTimeout(resolve, 50)); // 50ms intervals
}
// Signal end of stream
audioStream.push(null);
// Wait a bit for final transcripts
await new Promise((resolve) => setTimeout(resolve, 1000));
// Close the transcriber
await this.transcriber.close();
}
async close() {
if (this.transcriber) {
await this.transcriber.close();
}
}
}
function playAudioFile() {
return new Promise((resolve) => {
console.log(`Playing audio: ${AUDIO_FILE_PATH}`);
// Use platform-specific audio player
let command;
let args;
if (process.platform === "darwin") {
// macOS
command = "afplay";
args = [AUDIO_FILE_PATH];
} else if (process.platform === "win32") {
// Windows - using PowerShell
command = "powershell";
args = [
"-c",
`(New-Object Media.SoundPlayer '${AUDIO_FILE_PATH}').PlaySync()`,
];
} else {
// Linux - try aplay
command = "aplay";
args = [AUDIO_FILE_PATH];
}
try {
const player = spawn(command, args, {
stdio: ["ignore", "ignore", "ignore"], // Suppress all output from player
});
player.on("close", (code) => {
if (code === 0) {
console.log("Audio playback finished");
}
resolve();
});
player.on("error", (error) => {
// Silently continue without audio
resolve();
});
} catch (error) {
resolve();
}
});
}
async function transcribeMultichannel() {
// Verify API key is set
if (YOUR_API_KEY === "<YOUR_API_KEY>") {
console.error("ERROR: Please set YOUR_API_KEY before running");
process.exit(1);
}
// Verify file exists
if (!fs.existsSync(AUDIO_FILE_PATH)) {
console.error(`ERROR: Audio file not found: ${AUDIO_FILE_PATH}`);
process.exit(1);
}
// Get sample rate from file
const wavParser = new SimpleWavParser(AUDIO_FILE_PATH);
const sampleRate = wavParser.sampleRate;
// Create SDK client
const client = new AssemblyAI({
apiKey: YOUR_API_KEY,
});
const transcriber1 = new ChannelTranscriber(
client,
0,
"Speaker 1",
sampleRate
);
const transcriber2 = new ChannelTranscriber(
client,
1,
"Speaker 2",
sampleRate
);
try {
// Start audio playback (non-blocking)
const audioPromise = playAudioFile();
// Start both transcriptions
const transcriptionPromises = [
transcriber1.startTranscription(),
transcriber2.startTranscription(),
];
// Wait for all to complete
await Promise.all([...transcriptionPromises, audioPromise]);
} catch (error) {
console.error("\nError during transcription:", error.message);
// Clean up
await transcriber1.close();
await transcriber2.close();
process.exit(1);
}
}
// Handle graceful shutdown
process.on("SIGINT", () => {
console.log("\n"); // Clean line break before exit
process.exit(0);
});
// Main execution
transcribeMultichannel();
Configure turn detection for your use caseThe examples above use turn detection settings optimized for short responses and rapid back-and-forth conversations. To optimize for your specific audio scenario, you can adjust the turn detection parameters.For configuration examples tailored to different use cases, refer to our Configuration examples.
- Python
- Python SDK
- JavaScript
- JavaScript SDK
Modify the turn detection parameters in
API_PARAMS:API_PARAMS = {
"sample_rate": 8000,
"format_turns": "true",
"end_of_turn_confidence_threshold": 0.4,
"min_turn_silence": 160,
"max_turn_silence": 400,
}
Modify the
StreamingParameters in the start_transcription method:# Connect to streaming service with turn detection configuration
self.client.connect(
StreamingParameters(
sample_rate=self.sample_rate,
format_turns=True,
end_of_turn_confidence_threshold=0.4,
min_turn_silence=160,
max_turn_silence=400,
)
)
Modify the turn detection parameters in
API_PARAMS:const API_PARAMS = {
sample_rate: 8000,
format_turns: 'true',
end_of_turn_confidence_threshold: 0.4,
min_turn_silence: 160,
max_turn_silence: 400,
};
Modify the turn detection configuration object:
const turnDetectionConfig = {
endOfTurnConfidenceThreshold: 0.4,
minEndOfTurnSilenceWhenConfident: 160,
maxTurnSilence: 400
};
// Create transcriber with SDK
this.transcriber = this.client.streaming.transcriber({
sampleRate: this.sampleRate,
formatTurns: true,
...turnDetectionConfig
});