Determine Optimal Turn Detection Settings from Historical Audio Analysis
This guide shows how to analyze utterance gaps from multiple pre-recorded audio files to automatically determine optimal turn detection settings for real-time streaming transcription. It processes an entire folder, aggregates gap statistics across all recordings, and configures the WebSocket with parameters tailored to your specific conversation patterns.
Quickstart
1 import requests 2 import time 3 import json 4 import pyaudio 5 import websocket 6 import threading 7 from urllib.parse import urlencode 8 from datetime import datetime 9 import os 10 from pathlib import Path 11 12 13 YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key 14 AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files 15 16 # Audio Configuration 17 SAMPLE_RATE = 16000 18 CHANNELS = 1 19 FORMAT = pyaudio.paInt16 20 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 21 22 # Global variables for audio stream and websocket 23 audio = None 24 stream = None 25 ws_app = None 26 audio_thread = None 27 stop_event = threading.Event() 28 recorded_frames = [] 29 recording_lock = threading.Lock() 30 31 # Store the optimized configuration 32 OPTIMIZED_CONFIG = {} 33 34 35 def get_audio_files(folder_path): 36 """ 37 Gets all audio files from the specified folder. 38 Supports all formats accepted by AssemblyAI's API 39 """ 40 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape', 41 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3', 42 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta', 43 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS', 44 '.mov', '.mp4', '.m4v'} 45 folder = Path(folder_path) 46 47 if not folder.exists(): 48 raise FileNotFoundError(f"Folder not found: {folder_path}") 49 50 audio_files = [ 51 str(f) for f in folder.iterdir() 52 if f.is_file() and f.suffix.lower() in audio_extensions 53 ] 54 55 if not audio_files: 56 raise ValueError(f"No audio files found in {folder_path}") 57 58 return sorted(audio_files) 59 60 61 def analyze_single_file(audio_file, api_key, file_index, total_files): 62 """ 63 Analyzes a single audio file and returns gap statistics. 64 """ 65 print("\n" + "=" * 70) 66 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}") 67 print("=" * 70) 68 69 base_url = "https://api.assemblyai.com" 70 headers = {"authorization": api_key} 71 72 # Upload audio file 73 print(f"\nUploading audio file...") 74 75 if audio_file.startswith("http"): 76 upload_url = audio_file 77 print("Using provided URL") 78 else: 79 with open(audio_file, "rb") as f: 80 response = requests.post( 81 base_url + "/v2/upload", 82 headers=headers, 83 data=f 84 ) 85 upload_url = response.json()["upload_url"] 86 print(f"Upload complete") 87 88 # Enable Speaker Labels 89 data = { 90 "audio_url": upload_url, 91 "speaker_labels": True, 92 # "language_detection": True # Enable automatic language detection if your files are in different languages 93 } 94 95 response = requests.post( 96 base_url + "/v2/transcript", 97 json=data, 98 headers=headers 99 ) 100 transcript_id = response.json()['id'] 101 print(f"Transcript ID: {transcript_id}") 102 103 # Poll for completion 104 print("\nWaiting for transcription to complete...") 105 polling_endpoint = base_url + "/v2/transcript/" + transcript_id 106 107 while True: 108 transcription_result = requests.get(polling_endpoint, headers=headers).json() 109 110 if transcription_result['status'] == 'completed': 111 print("Transcription completed!") 112 break 113 elif transcription_result['status'] == 'error': 114 print(f"Transcription failed: {transcription_result['error']}") 115 return None 116 else: 117 time.sleep(3) 118 119 # Calculate gaps 120 utterances = transcription_result['utterances'] 121 122 if len(utterances) < 2: 123 print("⚠ Not enough utterances to analyze gaps (need at least 2)") 124 return None 125 126 gaps = [] 127 for i in range(len(utterances) - 1): 128 current_end = utterances[i]['end'] 129 next_start = utterances[i + 1]['start'] 130 gap = next_start - current_end 131 132 if gap > 0: 133 gaps.append(gap) 134 135 if not gaps: 136 print("⚠ No gaps found between utterances (all speech overlaps)") 137 return None 138 139 # Calculate statistics 140 stats = { 141 'filename': Path(audio_file).name, 142 'average_gap_ms': sum(gaps) / len(gaps), 143 'min_gap_ms': min(gaps), 144 'max_gap_ms': max(gaps), 145 'median_gap_ms': sorted(gaps)[len(gaps) // 2], 146 'total_utterances': len(utterances), 147 'total_gaps': len(gaps), 148 'all_gaps': gaps 149 } 150 151 print(f"\nResults for {stats['filename']}:") 152 print(f" Total utterances: {stats['total_utterances']}") 153 print(f" Total gaps: {stats['total_gaps']}") 154 print(f" Average gap: {stats['average_gap_ms']:.0f} ms") 155 print(f" Median gap: {stats['median_gap_ms']:.0f} ms") 156 157 # Save transcript JSON to file 158 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 159 safe_filename = Path(audio_file).stem.replace(' ', '_') 160 json_filename = f"transcript_{safe_filename}_{timestamp}.json" 161 162 try: 163 with open(json_filename, 'w', encoding='utf-8') as f: 164 json.dump(transcription_result, f, indent=2, ensure_ascii=False) 165 print(f" Transcript saved: {json_filename}") 166 except Exception as e: 167 print(f" Error saving transcript: {e}") 168 169 return stats 170 171 172 def analyze_multiple_files(folder_path, api_key): 173 """ 174 Analyzes all audio files in a folder and returns aggregated statistics. 175 """ 176 print("=" * 70) 177 print("MULTI-FILE UTTERANCE GAP ANALYSIS") 178 print("=" * 70) 179 180 audio_files = get_audio_files(folder_path) 181 total_files = len(audio_files) 182 183 print(f"\nFound {total_files} audio file(s) in: {folder_path}") 184 for i, file in enumerate(audio_files, 1): 185 print(f" {i}. {Path(file).name}") 186 187 # Analyze each file 188 all_file_stats = [] 189 all_gaps = [] 190 191 for i, audio_file in enumerate(audio_files, 1): 192 try: 193 stats = analyze_single_file(audio_file, api_key, i, total_files) 194 if stats: 195 all_file_stats.append(stats) 196 all_gaps.extend(stats['all_gaps']) 197 except Exception as e: 198 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}") 199 continue 200 201 if not all_file_stats: 202 print("\n✗ No files were successfully analyzed") 203 return None 204 205 # Calculate aggregated statistics 206 print("\n" + "=" * 70) 207 print("AGGREGATED GAP ANALYSIS RESULTS") 208 print("=" * 70) 209 210 aggregated_stats = { 211 'total_files_analyzed': len(all_file_stats), 212 'total_utterances': sum(s['total_utterances'] for s in all_file_stats), 213 'total_gaps': sum(s['total_gaps'] for s in all_file_stats), 214 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps), 215 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2], 216 'overall_min_gap_ms': min(all_gaps), 217 'overall_max_gap_ms': max(all_gaps), 218 'file_averages': [s['average_gap_ms'] for s in all_file_stats], 219 'file_stats': all_file_stats 220 } 221 222 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}") 223 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}") 224 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}") 225 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)") 226 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms") 227 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms") 228 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms") 229 230 # Show per-file breakdown 231 print(f"\nPer-file average gaps:") 232 for stat in all_file_stats: 233 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms") 234 235 # Calculate variability 236 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages']) 237 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms'] 238 239 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms") 240 print(f"Variability ratio: {variability_ratio:.2f}x") 241 242 if variability_ratio > 3: 243 print("└─> HIGH variability - mixed conversation patterns across files") 244 elif variability_ratio > 2: 245 print("└─> MODERATE variability - some pattern variation") 246 else: 247 print("└─> LOW variability - consistent conversation rhythm") 248 249 # Save aggregated results 250 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 251 summary_filename = f"aggregated_analysis_{timestamp}.json" 252 253 try: 254 summary_data = { 255 'analysis_date': datetime.now().isoformat(), 256 'folder_path': folder_path, 257 'aggregated_statistics': { 258 'total_files_analyzed': aggregated_stats['total_files_analyzed'], 259 'total_utterances': aggregated_stats['total_utterances'], 260 'total_gaps': aggregated_stats['total_gaps'], 261 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'], 262 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'], 263 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'], 264 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'], 265 'variability_ratio': variability_ratio 266 }, 267 'per_file_results': [ 268 { 269 'filename': s['filename'], 270 'average_gap_ms': s['average_gap_ms'], 271 'median_gap_ms': s['median_gap_ms'], 272 'total_utterances': s['total_utterances'], 273 'total_gaps': s['total_gaps'] 274 } 275 for s in all_file_stats 276 ] 277 } 278 279 with open(summary_filename, 'w', encoding='utf-8') as f: 280 json.dump(summary_data, f, indent=2, ensure_ascii=False) 281 print(f"\nAggregated analysis saved to: {summary_filename}") 282 except Exception as e: 283 print(f"\nError saving aggregated analysis: {e}") 284 285 return aggregated_stats 286 287 288 def determine_streaming_config(aggregated_stats): 289 """ 290 Determines optimal Universal-Streaming configuration based on aggregated gap analysis. 291 Returns WebSocket connection parameters. 292 """ 293 if aggregated_stats is None: 294 print("\nUsing default balanced configuration (no gap data available)") 295 return { 296 'name': 'Balanced (Default)', 297 'min_turn_silence': 400, 298 'max_turn_silence': 1280, 299 'description': 'Standard configuration for general use' 300 } 301 302 print("\n" + "=" * 70) 303 print("DETERMINING OPTIMAL STREAMING CONFIGURATION") 304 print("=" * 70) 305 306 avg_gap = aggregated_stats['overall_average_gap_ms'] 307 num_files = aggregated_stats['total_files_analyzed'] 308 309 print(f"\nBased on analysis of {num_files} file(s)") 310 print(f"Overall average gap: {avg_gap:.0f} ms") 311 312 # Determine configuration based on average gap 313 if avg_gap < 500: 314 config = { 315 'name': 'Aggressive', 316 'min_turn_silence': 160, 317 'max_turn_silence': 400, 318 'description': 'Fast-paced conversation with quick turn-taking' 319 } 320 use_cases = "IVR systems, order confirmations, yes/no queries, retail support" 321 elif avg_gap < 1000: 322 config = { 323 'name': 'Balanced', 324 'min_turn_silence': 400, 325 'max_turn_silence': 1280, 326 'description': 'Natural conversation pacing' 327 } 328 use_cases = "General customer support, consultations, standard voice agents" 329 else: 330 config = { 331 'name': 'Conservative', 332 'min_turn_silence': 800, 333 'max_turn_silence': 3600, 334 'description': 'Thoughtful, complex speech with longer pauses' 335 } 336 use_cases = "Technical support, healthcare, legal consultations, troubleshooting" 337 338 print(f"\nSelected Configuration: {config['name']}") 339 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}") 340 print(f"\nConfiguration Parameters:") 341 print(f" • min_turn_silence: {config['min_turn_silence']} ms") 342 print(f" • max_turn_silence: {config['max_turn_silence']} ms") 343 print(f"\nRecommended use cases: {use_cases}") 344 345 return config 346 347 348 # WEBSOCKET HANDLERS WITH OPTIMIZED SETTINGS 349 350 def on_open(ws): 351 """Called when the WebSocket connection is established.""" 352 print("WebSocket connection opened.") 353 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration") 354 355 def stream_audio(): 356 global stream 357 print("Starting audio streaming...") 358 while not stop_event.is_set(): 359 try: 360 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 361 362 with recording_lock: 363 recorded_frames.append(audio_data) 364 365 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 366 except Exception as e: 367 print(f"Error streaming audio: {e}") 368 break 369 print("Audio streaming stopped.") 370 371 global audio_thread 372 audio_thread = threading.Thread(target=stream_audio) 373 audio_thread.daemon = True 374 audio_thread.start() 375 376 def on_message(ws, message): 377 try: 378 data = json.loads(message) 379 msg_type = data.get('type') 380 381 if msg_type == "Begin": 382 session_id = data.get('id') 383 expires_at = data.get('expires_at') 384 print(f"\nSession began: ID={session_id}") 385 print(f" Expires at: {datetime.fromtimestamp(expires_at)}") 386 print(f" Configuration: {OPTIMIZED_CONFIG['name']}") 387 print("\nSpeak now... (Press Ctrl+C to stop)\n") 388 389 elif msg_type == "Turn": 390 transcript = data.get('transcript', '') 391 if data.get('end_of_turn'): 392 print('\r' + ' ' * 80 + '\r', end='') 393 print(f"FINAL: {transcript}") 394 else: 395 print(f"\r partial: {transcript}", end='') 396 397 elif msg_type == "Termination": 398 audio_duration = data.get('audio_duration_seconds', 0) 399 session_duration = data.get('session_duration_seconds', 0) 400 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s") 401 402 except json.JSONDecodeError as e: 403 print(f"Error decoding message: {e}") 404 except Exception as e: 405 print(f"Error handling message: {e}") 406 407 def on_error(ws, error): 408 """Called when a WebSocket error occurs.""" 409 print(f"\nWebSocket Error: {error}") 410 stop_event.set() 411 412 def on_close(ws, close_status_code, close_msg): 413 """Called when the WebSocket connection is closed.""" 414 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 415 416 global stream, audio 417 stop_event.set() 418 419 if stream: 420 if stream.is_active(): 421 stream.stop_stream() 422 stream.close() 423 stream = None 424 if audio: 425 audio.terminate() 426 audio = None 427 if audio_thread and audio_thread.is_alive(): 428 audio_thread.join(timeout=1.0) 429 430 431 # RUN STREAMING WITH OPTIMIZED CONFIGURATION 432 433 def run_streaming(config): 434 """ 435 Runs the streaming transcription with optimized turn detection settings. 436 """ 437 global audio, stream, ws_app, OPTIMIZED_CONFIG 438 439 OPTIMIZED_CONFIG = config 440 441 print("\n" + "=" * 70) 442 print("STARTING REAL-TIME STREAMING") 443 print("=" * 70) 444 445 # Build connection parameters with optimized settings 446 CONNECTION_PARAMS = { 447 "sample_rate": SAMPLE_RATE, 448 "format_turns": True, 449 "min_turn_silence": str(config['min_turn_silence']), 450 "max_turn_silence": str(config['max_turn_silence']) 451 } 452 453 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 454 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 455 456 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}") 457 print(f"\nApplied Configuration:") 458 for key, value in CONNECTION_PARAMS.items(): 459 print(f" • {key}: {value}") 460 461 # Initialize PyAudio 462 audio = pyaudio.PyAudio() 463 464 # Open microphone stream 465 try: 466 stream = audio.open( 467 input=True, 468 frames_per_buffer=FRAMES_PER_BUFFER, 469 channels=CHANNELS, 470 format=FORMAT, 471 rate=SAMPLE_RATE, 472 ) 473 print("\nMicrophone stream opened successfully.") 474 except Exception as e: 475 print(f"Error opening microphone stream: {e}") 476 if audio: 477 audio.terminate() 478 return 479 480 # Create WebSocketApp 481 ws_app = websocket.WebSocketApp( 482 API_ENDPOINT, 483 header={"Authorization": YOUR_API_KEY}, 484 on_open=on_open, 485 on_message=on_message, 486 on_error=on_error, 487 on_close=on_close, 488 ) 489 490 # Run WebSocketApp in a separate thread 491 ws_thread = threading.Thread(target=ws_app.run_forever) 492 ws_thread.daemon = True 493 ws_thread.start() 494 495 try: 496 while ws_thread.is_alive(): 497 time.sleep(0.1) 498 except KeyboardInterrupt: 499 print("\nCtrl+C received. Stopping...") 500 stop_event.set() 501 502 if ws_app and ws_app.sock and ws_app.sock.connected: 503 try: 504 terminate_message = {"type": "Terminate"} 505 print(f"Sending termination message...") 506 ws_app.send(json.dumps(terminate_message)) 507 time.sleep(1) 508 except Exception as e: 509 print(f"Error sending termination message: {e}") 510 511 if ws_app: 512 ws_app.close() 513 514 ws_thread.join(timeout=2.0) 515 516 except Exception as e: 517 print(f"\nAn unexpected error occurred: {e}") 518 stop_event.set() 519 if ws_app: 520 ws_app.close() 521 ws_thread.join(timeout=2.0) 522 523 finally: 524 if stream and stream.is_active(): 525 stream.stop_stream() 526 if stream: 527 stream.close() 528 if audio: 529 audio.terminate() 530 print("Cleanup complete. Exiting.") 531 532 533 # MAIN WORKFLOW 534 535 def main(): 536 """ 537 Main workflow: Analyze multiple files -> Configure -> Run Streaming 538 """ 539 540 try: 541 # Step 1: Analyze all audio files in folder 542 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY) 543 544 # Step 2: Determine optimal configuration based on aggregated data 545 streaming_config = determine_streaming_config(aggregated_stats) 546 547 # Step 3: Run streaming with optimized settings 548 run_streaming(streaming_config) 549 550 except Exception as e: 551 print(f"\nError in workflow: {str(e)}") 552 raise 553 554 555 # EXECUTION 556 557 if __name__ == "__main__": 558 main()
Step-By-Step Guide
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
- Install All Required Packages
$ pip install requests pyaudio websocket-client
- Configuration and Global Variables
Set up API credentials, file paths, audio parameters (16kHz sample rate, mono channel), and initialize global variables for managing WebSocket connections and audio streaming threads.
1 import requests 2 import time 3 import json 4 import pyaudio 5 import websocket 6 import threading 7 from urllib.parse import urlencode 8 from datetime import datetime 9 import os 10 from pathlib import Path 11 12 13 YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key 14 AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files 15 16 # Audio Configuration 17 SAMPLE_RATE = 16000 18 CHANNELS = 1 19 FORMAT = pyaudio.paInt16 20 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 21 22 # Global variables for audio stream and websocket 23 audio = None 24 stream = None 25 ws_app = None 26 audio_thread = None 27 stop_event = threading.Event() 28 recorded_frames = [] 29 recording_lock = threading.Lock() 30 31 # Store the optimized configuration 32 OPTIMIZED_CONFIG = {}
- Define get_audio_files() Function
This function scans a specified folder for audio/video files with supported extensions and returns a sorted list of file paths for batch processing.
1 def get_audio_files(folder_path): 2 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape', 3 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3', 4 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta', 5 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS', 6 '.mov', '.mp4', '.m4v'} 7 folder = Path(folder_path) 8 9 if not folder.exists(): 10 raise FileNotFoundError(f"Folder not found: {folder_path}") 11 12 audio_files = [ 13 str(f) for f in folder.iterdir() 14 if f.is_file() and f.suffix.lower() in audio_extensions 15 ] 16 17 if not audio_files: 18 raise ValueError(f"No audio files found in {folder_path}") 19 20 return sorted(audio_files)
- Define
analyze_single_file()Function
This function uploads an audio file to AssemblyAI, requests transcription with speaker labels enabled, polls until completion, then calculates gap statistics between utterances (average, median, min, max) and saves the transcript JSON.
1 def analyze_single_file(audio_file, api_key, file_index, total_files): 2 print("\n" + "=" * 70) 3 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}") 4 print("=" * 70) 5 6 base_url = "https://api.assemblyai.com" 7 headers = {"authorization": api_key} 8 9 # Upload audio file 10 print(f"\nUploading audio file...") 11 12 if audio_file.startswith("http"): 13 upload_url = audio_file 14 print("Using provided URL") 15 else: 16 with open(audio_file, "rb") as f: 17 response = requests.post( 18 base_url + "/v2/upload", 19 headers=headers, 20 data=f 21 ) 22 upload_url = response.json()["upload_url"] 23 print(f"Upload complete") 24 25 # Enable Speaker Labels 26 data = { 27 "audio_url": upload_url, 28 "speaker_labels": True, 29 # "language_detection": True # Enable automatic language detection if your files are in different languages 30 } 31 32 response = requests.post( 33 base_url + "/v2/transcript", 34 json=data, 35 headers=headers 36 ) 37 transcript_id = response.json()['id'] 38 print(f"Transcript ID: {transcript_id}") 39 40 # Poll for completion 41 print("\nWaiting for transcription to complete...") 42 polling_endpoint = base_url + "/v2/transcript/" + transcript_id 43 44 while True: 45 transcription_result = requests.get(polling_endpoint, headers=headers).json() 46 47 if transcription_result['status'] == 'completed': 48 print("Transcription completed!") 49 break 50 elif transcription_result['status'] == 'error': 51 print(f"Transcription failed: {transcription_result['error']}") 52 return None 53 else: 54 time.sleep(3) 55 56 # Calculate gaps 57 utterances = transcription_result['utterances'] 58 59 if len(utterances) < 2: 60 print("⚠ Not enough utterances to analyze gaps (need at least 2)") 61 return None 62 63 gaps = [] 64 for i in range(len(utterances) - 1): 65 current_end = utterances[i]['end'] 66 next_start = utterances[i + 1]['start'] 67 gap = next_start - current_end 68 69 if gap > 0: 70 gaps.append(gap) 71 72 if not gaps: 73 print("⚠ No gaps found between utterances (all speech overlaps)") 74 return None 75 76 # Calculate statistics 77 stats = { 78 'filename': Path(audio_file).name, 79 'average_gap_ms': sum(gaps) / len(gaps), 80 'min_gap_ms': min(gaps), 81 'max_gap_ms': max(gaps), 82 'median_gap_ms': sorted(gaps)[len(gaps) // 2], 83 'total_utterances': len(utterances), 84 'total_gaps': len(gaps), 85 'all_gaps': gaps 86 } 87 88 print(f"\nResults for {stats['filename']}:") 89 print(f" Total utterances: {stats['total_utterances']}") 90 print(f" Total gaps: {stats['total_gaps']}") 91 print(f" Average gap: {stats['average_gap_ms']:.0f} ms") 92 print(f" Median gap: {stats['median_gap_ms']:.0f} ms") 93 94 # Save transcript JSON to file 95 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 96 safe_filename = Path(audio_file).stem.replace(' ', '_') 97 json_filename = f"transcript_{safe_filename}_{timestamp}.json" 98 99 try: 100 with open(json_filename, 'w', encoding='utf-8') as f: 101 json.dump(transcription_result, f, indent=2, ensure_ascii=False) 102 print(f" Transcript saved: {json_filename}") 103 except Exception as e: 104 print(f" Error saving transcript: {e}") 105 106 return stats
- Define
analyze_multiple_files()Function
This function orchestrates the analysis of all files in a folder by calling analyze_single_file() for each, aggregates all gap data across files, calculates overall statistics, displays per-file breakdowns, and saves a comprehensive summary JSON.
1 def analyze_multiple_files(folder_path, api_key): 2 print("=" * 70) 3 print("MULTI-FILE UTTERANCE GAP ANALYSIS") 4 print("=" * 70) 5 6 audio_files = get_audio_files(folder_path) 7 total_files = len(audio_files) 8 9 print(f"\nFound {total_files} audio file(s) in: {folder_path}") 10 for i, file in enumerate(audio_files, 1): 11 print(f" {i}. {Path(file).name}") 12 13 # Analyze each file 14 all_file_stats = [] 15 all_gaps = [] 16 17 for i, audio_file in enumerate(audio_files, 1): 18 try: 19 stats = analyze_single_file(audio_file, api_key, i, total_files) 20 if stats: 21 all_file_stats.append(stats) 22 all_gaps.extend(stats['all_gaps']) 23 except Exception as e: 24 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}") 25 continue 26 27 if not all_file_stats: 28 print("\n✗ No files were successfully analyzed") 29 return None 30 31 # Calculate aggregated statistics 32 print("\n" + "=" * 70) 33 print("AGGREGATED GAP ANALYSIS RESULTS") 34 print("=" * 70) 35 36 aggregated_stats = { 37 'total_files_analyzed': len(all_file_stats), 38 'total_utterances': sum(s['total_utterances'] for s in all_file_stats), 39 'total_gaps': sum(s['total_gaps'] for s in all_file_stats), 40 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps), 41 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2], 42 'overall_min_gap_ms': min(all_gaps), 43 'overall_max_gap_ms': max(all_gaps), 44 'file_averages': [s['average_gap_ms'] for s in all_file_stats], 45 'file_stats': all_file_stats 46 } 47 48 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}") 49 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}") 50 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}") 51 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)") 52 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms") 53 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms") 54 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms") 55 56 # Show per-file breakdown 57 print(f"\nPer-file average gaps:") 58 for stat in all_file_stats: 59 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms") 60 61 # Calculate variability 62 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages']) 63 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms'] 64 65 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms") 66 print(f"Variability ratio: {variability_ratio:.2f}x") 67 68 if variability_ratio > 3: 69 print("└─> HIGH variability - mixed conversation patterns across files") 70 elif variability_ratio > 2: 71 print("└─> MODERATE variability - some pattern variation") 72 else: 73 print("└─> LOW variability - consistent conversation rhythm") 74 75 # Save aggregated results 76 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 77 summary_filename = f"aggregated_analysis_{timestamp}.json" 78 79 try: 80 summary_data = { 81 'analysis_date': datetime.now().isoformat(), 82 'folder_path': folder_path, 83 'aggregated_statistics': { 84 'total_files_analyzed': aggregated_stats['total_files_analyzed'], 85 'total_utterances': aggregated_stats['total_utterances'], 86 'total_gaps': aggregated_stats['total_gaps'], 87 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'], 88 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'], 89 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'], 90 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'], 91 'variability_ratio': variability_ratio 92 }, 93 'per_file_results': [ 94 { 95 'filename': s['filename'], 96 'average_gap_ms': s['average_gap_ms'], 97 'median_gap_ms': s['median_gap_ms'], 98 'total_utterances': s['total_utterances'], 99 'total_gaps': s['total_gaps'] 100 } 101 for s in all_file_stats 102 ] 103 } 104 105 with open(summary_filename, 'w', encoding='utf-8') as f: 106 json.dump(summary_data, f, indent=2, ensure_ascii=False) 107 print(f"\nAggregated analysis saved to: {summary_filename}") 108 except Exception as e: 109 print(f"\nError saving aggregated analysis: {e}") 110 111 return aggregated_stats
- Define
determine_streaming_config()Function
This function takes aggregated gap statistics and selects one of three preset configurations with optimized turn detection parameters for different conversation styles.
1 def determine_streaming_config(aggregated_stats): 2 if aggregated_stats is None: 3 print("\nUsing default balanced configuration (no gap data available)") 4 return { 5 'name': 'Balanced (Default)', 6 'min_turn_silence': 400, 7 'max_turn_silence': 1280, 8 'description': 'Standard configuration for general use' 9 } 10 11 print("\n" + "=" * 70) 12 print("DETERMINING OPTIMAL STREAMING CONFIGURATION") 13 print("=" * 70) 14 15 avg_gap = aggregated_stats['overall_average_gap_ms'] 16 num_files = aggregated_stats['total_files_analyzed'] 17 18 print(f"\nBased on analysis of {num_files} file(s)") 19 print(f"Overall average gap: {avg_gap:.0f} ms") 20 21 # Determine configuration based on average gap 22 if avg_gap < 500: 23 config = { 24 'name': 'Aggressive', 25 'min_turn_silence': 160, 26 'max_turn_silence': 400, 27 'description': 'Fast-paced conversation with quick turn-taking' 28 } 29 use_cases = "IVR systems, order confirmations, yes/no queries, retail support" 30 elif avg_gap < 1000: 31 config = { 32 'name': 'Balanced', 33 'min_turn_silence': 400, 34 'max_turn_silence': 1280, 35 'description': 'Natural conversation pacing' 36 } 37 use_cases = "General customer support, consultations, standard voice agents" 38 else: 39 config = { 40 'name': 'Conservative', 41 'min_turn_silence': 800, 42 'max_turn_silence': 3600, 43 'description': 'Thoughtful, complex speech with longer pauses' 44 } 45 use_cases = "Technical support, healthcare, legal consultations, troubleshooting" 46 47 print(f"\nSelected Configuration: {config['name']}") 48 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}") 49 print(f"\nConfiguration Parameters:") 50 print(f" • min_turn_silence: {config['min_turn_silence']} ms") 51 print(f" • max_turn_silence: {config['max_turn_silence']} ms") 52 print(f"\nRecommended use cases: {use_cases}") 53 54 return config
- Create WebSocket Event Handlers (
on_open,on_message,on_error,on_close)
These functions manage the real-time streaming connection lifecycle: on_open starts the audio streaming thread, on_message processes transcription results (partial and final turns), and the close/error handlers clean up resources.
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration") 5 6 def stream_audio(): 7 global stream 8 print("Starting audio streaming...") 9 while not stop_event.is_set(): 10 try: 11 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 12 13 with recording_lock: 14 recorded_frames.append(audio_data) 15 16 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 17 except Exception as e: 18 print(f"Error streaming audio: {e}") 19 break 20 print("Audio streaming stopped.") 21 22 global audio_thread 23 audio_thread = threading.Thread(target=stream_audio) 24 audio_thread.daemon = True 25 audio_thread.start() 26 27 def on_message(ws, message): 28 try: 29 data = json.loads(message) 30 msg_type = data.get('type') 31 32 if msg_type == "Begin": 33 session_id = data.get('id') 34 expires_at = data.get('expires_at') 35 print(f"\nSession began: ID={session_id}") 36 print(f" Expires at: {datetime.fromtimestamp(expires_at)}") 37 print(f" Configuration: {OPTIMIZED_CONFIG['name']}") 38 print("\nSpeak now... (Press Ctrl+C to stop)\n") 39 40 elif msg_type == "Turn": 41 transcript = data.get('transcript', '') 42 if data.get('end_of_turn'): 43 print('\r' + ' ' * 80 + '\r', end='') 44 print(f"FINAL: {transcript}") 45 else: 46 print(f"\r partial: {transcript}", end='') 47 48 elif msg_type == "Termination": 49 audio_duration = data.get('audio_duration_seconds', 0) 50 session_duration = data.get('session_duration_seconds', 0) 51 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s") 52 53 except json.JSONDecodeError as e: 54 print(f"Error decoding message: {e}") 55 except Exception as e: 56 print(f"Error handling message: {e}") 57 58 def on_error(ws, error): 59 """Called when a WebSocket error occurs.""" 60 print(f"\nWebSocket Error: {error}") 61 stop_event.set() 62 63 def on_close(ws, close_status_code, close_msg): 64 """Called when the WebSocket connection is closed.""" 65 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 66 67 global stream, audio 68 stop_event.set() 69 70 if stream: 71 if stream.is_active(): 72 stream.stop_stream() 73 stream.close() 74 stream = None 75 if audio: 76 audio.terminate() 77 audio = None 78 if audio_thread and audio_thread.is_alive(): 79 audio_thread.join(timeout=1.0)
- Define
run_streaming()Function
This function initializes PyAudio to capture microphone input, establishes a WebSocket connection with the optimized configuration parameters, and streams audio in real-time while displaying transcription results until the user stops with Ctrl+C.
1 def run_streaming(config): 2 global audio, stream, ws_app, OPTIMIZED_CONFIG 3 4 OPTIMIZED_CONFIG = config 5 6 print("\n" + "=" * 70) 7 print("STARTING REAL-TIME STREAMING") 8 print("=" * 70) 9 10 # Build connection parameters with optimized settings 11 CONNECTION_PARAMS = { 12 "sample_rate": SAMPLE_RATE, 13 "format_turns": True, 14 "min_turn_silence": str(config['min_turn_silence']), 15 "max_turn_silence": str(config['max_turn_silence']) 16 } 17 18 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 19 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 20 21 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}") 22 print(f"\nApplied Configuration:") 23 for key, value in CONNECTION_PARAMS.items(): 24 print(f" • {key}: {value}") 25 26 # Initialize PyAudio 27 audio = pyaudio.PyAudio() 28 29 # Open microphone stream 30 try: 31 stream = audio.open( 32 input=True, 33 frames_per_buffer=FRAMES_PER_BUFFER, 34 channels=CHANNELS, 35 format=FORMAT, 36 rate=SAMPLE_RATE, 37 ) 38 print("\nMicrophone stream opened successfully.") 39 except Exception as e: 40 print(f"Error opening microphone stream: {e}") 41 if audio: 42 audio.terminate() 43 return 44 45 # Create WebSocketApp 46 ws_app = websocket.WebSocketApp( 47 API_ENDPOINT, 48 header={"Authorization": YOUR_API_KEY}, 49 on_open=on_open, 50 on_message=on_message, 51 on_error=on_error, 52 on_close=on_close, 53 ) 54 55 # Run WebSocketApp in a separate thread 56 ws_thread = threading.Thread(target=ws_app.run_forever) 57 ws_thread.daemon = True 58 ws_thread.start() 59 60 try: 61 while ws_thread.is_alive(): 62 time.sleep(0.1) 63 except KeyboardInterrupt: 64 print("\nCtrl+C received. Stopping...") 65 stop_event.set() 66 67 if ws_app and ws_app.sock and ws_app.sock.connected: 68 try: 69 terminate_message = {"type": "Terminate"} 70 print(f"Sending termination message...") 71 ws_app.send(json.dumps(terminate_message)) 72 time.sleep(1) 73 except Exception as e: 74 print(f"Error sending termination message: {e}") 75 76 if ws_app: 77 ws_app.close() 78 79 ws_thread.join(timeout=2.0) 80 81 except Exception as e: 82 print(f"\nAn unexpected error occurred: {e}") 83 stop_event.set() 84 if ws_app: 85 ws_app.close() 86 ws_thread.join(timeout=2.0) 87 88 finally: 89 if stream and stream.is_active(): 90 stream.stop_stream() 91 if stream: 92 stream.close() 93 if audio: 94 audio.terminate() 95 print("Cleanup complete. Exiting.")
- Define
main()Workflow
Execute the three-step process: analyze all audio files in the folder, determine the best streaming configuration based on aggregated utterance gaps, then launch real-time streaming with the optimized settings.
1 def main(): 2 try: 3 # Step 1: Analyze all audio files in folder 4 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY) 5 6 # Step 2: Determine optimal configuration based on aggregated data 7 streaming_config = determine_streaming_config(aggregated_stats) 8 9 # Step 3: Run streaming with optimized settings 10 run_streaming(streaming_config) 11 12 except Exception as e: 13 print(f"\nError in workflow: {str(e)}") 14 raise 15 16 if __name__ == "__main__": 17 main()