Use this file to discover all available pages before exploring further.
This guide solves the challenge of transcribing system audio, which is can be used for transcribing media content or online calls. By using virtual audio devices, you’ll learn how to easily pipe system audio to AssemblyAI’s transcription API on both Mac and Windows.The key to success lies in creating a virtual input device that captures your speaker output and converts it into an input stream. This approach allows you to bypass the limitations of direct system audio access.For Mac Users: We recommend using BlackHole, a free open-source tool available through Homebrew. BlackHole creates a virtual audio device that can route your system audio to AssemblyAI’s API seamlessly.For Windows Users:Virtual Audio Cable (VAC) is a popular option. While we don’t provide specific Windows instructions in this guide, VAC offers similar functionality to BlackHole for the Windows environment.
import pyaudioimport websocketimport jsonimport threadingimport timefrom urllib.parse import urlencodefrom datetime import datetime# --- Configuration ---YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API keyCONNECTION_PARAMS = { "sample_rate": 16000, "speech_model": "u3-rt-pro",}API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"# Audio ConfigurationFRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]CHANNELS = 1FORMAT = pyaudio.paInt16# Global variables for audio stream and websocketaudio = Nonestream = Nonews_app = Noneaudio_thread = Nonestop_event = threading.Event() # To signal the audio thread to stop# WAV recording variablesrecorded_frames = [] # Store audio frames for WAV filerecording_lock = threading.Lock() # Thread-safe access to recorded_frames# --- BlackHole Device Detection ---def get_blackhole_device_index(): """Find BlackHole audio device index.""" p = pyaudio.PyAudio() blackhole_index = None print("Available audio devices:") for i in range(p.get_device_count()): dev_info = p.get_device_info_by_index(i) print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})") if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0: blackhole_index = i print(f" -> Found BlackHole device at index {i}") p.terminate() return blackhole_index# --- WebSocket Event Handlers ---def on_open(ws): """Called when the WebSocket connection is established.""" print("WebSocket connection opened.") print(f"Connected to: {API_ENDPOINT}") # Start sending audio data in a separate thread def stream_audio(): global stream print("Starting audio streaming...") while not stop_event.is_set(): try: audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) # Store audio data for WAV recording with recording_lock: recorded_frames.append(audio_data) # Send audio data as binary message ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Error streaming audio: {e}") # If stream read fails, likely means it's closed, stop the loop break print("Audio streaming stopped.") global audio_thread audio_thread = threading.Thread(target=stream_audio) audio_thread.daemon = ( True # Allow main thread to exit even if this thread is running ) audio_thread.start()def on_message(ws, message): try: data = json.loads(message) msg_type = data.get('type') if msg_type == "Begin": session_id = data.get('id') expires_at = data.get('expires_at') print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") elif msg_type == "Turn": transcript = data.get('transcript', '') if data.get('end_of_turn'): print('\r' + ' ' * 80 + '\r', end='') print(transcript) else: print(f"\r{transcript}", end='') elif msg_type == "Termination": audio_duration = data.get('audio_duration_seconds', 0) session_duration = data.get('session_duration_seconds', 0) print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") except json.JSONDecodeError as e: print(f"Error decoding message: {e}") except Exception as e: print(f"Error handling message: {e}")def on_error(ws, error): """Called when a WebSocket error occurs.""" print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error stop_event.set()def on_close(ws, close_status_code, close_msg): """Called when the WebSocket connection is closed.""" print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") # Ensure audio resources are released global stream, audio stop_event.set() # Signal audio thread just in case it's still running if stream: if stream.is_active(): stream.stop_stream() stream.close() stream = None if audio: audio.terminate() audio = None # Try to join the audio thread to ensure clean exit if audio_thread and audio_thread.is_alive(): audio_thread.join(timeout=1.0)# --- Main Execution ---def run(): global audio, stream, ws_app # Find BlackHole device blackhole_index = get_blackhole_device_index() if blackhole_index is None: print("Error: BlackHole audio device not found!") print("Please install BlackHole from https://existential.audio/blackhole/") return # Initialize PyAudio audio = pyaudio.PyAudio() # Open Blackhole audio stream try: stream = audio.open( input=True, input_device_index=blackhole_index, # Use BlackHole device frames_per_buffer=FRAMES_PER_BUFFER, channels=CHANNELS, format=FORMAT, rate=SAMPLE_RATE, ) print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).") print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.") print("Make sure audio is routed through BlackHole for transcription.") except Exception as e: print(f"Error opening Blackhole audio stream: {e}") if audio: audio.terminate() return # Exit if blackhole cannot be opened # Create WebSocketApp ws_app = websocket.WebSocketApp( API_ENDPOINT, header={"Authorization": YOUR_API_KEY}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, ) # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt ws_thread = threading.Thread(target=ws_app.run_forever) ws_thread.daemon = True ws_thread.start() try: # Keep main thread alive until interrupted while ws_thread.is_alive(): time.sleep(0.1) except KeyboardInterrupt: print("\nCtrl+C received. Stopping...") stop_event.set() # Signal audio thread to stop # Send termination message to the server if ws_app and ws_app.sock and ws_app.sock.connected: try: terminate_message = {"type": "Terminate"} print(f"Sending termination message: {json.dumps(terminate_message)}") ws_app.send(json.dumps(terminate_message)) # Give a moment for messages to process before forceful close time.sleep(5) except Exception as e: print(f"Error sending termination message: {e}") # Close the WebSocket connection (will trigger on_close) if ws_app: ws_app.close() # Wait for WebSocket thread to finish ws_thread.join(timeout=2.0) except Exception as e: print(f"\nAn unexpected error occurred: {e}") stop_event.set() if ws_app: ws_app.close() ws_thread.join(timeout=2.0) finally: # Final cleanup (already handled in on_close, but good as a fallback) if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")if __name__ == "__main__": run()
Define Function to Find Blackhole Audio Device Index
Define a function called get_blackhole_device_index, which retrieves the device index for your BlackHole virtual input device.
def get_blackhole_device_index(): """Find BlackHole audio device index.""" p = pyaudio.PyAudio() blackhole_index = None print("Available audio devices:") for i in range(p.get_device_count()): dev_info = p.get_device_info_by_index(i) print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})") if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0: blackhole_index = i print(f" -> Found BlackHole device at index {i}") p.terminate() return blackhole_index
def on_open(ws): """Called when the WebSocket connection is established.""" print("WebSocket connection opened.") print(f"Connected to: {API_ENDPOINT}") # Start sending audio data in a separate thread def stream_audio(): global stream print("Starting audio streaming...") while not stop_event.is_set(): try: audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) # Store audio data for WAV recording with recording_lock: recorded_frames.append(audio_data) # Send audio data as binary message ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Error streaming audio: {e}") # If stream read fails, likely means it's closed, stop the loop break print("Audio streaming stopped.") global audio_thread audio_thread = threading.Thread(target=stream_audio) audio_thread.daemon = ( True # Allow main thread to exit even if this thread is running ) audio_thread.start()
def on_close(ws, close_status_code, close_msg): """Called when the WebSocket connection is closed.""" print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") # Ensure audio resources are released global stream, audio stop_event.set() # Signal audio thread just in case it's still running if stream: if stream.is_active(): stream.stop_stream() stream.close() stream = None if audio: audio.terminate() audio = None # Try to join the audio thread to ensure clean exit if audio_thread and audio_thread.is_alive(): audio_thread.join(timeout=1.0)
def on_error(ws, error): """Called when a WebSocket error occurs.""" print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error stop_event.set()
Make sure to find the Blackhole device index and to set it to the input_device_index.
def run(): global audio, stream, ws_app # Find BlackHole device index blackhole_index = get_blackhole_device_index() if blackhole_index is None: print("Error: BlackHole audio device not found!") print("Please install BlackHole from https://existential.audio/blackhole/") return # Initialize PyAudio audio = pyaudio.PyAudio() # Open Blackhole audio stream try: stream = audio.open( input=True, input_device_index=blackhole_index, # Use BlackHole device frames_per_buffer=FRAMES_PER_BUFFER, channels=CHANNELS, format=FORMAT, rate=SAMPLE_RATE, ) print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).") print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.") print("Make sure audio is routed through BlackHole for transcription.") except Exception as e: print(f"Error opening Blackhole audio stream: {e}") if audio: audio.terminate() return # Exit if blackhole cannot be opened # Create WebSocketApp ws_app = websocket.WebSocketApp( API_ENDPOINT, header={"Authorization": YOUR_API_KEY}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, ) # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt ws_thread = threading.Thread(target=ws_app.run_forever) ws_thread.daemon = True ws_thread.start() try: # Keep main thread alive until interrupted while ws_thread.is_alive(): time.sleep(0.1) except KeyboardInterrupt: print("\nCtrl+C received. Stopping...") stop_event.set() # Signal audio thread to stop # Send termination message to the server if ws_app and ws_app.sock and ws_app.sock.connected: try: terminate_message = {"type": "Terminate"} print(f"Sending termination message: {json.dumps(terminate_message)}") ws_app.send(json.dumps(terminate_message)) # Give a moment for messages to process before forceful close time.sleep(5) except Exception as e: print(f"Error sending termination message: {e}") # Close the WebSocket connection (will trigger on_close) if ws_app: ws_app.close() # Wait for WebSocket thread to finish ws_thread.join(timeout=2.0) except Exception as e: print(f"\nAn unexpected error occurred: {e}") stop_event.set() if ws_app: ws_app.close() ws_thread.join(timeout=2.0) finally: # Final cleanup (already handled in on_close, but good as a fallback) if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")if __name__ == "__main__": run()
You need to select BlackHole as your system output device for the audio to be piped correctly
If you still need to hear the audio, you can create a multi-output device on Mac that sends audio to both BlackHole and your speakers/headphones
Here’s how to set it up:
Open “Audio MIDI Setup” (you can find this by searching in Spotlight).
Click the ”+” button in the bottom left corner and choose “Create Multi-Output Device”.
In the list on the right, check both your regular output (e.g., “MacBook Pro Speakers”) and “BlackHole 2ch”.
Optionally, rename this new device to something like “BlackHole + Speakers”. You may need to modify your script to search for this new device.