This guide shows how to analyze utterance gaps from multiple pre-recorded audio files to automatically determine optimal turn detection settings for real-time streaming transcription. It processes an entire folder, aggregates gap statistics across all recordings, and configures the WebSocket with parameters tailored to your specific conversation patterns.Documentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
Quickstart
import requests
import time
import json
import pyaudio
import websocket
import threading
from urllib.parse import urlencode
from datetime import datetime
import os
from pathlib import Path
YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key
AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files
# Audio Configuration
SAMPLE_RATE = 16000
CHANNELS = 1
FORMAT = pyaudio.paInt16
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
recorded_frames = []
recording_lock = threading.Lock()
# Store the optimized configuration
OPTIMIZED_CONFIG = {}
def get_audio_files(folder_path):
"""
Gets all audio files from the specified folder.
Supports all formats accepted by AssemblyAI's API
"""
audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
'.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
'.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
'.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
'.mov', '.mp4', '.m4v'}
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
audio_files = [
str(f) for f in folder.iterdir()
if f.is_file() and f.suffix.lower() in audio_extensions
]
if not audio_files:
raise ValueError(f"No audio files found in {folder_path}")
return sorted(audio_files)
def analyze_single_file(audio_file, api_key, file_index, total_files):
"""
Analyzes a single audio file and returns gap statistics.
"""
print("\n" + "=" * 70)
print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
print("=" * 70)
base_url = "https://api.assemblyai.com"
headers = {"authorization": api_key}
# Upload audio file
print(f"\nUploading audio file...")
if audio_file.startswith("http"):
upload_url = audio_file
print("Using provided URL")
else:
with open(audio_file, "rb") as f:
response = requests.post(
base_url + "/v2/upload",
headers=headers,
data=f
)
upload_url = response.json()["upload_url"]
print(f"Upload complete")
# Enable Speaker Labels
data = {
"audio_url": upload_url,
"speaker_labels": True,
# "language_detection": True # Enable automatic language detection if your files are in different languages
}
response = requests.post(
base_url + "/v2/transcript",
json=data,
headers=headers
)
transcript_id = response.json()['id']
print(f"Transcript ID: {transcript_id}")
# Poll for completion
print("\nWaiting for transcription to complete...")
polling_endpoint = base_url + "/v2/transcript/" + transcript_id
while True:
transcription_result = requests.get(polling_endpoint, headers=headers).json()
if transcription_result['status'] == 'completed':
print("Transcription completed!")
break
elif transcription_result['status'] == 'error':
print(f"Transcription failed: {transcription_result['error']}")
return None
else:
time.sleep(3)
# Calculate gaps
utterances = transcription_result['utterances']
if len(utterances) < 2:
print("⚠ Not enough utterances to analyze gaps (need at least 2)")
return None
gaps = []
for i in range(len(utterances) - 1):
current_end = utterances[i]['end']
next_start = utterances[i + 1]['start']
gap = next_start - current_end
if gap > 0:
gaps.append(gap)
if not gaps:
print("⚠ No gaps found between utterances (all speech overlaps)")
return None
# Calculate statistics
stats = {
'filename': Path(audio_file).name,
'average_gap_ms': sum(gaps) / len(gaps),
'min_gap_ms': min(gaps),
'max_gap_ms': max(gaps),
'median_gap_ms': sorted(gaps)[len(gaps) // 2],
'total_utterances': len(utterances),
'total_gaps': len(gaps),
'all_gaps': gaps
}
print(f"\nResults for {stats['filename']}:")
print(f" Total utterances: {stats['total_utterances']}")
print(f" Total gaps: {stats['total_gaps']}")
print(f" Average gap: {stats['average_gap_ms']:.0f} ms")
print(f" Median gap: {stats['median_gap_ms']:.0f} ms")
# Save transcript JSON to file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = Path(audio_file).stem.replace(' ', '_')
json_filename = f"transcript_{safe_filename}_{timestamp}.json"
try:
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(transcription_result, f, indent=2, ensure_ascii=False)
print(f" Transcript saved: {json_filename}")
except Exception as e:
print(f" Error saving transcript: {e}")
return stats
def analyze_multiple_files(folder_path, api_key):
"""
Analyzes all audio files in a folder and returns aggregated statistics.
"""
print("=" * 70)
print("MULTI-FILE UTTERANCE GAP ANALYSIS")
print("=" * 70)
audio_files = get_audio_files(folder_path)
total_files = len(audio_files)
print(f"\nFound {total_files} audio file(s) in: {folder_path}")
for i, file in enumerate(audio_files, 1):
print(f" {i}. {Path(file).name}")
# Analyze each file
all_file_stats = []
all_gaps = []
for i, audio_file in enumerate(audio_files, 1):
try:
stats = analyze_single_file(audio_file, api_key, i, total_files)
if stats:
all_file_stats.append(stats)
all_gaps.extend(stats['all_gaps'])
except Exception as e:
print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
continue
if not all_file_stats:
print("\n✗ No files were successfully analyzed")
return None
# Calculate aggregated statistics
print("\n" + "=" * 70)
print("AGGREGATED GAP ANALYSIS RESULTS")
print("=" * 70)
aggregated_stats = {
'total_files_analyzed': len(all_file_stats),
'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
'overall_min_gap_ms': min(all_gaps),
'overall_max_gap_ms': max(all_gaps),
'file_averages': [s['average_gap_ms'] for s in all_file_stats],
'file_stats': all_file_stats
}
print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}")
print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}")
print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms")
print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms")
print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms")
# Show per-file breakdown
print(f"\nPer-file average gaps:")
for stat in all_file_stats:
print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")
# Calculate variability
avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']
print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms")
print(f"Variability ratio: {variability_ratio:.2f}x")
if variability_ratio > 3:
print("└─> HIGH variability - mixed conversation patterns across files")
elif variability_ratio > 2:
print("└─> MODERATE variability - some pattern variation")
else:
print("└─> LOW variability - consistent conversation rhythm")
# Save aggregated results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
summary_filename = f"aggregated_analysis_{timestamp}.json"
try:
summary_data = {
'analysis_date': datetime.now().isoformat(),
'folder_path': folder_path,
'aggregated_statistics': {
'total_files_analyzed': aggregated_stats['total_files_analyzed'],
'total_utterances': aggregated_stats['total_utterances'],
'total_gaps': aggregated_stats['total_gaps'],
'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
'variability_ratio': variability_ratio
},
'per_file_results': [
{
'filename': s['filename'],
'average_gap_ms': s['average_gap_ms'],
'median_gap_ms': s['median_gap_ms'],
'total_utterances': s['total_utterances'],
'total_gaps': s['total_gaps']
}
for s in all_file_stats
]
}
with open(summary_filename, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=2, ensure_ascii=False)
print(f"\nAggregated analysis saved to: {summary_filename}")
except Exception as e:
print(f"\nError saving aggregated analysis: {e}")
return aggregated_stats
def determine_streaming_config(aggregated_stats):
"""
Determines optimal Universal-Streaming configuration based on aggregated gap analysis.
Returns WebSocket connection parameters.
"""
if aggregated_stats is None:
print("\nUsing default balanced configuration (no gap data available)")
return {
'name': 'Balanced (Default)',
'min_turn_silence': 400,
'max_turn_silence': 1280,
'description': 'Standard configuration for general use'
}
print("\n" + "=" * 70)
print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
print("=" * 70)
avg_gap = aggregated_stats['overall_average_gap_ms']
num_files = aggregated_stats['total_files_analyzed']
print(f"\nBased on analysis of {num_files} file(s)")
print(f"Overall average gap: {avg_gap:.0f} ms")
# Determine configuration based on average gap
if avg_gap < 500:
config = {
'name': 'Aggressive',
'min_turn_silence': 160,
'max_turn_silence': 400,
'description': 'Fast-paced conversation with quick turn-taking'
}
use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
elif avg_gap < 1000:
config = {
'name': 'Balanced',
'min_turn_silence': 400,
'max_turn_silence': 1280,
'description': 'Natural conversation pacing'
}
use_cases = "General customer support, consultations, standard voice agents"
else:
config = {
'name': 'Conservative',
'min_turn_silence': 800,
'max_turn_silence': 3600,
'description': 'Thoughtful, complex speech with longer pauses'
}
use_cases = "Technical support, healthcare, legal consultations, troubleshooting"
print(f"\nSelected Configuration: {config['name']}")
print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
print(f"\nConfiguration Parameters:")
print(f" • min_turn_silence: {config['min_turn_silence']} ms")
print(f" • max_turn_silence: {config['max_turn_silence']} ms")
print(f"\nRecommended use cases: {use_cases}")
return config
# WEBSOCKET HANDLERS WITH OPTIMIZED SETTINGS
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
with recording_lock:
recorded_frames.append(audio_data)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}")
print(f" Expires at: {datetime.fromtimestamp(expires_at)}")
print(f" Configuration: {OPTIMIZED_CONFIG['name']}")
print("\nSpeak now... (Press Ctrl+C to stop)\n")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
if data.get('end_of_turn'):
print('\r' + ' ' * 80 + '\r', end='')
print(f"FINAL: {transcript}")
else:
print(f"\r partial: {transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# RUN STREAMING WITH OPTIMIZED CONFIGURATION
def run_streaming(config):
"""
Runs the streaming transcription with optimized turn detection settings.
"""
global audio, stream, ws_app, OPTIMIZED_CONFIG
OPTIMIZED_CONFIG = config
print("\n" + "=" * 70)
print("STARTING REAL-TIME STREAMING")
print("=" * 70)
# Build connection parameters with optimized settings
CONNECTION_PARAMS = {
"sample_rate": SAMPLE_RATE,
"format_turns": True,
"min_turn_silence": str(config['min_turn_silence']),
"max_turn_silence": str(config['max_turn_silence'])
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
print(f"\nApplied Configuration:")
for key, value in CONNECTION_PARAMS.items():
print(f" • {key}: {value}")
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("\nMicrophone stream opened successfully.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message...")
ws_app.send(json.dumps(terminate_message))
time.sleep(1)
except Exception as e:
print(f"Error sending termination message: {e}")
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
# MAIN WORKFLOW
def main():
"""
Main workflow: Analyze multiple files -> Configure -> Run Streaming
"""
try:
# Step 1: Analyze all audio files in folder
aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)
# Step 2: Determine optimal configuration based on aggregated data
streaming_config = determine_streaming_config(aggregated_stats)
# Step 3: Run streaming with optimized settings
run_streaming(streaming_config)
except Exception as e:
print(f"\nError in workflow: {str(e)}")
raise
# EXECUTION
if __name__ == "__main__":
main()
Step-By-Step Guide
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.- Install All Required Packages
pip install requests pyaudio websocket-client
- Configuration and Global Variables
import requests
import time
import json
import pyaudio
import websocket
import threading
from urllib.parse import urlencode
from datetime import datetime
import os
from pathlib import Path
YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key
AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files
# Audio Configuration
SAMPLE_RATE = 16000
CHANNELS = 1
FORMAT = pyaudio.paInt16
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
recorded_frames = []
recording_lock = threading.Lock()
# Store the optimized configuration
OPTIMIZED_CONFIG = {}
- Define get_audio_files() Function
def get_audio_files(folder_path):
audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
'.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
'.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
'.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
'.mov', '.mp4', '.m4v'}
folder = Path(folder_path)
if not folder.exists():
raise FileNotFoundError(f"Folder not found: {folder_path}")
audio_files = [
str(f) for f in folder.iterdir()
if f.is_file() and f.suffix.lower() in audio_extensions
]
if not audio_files:
raise ValueError(f"No audio files found in {folder_path}")
return sorted(audio_files)
- Define
analyze_single_file()Function
def analyze_single_file(audio_file, api_key, file_index, total_files):
print("\n" + "=" * 70)
print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
print("=" * 70)
base_url = "https://api.assemblyai.com"
headers = {"authorization": api_key}
# Upload audio file
print(f"\nUploading audio file...")
if audio_file.startswith("http"):
upload_url = audio_file
print("Using provided URL")
else:
with open(audio_file, "rb") as f:
response = requests.post(
base_url + "/v2/upload",
headers=headers,
data=f
)
upload_url = response.json()["upload_url"]
print(f"Upload complete")
# Enable Speaker Labels
data = {
"audio_url": upload_url,
"speaker_labels": True,
# "language_detection": True # Enable automatic language detection if your files are in different languages
}
response = requests.post(
base_url + "/v2/transcript",
json=data,
headers=headers
)
transcript_id = response.json()['id']
print(f"Transcript ID: {transcript_id}")
# Poll for completion
print("\nWaiting for transcription to complete...")
polling_endpoint = base_url + "/v2/transcript/" + transcript_id
while True:
transcription_result = requests.get(polling_endpoint, headers=headers).json()
if transcription_result['status'] == 'completed':
print("Transcription completed!")
break
elif transcription_result['status'] == 'error':
print(f"Transcription failed: {transcription_result['error']}")
return None
else:
time.sleep(3)
# Calculate gaps
utterances = transcription_result['utterances']
if len(utterances) < 2:
print("⚠ Not enough utterances to analyze gaps (need at least 2)")
return None
gaps = []
for i in range(len(utterances) - 1):
current_end = utterances[i]['end']
next_start = utterances[i + 1]['start']
gap = next_start - current_end
if gap > 0:
gaps.append(gap)
if not gaps:
print("⚠ No gaps found between utterances (all speech overlaps)")
return None
# Calculate statistics
stats = {
'filename': Path(audio_file).name,
'average_gap_ms': sum(gaps) / len(gaps),
'min_gap_ms': min(gaps),
'max_gap_ms': max(gaps),
'median_gap_ms': sorted(gaps)[len(gaps) // 2],
'total_utterances': len(utterances),
'total_gaps': len(gaps),
'all_gaps': gaps
}
print(f"\nResults for {stats['filename']}:")
print(f" Total utterances: {stats['total_utterances']}")
print(f" Total gaps: {stats['total_gaps']}")
print(f" Average gap: {stats['average_gap_ms']:.0f} ms")
print(f" Median gap: {stats['median_gap_ms']:.0f} ms")
# Save transcript JSON to file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_filename = Path(audio_file).stem.replace(' ', '_')
json_filename = f"transcript_{safe_filename}_{timestamp}.json"
try:
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(transcription_result, f, indent=2, ensure_ascii=False)
print(f" Transcript saved: {json_filename}")
except Exception as e:
print(f" Error saving transcript: {e}")
return stats
- Define
analyze_multiple_files()Function
analyze_single_file() for each, aggregates all gap data across files, calculates overall statistics, displays per-file breakdowns, and saves a comprehensive summary JSON.
def analyze_multiple_files(folder_path, api_key):
print("=" * 70)
print("MULTI-FILE UTTERANCE GAP ANALYSIS")
print("=" * 70)
audio_files = get_audio_files(folder_path)
total_files = len(audio_files)
print(f"\nFound {total_files} audio file(s) in: {folder_path}")
for i, file in enumerate(audio_files, 1):
print(f" {i}. {Path(file).name}")
# Analyze each file
all_file_stats = []
all_gaps = []
for i, audio_file in enumerate(audio_files, 1):
try:
stats = analyze_single_file(audio_file, api_key, i, total_files)
if stats:
all_file_stats.append(stats)
all_gaps.extend(stats['all_gaps'])
except Exception as e:
print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
continue
if not all_file_stats:
print("\n✗ No files were successfully analyzed")
return None
# Calculate aggregated statistics
print("\n" + "=" * 70)
print("AGGREGATED GAP ANALYSIS RESULTS")
print("=" * 70)
aggregated_stats = {
'total_files_analyzed': len(all_file_stats),
'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
'overall_min_gap_ms': min(all_gaps),
'overall_max_gap_ms': max(all_gaps),
'file_averages': [s['average_gap_ms'] for s in all_file_stats],
'file_stats': all_file_stats
}
print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}")
print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}")
print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms")
print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms")
print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms")
# Show per-file breakdown
print(f"\nPer-file average gaps:")
for stat in all_file_stats:
print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")
# Calculate variability
avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']
print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms")
print(f"Variability ratio: {variability_ratio:.2f}x")
if variability_ratio > 3:
print("└─> HIGH variability - mixed conversation patterns across files")
elif variability_ratio > 2:
print("└─> MODERATE variability - some pattern variation")
else:
print("└─> LOW variability - consistent conversation rhythm")
# Save aggregated results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
summary_filename = f"aggregated_analysis_{timestamp}.json"
try:
summary_data = {
'analysis_date': datetime.now().isoformat(),
'folder_path': folder_path,
'aggregated_statistics': {
'total_files_analyzed': aggregated_stats['total_files_analyzed'],
'total_utterances': aggregated_stats['total_utterances'],
'total_gaps': aggregated_stats['total_gaps'],
'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
'variability_ratio': variability_ratio
},
'per_file_results': [
{
'filename': s['filename'],
'average_gap_ms': s['average_gap_ms'],
'median_gap_ms': s['median_gap_ms'],
'total_utterances': s['total_utterances'],
'total_gaps': s['total_gaps']
}
for s in all_file_stats
]
}
with open(summary_filename, 'w', encoding='utf-8') as f:
json.dump(summary_data, f, indent=2, ensure_ascii=False)
print(f"\nAggregated analysis saved to: {summary_filename}")
except Exception as e:
print(f"\nError saving aggregated analysis: {e}")
return aggregated_stats
- Define
determine_streaming_config()Function
def determine_streaming_config(aggregated_stats):
if aggregated_stats is None:
print("\nUsing default balanced configuration (no gap data available)")
return {
'name': 'Balanced (Default)',
'min_turn_silence': 400,
'max_turn_silence': 1280,
'description': 'Standard configuration for general use'
}
print("\n" + "=" * 70)
print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
print("=" * 70)
avg_gap = aggregated_stats['overall_average_gap_ms']
num_files = aggregated_stats['total_files_analyzed']
print(f"\nBased on analysis of {num_files} file(s)")
print(f"Overall average gap: {avg_gap:.0f} ms")
# Determine configuration based on average gap
if avg_gap < 500:
config = {
'name': 'Aggressive',
'min_turn_silence': 160,
'max_turn_silence': 400,
'description': 'Fast-paced conversation with quick turn-taking'
}
use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
elif avg_gap < 1000:
config = {
'name': 'Balanced',
'min_turn_silence': 400,
'max_turn_silence': 1280,
'description': 'Natural conversation pacing'
}
use_cases = "General customer support, consultations, standard voice agents"
else:
config = {
'name': 'Conservative',
'min_turn_silence': 800,
'max_turn_silence': 3600,
'description': 'Thoughtful, complex speech with longer pauses'
}
use_cases = "Technical support, healthcare, legal consultations, troubleshooting"
print(f"\nSelected Configuration: {config['name']}")
print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
print(f"\nConfiguration Parameters:")
print(f" • min_turn_silence: {config['min_turn_silence']} ms")
print(f" • max_turn_silence: {config['max_turn_silence']} ms")
print(f"\nRecommended use cases: {use_cases}")
return config
- Create WebSocket Event Handlers (
on_open,on_message,on_error,on_close)
on_open starts the audio streaming thread, on_message processes transcription results (partial and final turns), and the close/error handlers clean up resources.
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
with recording_lock:
recorded_frames.append(audio_data)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}")
print(f" Expires at: {datetime.fromtimestamp(expires_at)}")
print(f" Configuration: {OPTIMIZED_CONFIG['name']}")
print("\nSpeak now... (Press Ctrl+C to stop)\n")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
if data.get('end_of_turn'):
print('\r' + ' ' * 80 + '\r', end='')
print(f"FINAL: {transcript}")
else:
print(f"\r partial: {transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
- Define
run_streaming()Function
def run_streaming(config):
global audio, stream, ws_app, OPTIMIZED_CONFIG
OPTIMIZED_CONFIG = config
print("\n" + "=" * 70)
print("STARTING REAL-TIME STREAMING")
print("=" * 70)
# Build connection parameters with optimized settings
CONNECTION_PARAMS = {
"sample_rate": SAMPLE_RATE,
"format_turns": True,
"min_turn_silence": str(config['min_turn_silence']),
"max_turn_silence": str(config['max_turn_silence'])
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
print(f"\nApplied Configuration:")
for key, value in CONNECTION_PARAMS.items():
print(f" • {key}: {value}")
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("\nMicrophone stream opened successfully.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message...")
ws_app.send(json.dumps(terminate_message))
time.sleep(1)
except Exception as e:
print(f"Error sending termination message: {e}")
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
- Define
main()Workflow
def main():
try:
# Step 1: Analyze all audio files in folder
aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)
# Step 2: Determine optimal configuration based on aggregated data
streaming_config = determine_streaming_config(aggregated_stats)
# Step 3: Run streaming with optimized settings
run_streaming(streaming_config)
except Exception as e:
print(f"\nError in workflow: {str(e)}")
raise
if __name__ == "__main__":
main()