Saiyaswanth007 commited on
Commit
3466e71
·
1 Parent(s): 7662a6a

Revert portg

Browse files
Files changed (1) hide show
  1. app.py +315 -76
app.py CHANGED
@@ -283,7 +283,7 @@ class RealtimeSpeakerDiarization:
283
  self.audio_processor = None
284
  self.speaker_detector = None
285
  self.recorder = None
286
- self.sentence_queue = queue.Queue()
287
  self.full_sentences = []
288
  self.sentence_speakers = []
289
  self.pending_sentences = []
@@ -294,6 +294,9 @@ class RealtimeSpeakerDiarization:
294
  self.max_speakers = DEFAULT_MAX_SPEAKERS
295
  self.current_conversation = ""
296
  self.audio_buffer = []
 
 
 
297
 
298
  def initialize_models(self):
299
  """Initialize the speaker encoder model"""
@@ -302,9 +305,25 @@ class RealtimeSpeakerDiarization:
302
  print(f"Using device: {device_str}")
303
 
304
  self.encoder = SpeechBrainEncoder(device=device_str)
305
- success = self.encoder.load_model()
306
 
307
- if success:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
  self.audio_processor = AudioProcessor(self.encoder)
309
  self.speaker_detector = SpeakerChangeDetector(
310
  embedding_dim=self.encoder.embedding_dim,
@@ -314,10 +333,52 @@ class RealtimeSpeakerDiarization:
314
  print("ECAPA-TDNN model loaded successfully!")
315
  return True
316
  else:
317
- print("Failed to load ECAPA-TDNN model")
318
- return False
 
319
  except Exception as e:
320
  print(f"Model initialization error: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
321
  return False
322
 
323
  def live_text_detected(self, text):
@@ -346,8 +407,9 @@ class RealtimeSpeakerDiarization:
346
  if text:
347
  try:
348
  bytes_data = self.recorder.last_transcription_bytes
349
- self.sentence_queue.put((text, bytes_data))
350
- self.pending_sentences.append(text)
 
351
  except Exception as e:
352
  print(f"Error processing final text: {e}")
353
 
@@ -363,28 +425,31 @@ class RealtimeSpeakerDiarization:
363
  # Extract speaker embedding
364
  speaker_embedding = self.audio_processor.extract_embedding(audio_int16)
365
 
366
- # Store sentence and embedding
367
- self.full_sentences.append((text, speaker_embedding))
368
-
369
- # Fill in missing speaker assignments
370
- while len(self.sentence_speakers) < len(self.full_sentences) - 1:
371
- self.sentence_speakers.append(0)
372
-
373
- # Detect speaker changes
374
- speaker_id, similarity = self.speaker_detector.add_embedding(speaker_embedding)
375
- self.sentence_speakers.append(speaker_id)
376
-
377
- # Remove from pending
378
- if text in self.pending_sentences:
379
- self.pending_sentences.remove(text)
380
-
381
- # Update conversation display
382
- self.current_conversation = self.get_formatted_conversation()
 
383
 
384
  except queue.Empty:
385
  continue
386
  except Exception as e:
387
  print(f"Error processing sentence: {e}")
 
 
388
 
389
  def start_recording(self):
390
  """Start the recording and transcription process"""
@@ -412,10 +477,22 @@ class RealtimeSpeakerDiarization:
412
  'beam_size_realtime': REALTIME_BEAM_SIZE,
413
  'buffer_size': BUFFER_SIZE,
414
  'sample_rate': SAMPLE_RATE,
 
415
  }
416
 
 
 
 
 
 
 
417
  self.recorder = AudioToTextRecorder(**recorder_config)
418
 
 
 
 
 
 
419
  # Start sentence processing thread
420
  self.is_running = True
421
  self.sentence_thread = threading.Thread(target=self.process_sentence_queue, daemon=True)
@@ -428,7 +505,10 @@ class RealtimeSpeakerDiarization:
428
  return "Recording started successfully! FastRTC audio input ready."
429
 
430
  except Exception as e:
431
- return f"Error starting recording: {e}"
 
 
 
432
 
433
  def run_transcription(self):
434
  """Run the transcription loop"""
@@ -443,8 +523,48 @@ class RealtimeSpeakerDiarization:
443
  self.is_running = False
444
  if self.recorder:
445
  self.recorder.stop()
 
 
 
 
446
  return "Recording stopped!"
447
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
448
  def clear_conversation(self):
449
  """Clear all conversation data"""
450
  self.full_sentences = []
@@ -553,11 +673,23 @@ class RealtimeSpeakerDiarization:
553
  else:
554
  audio_bytes = audio_data
555
 
556
- # Feed to recorder
557
- self.recorder.feed_audio(audio_bytes)
 
 
 
 
 
 
 
 
 
 
558
 
559
  except Exception as e:
560
- print(f"Error feeding audio data: {e}")
 
 
561
 
562
  def process_audio_chunk(self, audio_data, sample_rate=16000):
563
  """Process audio chunk from FastRTC input"""
@@ -565,34 +697,30 @@ class RealtimeSpeakerDiarization:
565
  return
566
 
567
  try:
568
- # Convert float audio to int16 for the recorder
569
- if audio_data.dtype == np.float32 or audio_data.dtype == np.float64:
570
- if np.max(np.abs(audio_data)) <= 1.0:
571
- # Float audio is normalized to [-1, 1], convert to int16
572
- audio_int16 = (audio_data * 32767).astype(np.int16)
573
- else:
574
- # Audio is already in higher range
575
- audio_int16 = audio_data.astype(np.int16)
576
- else:
577
- audio_int16 = audio_data
578
 
579
- # Ensure correct shape (1, N) for the recorder
580
- if len(audio_int16.shape) == 1:
581
- audio_int16 = np.expand_dims(audio_int16, 0)
 
582
 
583
- # Resample if needed
584
- if sample_rate != SAMPLE_RATE:
585
- audio_int16 = self._resample_audio(audio_int16, sample_rate, SAMPLE_RATE)
586
 
587
- # Convert to bytes for feeding to recorder
588
- audio_bytes = audio_int16.tobytes()
589
-
590
- # Feed to recorder
591
- self.feed_audio_data(audio_bytes)
592
 
593
  except Exception as e:
594
- print(f"Error processing audio chunk: {e}")
595
-
 
 
596
  def _resample_audio(self, audio, orig_sr, target_sr):
597
  """Resample audio to target sample rate"""
598
  try:
@@ -613,6 +741,60 @@ class RealtimeSpeakerDiarization:
613
  print(f"Error resampling audio: {e}")
614
  return audio
615
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
616
 
617
  # FastRTC Audio Handler for Real-time Diarization
618
 
@@ -620,9 +802,10 @@ class DiarizationHandler(AsyncStreamHandler):
620
  def __init__(self, diarization_system):
621
  super().__init__()
622
  self.diarization_system = diarization_system
623
- self.audio_queue = Queue()
624
  self.is_processing = False
625
  self.sample_rate = 16000 # Default sample rate
 
626
 
627
  def copy(self):
628
  """Return a fresh handler for each new stream connection"""
@@ -646,39 +829,75 @@ class DiarizationHandler(AsyncStreamHandler):
646
  else:
647
  audio_data = frame
648
 
649
- # Convert to numpy array if needed
650
- if isinstance(audio_data, bytes):
651
- # Convert bytes to numpy array (assuming 16-bit PCM)
652
- audio_array = np.frombuffer(audio_data, dtype=np.int16)
653
- # Normalize to float32 range [-1, 1]
654
- audio_array = audio_array.astype(np.float32) / 32768.0
655
- elif isinstance(audio_data, (list, tuple)):
656
- audio_array = np.array(audio_data, dtype=np.float32)
657
- elif isinstance(audio_data, np.ndarray):
658
- audio_array = audio_data.astype(np.float32)
659
- else:
660
- print(f"Unknown audio data type: {type(audio_data)}")
661
- return
662
-
663
- # Ensure mono audio
664
- if len(audio_array.shape) > 1 and audio_array.shape[1] > 1:
665
- audio_array = np.mean(audio_array, axis=1)
666
-
667
- # Ensure 1D array
668
- if len(audio_array.shape) > 1:
669
- audio_array = audio_array.flatten()
670
-
671
  # Get sample rate from frame if available
672
  sample_rate = getattr(frame, 'sample_rate', self.sample_rate)
673
 
674
- # Process audio asynchronously to avoid blocking
675
- await self.process_audio_async(audio_array, sample_rate)
 
 
 
 
 
 
 
 
 
676
 
677
  except Exception as e:
678
  print(f"Error in FastRTC audio receive: {e}")
679
  import traceback
680
  traceback.print_exc()
681
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
682
  async def process_audio_async(self, audio_data, sample_rate=16000):
683
  """Process audio data asynchronously"""
684
  try:
@@ -698,10 +917,30 @@ class DiarizationHandler(AsyncStreamHandler):
698
  print("FastRTC stream started")
699
  self.is_processing = True
700
 
 
 
 
701
  async def shutdown(self) -> None:
702
  """Clean up any resources when the stream ends"""
703
  print("FastRTC stream shutting down")
704
  self.is_processing = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
705
 
706
 
707
  # Global instances
 
283
  self.audio_processor = None
284
  self.speaker_detector = None
285
  self.recorder = None
286
+ self.sentence_queue = queue.Queue(maxsize=100) # Add maxsize to prevent unlimited growth
287
  self.full_sentences = []
288
  self.sentence_speakers = []
289
  self.pending_sentences = []
 
294
  self.max_speakers = DEFAULT_MAX_SPEAKERS
295
  self.current_conversation = ""
296
  self.audio_buffer = []
297
+ # Add locks for thread safety
298
+ self._state_lock = threading.RLock() # Reentrant lock for shared state
299
+ self._audio_lock = threading.Lock() # Lock for audio processing
300
 
301
  def initialize_models(self):
302
  """Initialize the speaker encoder model"""
 
305
  print(f"Using device: {device_str}")
306
 
307
  self.encoder = SpeechBrainEncoder(device=device_str)
 
308
 
309
+ # Try to load model with timeout
310
+ import threading
311
+ load_success = [False]
312
+
313
+ def load_model_thread():
314
+ try:
315
+ success = self.encoder.load_model()
316
+ load_success[0] = success
317
+ except Exception as e:
318
+ print(f"Error in model loading thread: {e}")
319
+
320
+ # Start loading in a thread with timeout
321
+ load_thread = threading.Thread(target=load_model_thread)
322
+ load_thread.daemon = True
323
+ load_thread.start()
324
+ load_thread.join(timeout=60) # 60 second timeout for model loading
325
+
326
+ if load_success[0]:
327
  self.audio_processor = AudioProcessor(self.encoder)
328
  self.speaker_detector = SpeakerChangeDetector(
329
  embedding_dim=self.encoder.embedding_dim,
 
333
  print("ECAPA-TDNN model loaded successfully!")
334
  return True
335
  else:
336
+ print("Failed to load ECAPA-TDNN model or timeout occurred")
337
+ return self._initialize_fallback()
338
+
339
  except Exception as e:
340
  print(f"Model initialization error: {e}")
341
+ import traceback
342
+ traceback.print_exc()
343
+ return self._initialize_fallback()
344
+
345
+ def _initialize_fallback(self):
346
+ """Initialize fallback mode when model loading fails"""
347
+ try:
348
+ print("Initializing fallback mode with simple speaker detection...")
349
+ # Create a simple embedding dimension
350
+ embedding_dim = 64
351
+
352
+ # Create a dummy encoder that produces random embeddings
353
+ class DummyEncoder:
354
+ def __init__(self):
355
+ self.embedding_dim = embedding_dim
356
+ self.model_loaded = True
357
+
358
+ def embed_utterance(self, audio, sr=16000):
359
+ # Simple energy-based pseudo-embedding
360
+ if isinstance(audio, np.ndarray):
361
+ # Create a simple feature vector (not a real embedding)
362
+ energy = np.mean(np.abs(audio))
363
+ # Create a pseudo-random but consistent embedding based on audio energy
364
+ np.random.seed(int(energy * 1000))
365
+ return np.random.rand(embedding_dim)
366
+ return np.random.rand(embedding_dim)
367
+
368
+ # Set up system with fallback components
369
+ self.encoder = DummyEncoder()
370
+ self.audio_processor = AudioProcessor(self.encoder)
371
+ self.speaker_detector = SpeakerChangeDetector(
372
+ embedding_dim=embedding_dim,
373
+ change_threshold=self.change_threshold,
374
+ max_speakers=2 # Limit speakers in fallback mode
375
+ )
376
+
377
+ print("Fallback mode initialized - limited functionality!")
378
+ return True
379
+
380
+ except Exception as e:
381
+ print(f"Even fallback initialization failed: {e}")
382
  return False
383
 
384
  def live_text_detected(self, text):
 
407
  if text:
408
  try:
409
  bytes_data = self.recorder.last_transcription_bytes
410
+ self.sentence_queue.put((text, bytes_data), timeout=1.0) # Added timeout
411
+ with self._state_lock:
412
+ self.pending_sentences.append(text)
413
  except Exception as e:
414
  print(f"Error processing final text: {e}")
415
 
 
425
  # Extract speaker embedding
426
  speaker_embedding = self.audio_processor.extract_embedding(audio_int16)
427
 
428
+ with self._state_lock:
429
+ # Store sentence and embedding
430
+ self.full_sentences.append((text, speaker_embedding))
431
+
432
+ # Fill in missing speaker assignments
433
+ while len(self.sentence_speakers) < len(self.full_sentences) - 1:
434
+ self.sentence_speakers.append(0)
435
+
436
+ # Detect speaker changes
437
+ speaker_id, similarity = self.speaker_detector.add_embedding(speaker_embedding)
438
+ self.sentence_speakers.append(speaker_id)
439
+
440
+ # Remove from pending
441
+ if text in self.pending_sentences:
442
+ self.pending_sentences.remove(text)
443
+
444
+ # Update conversation display
445
+ self.current_conversation = self.get_formatted_conversation()
446
 
447
  except queue.Empty:
448
  continue
449
  except Exception as e:
450
  print(f"Error processing sentence: {e}")
451
+ import traceback
452
+ traceback.print_exc()
453
 
454
  def start_recording(self):
455
  """Start the recording and transcription process"""
 
477
  'beam_size_realtime': REALTIME_BEAM_SIZE,
478
  'buffer_size': BUFFER_SIZE,
479
  'sample_rate': SAMPLE_RATE,
480
+ 'external_audio': True, # Signal that we'll provide audio
481
  }
482
 
483
+ # Make sure we're not running already
484
+ if hasattr(self, 'is_running') and self.is_running:
485
+ self.stop_recording()
486
+ # Short pause to ensure cleanup completes
487
+ time.sleep(0.5)
488
+
489
  self.recorder = AudioToTextRecorder(**recorder_config)
490
 
491
+ # Reset state
492
+ with self._state_lock:
493
+ self.pending_sentences = []
494
+ self.last_realtime_text = ""
495
+
496
  # Start sentence processing thread
497
  self.is_running = True
498
  self.sentence_thread = threading.Thread(target=self.process_sentence_queue, daemon=True)
 
505
  return "Recording started successfully! FastRTC audio input ready."
506
 
507
  except Exception as e:
508
+ self.is_running = False
509
+ import traceback
510
+ traceback.print_exc()
511
+ return f"Error starting recording: {str(e)}"
512
 
513
  def run_transcription(self):
514
  """Run the transcription loop"""
 
523
  self.is_running = False
524
  if self.recorder:
525
  self.recorder.stop()
526
+
527
+ # Wait for threads to finish
528
+ self._cleanup_resources()
529
+
530
  return "Recording stopped!"
531
 
532
+ def _cleanup_resources(self):
533
+ """Clean up resources and threads"""
534
+ try:
535
+ # Wait for threads to stop gracefully
536
+ if hasattr(self, 'sentence_thread') and self.sentence_thread is not None:
537
+ if self.sentence_thread.is_alive():
538
+ self.sentence_thread.join(timeout=3.0)
539
+
540
+ if hasattr(self, 'transcription_thread') and self.transcription_thread is not None:
541
+ if self.transcription_thread.is_alive():
542
+ self.transcription_thread.join(timeout=3.0)
543
+
544
+ # Clean up memory
545
+ with self._state_lock:
546
+ # Limit history size to prevent memory leaks
547
+ if len(self.full_sentences) > 1000:
548
+ self.full_sentences = self.full_sentences[-1000:]
549
+ if len(self.sentence_speakers) > 1000:
550
+ self.sentence_speakers = self.sentence_speakers[-1000:]
551
+
552
+ # Clear audio buffer
553
+ with self._audio_lock:
554
+ self.audio_buffer = []
555
+
556
+ # Clear queue
557
+ while not self.sentence_queue.empty():
558
+ try:
559
+ self.sentence_queue.get_nowait()
560
+ except:
561
+ pass
562
+
563
+ except Exception as e:
564
+ print(f"Error during resource cleanup: {e}")
565
+ import traceback
566
+ traceback.print_exc()
567
+
568
  def clear_conversation(self):
569
  """Clear all conversation data"""
570
  self.full_sentences = []
 
673
  else:
674
  audio_bytes = audio_data
675
 
676
+ # Use the recorder's internal buffer mechanism
677
+ if hasattr(self.recorder, 'feed_audio') and callable(self.recorder.feed_audio):
678
+ self.recorder.feed_audio(audio_bytes)
679
+ else:
680
+ # Fallback: Direct access to the underlying buffer if the method doesn't exist
681
+ self.audio_buffer.append(audio_bytes)
682
+ # Process buffered audio when enough is accumulated
683
+ if len(self.audio_buffer) > 5: # Process in small batches
684
+ combined = b''.join(self.audio_buffer)
685
+ if hasattr(self.recorder, '_process_audio'):
686
+ self.recorder._process_audio(combined)
687
+ self.audio_buffer = []
688
 
689
  except Exception as e:
690
+ print(f"Error feeding audio data: {str(e)}")
691
+ import traceback
692
+ traceback.print_exc()
693
 
694
  def process_audio_chunk(self, audio_data, sample_rate=16000):
695
  """Process audio chunk from FastRTC input"""
 
697
  return
698
 
699
  try:
700
+ with self._audio_lock:
701
+ # Use the normalized audio function
702
+ audio_int16 = self._normalize_audio_format(audio_data, target_dtype=np.int16, target_sample_rate=SAMPLE_RATE)
 
 
 
 
 
 
 
703
 
704
+ # Check if we got valid audio
705
+ if audio_int16.size == 0:
706
+ print("Warning: Empty audio chunk received")
707
+ return
708
 
709
+ # Resample if needed
710
+ if sample_rate != SAMPLE_RATE:
711
+ audio_int16 = self._resample_audio(audio_int16, sample_rate, SAMPLE_RATE)
712
 
713
+ # Convert to bytes for feeding to recorder
714
+ audio_bytes = audio_int16.tobytes()
715
+
716
+ # Feed to recorder
717
+ self.feed_audio_data(audio_bytes)
718
 
719
  except Exception as e:
720
+ print(f"Error processing audio chunk: {str(e)}")
721
+ import traceback
722
+ traceback.print_exc()
723
+
724
  def _resample_audio(self, audio, orig_sr, target_sr):
725
  """Resample audio to target sample rate"""
726
  try:
 
741
  print(f"Error resampling audio: {e}")
742
  return audio
743
 
744
+ def _normalize_audio_format(self, audio_data, target_dtype=np.int16, target_sample_rate=SAMPLE_RATE):
745
+ """Normalize audio data to consistent format
746
+
747
+ Args:
748
+ audio_data: Input audio as numpy array or bytes
749
+ target_dtype: Target data type (np.int16 or np.float32)
750
+ target_sample_rate: Target sample rate
751
+
752
+ Returns:
753
+ Normalized audio as numpy array in requested format
754
+ """
755
+ try:
756
+ # Convert bytes to numpy if needed
757
+ if isinstance(audio_data, bytes):
758
+ audio_array = np.frombuffer(audio_data, dtype=np.int16)
759
+ elif isinstance(audio_data, (list, tuple)):
760
+ audio_array = np.array(audio_data)
761
+ else:
762
+ audio_array = audio_data
763
+
764
+ # Convert data type as needed
765
+ if target_dtype == np.int16 and audio_array.dtype != np.int16:
766
+ if audio_array.dtype == np.float32 or audio_array.dtype == np.float64:
767
+ # Check if normalized to [-1, 1] range
768
+ if np.max(np.abs(audio_array)) <= 1.0:
769
+ audio_array = (audio_array * 32767).astype(np.int16)
770
+ else:
771
+ audio_array = audio_array.astype(np.int16)
772
+ else:
773
+ audio_array = audio_array.astype(np.int16)
774
+ elif target_dtype == np.float32 and audio_array.dtype != np.float32:
775
+ if audio_array.dtype == np.int16:
776
+ audio_array = audio_array.astype(np.float32) / 32768.0
777
+ else:
778
+ audio_array = audio_array.astype(np.float32)
779
+
780
+ # Ensure mono audio
781
+ if len(audio_array.shape) > 1 and audio_array.shape[1] > 1:
782
+ audio_array = np.mean(audio_array, axis=1)
783
+
784
+ # Reshape if needed
785
+ if len(audio_array.shape) == 1:
786
+ if target_dtype == np.int16:
787
+ audio_array = np.expand_dims(audio_array, 0)
788
+
789
+ return audio_array
790
+
791
+ except Exception as e:
792
+ print(f"Error normalizing audio format: {e}")
793
+ import traceback
794
+ traceback.print_exc()
795
+ # Return empty array of correct type as fallback
796
+ return np.array([], dtype=target_dtype)
797
+
798
 
799
  # FastRTC Audio Handler for Real-time Diarization
800
 
 
802
  def __init__(self, diarization_system):
803
  super().__init__()
804
  self.diarization_system = diarization_system
805
+ self.audio_queue = asyncio.Queue(maxsize=100) # Use asyncio queue
806
  self.is_processing = False
807
  self.sample_rate = 16000 # Default sample rate
808
+ self.processing_task = None
809
 
810
  def copy(self):
811
  """Return a fresh handler for each new stream connection"""
 
829
  else:
830
  audio_data = frame
831
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
832
  # Get sample rate from frame if available
833
  sample_rate = getattr(frame, 'sample_rate', self.sample_rate)
834
 
835
+ # Add to queue - non-blocking with timeout
836
+ try:
837
+ # Use put_nowait with try/except to avoid blocking
838
+ await asyncio.wait_for(
839
+ self.audio_queue.put((audio_data, sample_rate)),
840
+ timeout=0.1
841
+ )
842
+ except asyncio.TimeoutError:
843
+ # Queue is full, drop this chunk
844
+ print("Warning: Audio queue full, dropping frame")
845
+ return
846
 
847
  except Exception as e:
848
  print(f"Error in FastRTC audio receive: {e}")
849
  import traceback
850
  traceback.print_exc()
851
 
852
+ async def _process_audio_loop(self):
853
+ """Background task to process audio from queue"""
854
+ while self.is_processing:
855
+ try:
856
+ # Get from queue with timeout to allow checking is_processing flag
857
+ try:
858
+ audio_data, sample_rate = await asyncio.wait_for(
859
+ self.audio_queue.get(),
860
+ timeout=0.5
861
+ )
862
+ except asyncio.TimeoutError:
863
+ # No audio available, check if we should keep running
864
+ continue
865
+
866
+ # Convert to numpy array if needed
867
+ if isinstance(audio_data, bytes):
868
+ # Convert bytes to numpy array (assuming 16-bit PCM)
869
+ audio_array = np.frombuffer(audio_data, dtype=np.int16)
870
+ # Normalize to float32 range [-1, 1]
871
+ audio_array = audio_array.astype(np.float32) / 32768.0
872
+ elif isinstance(audio_data, (list, tuple)):
873
+ audio_array = np.array(audio_data, dtype=np.float32)
874
+ elif isinstance(audio_data, np.ndarray):
875
+ audio_array = audio_array.astype(np.float32)
876
+ else:
877
+ print(f"Unknown audio data type: {type(audio_data)}")
878
+ continue
879
+
880
+ # Ensure mono audio
881
+ if len(audio_array.shape) > 1 and audio_array.shape[1] > 1:
882
+ audio_array = np.mean(audio_array, axis=1)
883
+
884
+ # Ensure 1D array
885
+ if len(audio_array.shape) > 1:
886
+ audio_array = audio_array.flatten()
887
+
888
+ # Process audio through thread pool to avoid blocking event loop
889
+ await self.process_audio_async(audio_array, sample_rate)
890
+
891
+ # Mark as done
892
+ self.audio_queue.task_done()
893
+
894
+ except Exception as e:
895
+ print(f"Error in audio processing loop: {e}")
896
+ import traceback
897
+ traceback.print_exc()
898
+ # Short sleep to avoid tight loop
899
+ await asyncio.sleep(0.1)
900
+
901
  async def process_audio_async(self, audio_data, sample_rate=16000):
902
  """Process audio data asynchronously"""
903
  try:
 
917
  print("FastRTC stream started")
918
  self.is_processing = True
919
 
920
+ # Start background processing task
921
+ self.processing_task = asyncio.create_task(self._process_audio_loop())
922
+
923
  async def shutdown(self) -> None:
924
  """Clean up any resources when the stream ends"""
925
  print("FastRTC stream shutting down")
926
  self.is_processing = False
927
+
928
+ # Wait for processing task to finish
929
+ if self.processing_task:
930
+ try:
931
+ # Cancel and wait for task
932
+ self.processing_task.cancel()
933
+ await asyncio.wait([self.processing_task], timeout=2.0)
934
+ except (asyncio.CancelledError, Exception) as e:
935
+ print(f"Error cancelling audio processing task: {e}")
936
+
937
+ # Clear queue
938
+ while not self.audio_queue.empty():
939
+ try:
940
+ self.audio_queue.get_nowait()
941
+ self.audio_queue.task_done()
942
+ except:
943
+ pass
944
 
945
 
946
  # Global instances