This guide will demonstrate how to build an advanced speaker recognition and diarization system that you can use to identify speakers across multiple audio files. It will use:Documentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
- AssemblyAI for transcription and initial diarization.
- Nvidia’s TitaNet model for speaker embedding generation.
- Pinecone for efficient similarity search of speaker embeddings.
Quickstart
from pinecone import Pinecone, ServerlessSpec
import assemblyai as aai
import requests
import os
from pydub import AudioSegment
import mimetypes
import wave
from nemo.collections.asr.models import EncDecSpeakerLabelModel
import torch
import numpy as np
import uuid
from sklearn.metrics.pairwise import cosine_similarity
# Obtain from your Pinecone dashboard.
pc = Pinecone(api_key="PINECONE_KEY_HERE")
# Obtain from your AssemblyAI dashboard.
aai.settings.api_key = "AAI_KEY_HERE"
def transcribe(file_url):
config = aai.TranscriptionConfig(speaker_labels=True, speech_models=["universal-3-pro", "universal-2"]) # Speaker labels must be enabled for this Cookbook.
transcriber = aai.Transcriber(config=config)
transcript = transcriber.transcribe(file_url)
return transcript.json_response
def download_and_convert_to_wav(url, output_dir="./content/converted_audio"):
# Create the output directory if it doesn't exist.
os.makedirs(output_dir, exist_ok=True)
# Extract filename from URL.
filename = url.split("/")[-1].split("?")[0]
base_filename, file_extension = os.path.splitext(filename)
# Download the file.
response = requests.get(url)
if response.status_code == 200:
# Determine the file type.
content_type = response.headers.get("content-type")
if content_type:
guessed_extension = mimetypes.guess_extension(content_type)
if guessed_extension:
file_extension = guessed_extension
# Save the downloaded file.
downloaded_file = os.path.join(output_dir, filename)
with open(downloaded_file, "wb") as f:
f.write(response.content)
# Generate the WAV file name.
wav_filename = f"{base_filename}.wav"
wav_file = os.path.join(output_dir, wav_filename)
# Load the audio file.
audio = AudioSegment.from_file(downloaded_file)
# Convert to mono if it's stereo.
if audio.channels > 1:
print("Setting channels to 1.")
audio = audio.set_channels(1)
# Export as WAV.
audio.export(wav_file, format="wav")
print(f"File converted and saved as: {wav_file}")
# Remove the original downloaded file if it's different from the WAV file.
if downloaded_file != wav_file:
os.remove(downloaded_file)
# Ensure the WAV file is single channel.
with wave.open(wav_file, "rb") as wf:
n_channels = wf.getnchannels()
if n_channels > 1:
print(f"Converting {n_channels} channels to mono...")
# Read the frames.
frames = wf.readframes(wf.getnframes())
# Get other parameters.
params = wf.getparams()
# Close the file.
wf.close()
# Convert to mono.
mono_frames = b"".join([frames[i::n_channels] for i in range(n_channels)])
# Write the mono WAV file.
with wave.open(wav_file, "wb") as wf:
wf.setparams(
(1, params.sampwidth, params.framerate, params.nframes, params.comptype, params.compname)
)
wf.writeframes(mono_frames)
print("Conversion to mono complete.")
return wav_file
else:
print(f"Failed to download the file. Status code: {response.status_code}")
return None
def add_speaker_embedding_to_pinecone(speaker_name, speaker_embedding, unique_id=None):
# Ensure the embedding is a 1D numpy array.
if isinstance(speaker_embedding, torch.Tensor):
embedding_np = speaker_embedding.squeeze().cpu().numpy()
elif isinstance(speaker_embedding, np.ndarray):
embedding_np = speaker_embedding.squeeze()
else:
raise ValueError("Unsupported embedding type. Expected torch.Tensor or numpy.ndarray")
# Ensure the embedding is the correct shape
if embedding_np.shape != (192,):
raise ValueError(f"Expected embedding of shape (192,), but got {embedding_np.shape}")
# Convert to list for Pinecone
embedding_list = embedding_np.tolist()
# Generate a unique ID if not provided
if unique_id is None:
unique_id = f"speaker_{speaker_name}_{uuid.uuid4().hex[:8]}"
# Create the metadata dictionary
metadata = {"speaker_name": speaker_name}
# Upsert the vector to Pinecone
upsert_response = index.upsert(vectors=[(unique_id, embedding_list, metadata)])
print(f"Upserted embedding for speaker {speaker_name} with ID {unique_id}")
return unique_id
def find_closest_speaker(utterance_embedding, local_embeddings=None, local_only=False, threshold=0.5):
def cosine_sim(a, b):
return cosine_similarity(a.reshape(1, -1), b.reshape(1, -1))[0][0]
best_match = {"speaker_name": "No match found", "score": 0}
# Local embeddings processing.
if local_embeddings is not None:
for speaker_name, embedding in local_embeddings.items():
score = cosine_sim(utterance_embedding, embedding)
if score > best_match["score"]:
print("Identified speaker " + speaker_name + " confidence " + str(score))
best_match = {"speaker_name": speaker_name, "score": score}
# Pinecone query (if not local_only and local_embeddings is empty or not provided)
if not local_only and (local_embeddings is None or len(local_embeddings) == 0):
results = index.query(vector=utterance_embedding.tolist(), top_k=1, include_metadata=True)
if results["matches"]:
pinecone_match = results["matches"][0]
pinecone_score = pinecone_match["score"]
if pinecone_score > best_match["score"]:
best_match = {"speaker_name": pinecone_match["metadata"]["speaker_name"], "score": pinecone_score}
# Check if the best match meets the threshold.
if best_match["score"] < threshold:
return "No match found", 0
return best_match["speaker_name"], best_match["score"]
def identify_speakers_from_utterances(transcript, wav_file, min_utterance_length=5000, match_all_utterances=False):
utterances = transcript["utterances"]
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")
known_speakers = {}
unknown_speakers = {}
unknown_count = 0
unknown_folder = "unknown_speaker_utterances"
os.makedirs(unknown_folder, exist_ok=True)
audio_file_name = os.path.basename(wav_file)
full_audio = AudioSegment.from_wav(wav_file)
def get_suitable_utterance(speaker, min_length):
suitable_utterances = [
u for u in utterances if u["speaker"] == speaker and (u["end"] - u["start"]) >= min_length
]
if suitable_utterances:
return max(suitable_utterances, key=lambda u: u["end"] - u["start"])
return max((u for u in utterances if u["speaker"] == speaker), key=lambda u: u["end"] - u["start"])
# First pass: Identify speakers.
for speaker in set(u["speaker"] for u in utterances):
if speaker not in known_speakers and speaker not in unknown_speakers:
suitable_utterance = get_suitable_utterance(speaker, min_utterance_length)
start_ms = suitable_utterance["start"]
end_ms = suitable_utterance["end"]
utterance_audio = full_audio[start_ms:end_ms]
temp_wav = "temp_utterance.wav"
utterance_audio.export(temp_wav, format="wav")
embedding = speaker_model.get_embedding(temp_wav)
os.remove(temp_wav)
speaker_name, score = find_closest_speaker(embedding)
print(f"Speaker: {speaker}, Closest match: {speaker_name}, Score: {score}")
if score > 0.5: # Adjust threshold as needed.
known_speakers[speaker] = speaker_name
print(f"Identified as known speaker: {speaker}")
else:
unknown_count += 1
unknown_name = f"Unknown Speaker {chr(64 + unknown_count)}"
unknown_wav = f"{unknown_folder}/unknown_speaker_{unknown_count}_from_{audio_file_name}"
utterance_audio.export(unknown_wav, format="wav")
unknown_speakers[speaker] = {
"name": unknown_name,
"wav_file": unknown_wav,
"duration": end_ms - start_ms,
}
print(f"New unknown speaker detected: {unknown_name} (Duration: {(end_ms - start_ms)/1000:.2f}s)")
# Second pass: Replace speaker names.
for utterance in utterances:
if utterance["speaker"] in known_speakers:
utterance["speaker"] in known_speakers[utterance["speaker"]]
elif utterance["speaker"] in unknown_speakers:
utterance["speaker"] = unknown_speakers[utterance["speaker"]]["name"]
# Third pass: Match all utterances if requested.
if match_all_utterances:
print("Matching all utterances individually...")
for utterance in utterances:
start_ms = utterance["start"]
end_ms = utterance["end"]
utterance_audio = full_audio[start_ms:end_ms]
temp_wav = "temp_utterance.wav"
utterance_audio.export(temp_wav, format="wav")
embedding = speaker_model.get_embedding(temp_wav)
os.remove(temp_wav)
new_speaker_name, score = find_closest_speaker(embedding)
if score > 0.5 and new_speaker_name != utterance["speaker"]:
print(f"Speaker change detected: '{utterance['speaker']}' -> '{new_speaker_name}' (Score: {score})")
print(f"Utterance: {utterance['text'][:50]}...")
utterance["speaker"] = new_speaker_name
return utterances, unknown_speakers
pc.create_index(
name="speaker-embeddings",
dimension=192, # Replace with model-specific dimensions - 192 is for TitaNet-Large.
metric="cosine", # Replace with your model metric.
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
),
)
# Connect to our new index.
index = pc.Index("speaker-embeddings")
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")
elon_fingerprint = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
altman_fingerprint = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/sam_altman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L3NhbV9hbHRtYW5fZmluZ2VycHJpbnQubXAzIiwiaWF0IjoxNzAwNjY5NjI0LCJleHAiOjE3MzIyMDU2MjR9._1yuMGzBhFcHr7xv76160Hb_SC-mH_Wv3_qX-S7XsTU&t=2023-11-22T16%3A13%3A44.103Z"
)
lex_fingerprint = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/lex_fridman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2xleF9mcmlkbWFuX2ZpbmdlcnByaW50Lm1wMyIsImlhdCI6MTcwMDY2OTc4OSwiZXhwIjoxNzMyMjA1Nzg5fQ.PUWTOLIHl4dcrWjh2ZJ_2TBaxMXpcU-x6OcvUDe6ZXQ&t=2023-11-22T16%3A16%3A29.697Z"
)
known_speakers = {"Elon Musk": elon_fingerprint, "Sam Altman": altman_fingerprint, "Lex Fridman": lex_fingerprint}
# Upload the known speakers.
for speaker, audio_file in known_speakers.items():
print("***")
print(speaker)
print(audio_file)
embedding = speaker_model.get_embedding(audio_file)
add_speaker_embedding_to_pinecone(speaker, embedding)
audio_file = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
utterance_embedding = speaker_model.get_embedding(audio_file)
results = index.query(vector=utterance_embedding.tolist(), top_k=3, include_metadata=True)
print(results)
# Example: Conversation Between Sam Altman and Elon Musk
transcript_obj = transcribe("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
wav_file = download_and_convert_to_wav("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
identified_utterances, unknown_speakers = identify_speakers_from_utterances(transcript_obj, wav_file)
for utterance in identified_utterances:
print(f"{utterance['speaker']}: {utterance['text']}")
Initial Setup
First, you’ll need to sign up for an AssemblyAI account and obtain your API key from your account dashboard. Then, sign up for a Pinecone account and obtain your API key from “API Keys” on the sidebar of your dashboard. Also note that any files you use for this Cookbook should be in WAV format. While not a requirement for AssemblyAI, TitaNet requires WAV format.Installing Dependencies
Now we’ll need to install the necessary libraries and frameworks for this project. Please note that this process can take several minutes to complete.pip install -U Cython torch nemo_toolkit ffmpeg pydub pinecone-client assemblyai hydra-core pytorch_lightning huggingface_hub==0.23.5 librosa transformers pandas inflect webdataset sentencepiece youtokentome pyannote-audio editdistance jiwer lhotse datasets
Pinecone Setup
In this section, we’ll import Pinecone, create a new index for our speaker embeddings, and connect to the index. Please enter your Pinecone API key in the placeholder below.from pinecone import Pinecone, ServerlessSpec
# Obtain from your Pinecone dashboard.
pc = Pinecone(api_key="PINECONE_KEY_HERE")
pc.create_index(
name="speaker-embeddings",
dimension=192, # Replace with model-specific dimensions - 192 is for TitaNet-Large.
metric="cosine", # Replace with your model metric.
spec=ServerlessSpec(
cloud="aws",
region="us-east-1",
),
)
# Connect to our new index.
index = pc.Index("speaker-embeddings")
AssemblyAI Setup
Now we’ll set up AssemblyAI for transcription and diarization. We’ll import the necessary modules and create a function to transcribe our audio files with speaker labels enabled. Please enter your AssemblyAI API key in the cell below.import assemblyai as aai
aai.settings.api_key = "AAI_KEY_HERE"
def transcribe(file_url):
config = aai.TranscriptionConfig(speaker_labels=True, speech_models=["universal-3-pro", "universal-2"]) # Speaker labels must be enabled for this Cookbook.
transcriber = aai.Transcriber(config=config)
transcript = transcriber.transcribe(file_url)
return transcript.json_response
download_and_convert_to_wav helper function. This function allows us to take file URLs, download them, then convert them to WAV format. If the URLs are already in WAV format, then they’re just downloaded. The files must be in WAV format to work properly with the TitaNet.
import requests
import os
from pydub import AudioSegment
import mimetypes
import wave
def download_and_convert_to_wav(url, output_dir="./content/converted_audio"):
# Create the output directory if it doesn't exist.
os.makedirs(output_dir, exist_ok=True)
# Extract filename from URL.
filename = url.split("/")[-1].split("?")[0]
base_filename, file_extension = os.path.splitext(filename)
# Download the file.
response = requests.get(url)
if response.status_code == 200:
# Determine the file type.
content_type = response.headers.get("content-type")
if content_type:
guessed_extension = mimetypes.guess_extension(content_type)
if guessed_extension:
file_extension = guessed_extension
# Save the downloaded file.
downloaded_file = os.path.join(output_dir, filename)
with open(downloaded_file, "wb") as f:
f.write(response.content)
# Generate the WAV file name.
wav_filename = f"{base_filename}.wav"
wav_file = os.path.join(output_dir, wav_filename)
# Load the audio file.
audio = AudioSegment.from_file(downloaded_file)
# Convert to mono if it's stereo.
if audio.channels > 1:
print("Setting channels to 1.")
audio = audio.set_channels(1)
# Export as WAV.
audio.export(wav_file, format="wav")
print(f"File converted and saved as: {wav_file}")
# Remove the original downloaded file if it's different from the WAV file.
if downloaded_file != wav_file:
os.remove(downloaded_file)
# Ensure the WAV file is single channel.
with wave.open(wav_file, "rb") as wf:
n_channels = wf.getnchannels()
if n_channels > 1:
print(f"Converting {n_channels} channels to mono...")
# Read the frames.
frames = wf.readframes(wf.getnframes())
# Get other parameters.
params = wf.getparams()
# Close the file.
wf.close()
# Convert to mono.
mono_frames = b"".join([frames[i::n_channels] for i in range(n_channels)])
# Write the mono WAV file.
with wave.open(wav_file, "wb") as wf:
wf.setparams(
(1, params.sampwidth, params.framerate, params.nframes, params.comptype, params.compname)
)
wf.writeframes(mono_frames)
print("Conversion to mono complete.")
return wav_file
else:
print(f"Failed to download the file. Status code: {response.status_code}")
return None
NVIDIA’s TitaNet Model Setup
Next we’ll importtorch and nemo, then connect to and load NVIDIA’s TitaNet model. This model allows us to generate speaker embeddings to create speaker fingerprints. It also enables the conversion of utterances into embeddings for comparison with our fingerprints.
from nemo.collections.asr.models import EncDecSpeakerLabelModel
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")
add_speaker_embedding_to_pinecone function to add our speaker embeddings to the Pinecone database.
import torch
import numpy as np
import uuid
def add_speaker_embedding_to_pinecone(speaker_name, speaker_embedding, unique_id=None):
# Ensure the embedding is a 1D numpy array.
if isinstance(speaker_embedding, torch.Tensor):
embedding_np = speaker_embedding.squeeze().cpu().numpy()
elif isinstance(speaker_embedding, np.ndarray):
embedding_np = speaker_embedding.squeeze()
else:
raise ValueError("Unsupported embedding type. Expected torch.Tensor or numpy.ndarray")
# Ensure the embedding is the correct shape
if embedding_np.shape != (192,):
raise ValueError(f"Expected embedding of shape (192,), but got {embedding_np.shape}")
# Convert to list for Pinecone
embedding_list = embedding_np.tolist()
# Generate a unique ID if not provided
if unique_id is None:
unique_id = f"speaker_{speaker_name}_{uuid.uuid4().hex[:8]}"
# Create the metadata dictionary
metadata = {"speaker_name": speaker_name}
# Upsert the vector to Pinecone
upsert_response = index.upsert(vectors=[(unique_id, embedding_list, metadata)])
print(f"Upserted embedding for speaker {speaker_name} with ID {unique_id}")
return unique_id
Add Thumbprints to our Pinecone Database
Below we’ll use chunks of the speakers’ conversations to generate speaker embeddings and add them to our vector database. Later on, we’ll show how to take an audio file with speakers not in the vector database and obtain the data required to generate new speaker fingerprints to be uploaded to the Pinecone database.elon_fingerprint = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
altman_fingerprint = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/sam_altman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L3NhbV9hbHRtYW5fZmluZ2VycHJpbnQubXAzIiwiaWF0IjoxNzAwNjY5NjI0LCJleHAiOjE3MzIyMDU2MjR9._1yuMGzBhFcHr7xv76160Hb_SC-mH_Wv3_qX-S7XsTU&t=2023-11-22T16%3A13%3A44.103Z"
)
lex_fingerprint = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/lex_fridman_fingerprint.mp3?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2xleF9mcmlkbWFuX2ZpbmdlcnByaW50Lm1wMyIsImlhdCI6MTcwMDY2OTc4OSwiZXhwIjoxNzMyMjA1Nzg5fQ.PUWTOLIHl4dcrWjh2ZJ_2TBaxMXpcU-x6OcvUDe6ZXQ&t=2023-11-22T16%3A16%3A29.697Z"
)
known_speakers = {"Elon Musk": elon_fingerprint, "Sam Altman": altman_fingerprint, "Lex Fridman": lex_fingerprint}
# Upload the known speakers.
for speaker, audio_file in known_speakers.items():
print("***")
print(speaker)
print(audio_file)
embedding = speaker_model.get_embedding(audio_file)
add_speaker_embedding_to_pinecone(speaker, embedding)
audio_file = download_and_convert_to_wav(
"https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/musk_fingerprinting.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L211c2tfZmluZ2VycHJpbnRpbmcud2F2IiwiaWF0IjoxNzAwNDM4OTAwLCJleHAiOjE3MzE5NzQ5MDB9.O3QOJSBqFNb1sg4nurwSFA13xIPHyKuon3UfHcFYit0&t=2023-11-20T00%3A08%3A20.071Z"
)
utterance_embedding = speaker_model.get_embedding(audio_file)
results = index.query(vector=utterance_embedding.tolist(), top_k=3, include_metadata=True)
print(results)
Creating Functions to Find the Closest Speaker and Identify Speakers of Utterances
Speaker Identification Function
Thefind_closest_speaker function is a crucial component of our speaker identification system. It compares a given utterance embedding to known speaker embeddings and identifies the closest match.
from sklearn.metrics.pairwise import cosine_similarity
def find_closest_speaker(utterance_embedding, local_embeddings=None, local_only=False, threshold=0.5):
def cosine_sim(a, b):
return cosine_similarity(a.reshape(1, -1), b.reshape(1, -1))[0][0]
best_match = {"speaker_name": "No match found", "score": 0}
# Local embeddings processing.
if local_embeddings is not None:
for speaker_name, embedding in local_embeddings.items():
score = cosine_sim(utterance_embedding, embedding)
if score > best_match["score"]:
print("Identified speaker " + speaker_name + " confidence " + str(score))
best_match = {"speaker_name": speaker_name, "score": score}
# Pinecone query (if not local_only and local_embeddings is empty or not provided)
if not local_only and (local_embeddings is None or len(local_embeddings) == 0):
results = index.query(vector=utterance_embedding.tolist(), top_k=1, include_metadata=True)
if results["matches"]:
pinecone_match = results["matches"][0]
pinecone_score = pinecone_match["score"]
if pinecone_score > best_match["score"]:
best_match = {"speaker_name": pinecone_match["metadata"]["speaker_name"], "score": pinecone_score}
# Check if the best match meets the threshold.
if best_match["score"] < threshold:
return "No match found", 0
return best_match["speaker_name"], best_match["score"]
Speaker Identification from Utterances
Theidentify_speakers_from_utterances function is the core of our speaker identification system. It processes a transcript with utterances and identifies speakers, handling both known and unknown voices.
def identify_speakers_from_utterances(transcript, wav_file, min_utterance_length=5000, match_all_utterances=False):
utterances = transcript["utterances"]
speaker_model = EncDecSpeakerLabelModel.from_pretrained("nvidia/speakerverification_en_titanet_large")
known_speakers = {}
unknown_speakers = {}
unknown_count = 0
unknown_folder = "unknown_speaker_utterances"
os.makedirs(unknown_folder, exist_ok=True)
audio_file_name = os.path.basename(wav_file)
full_audio = AudioSegment.from_wav(wav_file)
def get_suitable_utterance(speaker, min_length):
suitable_utterances = [
u for u in utterances if u["speaker"] == speaker and (u["end"] - u["start"]) >= min_length
]
if suitable_utterances:
return max(suitable_utterances, key=lambda u: u["end"] - u["start"])
return max((u for u in utterances if u["speaker"] == speaker), key=lambda u: u["end"] - u["start"])
# First pass: Identify speakers.
for speaker in set(u["speaker"] for u in utterances):
if speaker not in known_speakers and speaker not in unknown_speakers:
suitable_utterance = get_suitable_utterance(speaker, min_utterance_length)
start_ms = suitable_utterance["start"]
end_ms = suitable_utterance["end"]
utterance_audio = full_audio[start_ms:end_ms]
temp_wav = "temp_utterance.wav"
utterance_audio.export(temp_wav, format="wav")
embedding = speaker_model.get_embedding(temp_wav)
os.remove(temp_wav)
speaker_name, score = find_closest_speaker(embedding)
print(f"Speaker: {speaker}, Closest match: {speaker_name}, Score: {score}")
if score > 0.5: # Adjust threshold as needed.
known_speakers[speaker] = speaker_name
print(f"Identified as known speaker: {speaker}")
else:
unknown_count += 1
unknown_name = f"Unknown Speaker {chr(64 + unknown_count)}"
unknown_wav = f"{unknown_folder}/unknown_speaker_{unknown_count}_from_{audio_file_name}"
utterance_audio.export(unknown_wav, format="wav")
unknown_speakers[speaker] = {
"name": unknown_name,
"wav_file": unknown_wav,
"duration": end_ms - start_ms,
}
print(f"New unknown speaker detected: {unknown_name} (Duration: {(end_ms - start_ms)/1000:.2f}s)")
# Second pass: Replace speaker names.
for utterance in utterances:
if utterance["speaker"] in known_speakers:
utterance["speaker"] in known_speakers[utterance["speaker"]]
elif utterance["speaker"] in unknown_speakers:
utterance["speaker"] = unknown_speakers[utterance["speaker"]]["name"]
# Third pass: Match all utterances if requested.
if match_all_utterances:
print("Matching all utterances individually...")
for utterance in utterances:
start_ms = utterance["start"]
end_ms = utterance["end"]
utterance_audio = full_audio[start_ms:end_ms]
temp_wav = "temp_utterance.wav"
utterance_audio.export(temp_wav, format="wav")
embedding = speaker_model.get_embedding(temp_wav)
os.remove(temp_wav)
new_speaker_name, score = find_closest_speaker(embedding)
if score > 0.5 and new_speaker_name != utterance["speaker"]:
print(f"Speaker change detected: '{utterance['speaker']}' -> '{new_speaker_name}' (Score: {score})")
print(f"Utterance: {utterance['text'][:50]}...")
utterance["speaker"] = new_speaker_name
return utterances, unknown_speakers
Examples: Speaker Identification and Diarization
To demonstrate the capabilities of our speaker identification and diarization system, we’ll cover several examples. We’ll start with a straightforward case and progressively move to more complex scenarios.Example 1: Conversation Between Sam Altman and Elon Musk
Our first example is a simple conversation between two well-known figures: Elon Musk and Sam Altman. This example will showcase how our system performs with clear, distinct voices in a controlled setting.transcript_obj = transcribe("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
wav_file = download_and_convert_to_wav("https://api.assemblyai-solutions.com/storage/v1/object/sign/sam_training_bucket/elon_altman_interview_clipped.wav?token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1cmwiOiJzYW1fdHJhaW5pbmdfYnVja2V0L2Vsb25fYWx0bWFuX2ludGVydmlld19jbGlwcGVkLndhdiIsImlhdCI6MTcwMDY5MDU3OSwiZXhwIjoxNzMyMjI2NTc5fQ.4qZHvVRGhNGttfcpcfXDcJkJe_tbkc_2Bvs4i51SNSE&t=2023-11-22T22%3A02%3A59.429Z")
identified_utterances, unknown_speakers = identify_speakers_from_utterances(transcript_obj, wav_file)
for utterance in identified_utterances:
print(f"{utterance['speaker']}: {utterance['text']}")