import streamlit as st import os import yt_dlp import subprocess import librosa import numpy as np import torch import sys # Global flag for SpeechBrain availability HAS_SPEECHBRAIN = False # Handle SpeechBrain import with fallbacks for different versions try: # Try the new path first (SpeechBrain 1.0+) from speechbrain.inference.classifiers import EncoderClassifier HAS_SPEECHBRAIN = True except ImportError: try: # Try the legacy path from speechbrain.pretrained.interfaces import EncoderClassifier HAS_SPEECHBRAIN = True except ImportError: try: # Try the very old path from speechbrain.pretrained import EncoderClassifier HAS_SPEECHBRAIN = True except ImportError: # If all fail, we'll handle this later in the code st.error("âš ī¸ Unable to import SpeechBrain. Limited functionality available.") EncoderClassifier = None # Handle potential compatibility issues with transformers try: from transformers import AutoProcessor, AutoModelForAudioClassification HAS_AUTO_PROCESSOR = True except ImportError: from transformers import AutoModelForAudioClassification HAS_AUTO_PROCESSOR = False st.warning("Using a compatible but limited version of transformers. Some features may be limited.") from dotenv import load_dotenv import matplotlib.pyplot as plt import tempfile import time # Deployment instructions: # To deploy this app: # 1. Make sure Docker is installed # 2. Build the Docker image: docker build -t accent-detector . # 3. Run the container: docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector # For Windows: docker run -p 8501:8501 --volume C:\temp\accent-detector:/app/uploads accent-detector # 4. Access the app at http://localhost:8501 # # For cloud deployment: # - Streamlit Cloud: Connect your GitHub repository to Streamlit Cloud # - Hugging Face Spaces: Use the Docker deployment option with proper volume mounts # - Azure/AWS/GCP: Deploy the container using their container services with persistent storage # # Troubleshooting file uploads: # - Set maxUploadSize in .streamlit/config.toml # - Ensure write permissions on upload directories # - For 403 errors, check file size and format compatibility # Load environment variables (if .env file exists) try: load_dotenv() except: pass # Check for OpenAI API access - optional for enhanced explanations try: import openai openai.api_key = os.getenv("OPENAI_API_KEY") have_openai = openai.api_key is not None except (ImportError, AttributeError): have_openai = False # English accent categories ENGLISH_ACCENTS = { "en-us": "American English", "en-gb": "British English", "en-au": "Australian English", "en-ca": "Canadian English", "en-ie": "Irish English", "en-scotland": "Scottish English", "en-in": "Indian English", "en-za": "South African English", "en-ng": "Nigerian English", "en-caribbean": "Caribbean English", } def download_video(url, video_path="video.mp4", cookies_file=None): """Download a video from a URL""" # Determine if this is a YouTube URL is_youtube = "youtube" in url.lower() or "youtu.be" in url.lower() # Create a unique directory for each download to avoid permission issues timestamp = str(int(time.time())) # Use proper temp directory for Windows or Linux if os.name == 'nt': # Windows temp_dir = os.path.join(os.environ.get('TEMP', 'C:\\temp'), f"video_download_{timestamp}") else: # Linux/Mac temp_dir = f"/tmp/video_download_{timestamp}" os.makedirs(temp_dir, exist_ok=True) # Set correct permissions for the temp directory try: os.chmod(temp_dir, 0o777) # Full permissions for all users except Exception as e: st.warning(f"Could not set directory permissions: {str(e)}. Continuing anyway.") # Use the temp directory for the video path if not os.path.isabs(video_path): video_path = os.path.join(temp_dir, video_path) ydl_opts = { "outtmpl": video_path, "quiet": False, "verbose": True, # More detailed output for debugging "format": "bestaudio/best", # Prefer audio formats since we only need audio "postprocessors": [{ "key": "FFmpegExtractAudio", "preferredcodec": "wav", }] if is_youtube else [], # Extract audio directly for YouTube "noplaylist": True, "extractor_retries": 5, # Increased from 3 to 5 "socket_timeout": 45, # Increased from 30 to 45 "retry_sleep_functions": { "http": lambda n: 5 * (n + 1), # 5, 10, 15, 20, 25 seconds }, "nocheckcertificate": True, # Skip HTTPS certificate validation "ignoreerrors": False, # Don't ignore errors (we want to handle them) } # Add cookies if provided if cookies_file and os.path.exists(cookies_file): ydl_opts["cookiefile"] = cookies_file st.info("Using provided cookies file for authentication") # Set permissions on cookies file to make sure it's readable try: os.chmod(cookies_file, 0o644) # Read-write for owner, read-only for others except Exception as e: st.warning(f"Could not set permissions on cookies file: {str(e)}. Continuing anyway.") # Setup environment variables for cache directories os.environ['HOME'] = temp_dir # Set HOME to our temp dir for YouTube-DL cache os.environ['XDG_CACHE_HOME'] = os.path.join(temp_dir, '.cache') # For Linux os.environ['APPDATA'] = temp_dir # For Windows try: if is_youtube: st.info("Attempting to download from YouTube. This might take longer...") # List of alternative YouTube frontends to try youtube_alternatives = [ (url, "Standard YouTube"), (url.replace("youtube.com", "yewtu.be"), "Invidious (yewtu.be)"), (url.replace("youtube.com", "piped.video"), "Piped"), (url.replace("youtube.com", "inv.riverside.rocks"), "Invidious (riverside)") ] # If youtu.be is used, create proper alternatives if "youtu.be" in url.lower(): video_id = url.split("/")[-1].split("?")[0] youtube_alternatives = [ (url, "Standard YouTube"), (f"https://yewtu.be/watch?v={video_id}", "Invidious (yewtu.be)"), (f"https://piped.video/watch?v={video_id}", "Piped"), (f"https://inv.riverside.rocks/watch?v={video_id}", "Invidious (riverside)") ] success = False for alt_url, alt_name in youtube_alternatives: if alt_url == url and alt_name != "Standard YouTube": continue # Skip redundant first entry st.info(f"Trying {alt_name}... Please wait.") try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([alt_url]) # If we get here without exception, it worked st.success(f"Successfully downloaded using {alt_name}") success = True break except Exception as download_error: error_msg = str(download_error) st.warning(f"{alt_name} download attempt failed: {error_msg}") # Break early if it's a permission issue to avoid trying alternatives if "permission" in error_msg.lower() or "access" in error_msg.lower(): st.error("Permission error detected. Stopping download attempts.") raise download_error # If all attempts failed if not success: st.error("All YouTube download methods failed.") return False else: # For non-YouTube URLs with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Check if download was successful if os.path.exists(video_path): return True else: # Look for any downloaded files in the temp directory - more comprehensive search downloaded_files = [] for root, _, files in os.walk(temp_dir): for file in files: if file.endswith(('.mp4', '.mp3', '.wav', '.m4a')): downloaded_files.append(os.path.join(root, file)) if downloaded_files: # Use the first media file found first_file = downloaded_files[0] try: # Copy instead of move to avoid cross-device link issues import shutil shutil.copy(first_file, video_path) return True except Exception as copy_error: st.error(f"Error copying downloaded file: {str(copy_error)}") return False st.error(f"Video downloaded but file not found: {video_path}") return False except Exception as e: error_msg = str(e) st.error(f"Download error: {error_msg}") # Provide specific guidance based on error type if is_youtube and ("bot" in error_msg.lower() or "sign in" in error_msg.lower() or "403" in error_msg): st.warning("âš ī¸ YouTube requires authentication. Please try one of these solutions:") st.markdown(""" 1. **Upload a cookies.txt file** using the file uploader above 2. **Try a different video source** like Loom, Vimeo or direct MP3/WAV files 3. **Use the Audio Upload tab** instead of YouTube URLs """) elif "not find" in error_msg.lower() and "cookies" in error_msg.lower(): st.warning("Browser cookies could not be accessed. Please upload a cookies.txt file.") elif "network" in error_msg.lower() or "timeout" in error_msg.lower(): st.warning("Network error. Please check your internet connection and try again.") elif "permission" in error_msg.lower(): st.warning("Permission error. The application doesn't have access to create or write files in the temporary directory.") st.info("Try running the Docker container with the proper volume mounts: `docker run -p 8501:8501 --volume /tmp/accent-detector:/app/uploads accent-detector`") elif "not found" in error_msg.lower() and "ffmpeg" in error_msg.lower(): st.error("FFmpeg is not installed or not found in PATH.") st.info("If running locally, please install FFmpeg. If using Docker, the container may be misconfigured.") return False finally: # Clean up temp directory if it still exists try: if os.path.exists(temp_dir) and ("tmp" in temp_dir or "temp" in temp_dir.lower()): import shutil shutil.rmtree(temp_dir) except Exception as cleanup_error: st.warning(f"Could not clean up temporary directory: {str(cleanup_error)}") pass def extract_audio(video_path="video.mp4", audio_path="audio.wav"): """Extract audio from video file using ffmpeg""" try: subprocess.run( ['ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path], check=True, capture_output=True ) return os.path.exists(audio_path) except subprocess.CalledProcessError as e: st.error(f"Error extracting audio: {e}") st.error(f"ffmpeg output: {e.stderr.decode('utf-8')}") raise class AccentDetector: def __init__(self): # Initialize language identification model self.have_lang_id = False try: if EncoderClassifier is not None: self.lang_id = EncoderClassifier.from_hparams( source="speechbrain/lang-id-commonlanguage_ecapa", savedir="tmp_model" ) self.have_lang_id = True else: st.error("SpeechBrain not available. Language identification disabled.") except Exception as e: st.error(f"Error loading language ID model: {str(e)}") # Initialize the accent classifier self.have_accent_model = False try: self.model_name = "speechbrain/lang-id-voxlingua107-ecapa" # Handle case where AutoProcessor is not available if HAS_AUTO_PROCESSOR: self.processor = AutoProcessor.from_pretrained(self.model_name) else: # Fall back to using feature_extractor from transformers import AutoFeatureExtractor self.processor = AutoFeatureExtractor.from_pretrained(self.model_name) self.model = AutoModelForAudioClassification.from_pretrained(self.model_name) self.have_accent_model = True except Exception as e: st.warning(f"Could not load accent model: {str(e)}") self.have_accent_model = False def is_english(self, audio_path, threshold=0.7): """ Determine if the speech is English and return confidence score """ if not hasattr(self, 'have_lang_id') or not self.have_lang_id: # If language ID model is not available, assume English st.warning("Language identification is not available. Assuming English speech.") return True, "en", 1.0 try: out_prob, score, index, lang = self.lang_id.classify_file(audio_path) score = float(score) # Check if language is English (slightly fuzzy match) is_english = "eng" in lang.lower() or "en-" in lang.lower() or lang.lower() == "en" return is_english, lang, score except Exception as e: st.warning(f"Error identifying language: {str(e)}. Assuming English speech.") return True, "en", 0.5 def classify_accent(self, audio_path): """ Classify the specific English accent """ if not self.have_accent_model: return "Unknown English Accent", 0.0 try: # Load and preprocess audio audio, sr = librosa.load(audio_path, sr=16000) inputs = self.processor(audio, sampling_rate=sr, return_tensors="pt") # Get predictions with torch.no_grad(): outputs = self.model(**inputs) # Get probabilities probs = outputs.logits.softmax(dim=-1)[0] prediction_id = probs.argmax().item() confidence = probs[prediction_id].item() # Get predicted label id2label = self.model.config.id2label accent_code = id2label[prediction_id] # Map to English accent if possible if accent_code.startswith('en-'): accent = ENGLISH_ACCENTS.get(accent_code, f"English ({accent_code})") confidence = confidence # Keep confidence as-is for English accents else: # If it's not an English accent code, use our pre-classification is_english, _, _ = self.is_english(audio_path) if is_english: accent = "General English" else: accent = f"Non-English ({accent_code})" confidence *= 0.7 # Reduce confidence for non-specific matches return accent, confidence except Exception as e: st.error(f"Error in accent classification: {str(e)}") return "Unknown English Accent", 0.0 def generate_explanation(self, audio_path, accent, confidence, is_english, language): """ Generate an explanation of the accent detection results using OpenAI API (if available) """ if not have_openai: if is_english: return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English." else: return f"The speech was identified as {language}, not English. English confidence is low." try: import openai is_english, lang, lang_score = self.is_english(audio_path) prompt = f""" Audio analysis detected a speaker with the following characteristics: - Primary accent/language: {accent} - Confidence score: {confidence*100:.1f}% - Detected language category: {lang} - Is English: {is_english} Based on this information, provide a 2-3 sentence summary about the speaker's accent. Focus on how clear their English is and any notable accent characteristics. This is for hiring purposes to evaluate English speaking abilities. """ response = openai.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are an accent analysis specialist providing factual assessments."}, {"role": "user", "content": prompt} ], max_tokens=150 ) return response.choices[0].message.content.strip() except Exception as e: st.error(f"Error generating explanation: {str(e)}") if is_english: return f"The speaker has a {accent} accent with {confidence*100:.1f}% confidence. The speech was identified as English." else: return f"The speech was identified as {language}, not English. English confidence is low." def analyze_audio(self, audio_path): """ Complete analysis pipeline returning all needed results """ # Check if it's English is_english, lang, lang_score = self.is_english(audio_path) # Classify accent if it's English if is_english: accent, accent_confidence = self.classify_accent(audio_path) english_confidence = lang_score * 100 # Scale to percentage else: accent = f"Non-English ({lang})" accent_confidence = lang_score english_confidence = max(0, min(30, lang_score * 50)) # Cap at 30% if non-English # Generate explanation explanation = self.generate_explanation(audio_path, accent, accent_confidence, is_english, lang) # Create visualization of the audio waveform try: y, sr = librosa.load(audio_path, sr=None) fig, ax = plt.subplots(figsize=(10, 2)) ax.plot(y) ax.set_xlabel('Sample') ax.set_ylabel('Amplitude') ax.set_title('Audio Waveform') plt.tight_layout() audio_viz = fig # Make sure the figure can be saved try: # Test if the figure can be saved import tempfile with tempfile.NamedTemporaryFile(suffix='.png') as tmp: plt.savefig(tmp.name) except Exception as viz_save_error: st.warning(f"Could not save visualization: {str(viz_save_error)}. Using simpler visualization.") # Create a simple alternative visualization import numpy as np # Downsample for performance sample_rate = max(1, len(y) // 1000) y_downsampled = y[::sample_rate] fig2, ax2 = plt.subplots(figsize=(8, 2)) ax2.plot(np.arange(len(y_downsampled)), y_downsampled) ax2.set_title("Audio Waveform (simplified)") audio_viz = fig2 except Exception as e: st.warning(f"Could not generate audio visualization: {str(e)}") audio_viz = None return { "is_english": is_english, "accent": accent, "accent_confidence": accent_confidence * 100, # Scale to percentage "english_confidence": english_confidence, "language_detected": lang, "explanation": explanation, "audio_viz": audio_viz } def process_uploaded_audio(file_input): """Process uploaded audio file Args: file_input: Either a StreamlitUploadedFile object or a string path to a file """ audio_path = None temp_input_path = None try: # Create a unique filename based on timestamp timestamp = str(int(time.time())) # Create a deterministic uploads directory with full permissions uploads_dir = os.path.join(os.getcwd(), "uploads") os.makedirs(uploads_dir, exist_ok=True) # Try Streamlit's own upload path first if available streamlit_uploads_path = os.environ.get('STREAMLIT_UPLOADS_PATH') if streamlit_uploads_path and os.path.isdir(streamlit_uploads_path): uploads_dir = streamlit_uploads_path st.info(f"Using Streamlit's upload directory: {uploads_dir}") # Make sure uploads directory has proper permissions try: os.chmod(uploads_dir, 0o777) # Full permissions except Exception as chmod_error: st.warning(f"Could not set permissions on uploads directory: {str(chmod_error)}. Continuing anyway.") # Log upload dir info for debugging st.info(f"Upload directory: {uploads_dir} (exists: {os.path.exists(uploads_dir)}, writable: {os.access(uploads_dir, os.W_OK)})") # Handle different input types if isinstance(file_input, str): # If it's already a file path temp_input_path = file_input file_extension = os.path.splitext(temp_input_path)[1].lower() st.info(f"Processing from saved file: {os.path.basename(temp_input_path)}") else: # If it's a StreamlitUploadedFile file_extension = os.path.splitext(file_input.name)[1].lower() # Write the uploaded file to disk with proper extension in the uploads directory # Use a unique filename to avoid conflicts safe_filename = ''.join(c if c.isalnum() or c in '._- ' else '_' for c in file_input.name) temp_input_path = os.path.join(uploads_dir, f"uploaded_{timestamp}_{safe_filename}") st.info(f"Saving uploaded file to: {temp_input_path}") try: # Write in chunks to handle large files better chunk_size = 1024 * 1024 # 1MB chunks buffer = file_input.getbuffer() with open(temp_input_path, "wb") as f: for i in range(0, len(buffer), chunk_size): f.write(buffer[i:i+chunk_size]) # Verify file was written properly if os.path.exists(temp_input_path): file_size = os.path.getsize(temp_input_path) st.success(f"File saved successfully: {file_size} bytes") else: st.error(f"Failed to save file - file doesn't exist after writing") except Exception as write_error: st.error(f"Error writing uploaded file: {str(write_error)}") # Try alternative temp directory as fallback try: import tempfile temp_dir = tempfile.gettempdir() temp_input_path = os.path.join(temp_dir, f"uploaded_{timestamp}_{safe_filename}") st.warning(f"Trying alternative location: {temp_input_path}") with open(temp_input_path, "wb") as f: f.write(file_input.getbuffer()) except Exception as alt_write_error: st.error(f"Alternative write also failed: {str(alt_write_error)}") raise # For MP4 files, extract the audio using ffmpeg if file_extension == ".mp4": st.info("Extracting audio from video file...") audio_path = os.path.join(uploads_dir, f"extracted_audio_{timestamp}.wav") try: # Add -y flag to overwrite output file if it exists subprocess.run( ['ffmpeg', '-y', '-i', temp_input_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', audio_path], check=True, capture_output=True ) st.success(f"Audio extracted successfully to {audio_path}") # Remove the original video file if extraction was successful if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: os.remove(temp_input_path) except subprocess.CalledProcessError as e: st.error(f"Error extracting audio: {e}") if e.stderr: st.error(f"FFmpeg output: {e.stderr.decode('utf-8')}") raise else: # For audio files, process based on format if file_extension in [".mp3", ".m4a", ".ogg", ".flac"]: # Convert to WAV for better compatibility audio_path = os.path.join(uploads_dir, f"converted_audio_{timestamp}.wav") st.info(f"Converting {file_extension} to WAV format for analysis...") try: # Use a verbose ffmpeg command with more options for compatibility process = subprocess.run( [ 'ffmpeg', '-y', '-i', temp_input_path, '-ar', '16000', '-ac', '1', '-c:a', 'pcm_s16le', # Add error handling flags '-err_detect', 'ignore_err', # Add buffers for better handling '-analyzeduration', '10000000', '-probesize', '10000000', audio_path ], check=True, capture_output=True ) # Verify the file was created successfully if os.path.exists(audio_path) and os.path.getsize(audio_path) > 0: st.success(f"Audio converted successfully: {os.path.getsize(audio_path)} bytes") # If conversion was successful, remove the original file to save space os.remove(temp_input_path) else: st.warning("Conversion produced an empty file. Trying fallback conversion method...") # Try alternative conversion method - simpler command fallback_cmd = ['ffmpeg', '-y', '-i', temp_input_path, audio_path] subprocess.run(fallback_cmd, check=True, capture_output=True) if not os.path.exists(audio_path) or os.path.getsize(audio_path) == 0: st.warning("Fallback conversion also failed. Using original file.") audio_path = temp_input_path except subprocess.CalledProcessError as e: st.warning(f"Conversion warning: {e}") if e.stderr: st.warning(f"FFmpeg error: {e.stderr.decode('utf-8')}") st.info("Using original file instead.") audio_path = temp_input_path else: # For already WAV files, use them directly audio_path = temp_input_path st.info(f"Using WAV file directly: {audio_path}") detector = AccentDetector() results = detector.analyze_audio(audio_path) # Clean up if audio_path and audio_path != temp_input_path and os.path.exists(audio_path): os.remove(audio_path) return results except Exception as e: error_msg = str(e) st.error(f"Error processing audio: {error_msg}") # Add detailed debugging info import traceback st.error(f"Error details: {traceback.format_exc()}") # Show file info if available if temp_input_path and os.path.exists(temp_input_path): st.info(f"Input file exists: {temp_input_path}, size: {os.path.getsize(temp_input_path)} bytes") os.remove(temp_input_path) else: if temp_input_path: st.warning(f"Input file does not exist: {temp_input_path}") if audio_path and os.path.exists(audio_path): st.info(f"Audio file exists: {audio_path}, size: {os.path.getsize(audio_path)} bytes") os.remove(audio_path) else: if audio_path: st.warning(f"Audio file does not exist: {audio_path}") # Check for common error types if "ffmpeg" in error_msg.lower(): st.warning("FFmpeg error detected. The audio conversion failed.") st.info("Try a different audio format or check if FFmpeg is installed correctly.") elif "permission" in error_msg.lower(): st.warning("Permission error detected.") st.info("Check that the uploads directory is writable.") elif "no such file" in error_msg.lower(): st.warning("File not found error detected.") st.info("The file may have been moved, deleted, or not saved correctly.") raise return results # --- Streamlit App --- st.set_page_config( page_title="🎤 English Accent Detector", page_icon="🎤", layout="wide" ) st.title("🎤 English Accent Detection Tool") st.markdown(""" This application analyzes a speaker's English accent from video URLs or audio uploads, providing detailed insights for hiring evaluation purposes. """) # Add container for tips with st.container(): st.info(""" 💡 **Tips for best results:** - Use **Loom** or **Vimeo** videos (more reliable than YouTube) - For YouTube videos, you may need to provide cookies - Audio clips of 15-30 seconds work best - Clear speech with minimal background noise is ideal """) st.markdown(""" This app analyzes a speaker's English accent from a video or audio source. It provides: - Classification of the accent (British, American, etc.) - Confidence score for English proficiency - Explanation of accent characteristics """) # Create tabs for different input methods tab1, tab2 = st.tabs(["Video URL", "Upload Audio"]) with tab1: st.markdown("### đŸŽŦ Analyze video from URL") url = st.text_input("Enter a public video URL", placeholder="https://www.loom.com/..., https://vimeo.com/..., or direct MP4 link") # Add alternative invidious frontend option for YouTube use_alternative = st.checkbox("Try alternative YouTube source (for authentication issues)", value=True, help="Uses an alternative frontend (Invidious) that may bypass YouTube restrictions") # Recommend alternative sources st.caption("âš ī¸ **Note**: YouTube videos often require authentication. For best results, use Loom, Vimeo or direct video links.") # Add file uploader for cookies.txt cookies_file = None uploaded_cookies = st.file_uploader("Upload cookies.txt file for YouTube (if needed)", type="txt", help="Only needed for YouTube videos that require authentication") if uploaded_cookies is not None: # Save the uploaded cookies file to a temporary file cookies_file = f"cookies_{int(time.time())}.txt" with open(cookies_file, "wb") as f: f.write(uploaded_cookies.getbuffer()) st.success("Cookies file uploaded successfully!") with st.expander("Having trouble with YouTube videos?"): st.markdown(""" ### YouTube Authentication Issues YouTube's anti-bot measures often block automated video downloads. To solve this: #### Option 1: Use Alternative Video Sources (Recommended) These typically work without authentication issues: - [Loom](https://www.loom.com/) - Great for screen recordings - [Vimeo](https://vimeo.com/) - High-quality video hosting - [Streamable](https://streamable.com/) - Simple video sharing - Any direct MP4 link #### Option 2: Upload Cookies for YouTube 1. Install a browser extension like [Get cookies.txt](https://chrome.google.com/webstore/detail/get-cookiestxt-locally/cclelndahbckbenkjhflpdbgdldlbecc) 2. Login to YouTube in your browser 3. Use the extension to export cookies to a .txt file 4. Upload the cookies.txt file using the uploader above #### Option 3: Use Audio Upload Instead The 'Upload Audio' tab allows direct analysis of audio files without URL issues. """) if st.button("Analyze Video"): if not url: st.warning("Please enter a valid URL") else: try: # Create a placeholder for status updates status = st.empty() # Generate unique filenames using timestamp to avoid conflicts timestamp = str(int(time.time())) video_path = f"video_{timestamp}.mp4" audio_path = f"audio_{timestamp}.wav" # Download and process the video status.text("Downloading video...") download_success = download_video(url, video_path, cookies_file) if not download_success: st.error("Failed to download video") else: status.text("Extracting audio...") extract_success = extract_audio(video_path, audio_path) if not extract_success: st.error("Failed to extract audio") else: status.text("Analyzing accent... (this may take a moment)") detector = AccentDetector() results = detector.analyze_audio(audio_path) # Display results st.success("✅ Analysis Complete!") # Create columns for results col1, col2 = st.columns([2, 1]) with col1: st.subheader("Accent Analysis Results") st.markdown(f"**Detected Accent:** {results['accent']}") st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%") st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%") # Show explanation in a box st.markdown("### Expert Analysis") st.info(results['explanation']) with col2: if results['audio_viz']: try: st.pyplot(results['audio_viz']) except Exception as viz_error: st.warning("Could not display visualization due to torchvision issue.") st.info("Audio analysis was successful even though visualization failed.") # Show audio playback st.audio(audio_path) # Clean up files try: if os.path.exists(video_path): os.remove(video_path) if os.path.exists(audio_path): os.remove(audio_path) if cookies_file and os.path.exists(cookies_file): os.remove(cookies_file) except Exception as e: st.warning(f"Couldn't clean up temporary files: {str(e)}") except Exception as e: st.error(f"Error during analysis: {str(e)}") with tab2: st.markdown("### đŸŽĩ Upload Audio File") st.caption("**Recommended option!** Direct audio upload is more reliable than video URLs.") # Add some information about file size limits st.info("📝 **File Requirements**: \n" "â€ĸ Maximum file size: 200MB \n" "â€ĸ Supported formats: WAV, MP3, M4A, OGG, FLAC, MP4 \n" "â€ĸ Recommended length: 15-60 seconds of clear speech") uploaded_file = st.file_uploader("Upload an audio file", type=["wav", "mp3", "m4a", "ogg", "flac", "mp4"], help="Support for WAV, MP3, M4A, OGG, FLAC and MP4 formats", accept_multiple_files=False) if uploaded_file is not None: # Show a preview of the audio st.markdown("#### Audio Preview:") try: st.audio(uploaded_file) st.markdown("#### Ready for Analysis") col1, col2 = st.columns([1, 3]) with col1: analyze_button = st.button("Analyze Audio", type="primary", use_container_width=True) with col2: st.caption("Tip: 15-30 seconds of clear speech works best for accent detection") except Exception as preview_error: st.warning(f"Could not preview audio: {str(preview_error)}") # If preview fails, still allow analysis analyze_button = st.button("Analyze Audio (Preview Failed)", type="primary") st.caption("Proceeding with analysis might still work even if preview failed") if analyze_button: with st.spinner("Analyzing audio... (this may take 15-30 seconds)"): try: # Check file size before processing file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) if file_size_mb > 190: # Stay below the 200MB limit with some buffer st.error(f"File size ({file_size_mb:.1f}MB) is too large. Maximum allowed is 190MB.") st.info("Tip: Try trimming your audio to just the speech segment for better results.") else: # Create a progress bar to show processing stages progress_bar = st.progress(0) # Check the file type and inform user about processing steps file_extension = os.path.splitext(uploaded_file.name)[1].lower() if file_extension == '.mp4': st.info("Processing video file - extracting audio track...") elif file_extension in ['.mp3', '.m4a', '.ogg', '.flac']: st.info(f"Processing {file_extension} audio file...") progress_bar.progress(25, text="Saving file...") # First save the file to a known location to bypass 403 errors # Create an uploads directory if it doesn't exist uploads_dir = os.path.join(os.getcwd(), "uploads") os.makedirs(uploads_dir, exist_ok=True) # Save the file first to avoid streaming it multiple times temp_file_path = os.path.join(uploads_dir, f"temp_{int(time.time())}_{uploaded_file.name}") with open(temp_file_path, "wb") as f: f.write(uploaded_file.getbuffer()) progress_bar.progress(50, text="Analyzing audio...") # Process using the saved file path directly results = process_uploaded_audio(temp_file_path) progress_bar.progress(100, text="Analysis complete!") # Display results st.success("✅ Analysis Complete!") # Create columns for results col1, col2 = st.columns([2, 1]) with col1: st.subheader("Accent Analysis Results") st.markdown(f"**Detected Accent:** {results['accent']}") st.markdown(f"**English Proficiency:** {results['english_confidence']:.1f}%") st.markdown(f"**Accent Confidence:** {results['accent_confidence']:.1f}%") # Show explanation in a box st.markdown("### Expert Analysis") st.info(results['explanation']) with col2: if results['audio_viz']: try: st.pyplot(results['audio_viz']) except Exception as viz_error: st.warning("Could not display visualization due to torchvision issue.") st.info("Audio analysis was successful even though visualization failed.") except subprocess.CalledProcessError as e: st.error("Error processing audio file") st.error(f"FFmpeg error: {e.stderr.decode('utf-8') if e.stderr else str(e)}") st.info("Troubleshooting tips:\n" "â€ĸ Try a different audio file format (WAV or MP3 recommended)\n" "â€ĸ Make sure the file is not corrupted\n" "â€ĸ Try a shorter audio clip") except PermissionError as e: st.error(f"Permission error: {str(e)}") st.info("The app doesn't have permission to access or create temporary files. " "This could be due to Docker container permissions. " "Contact the administrator or try using a different file.") except OSError as e: st.error(f"System error: {str(e)}") st.info("Check that the file isn't corrupted and try with a smaller audio clip.") except Exception as e: error_msg = str(e) st.error(f"Error during analysis: {error_msg}") if "403" in error_msg: st.warning("Received a 403 Forbidden error. This may be due to: \n" "â€ĸ File size exceeding limits\n" "â€ĸ Temporary file permission issues\n" "â€ĸ Network restrictions") st.info("Try a smaller audio file (less than 50MB) or a different format.") elif "timeout" in error_msg.lower(): st.warning("The request timed out. Try a shorter audio clip or check your internet connection.") elif "memory" in error_msg.lower(): st.warning("Out of memory error. Try a shorter audio clip.") else: st.info("If the problem persists, try a different audio file format such as MP3 or WAV.") # Add footer with deployment info st.markdown("---") st.markdown("Deployed using Streamlit â€ĸ Built with SpeechBrain and Transformers") # Add a section for how it works with st.expander("â„šī¸ How It Works"): st.markdown(""" This app uses a multi-stage process to analyze a speaker's accent: 1. **Audio Extraction**: The audio track is extracted from the input video or directly processed from uploaded audio. 2. **Language Identification**: First, we determine if the speech is English using SpeechBrain's language identification model. 3. **Accent Classification**: For English speech, we analyze the specific accent using a transformer-based model trained on diverse accent data. 4. **English Proficiency Score**: A confidence score is calculated based on both language identification and accent clarity. 5. **Analysis Summary**: An explanation is generated describing accent characteristics relevant for hiring evaluations. """) # Add debug function for troubleshooting HTTP errors def debug_http_errors(): """Print debug information for HTTP errors""" st.warning("âš ī¸ HTTP 400 Error Debugging Mode") st.markdown(""" ### Common HTTP 400 Error Causes: 1. **File size exceeds limits** (current limit: 150MB) 2. **File format incompatibility** 3. **Network interruption** during upload 4. **Server-side timeout** during processing 5. **Permissions issues** in container """) # Show environment info st.subheader("Environment Information") env_info = { "STREAMLIT_UPLOADS_PATH": os.environ.get("STREAMLIT_UPLOADS_PATH", "Not set"), "STREAMLIT_SERVER_MAX_UPLOAD_SIZE": os.environ.get("STREAMLIT_SERVER_MAX_UPLOAD_SIZE", "Not set"), "Current directory": os.getcwd(), "Python version": sys.version } for key, value in env_info.items(): st.code(f"{key}: {value}") # Check if uploads directory is writable uploads_dir = os.environ.get("STREAMLIT_UPLOADS_PATH", os.path.join(os.getcwd(), "uploads")) os.makedirs(uploads_dir, exist_ok=True) try: test_file = os.path.join(uploads_dir, "test_write.txt") with open(test_file, "w") as f: f.write("Test write permission") os.remove(test_file) st.success(f"✓ Upload directory is writable: {uploads_dir}") except Exception as e: st.error(f"✗ Cannot write to upload directory: {str(e)}") # Test ffmpeg try: result = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True) st.success(f"✓ FFmpeg is available") except Exception as e: st.error(f"✗ FFmpeg error: {str(e)}") # Add debug mode flag to the app debug_mode = False with st.expander("🔧 Troubleshooting Tools"): debug_mode = st.checkbox("Enable Debug Mode for HTTP 400 Errors") if debug_mode: debug_http_errors() # Add option for user to try different upload method alt_upload = st.checkbox("Use alternative upload method (for HTTP 400 errors)") if alt_upload: st.info("Using alternative upload method that may bypass some HTTP 400 errors")