File size: 4,006 Bytes
ab176c3
 
 
 
 
 
 
 
 
434a82b
 
 
 
 
ab176c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
295cea1
 
ab176c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import numpy as np
import time
import whisperx
from scipy.signal import resample
import time
import os
import time

class WhisperAutomaticSpeechRecognizer:
    # device = "cuda"
    # compute_type = "int8"  # change to if more gpu memory available
    device = "cpu"  
    compute_type = "float32"
    
    batch_size = 4
    model = whisperx.load_model(
        "medium", device, language="en", compute_type=compute_type
        ,
        asr_options={
            "max_new_tokens": 448,
            "clip_timestamps": True,
            "hallucination_silence_threshold": 0.2,
        }
        #,
        # ๆทปๅŠ ็ผบๅคฑ็š„ๅ‚ๆ•ฐ
        #max_new_tokens=448,  # ๅฏๆ นๆฎๅฎž้™…ๆƒ…ๅ†ต่ฐƒๆ•ด
        #clip_timestamps=True,
        #hallucination_silence_threshold=0.2  # ๅฏๆ นๆฎๅฎž้™…ๆƒ…ๅ†ต่ฐƒๆ•ด
    )
    diarize_model = whisperx.DiarizationPipeline(
        # use_auth_token=os.environ.get('HF_TOKEN'), device="cuda"
        use_auth_token=os.environ.get('HF_TOKEN'), device=device
    )
    existing_speaker = None

    @staticmethod
    def downsample_audio_scipy(audio: np.ndarray, original_rate, target_rate=16000):
        if original_rate == target_rate:
            return audio

        if audio.ndim > 1:
            audio = np.mean(audio, axis=1)

        # Check if audio has one channel
        if len(audio.shape) != 1:
            raise ValueError("Input audio must have only one channel.")

        # Calculate the number of samples in the downsampled audio
        num_samples = int(len(audio) * target_rate / original_rate)
        downsampled_audio = resample(audio, num_samples)

        return downsampled_audio

    @staticmethod
    def transcribe_with_diarization_file(filepath: str):
        audio = whisperx.load_audio(filepath, 16000)
        return WhisperAutomaticSpeechRecognizer.transcribe_with_diarization(
            (16000, audio), None, "", False
        )

    @staticmethod
    def transcribe_with_diarization(
        stream, full_stream, full_transcript, streaming=True
    ):
        start_time = time.time()
        sr, y = stream
        if streaming:
            sr, y = stream
            y = WhisperAutomaticSpeechRecognizer.downsample_audio_scipy(y, sr)
            y = y.astype(np.float32)
            y /= 32768.0

        if full_transcript is None:
            full_transcript = ""
        transcribe_result = WhisperAutomaticSpeechRecognizer.model.transcribe(
            y, batch_size=WhisperAutomaticSpeechRecognizer.batch_size
        )
        diarize_segments = WhisperAutomaticSpeechRecognizer.diarize_model(y)

        diarize_result = whisperx.assign_word_speakers(
            diarize_segments, transcribe_result
        )

        new_transcript = ""
        for segment in diarize_result["segments"]:
            current_speaker = ""
            default_first_speaker = "SPEAKER_00"
            try:
                current_speaker = segment["speaker"]
            except KeyError:
                current_speaker = default_first_speaker
            if WhisperAutomaticSpeechRecognizer.existing_speaker == None:
                try:
                    WhisperAutomaticSpeechRecognizer.existing_speaker = current_speaker
                except KeyError:
                    WhisperAutomaticSpeechRecognizer.existing_speaker = default_first_speaker
                new_transcript += f"\n {WhisperAutomaticSpeechRecognizer.existing_speaker}  - "
            if current_speaker != WhisperAutomaticSpeechRecognizer.existing_speaker and current_speaker is not default_first_speaker:
                WhisperAutomaticSpeechRecognizer.existing_speaker = current_speaker
                new_transcript += f"\n {WhisperAutomaticSpeechRecognizer.existing_speaker}  - "
            new_transcript = new_transcript + segment["text"]
        full_transcript = full_transcript + new_transcript
        end_time = time.time()
        if streaming:
            time.sleep(5 - (end_time - start_time))
        return full_transcript, stream, full_transcript