|
import os |
|
import sys |
|
import time |
|
import queue |
|
import threading |
|
import signal |
|
import atexit |
|
from contextlib import contextmanager |
|
import warnings |
|
warnings.filterwarnings("ignore", category=UserWarning) |
|
|
|
import numpy as np |
|
import torch |
|
import torchaudio |
|
from scipy.spatial.distance import cosine |
|
|
|
try: |
|
import soundcard as sc |
|
except ImportError: |
|
print("soundcard not found. Install with: pip install soundcard") |
|
sys.exit(1) |
|
|
|
try: |
|
from RealtimeSTT import AudioToTextRecorder |
|
except ImportError: |
|
print("RealtimeSTT not found. Install with: pip install RealtimeSTT") |
|
sys.exit(1) |
|
|
|
|
|
class Config: |
|
|
|
SAMPLE_RATE = 16000 |
|
BUFFER_SIZE = 1024 |
|
CHANNELS = 1 |
|
|
|
|
|
FINAL_MODEL = "distil-large-v3" |
|
REALTIME_MODEL = "distil-small.en" |
|
LANGUAGE = "en" |
|
BEAM_SIZE = 5 |
|
REALTIME_BEAM_SIZE = 3 |
|
|
|
|
|
SILENCE_THRESHOLD = 0.4 |
|
MIN_RECORDING_LENGTH = 0.5 |
|
PRE_RECORDING_BUFFER = 0.2 |
|
SILERO_SENSITIVITY = 0.4 |
|
WEBRTC_SENSITIVITY = 3 |
|
|
|
|
|
CHANGE_THRESHOLD = 0.65 |
|
MAX_SPEAKERS = 4 |
|
MIN_SEGMENT_DURATION = 1.0 |
|
EMBEDDING_HISTORY_SIZE = 3 |
|
SPEAKER_MEMORY_SIZE = 20 |
|
|
|
|
|
COLORS = [ |
|
'\033[93m', |
|
'\033[91m', |
|
'\033[92m', |
|
'\033[96m', |
|
'\033[95m', |
|
'\033[94m', |
|
'\033[97m', |
|
'\033[33m', |
|
] |
|
RESET = '\033[0m' |
|
LIVE_COLOR = '\033[90m' |
|
|
|
class SpeakerEncoder: |
|
"""Simplified speaker encoder using torchaudio transforms""" |
|
|
|
def __init__(self, device="cpu"): |
|
self.device = device |
|
self.embedding_dim = 128 |
|
self.model_loaded = False |
|
self._setup_model() |
|
|
|
def _setup_model(self): |
|
"""Setup a simple MFCC-based feature extractor""" |
|
try: |
|
self.mfcc_transform = torchaudio.transforms.MFCC( |
|
sample_rate=Config.SAMPLE_RATE, |
|
n_mfcc=13, |
|
melkwargs={"n_fft": 400, "hop_length": 160, "n_mels": 23} |
|
).to(self.device) |
|
self.model_loaded = True |
|
print("Simple MFCC-based encoder initialized") |
|
except Exception as e: |
|
print(f"Error setting up encoder: {e}") |
|
self.model_loaded = False |
|
|
|
def extract_embedding(self, audio): |
|
"""Extract speaker embedding from audio""" |
|
if not self.model_loaded: |
|
return np.zeros(self.embedding_dim) |
|
|
|
try: |
|
|
|
if isinstance(audio, np.ndarray): |
|
audio = torch.from_numpy(audio).float() |
|
|
|
|
|
if audio.abs().max() > 0: |
|
audio = audio / audio.abs().max() |
|
|
|
|
|
if audio.dim() == 1: |
|
audio = audio.unsqueeze(0) |
|
|
|
|
|
with torch.no_grad(): |
|
mfcc = self.mfcc_transform(audio) |
|
|
|
embedding = torch.cat([ |
|
mfcc.mean(dim=2).flatten(), |
|
mfcc.std(dim=2).flatten(), |
|
mfcc.max(dim=2)[0].flatten(), |
|
mfcc.min(dim=2)[0].flatten() |
|
]) |
|
|
|
|
|
if embedding.size(0) > self.embedding_dim: |
|
embedding = embedding[:self.embedding_dim] |
|
elif embedding.size(0) < self.embedding_dim: |
|
padding = torch.zeros(self.embedding_dim - embedding.size(0)) |
|
embedding = torch.cat([embedding, padding]) |
|
|
|
return embedding.cpu().numpy() |
|
|
|
except Exception as e: |
|
print(f"Error extracting embedding: {e}") |
|
return np.zeros(self.embedding_dim) |
|
|
|
class SpeakerDetector: |
|
"""Speaker change detection using embeddings""" |
|
|
|
def __init__(self, threshold=Config.CHANGE_THRESHOLD, max_speakers=Config.MAX_SPEAKERS): |
|
self.threshold = threshold |
|
self.max_speakers = max_speakers |
|
self.current_speaker = 0 |
|
self.speaker_embeddings = [[] for _ in range(max_speakers)] |
|
self.speaker_centroids = [None] * max_speakers |
|
self.last_change_time = time.time() |
|
self.active_speakers = {0} |
|
|
|
def detect_speaker(self, embedding): |
|
"""Detect current speaker from embedding""" |
|
current_time = time.time() |
|
|
|
|
|
if not self.speaker_embeddings[0]: |
|
self.speaker_embeddings[0].append(embedding) |
|
self.speaker_centroids[0] = embedding.copy() |
|
return 0, 1.0 |
|
|
|
|
|
current_centroid = self.speaker_centroids[self.current_speaker] |
|
if current_centroid is not None: |
|
similarity = 1.0 - cosine(embedding, current_centroid) |
|
else: |
|
similarity = 0.0 |
|
|
|
|
|
if current_time - self.last_change_time < Config.MIN_SEGMENT_DURATION: |
|
self._update_speaker_model(self.current_speaker, embedding) |
|
return self.current_speaker, similarity |
|
|
|
|
|
if similarity < self.threshold: |
|
|
|
best_speaker = self.current_speaker |
|
best_similarity = similarity |
|
|
|
for speaker_id in self.active_speakers: |
|
if speaker_id == self.current_speaker: |
|
continue |
|
|
|
centroid = self.speaker_centroids[speaker_id] |
|
if centroid is not None: |
|
sim = 1.0 - cosine(embedding, centroid) |
|
if sim > best_similarity and sim > self.threshold: |
|
best_similarity = sim |
|
best_speaker = speaker_id |
|
|
|
|
|
if (best_speaker == self.current_speaker and |
|
len(self.active_speakers) < self.max_speakers): |
|
for new_id in range(self.max_speakers): |
|
if new_id not in self.active_speakers: |
|
best_speaker = new_id |
|
best_similarity = 0.0 |
|
self.active_speakers.add(new_id) |
|
break |
|
|
|
|
|
if best_speaker != self.current_speaker: |
|
self.current_speaker = best_speaker |
|
self.last_change_time = current_time |
|
similarity = best_similarity |
|
|
|
|
|
self._update_speaker_model(self.current_speaker, embedding) |
|
return self.current_speaker, similarity |
|
|
|
def _update_speaker_model(self, speaker_id, embedding): |
|
"""Update speaker model with new embedding""" |
|
self.speaker_embeddings[speaker_id].append(embedding) |
|
|
|
|
|
if len(self.speaker_embeddings[speaker_id]) > Config.SPEAKER_MEMORY_SIZE: |
|
self.speaker_embeddings[speaker_id] = \ |
|
self.speaker_embeddings[speaker_id][-Config.SPEAKER_MEMORY_SIZE:] |
|
|
|
|
|
if self.speaker_embeddings[speaker_id]: |
|
self.speaker_centroids[speaker_id] = np.mean( |
|
self.speaker_embeddings[speaker_id], axis=0 |
|
) |
|
|
|
class AudioRecorder: |
|
"""Handles audio recording from system audio""" |
|
|
|
def __init__(self, audio_queue): |
|
self.audio_queue = audio_queue |
|
self.running = False |
|
self.thread = None |
|
|
|
def start(self): |
|
"""Start recording""" |
|
self.running = True |
|
self.thread = threading.Thread(target=self._record_loop, daemon=True) |
|
self.thread.start() |
|
print("Audio recording started") |
|
|
|
def stop(self): |
|
"""Stop recording""" |
|
self.running = False |
|
if self.thread and self.thread.is_alive(): |
|
self.thread.join(timeout=2) |
|
|
|
def _record_loop(self): |
|
"""Main recording loop""" |
|
try: |
|
|
|
try: |
|
device = sc.default_speaker() |
|
with device.recorder( |
|
samplerate=Config.SAMPLE_RATE, |
|
blocksize=Config.BUFFER_SIZE, |
|
channels=Config.CHANNELS |
|
) as recorder: |
|
print(f"Recording from: {device.name}") |
|
while self.running: |
|
data = recorder.record(numframes=Config.BUFFER_SIZE) |
|
if data is not None and len(data) > 0: |
|
|
|
if data.ndim > 1: |
|
data = data[:, 0] |
|
self.audio_queue.put(data.flatten()) |
|
|
|
except Exception as e: |
|
print(f"Loopback recording failed: {e}") |
|
print("Falling back to microphone...") |
|
|
|
|
|
mic = sc.default_microphone() |
|
with mic.recorder( |
|
samplerate=Config.SAMPLE_RATE, |
|
blocksize=Config.BUFFER_SIZE, |
|
channels=Config.CHANNELS |
|
) as recorder: |
|
print(f"Recording from microphone: {mic.name}") |
|
while self.running: |
|
data = recorder.record(numframes=Config.BUFFER_SIZE) |
|
if data is not None and len(data) > 0: |
|
if data.ndim > 1: |
|
data = data[:, 0] |
|
self.audio_queue.put(data.flatten()) |
|
|
|
except Exception as e: |
|
print(f"Recording error: {e}") |
|
self.running = False |
|
|
|
class TranscriptionProcessor: |
|
"""Handles transcription and speaker detection""" |
|
|
|
def __init__(self): |
|
self.encoder = SpeakerEncoder() |
|
self.detector = SpeakerDetector() |
|
self.recorder = None |
|
self.audio_queue = queue.Queue(maxsize=100) |
|
self.audio_recorder = AudioRecorder(self.audio_queue) |
|
self.processing_thread = None |
|
self.running = False |
|
|
|
def setup(self): |
|
"""Setup transcription recorder""" |
|
try: |
|
self.recorder = AudioToTextRecorder( |
|
spinner=False, |
|
use_microphone=False, |
|
model=Config.FINAL_MODEL, |
|
language=Config.LANGUAGE, |
|
silero_sensitivity=Config.SILERO_SENSITIVITY, |
|
webrtc_sensitivity=Config.WEBRTC_SENSITIVITY, |
|
post_speech_silence_duration=Config.SILENCE_THRESHOLD, |
|
min_length_of_recording=Config.MIN_RECORDING_LENGTH, |
|
pre_recording_buffer_duration=Config.PRE_RECORDING_BUFFER, |
|
enable_realtime_transcription=True, |
|
realtime_model_type=Config.REALTIME_MODEL, |
|
beam_size=Config.BEAM_SIZE, |
|
beam_size_realtime=Config.REALTIME_BEAM_SIZE, |
|
on_realtime_transcription_update=self._on_live_text, |
|
) |
|
print("Transcription recorder setup complete") |
|
return True |
|
except Exception as e: |
|
print(f"Transcription setup failed: {e}") |
|
return False |
|
|
|
def start(self): |
|
"""Start processing""" |
|
if not self.setup(): |
|
return False |
|
|
|
self.running = True |
|
|
|
|
|
self.audio_recorder.start() |
|
|
|
|
|
self.processing_thread = threading.Thread(target=self._process_audio, daemon=True) |
|
self.processing_thread.start() |
|
|
|
|
|
self._start_transcription() |
|
|
|
return True |
|
|
|
def stop(self): |
|
"""Stop processing""" |
|
print("\nStopping transcription...") |
|
self.running = False |
|
|
|
if self.audio_recorder: |
|
self.audio_recorder.stop() |
|
|
|
if self.processing_thread and self.processing_thread.is_alive(): |
|
self.processing_thread.join(timeout=2) |
|
|
|
if self.recorder: |
|
try: |
|
self.recorder.shutdown() |
|
except: |
|
pass |
|
|
|
def _process_audio(self): |
|
"""Process audio chunks for speaker detection""" |
|
audio_buffer = [] |
|
|
|
while self.running: |
|
try: |
|
|
|
chunk = self.audio_queue.get(timeout=0.1) |
|
audio_buffer.extend(chunk) |
|
|
|
|
|
if len(audio_buffer) >= Config.SAMPLE_RATE: |
|
audio_array = np.array(audio_buffer[:Config.SAMPLE_RATE]) |
|
audio_buffer = audio_buffer[Config.SAMPLE_RATE//2:] |
|
|
|
|
|
audio_int16 = (audio_array * 32767).astype(np.int16) |
|
|
|
|
|
if self.recorder: |
|
self.recorder.feed_audio(audio_int16.tobytes()) |
|
|
|
except queue.Empty: |
|
continue |
|
except Exception as e: |
|
if self.running: |
|
print(f"Audio processing error: {e}") |
|
|
|
def _start_transcription(self): |
|
"""Start transcription loop""" |
|
def transcription_loop(): |
|
while self.running: |
|
try: |
|
text = self.recorder.text() |
|
if text and text.strip(): |
|
self._process_final_text(text) |
|
except Exception as e: |
|
if self.running: |
|
print(f"Transcription error: {e}") |
|
break |
|
|
|
transcription_thread = threading.Thread(target=transcription_loop, daemon=True) |
|
transcription_thread.start() |
|
|
|
def _on_live_text(self, text): |
|
"""Handle live transcription updates""" |
|
if text and text.strip(): |
|
print(f"\r{LIVE_COLOR}[Live] {text}{RESET}", end="", flush=True) |
|
|
|
def _process_final_text(self, text): |
|
"""Process final transcription with speaker detection""" |
|
|
|
print("\r" + " " * 80 + "\r", end="") |
|
|
|
try: |
|
|
|
recent_audio = [] |
|
temp_queue = [] |
|
|
|
|
|
for _ in range(min(10, self.audio_queue.qsize())): |
|
try: |
|
chunk = self.audio_queue.get_nowait() |
|
recent_audio.extend(chunk) |
|
temp_queue.append(chunk) |
|
except queue.Empty: |
|
break |
|
|
|
|
|
for chunk in reversed(temp_queue): |
|
try: |
|
self.audio_queue.put_nowait(chunk) |
|
except queue.Full: |
|
break |
|
|
|
|
|
if recent_audio: |
|
audio_tensor = torch.FloatTensor(recent_audio[-Config.SAMPLE_RATE:]) |
|
embedding = self.encoder.extract_embedding(audio_tensor) |
|
speaker_id, similarity = self.detector.detect_speaker(embedding) |
|
else: |
|
speaker_id, similarity = 0, 1.0 |
|
|
|
|
|
color = COLORS[speaker_id % len(COLORS)] |
|
print(f"{color}Speaker {speaker_id + 1}: {text}{RESET}") |
|
|
|
except Exception as e: |
|
print(f"Error processing text: {e}") |
|
print(f"Text: {text}") |
|
|
|
class RealTimeSpeakerDetection: |
|
"""Main application class""" |
|
|
|
def __init__(self): |
|
self.processor = None |
|
self.running = False |
|
|
|
|
|
signal.signal(signal.SIGINT, self._signal_handler) |
|
signal.signal(signal.SIGTERM, self._signal_handler) |
|
atexit.register(self.cleanup) |
|
|
|
def _signal_handler(self, signum, frame): |
|
"""Handle shutdown signals""" |
|
print(f"\nReceived signal {signum}, shutting down...") |
|
self.stop() |
|
|
|
def start(self): |
|
"""Start the application""" |
|
print("=== Real-time Speaker Detection and Transcription ===") |
|
print("Initializing...") |
|
|
|
self.processor = TranscriptionProcessor() |
|
|
|
if not self.processor.start(): |
|
print("Failed to start. Check your audio setup and dependencies.") |
|
return False |
|
|
|
self.running = True |
|
|
|
print("=" * 60) |
|
print("System ready! Listening for audio...") |
|
print("Different speakers will be shown in different colors.") |
|
print("Press Ctrl+C to stop.") |
|
print("=" * 60) |
|
|
|
|
|
try: |
|
while self.running: |
|
time.sleep(1) |
|
except KeyboardInterrupt: |
|
pass |
|
|
|
return True |
|
|
|
def stop(self): |
|
"""Stop the application""" |
|
if not self.running: |
|
return |
|
|
|
self.running = False |
|
|
|
if self.processor: |
|
self.processor.stop() |
|
|
|
print("System stopped.") |
|
|
|
def cleanup(self): |
|
"""Cleanup resources""" |
|
self.stop() |
|
|
|
def main(): |
|
"""Main entry point""" |
|
app = RealTimeSpeakerDetection() |
|
|
|
try: |
|
app.start() |
|
except Exception as e: |
|
print(f"Application error: {e}") |
|
finally: |
|
app.cleanup() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|