|
import streamlit as st |
|
import torch |
|
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM |
|
import torchaudio |
|
import os |
|
import re |
|
import jieba |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
MODEL_NAME = "alvanlii/whisper-small-cantonese" |
|
language = "zh" |
|
pipe = pipeline(task="automatic-speech-recognition", model=MODEL_NAME, chunk_length_s=60, device=device) |
|
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=language, task="transcribe") |
|
|
|
def transcribe_audio(audio_path): |
|
""" |
|
对音频文件进行转录,支持大于60秒的音频分段处理 |
|
""" |
|
waveform, sample_rate = torchaudio.load(audio_path) |
|
duration = waveform.shape[1] / sample_rate |
|
if duration > 60: |
|
results = [] |
|
for start in range(0, int(duration), 50): |
|
end = min(start + 60, int(duration)) |
|
chunk = waveform[:, start * sample_rate:end * sample_rate] |
|
temp_filename = f"temp_chunk_{start}.wav" |
|
torchaudio.save(temp_filename, chunk, sample_rate) |
|
result = pipe(temp_filename)["text"] |
|
results.append(result) |
|
os.remove(temp_filename) |
|
return " ".join(results) |
|
return pipe(audio_path)["text"] |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained("botisan-ai/mt5-translate-yue-zh") |
|
model = AutoModelForSeq2SeqLM.from_pretrained("botisan-ai/mt5-translate-yue-zh").to(device) |
|
|
|
def split_sentences(text): |
|
"""根据中文标点分割句子""" |
|
return [s for s in re.split(r'(?<=[。!?])', text) if s] |
|
|
|
def translate(text): |
|
""" |
|
将转录文本翻译为中文,逐句翻译后拼接输出 |
|
""" |
|
sentences = split_sentences(text) |
|
translations = [] |
|
for sentence in sentences: |
|
inputs = tokenizer(sentence, return_tensors="pt").to(device) |
|
outputs = model.generate(inputs["input_ids"], max_length=1000, num_beams=5) |
|
translations.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) |
|
return " ".join(translations) |
|
|
|
|
|
rating_pipe = pipeline("text-classification", model="Leo0129/CustomModel_dianping-chinese") |
|
|
|
def split_text(text, max_length=512): |
|
""" |
|
将文本按照最大长度拆分成多个片段,使用 jieba 分词 |
|
""" |
|
words = list(jieba.cut(text)) |
|
chunks, current_chunk = [], "" |
|
for word in words: |
|
if len(current_chunk) + len(word) < max_length: |
|
current_chunk += word |
|
else: |
|
chunks.append(current_chunk) |
|
current_chunk = word |
|
if current_chunk: |
|
chunks.append(current_chunk) |
|
return chunks |
|
|
|
def rate_quality(text): |
|
""" |
|
对翻译后的文本进行质量评价,返回最频繁的评分结果 |
|
""" |
|
chunks = split_text(text) |
|
results = [] |
|
for chunk in chunks: |
|
result = rating_pipe(chunk)[0] |
|
label_map = {"LABEL_0": "Poor", "LABEL_1": "Neutral", "LABEL_2": "Good"} |
|
results.append(label_map.get(result["label"], "Unknown")) |
|
return max(set(results), key=results.count) |
|
|
|
def main(): |
|
|
|
st.set_page_config(page_title="Customer Service Quality Analyzer", page_icon="🎙️") |
|
|
|
|
|
st.markdown(""" |
|
<style> |
|
@import url('https://fonts.googleapis.com/css2?family=Comic+Neue:wght@700&display=swap'); |
|
.header { |
|
background: linear-gradient(45deg, #FF9A6C, #FF6B6B); |
|
border-radius: 15px; |
|
padding: 2rem; |
|
text-align: center; |
|
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); |
|
margin-bottom: 2rem; |
|
} |
|
.subtitle { |
|
font-family: 'Comic Neue', cursive; |
|
color: #4B4B4B; |
|
font-size: 1.2rem; |
|
margin: 1rem 0; |
|
padding: 1rem; |
|
background: rgba(255,255,255,0.9); |
|
border-radius: 10px; |
|
border-left: 5px solid #FF6B6B; |
|
} |
|
</style> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
st.markdown(""" |
|
<div class="header"> |
|
<h1 style='margin:0;'>🎙️ Customer Service Quality Analyzer</h1> |
|
<p style='color: white; font-size: 1.2rem;'>Evaluate the service quality with simple-uploading!</p> |
|
</div> |
|
""", unsafe_allow_html=True) |
|
|
|
|
|
uploaded_file = st.file_uploader("👉🏻 Upload your Cantonese audio file here...", type=["wav", "mp3", "flac"]) |
|
|
|
if uploaded_file is not None: |
|
|
|
st.audio(uploaded_file, format="audio/wav") |
|
|
|
temp_audio_path = "uploaded_audio.wav" |
|
with open(temp_audio_path, "wb") as f: |
|
f.write(uploaded_file.getbuffer()) |
|
|
|
|
|
progress_bar = st.progress(0) |
|
status_container = st.empty() |
|
|
|
|
|
status_container.info("📝 **Step 1/3**: Transcribing audio...") |
|
transcript = transcribe_audio(temp_audio_path) |
|
progress_bar.progress(33) |
|
st.write("**Transcript:**", transcript) |
|
|
|
|
|
status_container.info("📚 **Step 2/3**: Translating transcript...") |
|
translated_text = translate(transcript) |
|
progress_bar.progress(66) |
|
st.write("**Translation:**", translated_text) |
|
|
|
|
|
status_container.info("🧑⚖️ **Step 3/3**: Evaluating audio quality...") |
|
quality_rating = rate_quality(translated_text) |
|
progress_bar.progress(100) |
|
st.write("**Quality Rating:**", quality_rating) |
|
|
|
|
|
os.remove(temp_audio_path) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|