Skip to main content

Documentation Index

Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

This guide shows how to analyze utterance gaps from multiple pre-recorded audio files to automatically determine optimal turn detection settings for real-time streaming transcription. It processes an entire folder, aggregates gap statistics across all recordings, and configures the WebSocket with parameters tailored to your specific conversation patterns.

Quickstart

import requests
import time
import json
import pyaudio
import websocket
import threading
from urllib.parse import urlencode
from datetime import datetime
import os
from pathlib import Path


YOUR_API_KEY = "<YOUR_API_KEY>"  # Replace with your API key
AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>"  # Folder containing audio files

# Audio Configuration
SAMPLE_RATE = 16000
CHANNELS = 1
FORMAT = pyaudio.paInt16
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
recorded_frames = []
recording_lock = threading.Lock()

# Store the optimized configuration
OPTIMIZED_CONFIG = {}


def get_audio_files(folder_path):
    """
    Gets all audio files from the specified folder.
    Supports all formats accepted by AssemblyAI's API
    """
    audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
    '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
    '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
    '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
    '.mov', '.mp4', '.m4v'}
    folder = Path(folder_path)

    if not folder.exists():
        raise FileNotFoundError(f"Folder not found: {folder_path}")

    audio_files = [
        str(f) for f in folder.iterdir()
        if f.is_file() and f.suffix.lower() in audio_extensions
    ]

    if not audio_files:
        raise ValueError(f"No audio files found in {folder_path}")

    return sorted(audio_files)


def analyze_single_file(audio_file, api_key, file_index, total_files):
    """
    Analyzes a single audio file and returns gap statistics.
    """
    print("\n" + "=" * 70)
    print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
    print("=" * 70)

    base_url = "https://api.assemblyai.com"
    headers = {"authorization": api_key}

    # Upload audio file
    print(f"\nUploading audio file...")

    if audio_file.startswith("http"):
        upload_url = audio_file
        print("Using provided URL")
    else:
        with open(audio_file, "rb") as f:
            response = requests.post(
                base_url + "/v2/upload",
                headers=headers,
                data=f
            )
        upload_url = response.json()["upload_url"]
        print(f"Upload complete")

    # Enable Speaker Labels
    data = {
        "audio_url": upload_url,
        "speaker_labels": True,
        # "language_detection": True # Enable automatic language detection if your files are in different languages
    }

    response = requests.post(
        base_url + "/v2/transcript",
        json=data,
        headers=headers
    )
    transcript_id = response.json()['id']
    print(f"Transcript ID: {transcript_id}")

    # Poll for completion
    print("\nWaiting for transcription to complete...")
    polling_endpoint = base_url + "/v2/transcript/" + transcript_id

    while True:
        transcription_result = requests.get(polling_endpoint, headers=headers).json()

        if transcription_result['status'] == 'completed':
            print("Transcription completed!")
            break
        elif transcription_result['status'] == 'error':
            print(f"Transcription failed: {transcription_result['error']}")
            return None
        else:
            time.sleep(3)

    # Calculate gaps
    utterances = transcription_result['utterances']

    if len(utterances) < 2:
        print("⚠ Not enough utterances to analyze gaps (need at least 2)")
        return None

    gaps = []
    for i in range(len(utterances) - 1):
        current_end = utterances[i]['end']
        next_start = utterances[i + 1]['start']
        gap = next_start - current_end

        if gap > 0:
            gaps.append(gap)

    if not gaps:
        print("⚠ No gaps found between utterances (all speech overlaps)")
        return None

    # Calculate statistics
    stats = {
        'filename': Path(audio_file).name,
        'average_gap_ms': sum(gaps) / len(gaps),
        'min_gap_ms': min(gaps),
        'max_gap_ms': max(gaps),
        'median_gap_ms': sorted(gaps)[len(gaps) // 2],
        'total_utterances': len(utterances),
        'total_gaps': len(gaps),
        'all_gaps': gaps
    }

    print(f"\nResults for {stats['filename']}:")
    print(f"   Total utterances:  {stats['total_utterances']}")
    print(f"   Total gaps:        {stats['total_gaps']}")
    print(f"   Average gap:       {stats['average_gap_ms']:.0f} ms")
    print(f"   Median gap:        {stats['median_gap_ms']:.0f} ms")

    # Save transcript JSON to file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    safe_filename = Path(audio_file).stem.replace(' ', '_')
    json_filename = f"transcript_{safe_filename}_{timestamp}.json"

    try:
        with open(json_filename, 'w', encoding='utf-8') as f:
            json.dump(transcription_result, f, indent=2, ensure_ascii=False)
        print(f"   Transcript saved:  {json_filename}")
    except Exception as e:
        print(f"   Error saving transcript: {e}")

    return stats


def analyze_multiple_files(folder_path, api_key):
    """
    Analyzes all audio files in a folder and returns aggregated statistics.
    """
    print("=" * 70)
    print("MULTI-FILE UTTERANCE GAP ANALYSIS")
    print("=" * 70)

    audio_files = get_audio_files(folder_path)
    total_files = len(audio_files)

    print(f"\nFound {total_files} audio file(s) in: {folder_path}")
    for i, file in enumerate(audio_files, 1):
        print(f"   {i}. {Path(file).name}")

    # Analyze each file
    all_file_stats = []
    all_gaps = []

    for i, audio_file in enumerate(audio_files, 1):
        try:
            stats = analyze_single_file(audio_file, api_key, i, total_files)
            if stats:
                all_file_stats.append(stats)
                all_gaps.extend(stats['all_gaps'])
        except Exception as e:
            print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
            continue

    if not all_file_stats:
        print("\n✗ No files were successfully analyzed")
        return None

    # Calculate aggregated statistics
    print("\n" + "=" * 70)
    print("AGGREGATED GAP ANALYSIS RESULTS")
    print("=" * 70)

    aggregated_stats = {
        'total_files_analyzed': len(all_file_stats),
        'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
        'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
        'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
        'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
        'overall_min_gap_ms': min(all_gaps),
        'overall_max_gap_ms': max(all_gaps),
        'file_averages': [s['average_gap_ms'] for s in all_file_stats],
        'file_stats': all_file_stats
    }

    print(f"\nFiles successfully analyzed:  {aggregated_stats['total_files_analyzed']}/{total_files}")
    print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
    print(f"Total gaps analyzed:          {aggregated_stats['total_gaps']}")
    print(f"\nOverall average gap:          {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
    print(f"Overall median gap:           {aggregated_stats['overall_median_gap_ms']:.0f} ms")
    print(f"Overall minimum gap:          {aggregated_stats['overall_min_gap_ms']:.0f} ms")
    print(f"Overall maximum gap:          {aggregated_stats['overall_max_gap_ms']:.0f} ms")

    # Show per-file breakdown
    print(f"\nPer-file average gaps:")
    for stat in all_file_stats:
        print(f"   • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")

    # Calculate variability
    avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
    variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']

    print(f"\nAverage of file averages:     {avg_of_file_averages:.0f} ms")
    print(f"Variability ratio:            {variability_ratio:.2f}x")

    if variability_ratio > 3:
        print("└─> HIGH variability - mixed conversation patterns across files")
    elif variability_ratio > 2:
        print("└─> MODERATE variability - some pattern variation")
    else:
        print("└─> LOW variability - consistent conversation rhythm")

    # Save aggregated results
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    summary_filename = f"aggregated_analysis_{timestamp}.json"

    try:
        summary_data = {
            'analysis_date': datetime.now().isoformat(),
            'folder_path': folder_path,
            'aggregated_statistics': {
                'total_files_analyzed': aggregated_stats['total_files_analyzed'],
                'total_utterances': aggregated_stats['total_utterances'],
                'total_gaps': aggregated_stats['total_gaps'],
                'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
                'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
                'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
                'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
                'variability_ratio': variability_ratio
            },
            'per_file_results': [
                {
                    'filename': s['filename'],
                    'average_gap_ms': s['average_gap_ms'],
                    'median_gap_ms': s['median_gap_ms'],
                    'total_utterances': s['total_utterances'],
                    'total_gaps': s['total_gaps']
                }
                for s in all_file_stats
            ]
        }

        with open(summary_filename, 'w', encoding='utf-8') as f:
            json.dump(summary_data, f, indent=2, ensure_ascii=False)
        print(f"\nAggregated analysis saved to: {summary_filename}")
    except Exception as e:
        print(f"\nError saving aggregated analysis: {e}")

    return aggregated_stats


def determine_streaming_config(aggregated_stats):
    """
    Determines optimal Universal-Streaming configuration based on aggregated gap analysis.
    Returns WebSocket connection parameters.
    """
    if aggregated_stats is None:
        print("\nUsing default balanced configuration (no gap data available)")
        return {
            'name': 'Balanced (Default)',
            'min_turn_silence': 400,
            'max_turn_silence': 1280,
            'description': 'Standard configuration for general use'
        }

    print("\n" + "=" * 70)
    print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
    print("=" * 70)

    avg_gap = aggregated_stats['overall_average_gap_ms']
    num_files = aggregated_stats['total_files_analyzed']

    print(f"\nBased on analysis of {num_files} file(s)")
    print(f"Overall average gap: {avg_gap:.0f} ms")

    # Determine configuration based on average gap
    if avg_gap < 500:
        config = {
            'name': 'Aggressive',
            'min_turn_silence': 160,
            'max_turn_silence': 400,
            'description': 'Fast-paced conversation with quick turn-taking'
        }
        use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
    elif avg_gap < 1000:
        config = {
            'name': 'Balanced',
            'min_turn_silence': 400,
            'max_turn_silence': 1280,
            'description': 'Natural conversation pacing'
        }
        use_cases = "General customer support, consultations, standard voice agents"
    else:
        config = {
            'name': 'Conservative',
            'min_turn_silence': 800,
            'max_turn_silence': 3600,
            'description': 'Thoughtful, complex speech with longer pauses'
        }
        use_cases = "Technical support, healthcare, legal consultations, troubleshooting"

    print(f"\nSelected Configuration: {config['name']}")
    print(f"   Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
    print(f"\nConfiguration Parameters:")
    print(f"   • min_turn_silence: {config['min_turn_silence']} ms")
    print(f"   • max_turn_silence: {config['max_turn_silence']} ms")
    print(f"\nRecommended use cases: {use_cases}")

    return config


#  WEBSOCKET HANDLERS WITH OPTIMIZED SETTINGS

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")

    def stream_audio():
        global stream
        print("Starting audio streaming...")
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)

                with recording_lock:
                    recorded_frames.append(audio_data)

                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}")
            print(f"   Expires at: {datetime.fromtimestamp(expires_at)}")
            print(f"   Configuration: {OPTIMIZED_CONFIG['name']}")
            print("\nSpeak now... (Press Ctrl+C to stop)\n")

        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(f"FINAL: {transcript}")
            else:
                print(f"\r  partial: {transcript}", end='')

        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")

    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()

def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")

    global stream, audio
    stop_event.set()

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)


# RUN STREAMING WITH OPTIMIZED CONFIGURATION

def run_streaming(config):
    """
    Runs the streaming transcription with optimized turn detection settings.
    """
    global audio, stream, ws_app, OPTIMIZED_CONFIG

    OPTIMIZED_CONFIG = config

    print("\n" + "=" * 70)
    print("STARTING REAL-TIME STREAMING")
    print("=" * 70)

    # Build connection parameters with optimized settings
    CONNECTION_PARAMS = {
        "sample_rate": SAMPLE_RATE,
        "format_turns": True,
        "min_turn_silence": str(config['min_turn_silence']),
        "max_turn_silence": str(config['max_turn_silence'])
    }

    API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
    API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

    print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
    print(f"\nApplied Configuration:")
    for key, value in CONNECTION_PARAMS.items():
        print(f"   • {key}: {value}")

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("\nMicrophone stream opened successfully.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": YOUR_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message...")
                ws_app.send(json.dumps(terminate_message))
                time.sleep(1)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()

        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")


# MAIN WORKFLOW

def main():
    """
    Main workflow: Analyze multiple files -> Configure -> Run Streaming
    """

    try:
        # Step 1: Analyze all audio files in folder
        aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)

        # Step 2: Determine optimal configuration based on aggregated data
        streaming_config = determine_streaming_config(aggregated_stats)

        # Step 3: Run streaming with optimized settings
        run_streaming(streaming_config)

    except Exception as e:
        print(f"\nError in workflow: {str(e)}")
        raise


# EXECUTION

if __name__ == "__main__":
    main()

Step-By-Step Guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
  1. Install All Required Packages
pip install requests pyaudio websocket-client
  1. Configuration and Global Variables
Set up API credentials, file paths, audio parameters (16kHz sample rate, mono channel), and initialize global variables for managing WebSocket connections and audio streaming threads.
import requests
import time
import json
import pyaudio
import websocket
import threading
from urllib.parse import urlencode
from datetime import datetime
import os
from pathlib import Path


YOUR_API_KEY = "<YOUR_API_KEY>"  # Replace with your API key
AUDIO_FOLDER_PATH = "<YOUR_AUDIO_FILE_FOLDER>"  # Folder containing audio files

# Audio Configuration
SAMPLE_RATE = 16000
CHANNELS = 1
FORMAT = pyaudio.paInt16
FRAMES_PER_BUFFER = 800  # 50ms of audio (0.05s * 16000Hz)

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
recorded_frames = []
recording_lock = threading.Lock()

# Store the optimized configuration
OPTIMIZED_CONFIG = {}
  1. Define get_audio_files() Function
This function scans a specified folder for audio/video files with supported extensions and returns a sorted list of file paths for batch processing.
def get_audio_files(folder_path):
    audio_extensions = {'.aac', '.ac3', '.aif', '.aiff', '.alac', '.amr', '.ape',
    '.au', '.dss', '.flac', '.m4a', '.m4b', '.m4p', '.mp3',
    '.mpga', '.ogg', '.oga', '.mogg', '.opus', '.qcp', '.tta',
    '.voc', '.wav', '.wv', '.webm', '.MTS', '.M2TS', '.TS',
    '.mov', '.mp4', '.m4v'}
    folder = Path(folder_path)

    if not folder.exists():
        raise FileNotFoundError(f"Folder not found: {folder_path}")

    audio_files = [
        str(f) for f in folder.iterdir()
        if f.is_file() and f.suffix.lower() in audio_extensions
    ]

    if not audio_files:
        raise ValueError(f"No audio files found in {folder_path}")

    return sorted(audio_files)
  1. Define analyze_single_file() Function
This function uploads an audio file to AssemblyAI, requests transcription with speaker labels enabled, polls until completion, then calculates gap statistics between utterances (average, median, min, max) and saves the transcript JSON.
def analyze_single_file(audio_file, api_key, file_index, total_files):
    print("\n" + "=" * 70)
    print(f"ANALYZING FILE {file_index}/{total_files}: {Path(audio_file).name}")
    print("=" * 70)

    base_url = "https://api.assemblyai.com"
    headers = {"authorization": api_key}

    # Upload audio file
    print(f"\nUploading audio file...")

    if audio_file.startswith("http"):
        upload_url = audio_file
        print("Using provided URL")
    else:
        with open(audio_file, "rb") as f:
            response = requests.post(
                base_url + "/v2/upload",
                headers=headers,
                data=f
            )
        upload_url = response.json()["upload_url"]
        print(f"Upload complete")

    # Enable Speaker Labels
    data = {
        "audio_url": upload_url,
        "speaker_labels": True,
        # "language_detection": True # Enable automatic language detection if your files are in different languages
    }

    response = requests.post(
        base_url + "/v2/transcript",
        json=data,
        headers=headers
    )
    transcript_id = response.json()['id']
    print(f"Transcript ID: {transcript_id}")

    # Poll for completion
    print("\nWaiting for transcription to complete...")
    polling_endpoint = base_url + "/v2/transcript/" + transcript_id

    while True:
        transcription_result = requests.get(polling_endpoint, headers=headers).json()

        if transcription_result['status'] == 'completed':
            print("Transcription completed!")
            break
        elif transcription_result['status'] == 'error':
            print(f"Transcription failed: {transcription_result['error']}")
            return None
        else:
            time.sleep(3)

    # Calculate gaps
    utterances = transcription_result['utterances']

    if len(utterances) < 2:
        print("⚠ Not enough utterances to analyze gaps (need at least 2)")
        return None

    gaps = []
    for i in range(len(utterances) - 1):
        current_end = utterances[i]['end']
        next_start = utterances[i + 1]['start']
        gap = next_start - current_end

        if gap > 0:
            gaps.append(gap)

    if not gaps:
        print("⚠ No gaps found between utterances (all speech overlaps)")
        return None

    # Calculate statistics
    stats = {
        'filename': Path(audio_file).name,
        'average_gap_ms': sum(gaps) / len(gaps),
        'min_gap_ms': min(gaps),
        'max_gap_ms': max(gaps),
        'median_gap_ms': sorted(gaps)[len(gaps) // 2],
        'total_utterances': len(utterances),
        'total_gaps': len(gaps),
        'all_gaps': gaps
    }

    print(f"\nResults for {stats['filename']}:")
    print(f"   Total utterances:  {stats['total_utterances']}")
    print(f"   Total gaps:        {stats['total_gaps']}")
    print(f"   Average gap:       {stats['average_gap_ms']:.0f} ms")
    print(f"   Median gap:        {stats['median_gap_ms']:.0f} ms")

    # Save transcript JSON to file
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    safe_filename = Path(audio_file).stem.replace(' ', '_')
    json_filename = f"transcript_{safe_filename}_{timestamp}.json"

    try:
        with open(json_filename, 'w', encoding='utf-8') as f:
            json.dump(transcription_result, f, indent=2, ensure_ascii=False)
        print(f"   Transcript saved:  {json_filename}")
    except Exception as e:
        print(f"   Error saving transcript: {e}")

    return stats
  1. Define analyze_multiple_files() Function
This function orchestrates the analysis of all files in a folder by calling analyze_single_file() for each, aggregates all gap data across files, calculates overall statistics, displays per-file breakdowns, and saves a comprehensive summary JSON.
def analyze_multiple_files(folder_path, api_key):
    print("=" * 70)
    print("MULTI-FILE UTTERANCE GAP ANALYSIS")
    print("=" * 70)

    audio_files = get_audio_files(folder_path)
    total_files = len(audio_files)

    print(f"\nFound {total_files} audio file(s) in: {folder_path}")
    for i, file in enumerate(audio_files, 1):
        print(f"   {i}. {Path(file).name}")

    # Analyze each file
    all_file_stats = []
    all_gaps = []

    for i, audio_file in enumerate(audio_files, 1):
        try:
            stats = analyze_single_file(audio_file, api_key, i, total_files)
            if stats:
                all_file_stats.append(stats)
                all_gaps.extend(stats['all_gaps'])
        except Exception as e:
            print(f"\n✗ Error analyzing {Path(audio_file).name}: {str(e)}")
            continue

    if not all_file_stats:
        print("\n✗ No files were successfully analyzed")
        return None

    # Calculate aggregated statistics
    print("\n" + "=" * 70)
    print("AGGREGATED GAP ANALYSIS RESULTS")
    print("=" * 70)

    aggregated_stats = {
        'total_files_analyzed': len(all_file_stats),
        'total_utterances': sum(s['total_utterances'] for s in all_file_stats),
        'total_gaps': sum(s['total_gaps'] for s in all_file_stats),
        'overall_average_gap_ms': sum(all_gaps) / len(all_gaps),
        'overall_median_gap_ms': sorted(all_gaps)[len(all_gaps) // 2],
        'overall_min_gap_ms': min(all_gaps),
        'overall_max_gap_ms': max(all_gaps),
        'file_averages': [s['average_gap_ms'] for s in all_file_stats],
        'file_stats': all_file_stats
    }

    print(f"\nFiles successfully analyzed:  {aggregated_stats['total_files_analyzed']}/{total_files}")
    print(f"Total utterances (all files): {aggregated_stats['total_utterances']}")
    print(f"Total gaps analyzed:          {aggregated_stats['total_gaps']}")
    print(f"\nOverall average gap:          {aggregated_stats['overall_average_gap_ms']:.0f} ms ({aggregated_stats['overall_average_gap_ms']/1000:.2f} seconds)")
    print(f"Overall median gap:           {aggregated_stats['overall_median_gap_ms']:.0f} ms")
    print(f"Overall minimum gap:          {aggregated_stats['overall_min_gap_ms']:.0f} ms")
    print(f"Overall maximum gap:          {aggregated_stats['overall_max_gap_ms']:.0f} ms")

    # Show per-file breakdown
    print(f"\nPer-file average gaps:")
    for stat in all_file_stats:
        print(f"   • {stat['filename']:<40} {stat['average_gap_ms']:>6.0f} ms")

    # Calculate variability
    avg_of_file_averages = sum(aggregated_stats['file_averages']) / len(aggregated_stats['file_averages'])
    variability_ratio = aggregated_stats['overall_max_gap_ms'] / aggregated_stats['overall_average_gap_ms']

    print(f"\nAverage of file averages:     {avg_of_file_averages:.0f} ms")
    print(f"Variability ratio:            {variability_ratio:.2f}x")

    if variability_ratio > 3:
        print("└─> HIGH variability - mixed conversation patterns across files")
    elif variability_ratio > 2:
        print("└─> MODERATE variability - some pattern variation")
    else:
        print("└─> LOW variability - consistent conversation rhythm")

    # Save aggregated results
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    summary_filename = f"aggregated_analysis_{timestamp}.json"

    try:
        summary_data = {
            'analysis_date': datetime.now().isoformat(),
            'folder_path': folder_path,
            'aggregated_statistics': {
                'total_files_analyzed': aggregated_stats['total_files_analyzed'],
                'total_utterances': aggregated_stats['total_utterances'],
                'total_gaps': aggregated_stats['total_gaps'],
                'overall_average_gap_ms': aggregated_stats['overall_average_gap_ms'],
                'overall_median_gap_ms': aggregated_stats['overall_median_gap_ms'],
                'overall_min_gap_ms': aggregated_stats['overall_min_gap_ms'],
                'overall_max_gap_ms': aggregated_stats['overall_max_gap_ms'],
                'variability_ratio': variability_ratio
            },
            'per_file_results': [
                {
                    'filename': s['filename'],
                    'average_gap_ms': s['average_gap_ms'],
                    'median_gap_ms': s['median_gap_ms'],
                    'total_utterances': s['total_utterances'],
                    'total_gaps': s['total_gaps']
                }
                for s in all_file_stats
            ]
        }

        with open(summary_filename, 'w', encoding='utf-8') as f:
            json.dump(summary_data, f, indent=2, ensure_ascii=False)
        print(f"\nAggregated analysis saved to: {summary_filename}")
    except Exception as e:
        print(f"\nError saving aggregated analysis: {e}")

    return aggregated_stats
  1. Define determine_streaming_config() Function
This function takes aggregated gap statistics and selects one of three preset configurations with optimized turn detection parameters for different conversation styles.
def determine_streaming_config(aggregated_stats):
    if aggregated_stats is None:
        print("\nUsing default balanced configuration (no gap data available)")
        return {
            'name': 'Balanced (Default)',
            'min_turn_silence': 400,
            'max_turn_silence': 1280,
            'description': 'Standard configuration for general use'
        }

    print("\n" + "=" * 70)
    print("DETERMINING OPTIMAL STREAMING CONFIGURATION")
    print("=" * 70)

    avg_gap = aggregated_stats['overall_average_gap_ms']
    num_files = aggregated_stats['total_files_analyzed']

    print(f"\nBased on analysis of {num_files} file(s)")
    print(f"Overall average gap: {avg_gap:.0f} ms")

    # Determine configuration based on average gap
    if avg_gap < 500:
        config = {
            'name': 'Aggressive',
            'min_turn_silence': 160,
            'max_turn_silence': 400,
            'description': 'Fast-paced conversation with quick turn-taking'
        }
        use_cases = "IVR systems, order confirmations, yes/no queries, retail support"
    elif avg_gap < 1000:
        config = {
            'name': 'Balanced',
            'min_turn_silence': 400,
            'max_turn_silence': 1280,
            'description': 'Natural conversation pacing'
        }
        use_cases = "General customer support, consultations, standard voice agents"
    else:
        config = {
            'name': 'Conservative',
            'min_turn_silence': 800,
            'max_turn_silence': 3600,
            'description': 'Thoughtful, complex speech with longer pauses'
        }
        use_cases = "Technical support, healthcare, legal consultations, troubleshooting"

    print(f"\nSelected Configuration: {config['name']}")
    print(f"   Reasoning: Average gap of {avg_gap:.0f}ms indicates {config['description']}")
    print(f"\nConfiguration Parameters:")
    print(f"   • min_turn_silence: {config['min_turn_silence']} ms")
    print(f"   • max_turn_silence: {config['max_turn_silence']} ms")
    print(f"\nRecommended use cases: {use_cases}")

    return config
  1. Create WebSocket Event Handlers (on_open, on_message, on_error, on_close)
These functions manage the real-time streaming connection lifecycle: on_open starts the audio streaming thread, on_message processes transcription results (partial and final turns), and the close/error handlers clean up resources.
def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Using optimized {OPTIMIZED_CONFIG['name']} configuration")

    def stream_audio():
        global stream
        print("Starting audio streaming...")
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)

                with recording_lock:
                    recorded_frames.append(audio_data)

                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break
        print("Audio streaming stopped.")

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get('type')

        if msg_type == "Begin":
            session_id = data.get('id')
            expires_at = data.get('expires_at')
            print(f"\nSession began: ID={session_id}")
            print(f"   Expires at: {datetime.fromtimestamp(expires_at)}")
            print(f"   Configuration: {OPTIMIZED_CONFIG['name']}")
            print("\nSpeak now... (Press Ctrl+C to stop)\n")

        elif msg_type == "Turn":
            transcript = data.get('transcript', '')
            if data.get('end_of_turn'):
                print('\r' + ' ' * 80 + '\r', end='')
                print(f"FINAL: {transcript}")
            else:
                print(f"\r  partial: {transcript}", end='')

        elif msg_type == "Termination":
            audio_duration = data.get('audio_duration_seconds', 0)
            session_duration = data.get('session_duration_seconds', 0)
            print(f"\nSession Terminated: Audio={audio_duration}s, Session={session_duration}s")

    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    stop_event.set()

def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")

    global stream, audio
    stop_event.set()

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)
  1. Define run_streaming() Function
This function initializes PyAudio to capture microphone input, establishes a WebSocket connection with the optimized configuration parameters, and streams audio in real-time while displaying transcription results until the user stops with Ctrl+C.
def run_streaming(config):
    global audio, stream, ws_app, OPTIMIZED_CONFIG

    OPTIMIZED_CONFIG = config

    print("\n" + "=" * 70)
    print("STARTING REAL-TIME STREAMING")
    print("=" * 70)

    # Build connection parameters with optimized settings
    CONNECTION_PARAMS = {
        "sample_rate": SAMPLE_RATE,
        "format_turns": True,
        "min_turn_silence": str(config['min_turn_silence']),
        "max_turn_silence": str(config['max_turn_silence'])
    }

    API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
    API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

    print(f"\nWebSocket Endpoint: {API_ENDPOINT_BASE_URL}")
    print(f"\nApplied Configuration:")
    for key, value in CONNECTION_PARAMS.items():
        print(f"   • {key}: {value}")

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
        )
        print("\nMicrophone stream opened successfully.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": YOUR_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread
    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()

        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                terminate_message = {"type": "Terminate"}
                print(f"Sending termination message...")
                ws_app.send(json.dumps(terminate_message))
                time.sleep(1)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        if ws_app:
            ws_app.close()

        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")
  1. Define main() Workflow
Execute the three-step process: analyze all audio files in the folder, determine the best streaming configuration based on aggregated utterance gaps, then launch real-time streaming with the optimized settings.
def main():
    try:
        # Step 1: Analyze all audio files in folder
        aggregated_stats = analyze_multiple_files(AUDIO_FOLDER_PATH, YOUR_API_KEY)

        # Step 2: Determine optimal configuration based on aggregated data
        streaming_config = determine_streaming_config(aggregated_stats)

        # Step 3: Run streaming with optimized settings
        run_streaming(streaming_config)

    except Exception as e:
        print(f"\nError in workflow: {str(e)}")
        raise

if __name__ == "__main__":
    main()