Spaces:
Running
Running
Fix indentation in expert analysis display section of Streamlit app for improved layout
5235a31
unverified
import streamlit as st | |
import os | |
import yt_dlp | |
import subprocess | |
import librosa | |
import numpy as np | |
import torch | |
import sys | |
# Global flag for SpeechBrain availability | |
HAS_SPEECHBRAIN = False | |
# Handle SpeechBrain import with fallbacks for different versions | |
try: | |
# Try the new path first (SpeechBrain 1.0+) | |
from speechbrain.inference.classifiers import EncoderClassifier | |
HAS_SPEECHBRAIN = True | |
except ImportError: | |
try: | |
# Try the legacy path | |
from speechbrain.pretrained.interfaces import EncoderClassifier | |
HAS_SPEECHBRAIN = True | |
except ImportError: | |
try: | |
# Try the very old path | |
from speechbrain.pretrained import EncoderClassifier | |
HAS_SPEECHBRAIN = True | |
except ImportError: | |
# If all fail, we'll handle this later in the code | |
st.error("⚠️ Unable to import SpeechBrain. Limited functionality available.") | |
EncoderClassifier = None | |
# Handle potential compatibility issues with transformers | |
try: | |
from transformers import AutoProcessor, AutoModelForAudioClassification | |
HAS_AUTO_PROCESSOR = True | |
except ImportError: | |
from transformers import AutoModelForAudioClassification | |
HAS_AUTO_PROCESSOR = False | |
st.warning("Using a compatible but limited version of transformers. Some features may be limited.") | |
from dotenv import load_dotenv | |
import matplotlib.pyplot as plt | |
import tempfile | |
import time | |
# Deployment instructions: | |
# To deploy this app: | |
# 1. Make sure Docker is installed | |
# 2. Build the Docker image: docker build -t accent-detector . | |
# 3. Run the container: docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector | |
# For Windows: docker run -p 8501:8501 --volume C:\temp\accent-detector:/app/uploads accent-detector | |
# 4. Access the app at http://localhost:8501 | |
# | |
# For cloud deployment: | |
# - Streamlit Cloud: Connect your GitHub repository to Streamlit Cloud | |
# - Hugging Face Spaces: Use the Docker deployment option with proper volume mounts | |
# - Azure/AWS/GCP: Deploy the container using their container services with persistent storage | |
# | |
# Troubleshooting file uploads: | |
# - Set maxUploadSize in .streamlit/config.toml | |
# - Ensure write permissions on upload directories | |
# - For 403 errors, check file size and format compatibility | |
# Load environment variables (if .env file exists) | |
try: | |
load_dotenv() | |
except: | |
pass | |
# Check for OpenAI API access - optional for enhanced explanations | |
try: | |
import openai | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
have_openai = openai.api_key is not None | |
except (ImportError, AttributeError): | |
have_openai = False | |
# English accent categories | |
ENGLISH_ACCENTS = { | |
"en-us": "American English", | |
"en-gb": "British English", | |
"en-au": "Australian English", | |
"en-ca": "Canadian English", | |
"en-ie": "Irish English", | |
"en-scotland": "Scottish English", | |
"en-in": "Indian English", | |
"en-za": "South African English", | |
"en-ng": "Nigerian English", | |
"en-caribbean": "Caribbean English", | |
} | |
def download_video(url, video_path="video.mp4", cookies_file=None): | |
"""Download a video from a URL""" | |
# Determine if this is a YouTube URL | |
is_youtube = "youtube" in url.lower() or "youtu.be" in url.lower() | |
# Create a unique directory for each download to avoid permission issues | |
timestamp = str(int(time.time())) | |
# Use proper temp directory for Windows or Linux | |
if os.name == 'nt': # Windows | |
temp_dir = os.path.join(os.environ.get('TEMP', 'C:\\temp'), f"video_download_{timestamp}") | |
else: # Linux/Mac | |
temp_dir = f"/tmp/video_download_{timestamp}" | |
os.makedirs(temp_dir, exist_ok=True) | |
# Set correct permissions for the temp directory | |
try: | |
os.chmod(temp_dir, 0o777) # Full permissions for all users | |
except Exception as e: | |
st.warning(f"Could not set directory permissions: {str(e)}. Continuing anyway.") | |
# Use the temp directory for the video path | |
if not os.path.isabs(video_path): | |
video_path = os.path.join(temp_dir, video_path) | |
ydl_opts = { | |
"outtmpl": video_path, | |
"quiet": False, | |
"verbose": True, # More detailed output for debugging | |
"format": "bestaudio/best", # Prefer audio formats since we only need audio | |
"postprocessors": [{ | |
"key": "FFmpegExtractAudio", | |
"preferredcodec": "wav", | |
}] if is_youtube else [], # Extract audio directly for YouTube | |
"noplaylist": True, | |
"extractor_retries": 5, # Increased from 3 to 5 | |
"socket_timeout": 45, # Increased from 30 to 45 | |
"retry_sleep_functions": { | |
"http": lambda n: 5 * (n + 1), # 5, 10, 15, 20, 25 seconds | |
}, | |
"nocheckcertificate": True, # Skip HTTPS certificate validation | |
"ignoreerrors": False, # Don't ignore errors (we want to handle them) | |
} | |
# Add cookies if provided | |
if cookies_file and os.path.exists(cookies_file): | |
ydl_opts["cookiefile"] = cookies_file | |
st.info("Using provided cookies file for authentication") | |
# Set permissions on cookies file to make sure it's readable | |
try: | |
os.chmod(cookies_file, 0o644) # Read-write for owner, read-only for others | |
except Exception as e: | |
st.warning(f"Could not set permissions on cookies file: {str(e)}. Continuing anyway.") | |
# Setup environment variables for cache directories | |
os.environ['HOME'] = temp_dir # Set HOME to our temp dir for YouTube-DL cache | |
os.environ['XDG_CACHE_HOME'] = os.path.join(temp_dir, '.cache') # For Linux | |
os.environ['APPDATA'] = temp_dir # For Windows | |
try: | |
if is_youtube: | |
st.info("Attempting to download from YouTube. This might take longer...") | |
# List of alternative YouTube frontends to try | |
youtube_alternatives = [ | |
(url, "Standard YouTube"), | |
(url.replace("youtube.com", "yewtu.be"), "Invidious (yewtu.be)"), | |
(url.replace("youtube.com", "piped.video"), "Piped"), | |
(url.replace("youtube.com", "inv.riverside.rocks"), "Invidious (riverside)") | |
] | |
# If youtu.be is used, create proper alternatives | |
if "youtu.be" in url.lower(): | |
video_id = url.split("/")[-1].split("?")[0] | |
youtube_alternatives = [ | |
(url, "Standard YouTube"), | |
(f"https://yewtu.be/watch?v={video_id}", "Invidious (yewtu.be)"), | |
(f"https://piped.video/watch?v={video_id}", "Piped"), | |
(f"https://inv.riverside.rocks/watch?v={video_id}", "Invidious (riverside)") | |
] | |
success = False | |
for alt_url, alt_name in youtube_alternatives: | |
if alt_url == url and alt_name != "Standard YouTube": | |
continue # Skip redundant first entry | |
st.info(f"Trying {alt_name}... Please wait.") | |
try: | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([alt_url]) | |
# If we get here without exception, it worked | |
st.success(f"Successfully downloaded using {alt_name}") | |
success = True | |
break | |
except Exception as download_error: | |
error_msg = str(download_error) | |
st.warning(f"{alt_name} download attempt failed: {error_msg}") | |
# Break early if it's a permission issue to avoid trying alternatives | |
if "permission" in error_msg.lower() or "access" in error_msg.lower(): | |
st.error("Permission error detected. Stopping download attempts.") | |
raise download_error | |
# If all attempts failed | |
if not success: | |
st.error("All YouTube download methods failed.") | |
return False | |
else: | |
# For non-YouTube URLs | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
# Check if download was successful | |
if os.path.exists(video_path): | |
return True | |
else: | |
# Look for any downloaded files in the temp directory - more comprehensive search | |
downloaded_files = [] | |
for root, _, files in os.walk(temp_dir): | |
for file in files: | |
if file.endswith(('.mp4', '.mp3', '.wav', '.m4a')): | |
downloaded_files.append(os.path.join(root, file)) | |
if downloaded_files: | |
# Use the first media file found | |
first_file = downloaded_files[0] | |
try: | |
# Copy instead of move to avoid cross-device link issues | |
import shutil | |
shutil.copy(first_file, video_path) | |
return True | |
except Exception as copy_error: | |
st.error(f"Error copying downloaded file: {str(copy_error)}") | |
return False | |
st.error(f"Video downloaded but file not found: {video_path}") | |
return False | |
except Exception as e: | |
error_msg = str(e) | |
st.error(f"Download error: {error_msg}") | |
# Provide specific guidance based on error type | |
if is_youtube and ("bot" in error_msg.lower() or "sign in" in error_msg.lower() or "403" in error_msg): | |
st.warning("⚠️ YouTube requires authentication. Please try one of these solutions:") | |
st.markdown(""" | |
1. **Upload a cookies.txt file** using the file uploader above | |
2. **Try a different video source** like Loom, Vimeo or direct MP3/WAV files | |
3. **Use the Audio Upload tab** instead of YouTube URLs | |
""") | |
elif "not find" in error_msg.lower() and "cookies" in error_msg.lower(): | |
st.warning("Browser cookies could not be accessed. Please upload a cookies.txt file.") | |
elif "network" in error_msg.lower() or "timeout" in error_msg.lower(): | |
st.warning("Network error. Please check your internet connection and try again.") | |
elif "permission" in error_msg.lower(): | |
st.warning("Permission error. The application doesn't have access to create or write files in the temporary directory.") | |
st.info("Try running the Docker container with the proper volume mounts: `docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector`") | |
elif "not found" in error_msg.lower() and "ffmpeg" in error_msg.lower(): | |
st.error("FFmpeg is not installed or not found in PATH.") | |
st.info("If running locally, please install FFmpeg. If using Docker, the container may be misconfigured.") | |
return False | |
finally: | |
# Clean up temp directory if it still exists | |
try: | |
if os.path.exists(temp_dir) and ("tmp" in temp_dir or "temp" in temp_dir.lower()): | |
import shutil | |
shutil.rmtree(temp_dir) | |
except Exception as cleanup_error: | |
st.warning(f"Could not clean up temporary directory: {str(cleanup_error)}") | |
pass | |
def extract_audio(video_path="video.mp4", audio_path="audio.wav"): | |
"""Extract audio from video file using ffmpeg""" | |
try: | |
subprocess.run( | |
['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path], | |
check=True, | |
capture_output=True | |
) | |
return os.path.exists(audio_path) | |
except subprocess.CalledProcessError as e: | |
st.error(f"Error extracting audio: {e}") | |
st.error(f"ffmpeg output: {e.stderr.decode('utf-8')}") | |
raise | |
class AccentDetector: | |
def __init__(self): | |
# Initialize language identification model | |
self.have_lang_id = False | |
try: | |
if EncoderClassifier is not None: | |
self.lang_id = EncoderClassifier.from_hparams( | |
source="speechbrain/lang-id-commonlanguage_ecapa", | |
savedir="tmp_model" | |
) | |
self.have_lang_id = True | |
else: | |
st.error("SpeechBrain not available. Language identification disabled.") | |
except Exception as e: | |
st.error(f"Error loading language ID model: {str(e)}") | |
# Initialize the accent classifier | |
self.have_accent_model = False | |
try: | |
self.model_name = "speechbrain/lang-id-voxlingua107-ecapa" | |
# Handle case where AutoProcessor is not available | |
if HAS_AUTO_PROCESSOR: | |
self.processor = AutoProcessor.from_pretrained(self.model_name) | |
else: | |
# Fall back to using feature_extractor | |
from transformers import AutoFeatureExtractor | |
self.processor = AutoFeatureExtractor.from_pretrained(self.model_name) | |
self.model = AutoModelForAudioClassification.from_pretrained(self.model_name) | |
self.have_accent_model = True | |
except Exception as e: | |
st.warning(f"Could not load accent model: {str(e)}") | |
self.have_accent_model = False | |
def is_english(self, audio_path, threshold=0.7): | |
""" | |
Determine if the speech is English and return confidence score | |
""" | |
if not hasattr(self, 'have_lang_id') or not self.have_lang_id: | |
# If language ID model is not available, assume English | |
st.warning("Language identification is not available. Assuming English speech.") | |
return True, "en", 1.0 | |
try: | |
out_prob, score, index, lang = self.lang_id.classify_file(audio_path) | |
score = float(score) | |
# Check if language is English (slightly fuzzy match) | |
is_english = "eng" in lang.lower() or "en-" in lang.lower() or lang.lower() == "en" | |
return is_english, lang, score | |
except Exception as e: | |
st.warning(f"Error identifying language: {str(e)}. Assuming English speech.") | |
return True, "en", 0.5 | |
def classify_accent(self, audio_path): | |
""" | |
Classify the specific English accent | |
""" | |
if not self.have_accent_model: | |
return "Unknown English Accent", 0.0 | |
try: | |
# Load and preprocess audio | |
audio, sr = librosa.load(audio_path, sr=16000) | |
inputs = self.processor(audio, sampling_rate=sr, return_tensors="pt") | |
# Get predictions | |
with torch.no_grad(): | |
outputs = self.model(**inputs) | |
# Get probabilities | |
probs = outputs.logits.softmax(dim=-1)[0] | |
prediction_id = probs.argmax().item() | |
confidence = probs[prediction_id].item() | |
# Get predicted label | |
id2label = self.model.config.id2label | |
accent_code = id2label[prediction_id] | |
# Map to English accent if possible | |
if accent_code.startswith('en-'): | |
accent = ENGLISH_ACCENTS.get(accent_code, f"English ({accent_code})") | |
confidence = confidence # Keep confidence as-is for English accents | |
else: | |
# If it's not an English accent code, use our pre-classification | |
is_english, _, _ = self.is_english(audio_path) | |
if is_english: | |
accent = "General English" | |
else: | |
accent = f"Non-English ({accent_code})" | |
confidence *= 0.7 # Reduce confidence for non-specific matches | |
return accent, confidence | |
except Exception as e: | |
st.error(f"Error in accent classification: {str(e)}") | |
return "Unknown English Accent", 0.0 | |
def generate_explanation(self, audio_path, accent, confidence, is_english, language): | |
""" | |
Generate an explanation of the accent detection results using OpenAI API (if available) | |
""" | |
if not have_openai: | |
if is_english: | |
return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English." | |
else: | |
return f"The speech was identified as {language}, not English. English confidence is low." | |
try: | |
import openai | |
is_english, lang, lang_score = self.is_english(audio_path) | |
prompt = f""" | |
Audio analysis detected a speaker with the following characteristics: | |
- Primary accent/language: {accent} | |
- Confidence score: {confidence*100:.1f}% | |
- Detected language category: {lang} | |
- Is English: {is_english} | |
Based on this information, provide a 2-3 sentence summary about the speaker's accent. | |
Focus on how clear their English is and any notable accent characteristics. | |
This is for hiring purposes to evaluate English speaking abilities. | |
""" | |
response = openai.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You are an accent analysis specialist providing factual assessments."}, | |
{"role": "user", "content": prompt} | |
], | |
max_tokens=150 | |
) | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
st.error(f"Error generating explanation: {str(e)}") | |
if is_english: | |
return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English." | |
else: | |
return f"The speech was identified as {language}, not English. English confidence is low." | |
def analyze_audio(self, audio_path): | |
""" | |
Complete analysis pipeline returning all needed results | |
""" | |
# Check if it's English | |
is_english, lang, lang_score = self.is_english(audio_path) | |
# Classify accent if it's English | |
if is_english: | |
accent, accent_confidence = self.classify_accent(audio_path) | |
english_confidence = lang_score * 100 # Scale to percentage | |
else: | |
accent = f"Non-English ({lang})" | |
accent_confidence = lang_score | |
english_confidence = max(0, min(30, lang_score * 50)) # Cap at 30% if non-English | |
# Generate explanation | |
explanation = self.generate_explanation(audio_path, accent, accent_confidence, is_english, lang) | |
# Create visualization of the audio waveform | |
try: | |
y, sr = librosa.load(audio_path, sr=None) | |
fig, ax = plt.subplots(figsize=(10, 2)) | |
ax.plot(y) | |
ax.set_xlabel('Sample') | |
ax.set_ylabel('Amplitude') | |
ax.set_title('Audio Waveform') | |
plt.tight_layout() | |
audio_viz = fig | |
# Make sure the figure can be saved | |
try: | |
# Test if the figure can be saved | |
import tempfile | |
with tempfile.NamedTemporaryFile(suffix='.png') as tmp: | |
plt.savefig(tmp.name) | |
except Exception as viz_save_error: | |
st.warning(f"Could not save visualization: {str(viz_save_error)}. Using simpler visualization.") | |
# Create a simple alternative visualization | |
import numpy as np | |
# Downsample for performance | |
sample_rate = max(1, len(y) // 1000) | |
y_downsampled = y[::sample_rate] | |
fig2, ax2 = plt.subplots(figsize=(8, 2)) | |
ax2.plot(np.arange(len(y_downsampled)), y_downsampled) | |
ax2.set_title("Audio Waveform (simplified)") | |
audio_viz = fig2 | |
except Exception as e: | |
st.warning(f"Could not generate audio visualization: {str(e)}") | |
audio_viz = None | |
return { | |
"is_english": is_english, | |
"accent": accent, | |
"accent_confidence": accent_confidence * 100, # Scale to percentage | |
"english_confidence": english_confidence, | |
"language_detected": lang, | |
"explanation": explanation, | |
"audio_viz": audio_viz | |
} | |
def process_uploaded_audio(file_input): | |
"""Process uploaded audio file | |
Args: | |
file_input: Either a StreamlitUploadedFile object or a string path to a file | |
""" | |
audio_path = None | |
temp_input_path = None | |
try: | |
# Create a unique filename based on timestamp | |
timestamp = str(int(time.time())) | |
# Create a deterministic uploads directory with full permissions | |
uploads_dir = os.path.join(os.getcwd(), "uploads") | |
os.makedirs(uploads_dir, exist_ok=True) | |
# Try Streamlit's own upload path first if available | |
streamlit_uploads_path = os.environ.get('STREAMLIT_UPLOADS_PATH') | |
if streamlit_uploads_path and os.path.isdir(streamlit_uploads_path): | |
uploads_dir = streamlit_uploads_path | |
st.info(f"Using Streamlit's upload directory: {uploads_dir}") | |
# Make sure uploads directory has proper permissions | |
try: | |
os.chmod(uploads_dir, 0o777) # Full permissions | |
except Exception as chmod_error: | |
st.warning(f"Could not set permissions on uploads directory: {str(chmod_error)}. Continuing anyway.") | |
# Log upload dir info for debugging | |
st.info(f"Upload directory: {uploads_dir} (exists: {os.path.exists(uploads_dir)}, writable: {os.access(uploads_dir, os.W_OK)})") | |
# Handle different input types | |
if isinstance(file_input, str): | |
# If it's already a file path | |
temp_input_path = file_input | |
file_extension = os.path.splitext(temp_input_path)[1].lower() | |
st.info(f"Processing from saved file: {os.path.basename(temp_input_path)}") | |
else: | |
# If it's a StreamlitUploadedFile | |
file_extension = os.path.splitext(file_input.name)[1].lower() | |
# Write the uploaded file to disk with proper extension in the uploads directory | |
# Use a unique filename to avoid conflicts | |
safe_filename = ''.join(c if c.isalnum() or c in '._- ' else '_' for c in file_input.name) | |
temp_input_path = os.path.join(uploads_dir, f"uploaded_{timestamp}_{safe_filename}") | |
st.info(f"Saving uploaded file to: {temp_input_path}") | |
try: | |
# Write in chunks to handle large files better | |
chunk_size = 1024 * 1024 # 1MB chunks | |
buffer = file_input.getbuffer() | |
with open(temp_input_path, "wb") as f: | |
for i in range(0, len(buffer), chunk_size): | |
f.write(buffer[i:i+chunk_size]) | |
# Verify file was written properly | |
if os.path.exists(temp_input_path): | |
file_size = os.path.getsize(temp_input_path) | |
st.success(f"File saved successfully: {file_size} bytes") | |
else: | |
st.error(f"Failed to save file - file doesn't exist after writing") | |
except Exception as write_error: | |
st.error(f"Error writing uploaded file: {str(write_error)}") | |
# Try alternative temp directory as fallback | |
try: | |
import tempfile | |
temp_dir = tempfile.gettempdir() | |
temp_input_path = os.path.join(temp_dir, f"uploaded_{timestamp}_{safe_filename}") | |
st.warning(f"Trying alternative location: {temp_input_path}") | |
with open(temp_input_path, "wb") as f: | |
f.write(file_input.getbuffer()) | |
except Exception as alt_write_error: | |
st.error(f"Alternative write also failed: {str(alt_write_error)}") | |
raise | |
# For MP4 files, extract the audio using ffmpeg | |
if file_extension == ".mp4": | |
st.info("Extracting audio from video file...") | |
audio_path = os.path.join(uploads_dir, f"extracted_audio_{timestamp}.wav") | |
try: | |
# Add -y flag to overwrite output file if it exists | |
subprocess.run( | |
['ffmpeg', '-y', '-i', temp_input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path], | |
check=True, | |
capture_output=True | |
) | |
st.success(f"Audio extracted successfully to {audio_path}") | |
# Remove the original video file if extraction was successful | |
if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
os.remove(temp_input_path) | |
except subprocess.CalledProcessError as e: | |
st.error(f"Error extracting audio: {e}") | |
if e.stderr: | |
st.error(f"FFmpeg output: {e.stderr.decode('utf-8')}") | |
raise | |
else: | |
# For audio files, process based on format | |
if file_extension in [".mp3", ".m4a", ".ogg", ".flac"]: | |
# Convert to WAV for better compatibility | |
audio_path = os.path.join(uploads_dir, f"converted_audio_{timestamp}.wav") | |
st.info(f"Converting {file_extension} to WAV format for analysis...") | |
try: | |
# Use a verbose ffmpeg command with more options for compatibility | |
process = subprocess.run( | |
[ | |
'ffmpeg', '-y', '-i', temp_input_path, | |
'-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le', | |
# Add error handling flags | |
'-err_detect', 'ignore_err', | |
# Add buffers for better handling | |
'-analyzeduration', '10000000', '-probesize', '10000000', | |
audio_path | |
], | |
check=True, | |
capture_output=True | |
) | |
# Verify the file was created successfully | |
if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: | |
st.success(f"Audio converted successfully: {os.path.getsize(audio_path)} bytes") | |
# If conversion was successful, remove the original file to save space | |
os.remove(temp_input_path) | |
else: | |
st.warning("Conversion produced an empty file. Trying fallback conversion method...") | |
# Try alternative conversion method - simpler command | |
fallback_cmd = ['ffmpeg', '-y', '-i', temp_input_path, audio_path] | |
subprocess.run(fallback_cmd, check=True, capture_output=True) | |
if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: | |
st.warning("Fallback conversion also failed. Using original file.") | |
audio_path = temp_input_path | |
except subprocess.CalledProcessError as e: | |
st.warning(f"Conversion warning: {e}") | |
if e.stderr: | |
st.warning(f"FFmpeg error: {e.stderr.decode('utf-8')}") | |
st.info("Using original file instead.") | |
audio_path = temp_input_path | |
else: | |
# For already WAV files, use them directly | |
audio_path = temp_input_path | |
st.info(f"Using WAV file directly: {audio_path}") | |
detector = AccentDetector() | |
results = detector.analyze_audio(audio_path) | |
# Clean up | |
if audio_path and audio_path != temp_input_path and os.path.exists(audio_path): | |
os.remove(audio_path) | |
return results | |
except Exception as e: | |
error_msg = str(e) | |
st.error(f"Error processing audio: {error_msg}") | |
# Add detailed debugging info | |
import traceback | |
st.error(f"Error details: {traceback.format_exc()}") | |
# Show file info if available | |
if temp_input_path and os.path.exists(temp_input_path): | |
st.info(f"Input file exists: {temp_input_path}, size: {os.path.getsize(temp_input_path)} bytes") | |
os.remove(temp_input_path) | |
else: | |
if temp_input_path: | |
st.warning(f"Input file does not exist: {temp_input_path}") | |
if audio_path and os.path.exists(audio_path): | |
st.info(f"Audio file exists: {audio_path}, size: {os.path.getsize(audio_path)} bytes") | |
os.remove(audio_path) | |
else: | |
if audio_path: | |
st.warning(f"Audio file does not exist: {audio_path}") | |
# Check for common error types | |
if "ffmpeg" in error_msg.lower(): | |
st.warning("FFmpeg error detected. The audio conversion failed.") | |
st.info("Try a different audio format or check if FFmpeg is installed correctly.") | |
elif "permission" in error_msg.lower(): | |
st.warning("Permission error detected.") | |
st.info("Check that the uploads directory is writable.") | |
elif "no such file" in error_msg.lower(): | |
st.warning("File not found error detected.") | |
st.info("The file may have been moved, deleted, or not saved correctly.") | |
raise | |
return results | |
# --- Streamlit App --- | |
st.set_page_config( | |
page_title="🎤 English Accent Detector", | |
page_icon="🎤", | |
layout="wide" | |
) | |
st.title("🎤 English Accent Detection Tool") | |
st.markdown(""" | |
This application analyzes a speaker's English accent from video URLs or audio uploads, | |
providing detailed insights for hiring evaluation purposes. | |
""") | |
# Add container for tips | |
with st.container(): | |
st.info(""" | |
💡 **Tips for best results:** | |
- Use **Loom** or **Vimeo** videos (more reliable than YouTube) | |
- For YouTube videos, you may need to provide cookies | |
- Audio clips of 15-30 seconds work best | |
- Clear speech with minimal background noise is ideal | |
""") | |
st.markdown(""" | |
This app analyzes a speaker's English accent from a video or audio source. | |
It provides: | |
- Classification of the accent (British, American, etc.) | |
- Confidence score for English proficiency | |
- Explanation of accent characteristics | |
""") | |
# Create tabs for different input methods | |
tab1, tab2 = st.tabs(["Video URL", "Upload Audio"]) | |
with tab1: | |
st.markdown("### 🎬 Analyze video from URL") | |
url = st.text_input("Enter a public video URL", | |
placeholder="https://www.loom.com/..., https://vimeo.com/..., or direct MP4 link") | |
# Add alternative invidious frontend option for YouTube | |
use_alternative = st.checkbox("Try alternative YouTube source (for authentication issues)", | |
value=True, | |
help="Uses an alternative frontend (Invidious) that may bypass YouTube restrictions") | |
# Recommend alternative sources | |
st.caption("⚠️ **Note**: YouTube videos often require authentication. For best results, use Loom, Vimeo or direct video links.") | |
# Add file uploader for cookies.txt | |
cookies_file = None | |
uploaded_cookies = st.file_uploader("Upload cookies.txt file for YouTube (if needed)", | |
type="txt", | |
help="Only needed for YouTube videos that require authentication") | |
if uploaded_cookies is not None: | |
# Save the uploaded cookies file to a temporary file | |
cookies_file = f"cookies_{int(time.time())}.txt" | |
with open(cookies_file, "wb") as f: | |
f.write(uploaded_cookies.getbuffer()) | |
st.success("Cookies file uploaded successfully!") | |
with st.expander("Having trouble with YouTube videos?"): | |
st.markdown(""" | |
### YouTube Authentication Issues | |
YouTube's anti-bot measures often block automated video downloads. To solve this: | |
#### Option 1: Use Alternative Video Sources (Recommended) | |
These typically work without authentication issues: | |
- [Loom](https://www.loom.com/) - Great for screen recordings | |
- [Vimeo](https://vimeo.com/) - High-quality video hosting | |
- [Streamable](https://streamable.com/) - Simple video sharing | |
- Any direct MP4 link | |
#### Option 2: Upload Cookies for YouTube | |
1. Install a browser extension like [Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc) | |
2. Login to YouTube in your browser | |
3. Use the extension to export cookies to a .txt file | |
4. Upload the cookies.txt file using the uploader above | |
#### Option 3: Use Audio Upload Instead | |
The 'Upload Audio' tab allows direct analysis of audio files without URL issues. | |
""") | |
if st.button("Analyze Video"): | |
if not url: | |
st.warning("Please enter a valid URL") | |
else: | |
try: | |
# Create a placeholder for status updates | |
status = st.empty() | |
# Generate unique filenames using timestamp to avoid conflicts | |
timestamp = str(int(time.time())) | |
video_path = f"video_{timestamp}.mp4" | |
audio_path = f"audio_{timestamp}.wav" | |
# Download and process the video | |
status.text("Downloading video...") | |
download_success = download_video(url, video_path, cookies_file) | |
if not download_success: | |
st.error("Failed to download video") | |
else: | |
status.text("Extracting audio...") | |
extract_success = extract_audio(video_path, audio_path) | |
if not extract_success: | |
st.error("Failed to extract audio") | |
else: | |
status.text("Analyzing accent... (this may take a moment)") | |
detector = AccentDetector() | |
results = detector.analyze_audio(audio_path) | |
# Display results | |
st.success("✅ Analysis Complete!") | |
# Create columns for results | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.subheader("Accent Analysis Results") | |
st.markdown(f"**Detected Accent:** {results['accent']}") | |
st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%") | |
st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%") | |
# Show explanation in a box | |
st.markdown("### Expert Analysis") | |
st.info(results['explanation']) | |
with col2: | |
if results['audio_viz']: | |
try: | |
st.pyplot(results['audio_viz']) | |
except Exception as viz_error: | |
st.warning("Could not display visualization due to torchvision issue.") | |
st.info("Audio analysis was successful even though visualization failed.") | |
# Show audio playback | |
st.audio(audio_path) | |
# Clean up files | |
try: | |
if os.path.exists(video_path): | |
os.remove(video_path) | |
if os.path.exists(audio_path): | |
os.remove(audio_path) | |
if cookies_file and os.path.exists(cookies_file): | |
os.remove(cookies_file) | |
except Exception as e: | |
st.warning(f"Couldn't clean up temporary files: {str(e)}") | |
except Exception as e: | |
st.error(f"Error during analysis: {str(e)}") | |
with tab2: | |
st.markdown("### 🎵 Upload Audio File") | |
st.caption("**Recommended option!** Direct audio upload is more reliable than video URLs.") | |
# Add some information about file size limits | |
st.info("📝 **File Requirements**: \n" | |
"• Maximum file size: 200MB \n" | |
"• Supported formats: WAV, MP3, M4A, OGG, FLAC, MP4 \n" | |
"• Recommended length: 15-60 seconds of clear speech") | |
uploaded_file = st.file_uploader("Upload an audio file", | |
type=["wav", "mp3", "m4a", "ogg", "flac", "mp4"], | |
help="Support for WAV, MP3, M4A, OGG, FLAC and MP4 formats", | |
accept_multiple_files=False) | |
if uploaded_file is not None: # Show a preview of the audio | |
st.markdown("#### Audio Preview:") | |
try: | |
st.audio(uploaded_file) | |
st.markdown("#### Ready for Analysis") | |
col1, col2 = st.columns([1, 3]) | |
with col1: | |
analyze_button = st.button("Analyze Audio", type="primary", use_container_width=True) | |
with col2: | |
st.caption("Tip: 15-30 seconds of clear speech works best for accent detection") | |
except Exception as preview_error: | |
st.warning(f"Could not preview audio: {str(preview_error)}") | |
# If preview fails, still allow analysis | |
analyze_button = st.button("Analyze Audio (Preview Failed)", type="primary") | |
st.caption("Proceeding with analysis might still work even if preview failed") | |
if analyze_button: | |
with st.spinner("Analyzing audio... (this may take 15-30 seconds)"): | |
try: | |
# Check file size before processing | |
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
if file_size_mb > 190: # Stay below the 200MB limit with some buffer | |
st.error(f"File size ({file_size_mb:.1f}MB) is too large. Maximum allowed is 190MB.") | |
st.info("Tip: Try trimming your audio to just the speech segment for better results.") | |
else: # Create a progress bar to show processing stages | |
progress_bar = st.progress(0) | |
# Check the file type and inform user about processing steps | |
file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
if file_extension == '.mp4': | |
st.info("Processing video file - extracting audio track...") | |
elif file_extension in ['.mp3', '.m4a', '.ogg', '.flac']: | |
st.info(f"Processing {file_extension} audio file...") | |
progress_bar.progress(25, text="Saving file...") | |
# First save the file to a known location to bypass 403 errors | |
# Create an uploads directory if it doesn't exist | |
uploads_dir = os.path.join(os.getcwd(), "uploads") | |
os.makedirs(uploads_dir, exist_ok=True) # Save the file first to avoid streaming it multiple times | |
temp_file_path = os.path.join(uploads_dir, f"temp_{int(time.time())}_{uploaded_file.name}") | |
with open(temp_file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
progress_bar.progress(50, text="Analyzing audio...") | |
# Process using the saved file path directly | |
results = process_uploaded_audio(temp_file_path) | |
progress_bar.progress(100, text="Analysis complete!") | |
# Display results | |
st.success("✅ Analysis Complete!") | |
# Create columns for results | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
st.subheader("Accent Analysis Results") | |
st.markdown(f"**Detected Accent:** {results['accent']}") | |
st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%") | |
st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%") | |
# Show explanation in a box | |
st.markdown("### Expert Analysis") | |
st.info(results['explanation']) | |
with col2: | |
if results['audio_viz']: | |
try: | |
st.pyplot(results['audio_viz']) | |
except Exception as viz_error: | |
st.warning("Could not display visualization due to torchvision issue.") | |
st.info("Audio analysis was successful even though visualization failed.") | |
except subprocess.CalledProcessError as e: | |
st.error("Error processing audio file") | |
st.error(f"FFmpeg error: {e.stderr.decode('utf-8') if e.stderr else str(e)}") | |
st.info("Troubleshooting tips:\n" | |
"• Try a different audio file format (WAV or MP3 recommended)\n" | |
"• Make sure the file is not corrupted\n" | |
"• Try a shorter audio clip") | |
except PermissionError as e: | |
st.error(f"Permission error: {str(e)}") | |
st.info("The app doesn't have permission to access or create temporary files. " | |
"This could be due to Docker container permissions. " | |
"Contact the administrator or try using a different file.") | |
except OSError as e: | |
st.error(f"System error: {str(e)}") | |
st.info("Check that the file isn't corrupted and try with a smaller audio clip.") | |
except Exception as e: | |
error_msg = str(e) | |
st.error(f"Error during analysis: {error_msg}") | |
if "403" in error_msg: | |
st.warning("Received a 403 Forbidden error. This may be due to: \n" | |
"• File size exceeding limits\n" | |
"• Temporary file permission issues\n" | |
"• Network restrictions") | |
st.info("Try a smaller audio file (less than 50MB) or a different format.") | |
elif "timeout" in error_msg.lower(): | |
st.warning("The request timed out. Try a shorter audio clip or check your internet connection.") | |
elif "memory" in error_msg.lower(): | |
st.warning("Out of memory error. Try a shorter audio clip.") | |
else: | |
st.info("If the problem persists, try a different audio file format such as MP3 or WAV.") | |
# Add footer with deployment info | |
st.markdown("---") | |
st.markdown("Deployed using Streamlit • Built with SpeechBrain and Transformers") | |
# Add a section for how it works | |
with st.expander("ℹ️ How It Works"): | |
st.markdown(""" | |
This app uses a multi-stage process to analyze a speaker's accent: | |
1. **Audio Extraction**: The audio track is extracted from the input video or directly processed from uploaded audio. | |
2. **Language Identification**: First, we determine if the speech is English using SpeechBrain's language identification model. | |
3. **Accent Classification**: For English speech, we analyze the specific accent using a transformer-based model trained on diverse accent data. | |
4. **English Proficiency Score**: A confidence score is calculated based on both language identification and accent clarity. | |
5. **Analysis Summary**: An explanation is generated describing accent characteristics relevant for hiring evaluations. | |
""") | |
# Add debug function for troubleshooting HTTP errors | |
def debug_http_errors(): | |
"""Print debug information for HTTP errors""" | |
st.warning("⚠️ HTTP 400 Error Debugging Mode") | |
st.markdown(""" | |
### Common HTTP 400 Error Causes: | |
1. **File size exceeds limits** (current limit: 150MB) | |
2. **File format incompatibility** | |
3. **Network interruption** during upload | |
4. **Server-side timeout** during processing | |
5. **Permissions issues** in container | |
""") | |
# Show environment info | |
st.subheader("Environment Information") | |
env_info = { | |
"STREAMLIT_UPLOADS_PATH": os.environ.get("STREAMLIT_UPLOADS_PATH", "Not set"), | |
"STREAMLIT_SERVER_MAX_UPLOAD_SIZE": os.environ.get("STREAMLIT_SERVER_MAX_UPLOAD_SIZE", "Not set"), | |
"Current directory": os.getcwd(), | |
"Python version": sys.version | |
} | |
for key, value in env_info.items(): | |
st.code(f"{key}: {value}") | |
# Check if uploads directory is writable | |
uploads_dir = os.environ.get("STREAMLIT_UPLOADS_PATH", os.path.join(os.getcwd(), "uploads")) | |
os.makedirs(uploads_dir, exist_ok=True) | |
try: | |
test_file = os.path.join(uploads_dir, "test_write.txt") | |
with open(test_file, "w") as f: | |
f.write("Test write permission") | |
os.remove(test_file) | |
st.success(f"✓ Upload directory is writable: {uploads_dir}") | |
except Exception as e: | |
st.error(f"✗ Cannot write to upload directory: {str(e)}") | |
# Test ffmpeg | |
try: | |
result = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True) | |
st.success(f"✓ FFmpeg is available") | |
except Exception as e: | |
st.error(f"✗ FFmpeg error: {str(e)}") | |
# Add debug mode flag to the app | |
debug_mode = False | |
with st.expander("🔧 Troubleshooting Tools"): | |
debug_mode = st.checkbox("Enable Debug Mode for HTTP 400 Errors") | |
if debug_mode: | |
debug_http_errors() | |
# Add option for user to try different upload method | |
alt_upload = st.checkbox("Use alternative upload method (for HTTP 400 errors)") | |
if alt_upload: | |
st.info("Using alternative upload method that may bypass some HTTP 400 errors") | |