Skip to main content

Documentation Index

Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

This guide will demonstrate how to build an advanced speaker recognition and diarization system that you can use to identify speakers across multiple audio files. It will use:
  • AssemblyAI for transcription and initial diarization.
  • Nvidia’s TitaNet model for speaker embedding generation.
  • Pinecone for efficient similarity search of speaker embeddings.

Quickstart

from pinecone import Pinecone, ServerlessSpec
import assemblyai as aai
import requests
import os
from pydub import AudioSegment
import mimetypes
import wave
from nemo.collections.asr.models import EncDecSpeakerLabelModel
import torch
import numpy as np
import uuid
from sklearn.metrics.pairwise import cosine_similarity

# Obtain from your Pinecone dashboard.
pc = Pinecone(api_key="PINECONE_KEY_HERE")

# Obtain from your AssemblyAI dashboard.
aai.settings.api_key = "AAI_KEY_HERE"

def transcribe(file_url):
    config = aai.TranscriptionConfig(speaker_labels=True, speech_models=["universal-3-pro", "universal-2"])  # Speaker labels must be enabled for this Cookbook.

    transcriber = aai.Transcriber(config=config)

    transcript = transcriber.transcribe(file_url)

    return transcript.json_response

def download_and_convert_to_wav(url, output_dir="./content/converted_audio"):
    # Create the output directory if it doesn't exist.
    os.makedirs(output_dir, exist_ok=True)

    # Extract filename from URL.
    filename = url.split("/")[-1].split("?")[0]
    base_filename, file_extension = os.path.splitext(filename)

    # Download the file.
    response = requests.get(url)
    if response.status_code == 200:
        # Determine the file type.
        content_type = response.headers.get("content-type")
        if content_type:
            guessed_extension = mimetypes.guess_extension(content_type)
            if guessed_extension:
                file_extension = guessed_extension

        # Save the downloaded file.
        downloaded_file = os.path.join(output_dir, filename)
        with open(downloaded_file, "wb") as f:
            f.write(response.content)

        # Generate the WAV file name.
        wav_filename = f"{base_filename}.wav"
        wav_file = os.path.join(output_dir, wav_filename)

        # Load the audio file.
        audio = AudioSegment.from_file(downloaded_file)

        # Convert to mono if it's stereo.
        if audio.channels > 1:
            print("Setting channels to 1.")
            audio = audio.set_channels(1)

        # Export as WAV.
        audio.export(wav_file, format="wav")
        print(f"File converted and saved as: {wav_file}")

        # Remove the original downloaded file if it's different from the WAV file.
        if downloaded_file != wav_file:
            os.remove(downloaded_file)

        # Ensure the WAV file is single channel.
        with wave.open(wav_file, "rb") as wf:
            n_channels = wf.getnchannels()
            if n_channels > 1:
                print(f"Converting {n_channels} channels to mono...")
                # Read the frames.
                frames = wf.readframes(wf.getnframes())
                # Get other parameters.
                params = wf.getparams()
                # Close the file.
                wf.close()
                # Convert to mono.
                mono_frames = b"".join([frames[i::n_channels] for i in range(n_channels)])
                # Write the mono WAV file.
                with wave.open(wav_file, "wb") as wf:
                    wf.setparams(
                        (1, params.sampwidth, params.framerate, params.nframes, params.comptype, params.compname)
                    )
                    wf.writeframes(mono_frames)
                print("Conversion to mono complete.")

        return wav_file
    else:
        print(f"Failed to download the file. Status code: {response.status_code}")
        return None

def add_speaker_embedding_to_pinecone(speaker_name, speaker_embedding, unique_id=None):
    # Ensure the embedding is a 1D numpy array.
    if isinstance(speaker_embedding, torch.Tensor):
        embedding_np = speaker_embedding.squeeze().cpu().numpy()
    elif isinstance(speaker_embedding, np.ndarray):
        embedding_np = speaker_embedding.squeeze()
    else:
        raise ValueError("Unsupported embedding type. Expected torch.Tensor or numpy.ndarray")

    # Ensure the embedding is the correct shape
    if embedding_np.shape != (192,):
        raise ValueError(f"Expected embedding of shape (192,), but got {embedding_np.shape}")

    # Convert to list for Pinecone
    embedding_list = embedding_np.tolist()

    # Generate a unique ID if not provided
    if unique_id is None:
        unique_id = f"speaker_{speaker_name}_{uuid.uuid4().hex[:8]}"

    # Create the metadata dictionary
    metadata = {"speaker_name": speaker_name}

    # Upsert the vector to Pinecone
    upsert_response = index.upsert(vectors=[(unique_id, embedding_list, metadata)])

    print(f"Upserted embedding for speaker {speaker_name} with ID {unique_id}")
    return unique_id

def find_closest_speaker(utterance_embedding, local_embeddings=None, local_only=False, threshold=0.5):
    def cosine_sim(a, b):
        return cosine_similarity(a.reshape(1, -1), b.reshape(1, -1))[0][0]

    best_match = {"speaker_name": "No match found", "score": 0}

    # Local embeddings processing.
    if local_embeddings is not None:
        for speaker_name, embedding in local_embeddings.items():
            score = cosine_sim(utterance_embedding, embedding)
            if score > best_match["score"]:
                print("Identified speaker " + speaker_name + " confidence " + str(score))
                best_match = {"speaker_name": speaker_name, "score": score}

    # Pinecone query (if not local_only and local_embeddings is empty or not provided)
    if not local_only and (local_embeddings is None or len(local_embeddings) == 0):
        results = index.query(vector=utterance_embedding.tolist(), top_k=1, include_metadata=True)
        if results["matches"]:
            pinecone_match = results["matches"][0]
            pinecone_score = pinecone_match["score"]
            if pinecone_score > best_match["score"]:
                best_match = {"speaker_name": pinecone_match["metadata"]["speaker_name"], "score": pinecone_score}

    # Check if the best match meets the threshold.
    if best_match["score"] < threshold:
        return "No match found", 0

    return best_match["speaker_name"], best_match["score"]

def identify_speakers_from_utterances(transcript, wav_file, min_utterance_length=5000, match_all_utterances=False):
    utterances = transcript["utterances"]
    speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")

    known_speakers = {}
    unknown_speakers = {}
    unknown_count = 0

    unknown_folder = "unknown_speaker_utterances"
    os.makedirs(unknown_folder, exist_ok=True)

    audio_file_name = os.path.basename(wav_file)
    full_audio = AudioSegment.from_wav(wav_file)

    def get_suitable_utterance(speaker, min_length):
        suitable_utterances = [
            u for u in utterances if u["speaker"] == speaker and (u["end"] - u["start"]) >= min_length
        ]
        if suitable_utterances:
            return max(suitable_utterances, key=lambda u: u["end"] - u["start"])
        return max((u for u in utterances if u["speaker"] == speaker), key=lambda u: u["end"] - u["start"])

    # First pass: Identify speakers.
    for speaker in set(u["speaker"] for u in utterances):
        if speaker not in known_speakers and speaker not in unknown_speakers:
            suitable_utterance = get_suitable_utterance(speaker, min_utterance_length)

            start_ms = suitable_utterance["start"]
            end_ms = suitable_utterance["end"]
            utterance_audio = full_audio[start_ms:end_ms]

            temp_wav = "temp_utterance.wav"
            utterance_audio.export(temp_wav, format="wav")
            embedding = speaker_model.get_embedding(temp_wav)
            os.remove(temp_wav)

            speaker_name, score = find_closest_speaker(embedding)
            print(f"Speaker: {speaker}, Closest match: {speaker_name}, Score: {score}")

            if score > 0.5:  # Adjust threshold as needed.
                known_speakers[speaker] = speaker_name
                print(f"Identified as known speaker: {speaker}")
            else:
                unknown_count += 1
                unknown_name = f"Unknown Speaker {chr(64 + unknown_count)}"
                unknown_wav = f"{unknown_folder}/unknown_speaker_{unknown_count}_from_{audio_file_name}"
                utterance_audio.export(unknown_wav, format="wav")
                unknown_speakers[speaker] = {
                    "name": unknown_name,
                    "wav_file": unknown_wav,
                    "duration": end_ms - start_ms,
                }
                print(f"New unknown speaker detected: {unknown_name} (Duration: {(end_ms - start_ms)/1000:.2f}s)")

    # Second pass: Replace speaker names.
    for utterance in utterances:
        if utterance["speaker"] in known_speakers:
            utterance["speaker"] in known_speakers[utterance["speaker"]]
        elif utterance["speaker"] in unknown_speakers:
            utterance["speaker"] = unknown_speakers[utterance["speaker"]]["name"]

    # Third pass: Match all utterances if requested.
    if match_all_utterances:
        print("Matching all utterances individually...")
        for utterance in utterances:
            start_ms = utterance["start"]
            end_ms = utterance["end"]
            utterance_audio = full_audio[start_ms:end_ms]

            temp_wav = "temp_utterance.wav"
            utterance_audio.export(temp_wav, format="wav")
            embedding = speaker_model.get_embedding(temp_wav)
            os.remove(temp_wav)

            new_speaker_name, score = find_closest_speaker(embedding)

            if score > 0.5 and new_speaker_name != utterance["speaker"]:
                print(f"Speaker change detected: '{utterance['speaker']}' -> '{new_speaker_name}' (Score: {score})")
                print(f"Utterance: {utterance['text'][:50]}...")
                utterance["speaker"] = new_speaker_name

    return utterances, unknown_speakers

pc.create_index(
    name="speaker-embeddings",
    dimension=192,  # Replace with model-specific dimensions - 192 is for TitaNet-Large.
    metric="cosine",  # Replace with your model metric.
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1",
    ),
)

# Connect to our new index.
index = pc.Index("speaker-embeddings")

speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")

elon_fingerprint = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
altman_fingerprint = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/sam_altman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L3NhbV9hbHRtYW5fZmluZ2VycHJpbnQubXAzIiwiaWF0IjoxNzAwNjY5NjI0LCJleHAiOjE3MzIyMDU2MjR9._1yuMGzBhFcHr7xv76160Hb_SC-mH_Wv3_qX-S7XsTU&t=2023-11-22T16%3A13%3A44.103Z"
)
lex_fingerprint = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/lex_fridman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2xleF9mcmlkbWFuX2ZpbmdlcnByaW50Lm1wMyIsImlhdCI6MTcwMDY2OTc4OSwiZXhwIjoxNzMyMjA1Nzg5fQ.PUWTOLIHl4dcrWjh2ZJ_2TBaxMXpcU-x6OcvUDe6ZXQ&t=2023-11-22T16%3A16%3A29.697Z"
)

known_speakers = {"Elon Musk": elon_fingerprint, "Sam Altman": altman_fingerprint, "Lex Fridman": lex_fingerprint}

# Upload the known speakers.
for speaker, audio_file in known_speakers.items():
    print("***")
    print(speaker)
    print(audio_file)
    embedding = speaker_model.get_embedding(audio_file)
    add_speaker_embedding_to_pinecone(speaker, embedding)

audio_file = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
utterance_embedding = speaker_model.get_embedding(audio_file)

results = index.query(vector=utterance_embedding.tolist(), top_k=3, include_metadata=True)
print(results)

# Example: Conversation Between Sam Altman and Elon Musk
transcript_obj = transcribe("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
wav_file = download_and_convert_to_wav("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
identified_utterances, unknown_speakers = identify_speakers_from_utterances(transcript_obj, wav_file)

for utterance in identified_utterances:
    print(f"{utterance['speaker']}: {utterance['text']}")

Initial Setup

First, you’ll need to sign up for an AssemblyAI account and obtain your API key from your account dashboard. Then, sign up for a Pinecone account and obtain your API key from “API Keys” on the sidebar of your dashboard. Also note that any files you use for this Cookbook should be in WAV format. While not a requirement for AssemblyAI, TitaNet requires WAV format.

Installing Dependencies

Now we’ll need to install the necessary libraries and frameworks for this project. Please note that this process can take several minutes to complete.
pip install -U Cython torch nemo_toolkit ffmpeg pydub pinecone-client assemblyai hydra-core pytorch_lightning huggingface_hub==0.23.5 librosa transformers pandas inflect webdataset sentencepiece youtokentome pyannote-audio editdistance jiwer lhotse datasets

Pinecone Setup

In this section, we’ll import Pinecone, create a new index for our speaker embeddings, and connect to the index. Please enter your Pinecone API key in the placeholder below.
from pinecone import Pinecone, ServerlessSpec

# Obtain from your Pinecone dashboard.
pc = Pinecone(api_key="PINECONE_KEY_HERE")

pc.create_index(
    name="speaker-embeddings",
    dimension=192,  # Replace with model-specific dimensions - 192 is for TitaNet-Large.
    metric="cosine",  # Replace with your model metric.
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1",
    ),
)

# Connect to our new index.
index = pc.Index("speaker-embeddings")

AssemblyAI Setup

Now we’ll set up AssemblyAI for transcription and diarization. We’ll import the necessary modules and create a function to transcribe our audio files with speaker labels enabled. Please enter your AssemblyAI API key in the cell below.
import assemblyai as aai

aai.settings.api_key = "AAI_KEY_HERE"


def transcribe(file_url):
    config = aai.TranscriptionConfig(speaker_labels=True, speech_models=["universal-3-pro", "universal-2"])  # Speaker labels must be enabled for this Cookbook.

    transcriber = aai.Transcriber(config=config)

    transcript = transcriber.transcribe(file_url)

    return transcript.json_response
We’ll also need to create a download_and_convert_to_wav helper function. This function allows us to take file URLs, download them, then convert them to WAV format. If the URLs are already in WAV format, then they’re just downloaded. The files must be in WAV format to work properly with the TitaNet.
import requests
import os
from pydub import AudioSegment
import mimetypes
import wave


def download_and_convert_to_wav(url, output_dir="./content/converted_audio"):
    # Create the output directory if it doesn't exist.
    os.makedirs(output_dir, exist_ok=True)

    # Extract filename from URL.
    filename = url.split("/")[-1].split("?")[0]
    base_filename, file_extension = os.path.splitext(filename)

    # Download the file.
    response = requests.get(url)
    if response.status_code == 200:
        # Determine the file type.
        content_type = response.headers.get("content-type")
        if content_type:
            guessed_extension = mimetypes.guess_extension(content_type)
            if guessed_extension:
                file_extension = guessed_extension

        # Save the downloaded file.
        downloaded_file = os.path.join(output_dir, filename)
        with open(downloaded_file, "wb") as f:
            f.write(response.content)

        # Generate the WAV file name.
        wav_filename = f"{base_filename}.wav"
        wav_file = os.path.join(output_dir, wav_filename)

        # Load the audio file.
        audio = AudioSegment.from_file(downloaded_file)

        # Convert to mono if it's stereo.
        if audio.channels > 1:
            print("Setting channels to 1.")
            audio = audio.set_channels(1)

        # Export as WAV.
        audio.export(wav_file, format="wav")
        print(f"File converted and saved as: {wav_file}")

        # Remove the original downloaded file if it's different from the WAV file.
        if downloaded_file != wav_file:
            os.remove(downloaded_file)

        # Ensure the WAV file is single channel.
        with wave.open(wav_file, "rb") as wf:
            n_channels = wf.getnchannels()
            if n_channels > 1:
                print(f"Converting {n_channels} channels to mono...")
                # Read the frames.
                frames = wf.readframes(wf.getnframes())
                # Get other parameters.
                params = wf.getparams()
                # Close the file.
                wf.close()
                # Convert to mono.
                mono_frames = b"".join([frames[i::n_channels] for i in range(n_channels)])
                # Write the mono WAV file.
                with wave.open(wav_file, "wb") as wf:
                    wf.setparams(
                        (1, params.sampwidth, params.framerate, params.nframes, params.comptype, params.compname)
                    )
                    wf.writeframes(mono_frames)
                print("Conversion to mono complete.")

        return wav_file
    else:
        print(f"Failed to download the file. Status code: {response.status_code}")
        return None

NVIDIA’s TitaNet Model Setup

Next we’ll import torch and nemo, then connect to and load NVIDIA’s TitaNet model. This model allows us to generate speaker embeddings to create speaker fingerprints. It also enables the conversion of utterances into embeddings for comparison with our fingerprints.
from nemo.collections.asr.models import EncDecSpeakerLabelModel

speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")
We’ll now define an add_speaker_embedding_to_pinecone function to add our speaker embeddings to the Pinecone database.
import torch
import numpy as np
import uuid


def add_speaker_embedding_to_pinecone(speaker_name, speaker_embedding, unique_id=None):
    # Ensure the embedding is a 1D numpy array.
    if isinstance(speaker_embedding, torch.Tensor):
        embedding_np = speaker_embedding.squeeze().cpu().numpy()
    elif isinstance(speaker_embedding, np.ndarray):
        embedding_np = speaker_embedding.squeeze()
    else:
        raise ValueError("Unsupported embedding type. Expected torch.Tensor or numpy.ndarray")

    # Ensure the embedding is the correct shape
    if embedding_np.shape != (192,):
        raise ValueError(f"Expected embedding of shape (192,), but got {embedding_np.shape}")

    # Convert to list for Pinecone
    embedding_list = embedding_np.tolist()

    # Generate a unique ID if not provided
    if unique_id is None:
        unique_id = f"speaker_{speaker_name}_{uuid.uuid4().hex[:8]}"

    # Create the metadata dictionary
    metadata = {"speaker_name": speaker_name}

    # Upsert the vector to Pinecone
    upsert_response = index.upsert(vectors=[(unique_id, embedding_list, metadata)])

    print(f"Upserted embedding for speaker {speaker_name} with ID {unique_id}")
    return unique_id

Add Thumbprints to our Pinecone Database

Below we’ll use chunks of the speakers’ conversations to generate speaker embeddings and add them to our vector database. Later on, we’ll show how to take an audio file with speakers not in the vector database and obtain the data required to generate new speaker fingerprints to be uploaded to the Pinecone database.
elon_fingerprint = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
altman_fingerprint = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/sam_altman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L3NhbV9hbHRtYW5fZmluZ2VycHJpbnQubXAzIiwiaWF0IjoxNzAwNjY5NjI0LCJleHAiOjE3MzIyMDU2MjR9._1yuMGzBhFcHr7xv76160Hb_SC-mH_Wv3_qX-S7XsTU&t=2023-11-22T16%3A13%3A44.103Z"
)
lex_fingerprint = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/lex_fridman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2xleF9mcmlkbWFuX2ZpbmdlcnByaW50Lm1wMyIsImlhdCI6MTcwMDY2OTc4OSwiZXhwIjoxNzMyMjA1Nzg5fQ.PUWTOLIHl4dcrWjh2ZJ_2TBaxMXpcU-x6OcvUDe6ZXQ&t=2023-11-22T16%3A16%3A29.697Z"
)

known_speakers = {"Elon Musk": elon_fingerprint, "Sam Altman": altman_fingerprint, "Lex Fridman": lex_fingerprint}

# Upload the known speakers.
for speaker, audio_file in known_speakers.items():
    print("***")
    print(speaker)
    print(audio_file)
    embedding = speaker_model.get_embedding(audio_file)
    add_speaker_embedding_to_pinecone(speaker, embedding)
Now we can query our Pinecone database to ensure that our embeddings were uploaded successfully.
audio_file = download_and_convert_to_wav(
    "https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
utterance_embedding = speaker_model.get_embedding(audio_file)

results = index.query(vector=utterance_embedding.tolist(), top_k=3, include_metadata=True)

print(results)

Creating Functions to Find the Closest Speaker and Identify Speakers of Utterances

Speaker Identification Function

The find_closest_speaker function is a crucial component of our speaker identification system. It compares a given utterance embedding to known speaker embeddings and identifies the closest match.
from sklearn.metrics.pairwise import cosine_similarity


def find_closest_speaker(utterance_embedding, local_embeddings=None, local_only=False, threshold=0.5):
    def cosine_sim(a, b):
        return cosine_similarity(a.reshape(1, -1), b.reshape(1, -1))[0][0]

    best_match = {"speaker_name": "No match found", "score": 0}

    # Local embeddings processing.
    if local_embeddings is not None:
        for speaker_name, embedding in local_embeddings.items():
            score = cosine_sim(utterance_embedding, embedding)
            if score > best_match["score"]:
                print("Identified speaker " + speaker_name + " confidence " + str(score))
                best_match = {"speaker_name": speaker_name, "score": score}

    # Pinecone query (if not local_only and local_embeddings is empty or not provided)
    if not local_only and (local_embeddings is None or len(local_embeddings) == 0):
        results = index.query(vector=utterance_embedding.tolist(), top_k=1, include_metadata=True)
        if results["matches"]:
            pinecone_match = results["matches"][0]
            pinecone_score = pinecone_match["score"]
            if pinecone_score > best_match["score"]:
                best_match = {"speaker_name": pinecone_match["metadata"]["speaker_name"], "score": pinecone_score}

    # Check if the best match meets the threshold.
    if best_match["score"] < threshold:
        return "No match found", 0

    return best_match["speaker_name"], best_match["score"]

Speaker Identification from Utterances

The identify_speakers_from_utterances function is the core of our speaker identification system. It processes a transcript with utterances and identifies speakers, handling both known and unknown voices.
def identify_speakers_from_utterances(transcript, wav_file, min_utterance_length=5000, match_all_utterances=False):
    utterances = transcript["utterances"]
    speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")

    known_speakers = {}
    unknown_speakers = {}
    unknown_count = 0

    unknown_folder = "unknown_speaker_utterances"
    os.makedirs(unknown_folder, exist_ok=True)

    audio_file_name = os.path.basename(wav_file)
    full_audio = AudioSegment.from_wav(wav_file)

    def get_suitable_utterance(speaker, min_length):
        suitable_utterances = [
            u for u in utterances if u["speaker"] == speaker and (u["end"] - u["start"]) >= min_length
        ]
        if suitable_utterances:
            return max(suitable_utterances, key=lambda u: u["end"] - u["start"])
        return max((u for u in utterances if u["speaker"] == speaker), key=lambda u: u["end"] - u["start"])

    # First pass: Identify speakers.
    for speaker in set(u["speaker"] for u in utterances):
        if speaker not in known_speakers and speaker not in unknown_speakers:
            suitable_utterance = get_suitable_utterance(speaker, min_utterance_length)

            start_ms = suitable_utterance["start"]
            end_ms = suitable_utterance["end"]
            utterance_audio = full_audio[start_ms:end_ms]

            temp_wav = "temp_utterance.wav"
            utterance_audio.export(temp_wav, format="wav")
            embedding = speaker_model.get_embedding(temp_wav)
            os.remove(temp_wav)

            speaker_name, score = find_closest_speaker(embedding)
            print(f"Speaker: {speaker}, Closest match: {speaker_name}, Score: {score}")

            if score > 0.5:  # Adjust threshold as needed.
                known_speakers[speaker] = speaker_name
                print(f"Identified as known speaker: {speaker}")
            else:
                unknown_count += 1
                unknown_name = f"Unknown Speaker {chr(64 + unknown_count)}"
                unknown_wav = f"{unknown_folder}/unknown_speaker_{unknown_count}_from_{audio_file_name}"
                utterance_audio.export(unknown_wav, format="wav")
                unknown_speakers[speaker] = {
                    "name": unknown_name,
                    "wav_file": unknown_wav,
                    "duration": end_ms - start_ms,
                }
                print(f"New unknown speaker detected: {unknown_name} (Duration: {(end_ms - start_ms)/1000:.2f}s)")

    # Second pass: Replace speaker names.
    for utterance in utterances:
        if utterance["speaker"] in known_speakers:
            utterance["speaker"] in known_speakers[utterance["speaker"]]
        elif utterance["speaker"] in unknown_speakers:
            utterance["speaker"] = unknown_speakers[utterance["speaker"]]["name"]

    # Third pass: Match all utterances if requested.
    if match_all_utterances:
        print("Matching all utterances individually...")
        for utterance in utterances:
            start_ms = utterance["start"]
            end_ms = utterance["end"]
            utterance_audio = full_audio[start_ms:end_ms]

            temp_wav = "temp_utterance.wav"
            utterance_audio.export(temp_wav, format="wav")
            embedding = speaker_model.get_embedding(temp_wav)
            os.remove(temp_wav)

            new_speaker_name, score = find_closest_speaker(embedding)

            if score > 0.5 and new_speaker_name != utterance["speaker"]:
                print(f"Speaker change detected: '{utterance['speaker']}' -> '{new_speaker_name}' (Score: {score})")
                print(f"Utterance: {utterance['text'][:50]}...")
                utterance["speaker"] = new_speaker_name

    return utterances, unknown_speakers

Examples: Speaker Identification and Diarization

To demonstrate the capabilities of our speaker identification and diarization system, we’ll cover several examples. We’ll start with a straightforward case and progressively move to more complex scenarios.

Example 1: Conversation Between Sam Altman and Elon Musk

Our first example is a simple conversation between two well-known figures: Elon Musk and Sam Altman. This example will showcase how our system performs with clear, distinct voices in a controlled setting.
transcript_obj = transcribe("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
wav_file = download_and_convert_to_wav("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
identified_utterances, unknown_speakers = identify_speakers_from_utterances(transcript_obj, wav_file)

for utterance in identified_utterances:
    print(f"{utterance['speaker']}: {utterance['text']}")