Determine Optimal Turn Detection Settings from Historical Audio Analysis

This guide shows how to analyze utterance gaps from multiple pre-recorded audio files to automatically determine optimal turn detection settings for real-time streaming transcription. It processes an entire folder, aggregates gap statistics across all recordings, and configures the WebSocket with parameters tailored to your specific conversation patterns.

Quickstart

1import requests
2import time
3import json
4import pyaudio
5import websocket
6import threading
7from urllib.parse import urlencode
8from datetime import datetime
9import os
10from pathlib import Path
11
12
13YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key
14AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files
15
16# Audio Configuration
17SAMPLE_RATE = 16000
18CHANNELS = 1
19FORMAT = pyaudio.paInt16
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21
22# Global variables for audio stream and websocket
23audio = None
24stream = None
25ws_app = None
26audio_thread = None
27stop_event = threading.Event()
28recorded_frames = []
29recording_lock = threading.Lock()
30
31# Store the optimized configuration
32OPTIMIZED_CONFIG = {}
33
34
35def get_audio_files(folder_path):
36 """
37 Gets all audio files from the specified folder.
38 Supports all formats accepted by AssemblyAI's API
39 """
40 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
41 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
42 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
43 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
44 '.mov', '.mp4', '.m4v'}
45 folder = Path(folder_path)
46
47 if not folder.exists():
48 raise FileNotFoundError(f"Folder not found: {folder_path}")
49
50 audio_files = [
51 str(f) for f in folder.iterdir()
52 if f.is_file() and f.suffix.lower() in audio_extensions
53 ]
54
55 if not audio_files:
56 raise ValueError(f"No audio files found in {folder_path}")
57
58 return sorted(audio_files)
59
60
61def analyze_single_file(audio_file, api_key, file_index, total_files):
62 """
63 Analyzes a single audio file and returns gap statistics.
64 """
65 print("\n" + "=" * 70)
66 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
67 print("=" * 70)
68
69 base_url = "https://api.assemblyai.com"
70 headers = {"authorization": api_key}
71
72 # Upload audio file
73 print(f"\nUploading audio file...")
74
75 if audio_file.startswith("http"):
76 upload_url = audio_file
77 print("Using provided URL")
78 else:
79 with open(audio_file, "rb") as f:
80 response = requests.post(
81 base_url + "/v2/upload",
82 headers=headers,
83 data=f
84 )
85 upload_url = response.json()["upload_url"]
86 print(f"Upload complete")
87
88 # Enable Speaker Labels
89 data = {
90 "audio_url": upload_url,
91 "speaker_labels": True,
92 # "language_detection": True # Enable automatic language detection if your files are in different languages
93 }
94
95 response = requests.post(
96 base_url + "/v2/transcript",
97 json=data,
98 headers=headers
99 )
100 transcript_id = response.json()['id']
101 print(f"Transcript ID: {transcript_id}")
102
103 # Poll for completion
104 print("\nWaiting for transcription to complete...")
105 polling_endpoint = base_url + "/v2/transcript/" + transcript_id
106
107 while True:
108 transcription_result = requests.get(polling_endpoint, headers=headers).json()
109
110 if transcription_result['status'] == 'completed':
111 print("Transcription completed!")
112 break
113 elif transcription_result['status'] == 'error':
114 print(f"Transcription failed: {transcription_result['error']}")
115 return None
116 else:
117 time.sleep(3)
118
119 # Calculate gaps
120 utterances = transcription_result['utterances']
121
122 if len(utterances) < 2:
123 print("⚠ Not enough utterances to analyze gaps (need at least 2)")
124 return None
125
126 gaps = []
127 for i in range(len(utterances) - 1):
128 current_end = utterances[i]['end']
129 next_start = utterances[i + 1]['start']
130 gap = next_start - current_end
131
132 if gap > 0:
133 gaps.append(gap)
134
135 if not gaps:
136 print("⚠ No gaps found between utterances (all speech overlaps)")
137 return None
138
139 # Calculate statistics
140 stats = {
141 'filename': Path(audio_file).name,
142 'average_gap_ms': sum(gaps) / len(gaps),
143 'min_gap_ms': min(gaps),
144 'max_gap_ms': max(gaps),
145 'median_gap_ms': sorted(gaps)[len(gaps) // 2],
146 'total_utterances': len(utterances),
147 'total_gaps': len(gaps),
148 'all_gaps': gaps
149 }
150
151 print(f"\nResults for {stats['filename']}:")
152 print(f" Total utterances: {stats['total_utterances']}")
153 print(f" Total gaps: {stats['total_gaps']}")
154 print(f" Average gap: {stats['average_gap_ms']:.0f} ms")
155 print(f" Median gap: {stats['median_gap_ms']:.0f} ms")
156
157 # Save transcript JSON to file
158 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
159 safe_filename = Path(audio_file).stem.replace(' ', '_')
160 json_filename = f"transcript_{safe_filename}_{timestamp}.json"
161
162 try:
163 with open(json_filename, 'w', encoding='utf-8') as f:
164 json.dump(transcription_result, f, indent=2, ensure_ascii=False)
165 print(f" Transcript saved: {json_filename}")
166 except Exception as e:
167 print(f" Error saving transcript: {e}")
168
169 return stats
170
171
172def analyze_multiple_files(folder_path, api_key):
173 """
174 Analyzes all audio files in a folder and returns aggregated statistics.
175 """
176 print("=" * 70)
177 print("MULTI-FILE UTTERANCE GAP ANALYSIS")
178 print("=" * 70)
179
180 audio_files = get_audio_files(folder_path)
181 total_files = len(audio_files)
182
183 print(f"\nFound {total_files} audio file(s) in: {folder_path}")
184 for i, file in enumerate(audio_files, 1):
185 print(f" {i}. {Path(file).name}")
186
187 # Analyze each file
188 all_file_stats = []
189 all_gaps = []
190
191 for i, audio_file in enumerate(audio_files, 1):
192 try:
193 stats = analyze_single_file(audio_file, api_key, i, total_files)
194 if stats:
195 all_file_stats.append(stats)
196 all_gaps.extend(stats['all_gaps'])
197 except Exception as e:
198 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
199 continue
200
201 if not all_file_stats:
202 print("\n✗ No files were successfully analyzed")
203 return None
204
205 # Calculate aggregated statistics
206 print("\n" + "=" * 70)
207 print("AGGREGATED GAP ANALYSIS RESULTS")
208 print("=" * 70)
209
210 aggregated_stats = {
211 'total_files_analyzed': len(all_file_stats),
212 'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
213 'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
214 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
215 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
216 'overall_min_gap_ms': min(all_gaps),
217 'overall_max_gap_ms': max(all_gaps),
218 'file_averages': [s['average_gap_ms'] for s in all_file_stats],
219 'file_stats': all_file_stats
220 }
221
222 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}")
223 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
224 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}")
225 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
226 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms")
227 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms")
228 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms")
229
230 # Show per-file breakdown
231 print(f"\nPer-file average gaps:")
232 for stat in all_file_stats:
233 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")
234
235 # Calculate variability
236 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
237 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']
238
239 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms")
240 print(f"Variability ratio: {variability_ratio:.2f}x")
241
242 if variability_ratio > 3:
243 print("└─> HIGH variability - mixed conversation patterns across files")
244 elif variability_ratio > 2:
245 print("└─> MODERATE variability - some pattern variation")
246 else:
247 print("└─> LOW variability - consistent conversation rhythm")
248
249 # Save aggregated results
250 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
251 summary_filename = f"aggregated_analysis_{timestamp}.json"
252
253 try:
254 summary_data = {
255 'analysis_date': datetime.now().isoformat(),
256 'folder_path': folder_path,
257 'aggregated_statistics': {
258 'total_files_analyzed': aggregated_stats['total_files_analyzed'],
259 'total_utterances': aggregated_stats['total_utterances'],
260 'total_gaps': aggregated_stats['total_gaps'],
261 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
262 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
263 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
264 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
265 'variability_ratio': variability_ratio
266 },
267 'per_file_results': [
268 {
269 'filename': s['filename'],
270 'average_gap_ms': s['average_gap_ms'],
271 'median_gap_ms': s['median_gap_ms'],
272 'total_utterances': s['total_utterances'],
273 'total_gaps': s['total_gaps']
274 }
275 for s in all_file_stats
276 ]
277 }
278
279 with open(summary_filename, 'w', encoding='utf-8') as f:
280 json.dump(summary_data, f, indent=2, ensure_ascii=False)
281 print(f"\nAggregated analysis saved to: {summary_filename}")
282 except Exception as e:
283 print(f"\nError saving aggregated analysis: {e}")
284
285 return aggregated_stats
286
287
288def determine_streaming_config(aggregated_stats):
289 """
290 Determines optimal Universal-Streaming configuration based on aggregated gap analysis.
291 Returns WebSocket connection parameters.
292 """
293 if aggregated_stats is None:
294 print("\nUsing default balanced configuration (no gap data available)")
295 return {
296 'name': 'Balanced (Default)',
297 'min_turn_silence': 400,
298 'max_turn_silence': 1280,
299 'description': 'Standard configuration for general use'
300 }
301
302 print("\n" + "=" * 70)
303 print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
304 print("=" * 70)
305
306 avg_gap = aggregated_stats['overall_average_gap_ms']
307 num_files = aggregated_stats['total_files_analyzed']
308
309 print(f"\nBased on analysis of {num_files} file(s)")
310 print(f"Overall average gap: {avg_gap:.0f} ms")
311
312 # Determine configuration based on average gap
313 if avg_gap < 500:
314 config = {
315 'name': 'Aggressive',
316 'min_turn_silence': 160,
317 'max_turn_silence': 400,
318 'description': 'Fast-paced conversation with quick turn-taking'
319 }
320 use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
321 elif avg_gap < 1000:
322 config = {
323 'name': 'Balanced',
324 'min_turn_silence': 400,
325 'max_turn_silence': 1280,
326 'description': 'Natural conversation pacing'
327 }
328 use_cases = "General customer support, consultations, standard voice agents"
329 else:
330 config = {
331 'name': 'Conservative',
332 'min_turn_silence': 800,
333 'max_turn_silence': 3600,
334 'description': 'Thoughtful, complex speech with longer pauses'
335 }
336 use_cases = "Technical support, healthcare, legal consultations, troubleshooting"
337
338 print(f"\nSelected Configuration: {config['name']}")
339 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
340 print(f"\nConfiguration Parameters:")
341 print(f" • min_turn_silence: {config['min_turn_silence']} ms")
342 print(f" • max_turn_silence: {config['max_turn_silence']} ms")
343 print(f"\nRecommended use cases: {use_cases}")
344
345 return config
346
347
348# WEBSOCKET HANDLERS WITH OPTIMIZED SETTINGS
349
350def on_open(ws):
351 """Called when the WebSocket connection is established."""
352 print("WebSocket connection opened.")
353 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")
354
355 def stream_audio():
356 global stream
357 print("Starting audio streaming...")
358 while not stop_event.is_set():
359 try:
360 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
361
362 with recording_lock:
363 recorded_frames.append(audio_data)
364
365 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
366 except Exception as e:
367 print(f"Error streaming audio: {e}")
368 break
369 print("Audio streaming stopped.")
370
371 global audio_thread
372 audio_thread = threading.Thread(target=stream_audio)
373 audio_thread.daemon = True
374 audio_thread.start()
375
376def on_message(ws, message):
377 try:
378 data = json.loads(message)
379 msg_type = data.get('type')
380
381 if msg_type == "Begin":
382 session_id = data.get('id')
383 expires_at = data.get('expires_at')
384 print(f"\nSession began: ID={session_id}")
385 print(f" Expires at: {datetime.fromtimestamp(expires_at)}")
386 print(f" Configuration: {OPTIMIZED_CONFIG['name']}")
387 print("\nSpeak now... (Press Ctrl+C to stop)\n")
388
389 elif msg_type == "Turn":
390 transcript = data.get('transcript', '')
391 if data.get('end_of_turn'):
392 print('\r' + ' ' * 80 + '\r', end='')
393 print(f"FINAL: {transcript}")
394 else:
395 print(f"\r partial: {transcript}", end='')
396
397 elif msg_type == "Termination":
398 audio_duration = data.get('audio_duration_seconds', 0)
399 session_duration = data.get('session_duration_seconds', 0)
400 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")
401
402 except json.JSONDecodeError as e:
403 print(f"Error decoding message: {e}")
404 except Exception as e:
405 print(f"Error handling message: {e}")
406
407def on_error(ws, error):
408 """Called when a WebSocket error occurs."""
409 print(f"\nWebSocket Error: {error}")
410 stop_event.set()
411
412def on_close(ws, close_status_code, close_msg):
413 """Called when the WebSocket connection is closed."""
414 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
415
416 global stream, audio
417 stop_event.set()
418
419 if stream:
420 if stream.is_active():
421 stream.stop_stream()
422 stream.close()
423 stream = None
424 if audio:
425 audio.terminate()
426 audio = None
427 if audio_thread and audio_thread.is_alive():
428 audio_thread.join(timeout=1.0)
429
430
431# RUN STREAMING WITH OPTIMIZED CONFIGURATION
432
433def run_streaming(config):
434 """
435 Runs the streaming transcription with optimized turn detection settings.
436 """
437 global audio, stream, ws_app, OPTIMIZED_CONFIG
438
439 OPTIMIZED_CONFIG = config
440
441 print("\n" + "=" * 70)
442 print("STARTING REAL-TIME STREAMING")
443 print("=" * 70)
444
445 # Build connection parameters with optimized settings
446 CONNECTION_PARAMS = {
447 "sample_rate": SAMPLE_RATE,
448 "format_turns": True,
449 "min_turn_silence": str(config['min_turn_silence']),
450 "max_turn_silence": str(config['max_turn_silence'])
451 }
452
453 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
454 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
455
456 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
457 print(f"\nApplied Configuration:")
458 for key, value in CONNECTION_PARAMS.items():
459 print(f" • {key}: {value}")
460
461 # Initialize PyAudio
462 audio = pyaudio.PyAudio()
463
464 # Open microphone stream
465 try:
466 stream = audio.open(
467 input=True,
468 frames_per_buffer=FRAMES_PER_BUFFER,
469 channels=CHANNELS,
470 format=FORMAT,
471 rate=SAMPLE_RATE,
472 )
473 print("\nMicrophone stream opened successfully.")
474 except Exception as e:
475 print(f"Error opening microphone stream: {e}")
476 if audio:
477 audio.terminate()
478 return
479
480 # Create WebSocketApp
481 ws_app = websocket.WebSocketApp(
482 API_ENDPOINT,
483 header={"Authorization": YOUR_API_KEY},
484 on_open=on_open,
485 on_message=on_message,
486 on_error=on_error,
487 on_close=on_close,
488 )
489
490 # Run WebSocketApp in a separate thread
491 ws_thread = threading.Thread(target=ws_app.run_forever)
492 ws_thread.daemon = True
493 ws_thread.start()
494
495 try:
496 while ws_thread.is_alive():
497 time.sleep(0.1)
498 except KeyboardInterrupt:
499 print("\nCtrl+C received. Stopping...")
500 stop_event.set()
501
502 if ws_app and ws_app.sock and ws_app.sock.connected:
503 try:
504 terminate_message = {"type": "Terminate"}
505 print(f"Sending termination message...")
506 ws_app.send(json.dumps(terminate_message))
507 time.sleep(1)
508 except Exception as e:
509 print(f"Error sending termination message: {e}")
510
511 if ws_app:
512 ws_app.close()
513
514 ws_thread.join(timeout=2.0)
515
516 except Exception as e:
517 print(f"\nAn unexpected error occurred: {e}")
518 stop_event.set()
519 if ws_app:
520 ws_app.close()
521 ws_thread.join(timeout=2.0)
522
523 finally:
524 if stream and stream.is_active():
525 stream.stop_stream()
526 if stream:
527 stream.close()
528 if audio:
529 audio.terminate()
530 print("Cleanup complete. Exiting.")
531
532
533# MAIN WORKFLOW
534
535def main():
536 """
537 Main workflow: Analyze multiple files -> Configure -> Run Streaming
538 """
539
540 try:
541 # Step 1: Analyze all audio files in folder
542 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)
543
544 # Step 2: Determine optimal configuration based on aggregated data
545 streaming_config = determine_streaming_config(aggregated_stats)
546
547 # Step 3: Run streaming with optimized settings
548 run_streaming(streaming_config)
549
550 except Exception as e:
551 print(f"\nError in workflow: {str(e)}")
552 raise
553
554
555# EXECUTION
556
557if __name__ == "__main__":
558 main()

Step-By-Step Guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

  1. Install All Required Packages
$pip install requests pyaudio websocket-client
  1. Configuration and Global Variables

Set up API credentials, file paths, audio parameters (16kHz sample rate, mono channel), and initialize global variables for managing WebSocket connections and audio streaming threads.

1import requests
2import time
3import json
4import pyaudio
5import websocket
6import threading
7from urllib.parse import urlencode
8from datetime import datetime
9import os
10from pathlib import Path
11
12
13YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your API key
14AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>" # Folder containing audio files
15
16# Audio Configuration
17SAMPLE_RATE = 16000
18CHANNELS = 1
19FORMAT = pyaudio.paInt16
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21
22# Global variables for audio stream and websocket
23audio = None
24stream = None
25ws_app = None
26audio_thread = None
27stop_event = threading.Event()
28recorded_frames = []
29recording_lock = threading.Lock()
30
31# Store the optimized configuration
32OPTIMIZED_CONFIG = {}
  1. Define get_audio_files() Function

This function scans a specified folder for audio/video files with supported extensions and returns a sorted list of file paths for batch processing.

1def get_audio_files(folder_path):
2 audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
3 '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
4 '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
5 '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
6 '.mov', '.mp4', '.m4v'}
7 folder = Path(folder_path)
8
9 if not folder.exists():
10 raise FileNotFoundError(f"Folder not found: {folder_path}")
11
12 audio_files = [
13 str(f) for f in folder.iterdir()
14 if f.is_file() and f.suffix.lower() in audio_extensions
15 ]
16
17 if not audio_files:
18 raise ValueError(f"No audio files found in {folder_path}")
19
20 return sorted(audio_files)
  1. Define analyze_single_file() Function

This function uploads an audio file to AssemblyAI, requests transcription with speaker labels enabled, polls until completion, then calculates gap statistics between utterances (average, median, min, max) and saves the transcript JSON.

1def analyze_single_file(audio_file, api_key, file_index, total_files):
2 print("\n" + "=" * 70)
3 print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
4 print("=" * 70)
5
6 base_url = "https://api.assemblyai.com"
7 headers = {"authorization": api_key}
8
9 # Upload audio file
10 print(f"\nUploading audio file...")
11
12 if audio_file.startswith("http"):
13 upload_url = audio_file
14 print("Using provided URL")
15 else:
16 with open(audio_file, "rb") as f:
17 response = requests.post(
18 base_url + "/v2/upload",
19 headers=headers,
20 data=f
21 )
22 upload_url = response.json()["upload_url"]
23 print(f"Upload complete")
24
25 # Enable Speaker Labels
26 data = {
27 "audio_url": upload_url,
28 "speaker_labels": True,
29 # "language_detection": True # Enable automatic language detection if your files are in different languages
30 }
31
32 response = requests.post(
33 base_url + "/v2/transcript",
34 json=data,
35 headers=headers
36 )
37 transcript_id = response.json()['id']
38 print(f"Transcript ID: {transcript_id}")
39
40 # Poll for completion
41 print("\nWaiting for transcription to complete...")
42 polling_endpoint = base_url + "/v2/transcript/" + transcript_id
43
44 while True:
45 transcription_result = requests.get(polling_endpoint, headers=headers).json()
46
47 if transcription_result['status'] == 'completed':
48 print("Transcription completed!")
49 break
50 elif transcription_result['status'] == 'error':
51 print(f"Transcription failed: {transcription_result['error']}")
52 return None
53 else:
54 time.sleep(3)
55
56 # Calculate gaps
57 utterances = transcription_result['utterances']
58
59 if len(utterances) < 2:
60 print("⚠ Not enough utterances to analyze gaps (need at least 2)")
61 return None
62
63 gaps = []
64 for i in range(len(utterances) - 1):
65 current_end = utterances[i]['end']
66 next_start = utterances[i + 1]['start']
67 gap = next_start - current_end
68
69 if gap > 0:
70 gaps.append(gap)
71
72 if not gaps:
73 print("⚠ No gaps found between utterances (all speech overlaps)")
74 return None
75
76 # Calculate statistics
77 stats = {
78 'filename': Path(audio_file).name,
79 'average_gap_ms': sum(gaps) / len(gaps),
80 'min_gap_ms': min(gaps),
81 'max_gap_ms': max(gaps),
82 'median_gap_ms': sorted(gaps)[len(gaps) // 2],
83 'total_utterances': len(utterances),
84 'total_gaps': len(gaps),
85 'all_gaps': gaps
86 }
87
88 print(f"\nResults for {stats['filename']}:")
89 print(f" Total utterances: {stats['total_utterances']}")
90 print(f" Total gaps: {stats['total_gaps']}")
91 print(f" Average gap: {stats['average_gap_ms']:.0f} ms")
92 print(f" Median gap: {stats['median_gap_ms']:.0f} ms")
93
94 # Save transcript JSON to file
95 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
96 safe_filename = Path(audio_file).stem.replace(' ', '_')
97 json_filename = f"transcript_{safe_filename}_{timestamp}.json"
98
99 try:
100 with open(json_filename, 'w', encoding='utf-8') as f:
101 json.dump(transcription_result, f, indent=2, ensure_ascii=False)
102 print(f" Transcript saved: {json_filename}")
103 except Exception as e:
104 print(f" Error saving transcript: {e}")
105
106 return stats
  1. Define analyze_multiple_files() Function

This function orchestrates the analysis of all files in a folder by calling analyze_single_file() for each, aggregates all gap data across files, calculates overall statistics, displays per-file breakdowns, and saves a comprehensive summary JSON.

1def analyze_multiple_files(folder_path, api_key):
2 print("=" * 70)
3 print("MULTI-FILE UTTERANCE GAP ANALYSIS")
4 print("=" * 70)
5
6 audio_files = get_audio_files(folder_path)
7 total_files = len(audio_files)
8
9 print(f"\nFound {total_files} audio file(s) in: {folder_path}")
10 for i, file in enumerate(audio_files, 1):
11 print(f" {i}. {Path(file).name}")
12
13 # Analyze each file
14 all_file_stats = []
15 all_gaps = []
16
17 for i, audio_file in enumerate(audio_files, 1):
18 try:
19 stats = analyze_single_file(audio_file, api_key, i, total_files)
20 if stats:
21 all_file_stats.append(stats)
22 all_gaps.extend(stats['all_gaps'])
23 except Exception as e:
24 print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
25 continue
26
27 if not all_file_stats:
28 print("\n✗ No files were successfully analyzed")
29 return None
30
31 # Calculate aggregated statistics
32 print("\n" + "=" * 70)
33 print("AGGREGATED GAP ANALYSIS RESULTS")
34 print("=" * 70)
35
36 aggregated_stats = {
37 'total_files_analyzed': len(all_file_stats),
38 'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
39 'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
40 'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
41 'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
42 'overall_min_gap_ms': min(all_gaps),
43 'overall_max_gap_ms': max(all_gaps),
44 'file_averages': [s['average_gap_ms'] for s in all_file_stats],
45 'file_stats': all_file_stats
46 }
47
48 print(f"\nFiles successfully analyzed: {aggregated_stats['total_files_analyzed']}/{total_files}")
49 print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
50 print(f"Total gaps analyzed: {aggregated_stats['total_gaps']}")
51 print(f"\nOverall average gap: {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
52 print(f"Overall median gap: {aggregated_stats['overall_median_gap_ms']:.0f} ms")
53 print(f"Overall minimum gap: {aggregated_stats['overall_min_gap_ms']:.0f} ms")
54 print(f"Overall maximum gap: {aggregated_stats['overall_max_gap_ms']:.0f} ms")
55
56 # Show per-file breakdown
57 print(f"\nPer-file average gaps:")
58 for stat in all_file_stats:
59 print(f" • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")
60
61 # Calculate variability
62 avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
63 variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']
64
65 print(f"\nAverage of file averages: {avg_of_file_averages:.0f} ms")
66 print(f"Variability ratio: {variability_ratio:.2f}x")
67
68 if variability_ratio > 3:
69 print("└─> HIGH variability - mixed conversation patterns across files")
70 elif variability_ratio > 2:
71 print("└─> MODERATE variability - some pattern variation")
72 else:
73 print("└─> LOW variability - consistent conversation rhythm")
74
75 # Save aggregated results
76 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
77 summary_filename = f"aggregated_analysis_{timestamp}.json"
78
79 try:
80 summary_data = {
81 'analysis_date': datetime.now().isoformat(),
82 'folder_path': folder_path,
83 'aggregated_statistics': {
84 'total_files_analyzed': aggregated_stats['total_files_analyzed'],
85 'total_utterances': aggregated_stats['total_utterances'],
86 'total_gaps': aggregated_stats['total_gaps'],
87 'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
88 'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
89 'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
90 'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
91 'variability_ratio': variability_ratio
92 },
93 'per_file_results': [
94 {
95 'filename': s['filename'],
96 'average_gap_ms': s['average_gap_ms'],
97 'median_gap_ms': s['median_gap_ms'],
98 'total_utterances': s['total_utterances'],
99 'total_gaps': s['total_gaps']
100 }
101 for s in all_file_stats
102 ]
103 }
104
105 with open(summary_filename, 'w', encoding='utf-8') as f:
106 json.dump(summary_data, f, indent=2, ensure_ascii=False)
107 print(f"\nAggregated analysis saved to: {summary_filename}")
108 except Exception as e:
109 print(f"\nError saving aggregated analysis: {e}")
110
111 return aggregated_stats
  1. Define determine_streaming_config() Function

This function takes aggregated gap statistics and selects one of three preset configurations with optimized turn detection parameters for different conversation styles.

1def determine_streaming_config(aggregated_stats):
2 if aggregated_stats is None:
3 print("\nUsing default balanced configuration (no gap data available)")
4 return {
5 'name': 'Balanced (Default)',
6 'min_turn_silence': 400,
7 'max_turn_silence': 1280,
8 'description': 'Standard configuration for general use'
9 }
10
11 print("\n" + "=" * 70)
12 print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
13 print("=" * 70)
14
15 avg_gap = aggregated_stats['overall_average_gap_ms']
16 num_files = aggregated_stats['total_files_analyzed']
17
18 print(f"\nBased on analysis of {num_files} file(s)")
19 print(f"Overall average gap: {avg_gap:.0f} ms")
20
21 # Determine configuration based on average gap
22 if avg_gap < 500:
23 config = {
24 'name': 'Aggressive',
25 'min_turn_silence': 160,
26 'max_turn_silence': 400,
27 'description': 'Fast-paced conversation with quick turn-taking'
28 }
29 use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
30 elif avg_gap < 1000:
31 config = {
32 'name': 'Balanced',
33 'min_turn_silence': 400,
34 'max_turn_silence': 1280,
35 'description': 'Natural conversation pacing'
36 }
37 use_cases = "General customer support, consultations, standard voice agents"
38 else:
39 config = {
40 'name': 'Conservative',
41 'min_turn_silence': 800,
42 'max_turn_silence': 3600,
43 'description': 'Thoughtful, complex speech with longer pauses'
44 }
45 use_cases = "Technical support, healthcare, legal consultations, troubleshooting"
46
47 print(f"\nSelected Configuration: {config['name']}")
48 print(f" Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
49 print(f"\nConfiguration Parameters:")
50 print(f" • min_turn_silence: {config['min_turn_silence']} ms")
51 print(f" • max_turn_silence: {config['max_turn_silence']} ms")
52 print(f"\nRecommended use cases: {use_cases}")
53
54 return config
  1. Create WebSocket Event Handlers (on_open, on_message, on_error, on_close)

These functions manage the real-time streaming connection lifecycle: on_open starts the audio streaming thread, on_message processes transcription results (partial and final turns), and the close/error handlers clean up resources.

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")
5
6 def stream_audio():
7 global stream
8 print("Starting audio streaming...")
9 while not stop_event.is_set():
10 try:
11 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
12
13 with recording_lock:
14 recorded_frames.append(audio_data)
15
16 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
17 except Exception as e:
18 print(f"Error streaming audio: {e}")
19 break
20 print("Audio streaming stopped.")
21
22 global audio_thread
23 audio_thread = threading.Thread(target=stream_audio)
24 audio_thread.daemon = True
25 audio_thread.start()
26
27def on_message(ws, message):
28 try:
29 data = json.loads(message)
30 msg_type = data.get('type')
31
32 if msg_type == "Begin":
33 session_id = data.get('id')
34 expires_at = data.get('expires_at')
35 print(f"\nSession began: ID={session_id}")
36 print(f" Expires at: {datetime.fromtimestamp(expires_at)}")
37 print(f" Configuration: {OPTIMIZED_CONFIG['name']}")
38 print("\nSpeak now... (Press Ctrl+C to stop)\n")
39
40 elif msg_type == "Turn":
41 transcript = data.get('transcript', '')
42 if data.get('end_of_turn'):
43 print('\r' + ' ' * 80 + '\r', end='')
44 print(f"FINAL: {transcript}")
45 else:
46 print(f"\r partial: {transcript}", end='')
47
48 elif msg_type == "Termination":
49 audio_duration = data.get('audio_duration_seconds', 0)
50 session_duration = data.get('session_duration_seconds', 0)
51 print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")
52
53 except json.JSONDecodeError as e:
54 print(f"Error decoding message: {e}")
55 except Exception as e:
56 print(f"Error handling message: {e}")
57
58def on_error(ws, error):
59 """Called when a WebSocket error occurs."""
60 print(f"\nWebSocket Error: {error}")
61 stop_event.set()
62
63def on_close(ws, close_status_code, close_msg):
64 """Called when the WebSocket connection is closed."""
65 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
66
67 global stream, audio
68 stop_event.set()
69
70 if stream:
71 if stream.is_active():
72 stream.stop_stream()
73 stream.close()
74 stream = None
75 if audio:
76 audio.terminate()
77 audio = None
78 if audio_thread and audio_thread.is_alive():
79 audio_thread.join(timeout=1.0)
  1. Define run_streaming() Function

This function initializes PyAudio to capture microphone input, establishes a WebSocket connection with the optimized configuration parameters, and streams audio in real-time while displaying transcription results until the user stops with Ctrl+C.

1def run_streaming(config):
2 global audio, stream, ws_app, OPTIMIZED_CONFIG
3
4 OPTIMIZED_CONFIG = config
5
6 print("\n" + "=" * 70)
7 print("STARTING REAL-TIME STREAMING")
8 print("=" * 70)
9
10 # Build connection parameters with optimized settings
11 CONNECTION_PARAMS = {
12 "sample_rate": SAMPLE_RATE,
13 "format_turns": True,
14 "min_turn_silence": str(config['min_turn_silence']),
15 "max_turn_silence": str(config['max_turn_silence'])
16 }
17
18 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
19 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
20
21 print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
22 print(f"\nApplied Configuration:")
23 for key, value in CONNECTION_PARAMS.items():
24 print(f" • {key}: {value}")
25
26 # Initialize PyAudio
27 audio = pyaudio.PyAudio()
28
29 # Open microphone stream
30 try:
31 stream = audio.open(
32 input=True,
33 frames_per_buffer=FRAMES_PER_BUFFER,
34 channels=CHANNELS,
35 format=FORMAT,
36 rate=SAMPLE_RATE,
37 )
38 print("\nMicrophone stream opened successfully.")
39 except Exception as e:
40 print(f"Error opening microphone stream: {e}")
41 if audio:
42 audio.terminate()
43 return
44
45 # Create WebSocketApp
46 ws_app = websocket.WebSocketApp(
47 API_ENDPOINT,
48 header={"Authorization": YOUR_API_KEY},
49 on_open=on_open,
50 on_message=on_message,
51 on_error=on_error,
52 on_close=on_close,
53 )
54
55 # Run WebSocketApp in a separate thread
56 ws_thread = threading.Thread(target=ws_app.run_forever)
57 ws_thread.daemon = True
58 ws_thread.start()
59
60 try:
61 while ws_thread.is_alive():
62 time.sleep(0.1)
63 except KeyboardInterrupt:
64 print("\nCtrl+C received. Stopping...")
65 stop_event.set()
66
67 if ws_app and ws_app.sock and ws_app.sock.connected:
68 try:
69 terminate_message = {"type": "Terminate"}
70 print(f"Sending termination message...")
71 ws_app.send(json.dumps(terminate_message))
72 time.sleep(1)
73 except Exception as e:
74 print(f"Error sending termination message: {e}")
75
76 if ws_app:
77 ws_app.close()
78
79 ws_thread.join(timeout=2.0)
80
81 except Exception as e:
82 print(f"\nAn unexpected error occurred: {e}")
83 stop_event.set()
84 if ws_app:
85 ws_app.close()
86 ws_thread.join(timeout=2.0)
87
88 finally:
89 if stream and stream.is_active():
90 stream.stop_stream()
91 if stream:
92 stream.close()
93 if audio:
94 audio.terminate()
95 print("Cleanup complete. Exiting.")
  1. Define main() Workflow

Execute the three-step process: analyze all audio files in the folder, determine the best streaming configuration based on aggregated utterance gaps, then launch real-time streaming with the optimized settings.

1def main():
2 try:
3 # Step 1: Analyze all audio files in folder
4 aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)
5
6 # Step 2: Determine optimal configuration based on aggregated data
7 streaming_config = determine_streaming_config(aggregated_stats)
8
9 # Step 3: Run streaming with optimized settings
10 run_streaming(streaming_config)
11
12 except Exception as e:
13 print(f"\nError in workflow: {str(e)}")
14 raise
15
16if __name__ == "__main__":
17 main()