import logging import traceback import os import json import re import time import psutil from datetime import datetime from fastapi import FastAPI, UploadFile, File, HTTPException, Form from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import config from models import load_whisper, load_summarizer, load_spacy from services import process_transcription, process_summary, create_enhanced_summary_prompt, format_summary_to_markdown from utils import webm_to_wav, get_language_name import google.generativeai as genai from google.api_core import exceptions as api_core_exceptions logger = logging.getLogger(__name__) app = FastAPI( title="Transcription and Summarization API", description="API using Faster-Whisper, spaCy, and Hugging Face Transformers", version="1.0.0", ) api_key = os.environ.get("GEMINI_API_KEY") if not api_key: logger.critical("GEMINI_API_KEY environment variable not set.") else: genai.configure(api_key=api_key) logger.info("Application starting up - loading models...") whisper_model = load_whisper(config) summarizer_pipeline = load_summarizer(config) nlp_spacy = load_spacy(config) logger.info("Model loading complete.") origins = ["http://localhost:8080"] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) if not whisper_model: logger.critical( "Whisper model failed to load. Transcription endpoint will be unavailable." ) if not summarizer_pipeline: logger.critical( "Summarizer pipeline failed to load. Summarization endpoint will be unavailable." ) if not nlp_spacy: logger.warning( "SpaCy model failed to load. Summarization will proceed without spaCy preprocessing." ) class TranscriptInput(BaseModel): transcript: str language: str class PerformanceMetrics: """Helper class to track performance metrics""" @staticmethod def get_system_metrics(): """Get current system resource usage""" return { "cpu_percent": psutil.cpu_percent(interval=None), "memory_percent": psutil.virtual_memory().percent, "memory_used_mb": psutil.virtual_memory().used / (1024 * 1024), "memory_available_mb": psutil.virtual_memory().available / (1024 * 1024) } @staticmethod def calculate_processing_rate(data_size, processing_time): """Calculate processing rate (MB/s or words/s)""" if processing_time <= 0: return 0 return data_size / processing_time @staticmethod def format_performance_report(metrics): """Format performance metrics into a readable report""" report = { "timestamp": datetime.utcnow().isoformat(), "processing_time_seconds": round(metrics.get("processing_time", 0), 3), "system_metrics": metrics.get("system_metrics", {}), "throughput": metrics.get("throughput", {}), "resource_efficiency": { "time_per_operation": round(metrics.get("processing_time", 0), 3), "memory_efficiency": f"{metrics.get('system_metrics', {}).get('memory_percent', 0):.1f}% used" } } return report @app.post("/calculate-result") def calculate_result(input: TranscriptInput): start_time = time.time() start_metrics = PerformanceMetrics.get_system_metrics() try: transcript_text = input.transcript.strip() language_code = input.language if not transcript_text: raise HTTPException(status_code=400, detail="Transcript cannot be empty.") word_count = len(transcript_text.split()) char_count = len(transcript_text) average_word_length = round(char_count / word_count, 2) if word_count else 0 # Estimating duration using an average speech rate (~150 words per minute) estimated_duration_minutes = round(word_count / 150, 2) estimated_duration_seconds = int(estimated_duration_minutes * 60) # Estimate WPM just for report usage wpm = 150 # assumption based on average speech rate processing_time = time.time() - start_time end_metrics = PerformanceMetrics.get_system_metrics() result = { "language": language_code, "word_count": word_count, "character_count": char_count, "average_word_length": average_word_length, "estimated_duration_minutes": estimated_duration_minutes, "estimated_duration_seconds": estimated_duration_seconds, "assumed_words_per_minute": wpm, } # Add performance metrics performance = PerformanceMetrics.format_performance_report({ "processing_time": processing_time, "system_metrics": end_metrics, "throughput": { "words_per_second": PerformanceMetrics.calculate_processing_rate(word_count, processing_time), "characters_per_second": PerformanceMetrics.calculate_processing_rate(char_count, processing_time) } }) return { "metrics": result, "performance": performance } except Exception as e: logger.error(f"Error in /calculate-result: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Failed to calculate transcript metrics.") @app.post("/transcribe") async def transcription( audio_file: UploadFile = File(...), enable_diarization: bool = Form(False) ): if whisper_model is None: raise HTTPException(status_code=503, detail="Transcription service unavailable.") start_time = time.time() start_metrics = PerformanceMetrics.get_system_metrics() try: content_type = audio_file.content_type content = await audio_file.read() file_size_mb = len(content) / (1024 * 1024) logger.warning(f"Received file: {audio_file.filename}, content_type: {content_type}, size: {file_size_mb:.2f}MB") # File processing timing file_process_start = time.time() if content_type in ["audio/webm", "video/webm"]: wav_path = webm_to_wav(content) with open(wav_path, "rb") as f: wav_bytes = f.read() os.remove(wav_path) elif content_type in ["audio/wav", "audio/x-wav", "audio/vnd.wave", "application/octet-stream"] \ or audio_file.filename.endswith(".wav"): logger.warning("[+] wav processing") wav_bytes = content else: raise HTTPException(status_code=400, detail="Unsupported audio format. Use .webm or .wav") file_process_time = time.time() - file_process_start # Transcription timing transcription_start = time.time() transcript, info, diarized_segments = process_transcription( wav_bytes, whisper_model, enable_diarization=enable_diarization ) transcription_time = time.time() - transcription_start total_processing_time = time.time() - start_time end_metrics = PerformanceMetrics.get_system_metrics() logger.info(f"Transcription successful. Language: {info.language}, Total Time: {total_processing_time:.2f}s") speakers = [] if diarized_segments: for segment in diarized_segments: if segment["speaker"] not in speakers: speakers.append(segment["speaker"]) # Calculate performance metrics word_count = len(transcript.split()) performance = PerformanceMetrics.format_performance_report({ "processing_time": total_processing_time, "system_metrics": end_metrics, "throughput": { "mb_per_second": PerformanceMetrics.calculate_processing_rate(file_size_mb, total_processing_time), "words_per_second": PerformanceMetrics.calculate_processing_rate(word_count, total_processing_time), "audio_duration_vs_processing_ratio": round(info.duration / total_processing_time, 2) if total_processing_time > 0 else 0 } }) # Add detailed timing breakdown performance["timing_breakdown"] = { "file_processing_seconds": round(file_process_time, 3), "transcription_seconds": round(transcription_time, 3), "total_processing_seconds": round(total_processing_time, 3), "file_size_mb": round(file_size_mb, 2), "audio_duration_seconds": round(info.duration, 2) } response = { "transcript": transcript, "language": info.language, "duration": info.duration, "performance": performance } if enable_diarization and diarized_segments: response["speakers"] = speakers response["segments"] = diarized_segments return response except HTTPException as http_exc: raise http_exc except ValueError as ve: logger.error(f"Value error during transcription processing: {ve}") raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: logger.error(f"Unhandled error during transcription: {e}\n{traceback.format_exc()}") raise HTTPException(status_code=500, detail="Internal server error during transcription.") @app.post("/summarize") async def summarize(input: TranscriptInput): if not input.transcript or not input.transcript.strip(): raise HTTPException(status_code=400, detail="Transcript cannot be empty.") start_time = time.time() start_metrics = PerformanceMetrics.get_system_metrics() try: prompt = f""" Summarize the following text concisely: Transcript: \"\"\" {input.transcript} \"\"\" """ api_call_start = time.time() model = genai.GenerativeModel('gemini-1.5-flash') response = model.generate_content(prompt) api_call_time = time.time() - api_call_start total_processing_time = time.time() - start_time end_metrics = PerformanceMetrics.get_system_metrics() logger.info(f"Gemini /summarize response text: '{response.text}'") # Calculate performance metrics input_word_count = len(input.transcript.split()) output_word_count = len(response.text.split()) performance = PerformanceMetrics.format_performance_report({ "processing_time": total_processing_time, "system_metrics": end_metrics, "throughput": { "input_words_per_second": PerformanceMetrics.calculate_processing_rate(input_word_count, total_processing_time), "compression_ratio": round(input_word_count / output_word_count, 2) if output_word_count > 0 else 0 } }) performance["timing_breakdown"] = { "api_call_seconds": round(api_call_time, 3), "total_processing_seconds": round(total_processing_time, 3), "input_word_count": input_word_count, "output_word_count": output_word_count } return { "summary": response.text, "performance": performance } except api_core_exceptions.ResourceExhausted as e: logger.error(f"Gemini API rate limit exceeded: {e}") raise HTTPException(status_code=429, detail="API rate limit exceeded. Please wait and try again.") except genai.types.BlockedPromptError as e: logger.error(f"The prompt was blocked: {e}") raise HTTPException(status_code=400, detail="The request was blocked by the content safety filter.") except Exception as e: logger.error(f"An unexpected error occurred during basic summarization: {e}", exc_info=True) raise HTTPException(status_code=500, detail=str(e)) @app.post("/smart-summary") def smart_summarize(input: TranscriptInput): if summarizer_pipeline is None: raise HTTPException(status_code=503, detail="Summarization service unavailable.") if not input.transcript: raise HTTPException(status_code=400, detail="Transcript cannot be empty.") start_time = time.time() start_metrics = PerformanceMetrics.get_system_metrics() try: summary = process_summary(input.transcript, summarizer_pipeline, nlp_spacy, config) total_processing_time = time.time() - start_time end_metrics = PerformanceMetrics.get_system_metrics() # Calculate performance metrics input_word_count = len(input.transcript.split()) output_word_count = len(summary.split()) performance = PerformanceMetrics.format_performance_report({ "processing_time": total_processing_time, "system_metrics": end_metrics, "throughput": { "words_per_second": PerformanceMetrics.calculate_processing_rate(input_word_count, total_processing_time), "compression_ratio": round(input_word_count / output_word_count, 2) if output_word_count > 0 else 0 } }) performance["timing_breakdown"] = { "total_processing_seconds": round(total_processing_time, 3), "input_word_count": input_word_count, "output_word_count": output_word_count } return { "summary": summary, "performance": performance } except ValueError as ve: logger.error(f"Value error during summary processing: {ve}") raise HTTPException(status_code=400, detail=str(ve)) except Exception as e: logger.error(f"Unhandled error during summarization: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Internal server error during summarization.") @app.post("/enhanced-summary") async def enhanced_summary(input: TranscriptInput): if not input.transcript or not input.transcript.strip(): raise HTTPException(status_code=400, detail="Transcript cannot be empty.") start_time = time.time() start_metrics = PerformanceMetrics.get_system_metrics() try: code = input.language logger.info(f"Detected language code: {code}") language_name = get_language_name(code) logger.info(f"Detected language name: {language_name}") prompt_creation_start = time.time() prompt = create_enhanced_summary_prompt(input.transcript, language_name) prompt_creation_time = time.time() - prompt_creation_start api_call_start = time.time() model = genai.GenerativeModel('gemini-1.5-flash') response = model.generate_content( contents=prompt, generation_config=genai.GenerationConfig(response_mime_type="application/json") ) api_call_time = time.time() - api_call_start json_parsing_start = time.time() try: cleaned_text = re.sub(r"```json\s*(.*)\s*```", r"\1", response.text, flags=re.DOTALL) summary_json = json.loads(cleaned_text) logger.info(f"Received JSON from Gemini: {summary_json}") except (json.JSONDecodeError, TypeError) as e: logger.error(f"Failed to parse LLM response as JSON: {e}\nResponse text: {response.text}") raise HTTPException(status_code=500, detail="Failed to generate a structured summary due to an invalid model response.") json_parsing_time = time.time() - json_parsing_start formatting_start = time.time() formatted_markdown = format_summary_to_markdown(summary_json, code) formatting_time = time.time() - formatting_start total_processing_time = time.time() - start_time end_metrics = PerformanceMetrics.get_system_metrics() logger.info(f"Formatted Markdown: {formatted_markdown}") # Calculate performance metrics input_word_count = len(input.transcript.split()) output_word_count = len(formatted_markdown.split()) performance = PerformanceMetrics.format_performance_report({ "processing_time": total_processing_time, "system_metrics": end_metrics, "throughput": { "words_per_second": PerformanceMetrics.calculate_processing_rate(input_word_count, total_processing_time), "compression_ratio": round(input_word_count / output_word_count, 2) if output_word_count > 0 else 0 } }) performance["timing_breakdown"] = { "prompt_creation_seconds": round(prompt_creation_time, 3), "api_call_seconds": round(api_call_time, 3), "json_parsing_seconds": round(json_parsing_time, 3), "markdown_formatting_seconds": round(formatting_time, 3), "total_processing_seconds": round(total_processing_time, 3), "input_word_count": input_word_count, "output_word_count": output_word_count } return { "summary": formatted_markdown, "performance": performance } except api_core_exceptions.ResourceExhausted as e: logger.error(f"Gemini API rate limit exceeded: {e}") raise HTTPException(status_code=429, detail="API rate limit exceeded. Please wait and try again.") except genai.types.BlockedPromptError as e: logger.error(f"The prompt was blocked: {e}") raise HTTPException(status_code=400, detail="The request was blocked by the content safety filter.") except Exception as e: logger.error(f"An unexpected error occurred during summarization: {e}", exc_info=True) raise HTTPException(status_code=500, detail="An internal server error occurred during summarization.") @app.get("/performance-stats") async def get_performance_stats(): """Get current system performance statistics""" current_metrics = PerformanceMetrics.get_system_metrics() return { "timestamp": datetime.utcnow().isoformat(), "system_status": { "cpu_usage_percent": current_metrics["cpu_percent"], "memory_usage_percent": current_metrics["memory_percent"], "memory_used_mb": round(current_metrics["memory_used_mb"], 2), "memory_available_mb": round(current_metrics["memory_available_mb"], 2) }, "service_status": { "whisper_available": whisper_model is not None, "summarizer_available": summarizer_pipeline is not None, "spacy_available": nlp_spacy is not None, "gemini_configured": api_key is not None } }