Spaces:
Running
on
Zero
Running
on
Zero
File size: 30,992 Bytes
e83cc4c 611206a e83cc4c 6197eab 1487b33 611206a 1487b33 c0fe80d e83cc4c ba55edb 611206a e83cc4c 4d1f920 ba55edb 4d1f920 ba55edb 6197eab ba55edb 4d1f920 ba55edb 4d1f920 6197eab 4d1f920 6197eab 4d1f920 ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 611206a 4a76a7e 4d1f920 ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 4d1f920 6197eab 3172319 e83cc4c 4d1f920 ba55edb 4d1f920 ba55edb 4d1f920 ba55edb 4d1f920 e83cc4c c0fe80d 3172319 c0fe80d ba55edb c0fe80d 3172319 c0fe80d 3172319 c0fe80d 4d1f920 3172319 4d1f920 3172319 c0fe80d 3172319 c0fe80d ba55edb c0fe80d 3172319 c0fe80d 3172319 c0fe80d e83cc4c 4d1f920 e83cc4c 4d1f920 e83cc4c 4d1f920 e83cc4c 4d1f920 e83cc4c 6197eab e83cc4c c0fe80d 3172319 c0fe80d 3172319 c0fe80d 3172319 c0fe80d 3172319 c0fe80d 3172319 e83cc4c c0fe80d 4d1f920 e83cc4c 3172319 c0fe80d 3172319 c0fe80d 1487b33 c0fe80d e83cc4c c0fe80d 6197eab c0fe80d 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 27687e0 ba55edb 6197eab 27687e0 c0fe80d 6197eab c0fe80d 6197eab c0fe80d 6197eab 27687e0 6197eab 27687e0 6197eab c0fe80d 6197eab c0fe80d 6197eab c0fe80d 6197eab 27687e0 6197eab 27687e0 c0fe80d 6197eab 27687e0 c0fe80d 27687e0 1487b33 ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 6197eab ba55edb 611206a c0fe80d 611206a ba55edb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 |
import re
import os
import numpy as np
import matplotlib.pyplot as plt
import gradio as gr
from typing import Dict, List, Any, Optional, Tuple
import cv2
from PIL import Image
import tempfile
import uuid
import time
import traceback
import spaces
from detection_model import DetectionModel
from color_mapper import ColorMapper
from evaluation_metrics import EvaluationMetrics
from style import Style
from image_processor import ImageProcessor
from video_processor import VideoProcessor
from llm_enhancer import LLMEnhancer
from ui_manager import UIManager
# Initialize Processors with LLM support
image_processor = None
video_processor = None
ui_manager = None
def initialize_processors():
"""
Initialize the image and video processors with LLM support.
Returns:
bool: True if initialization was successful, False otherwise
"""
global image_processor, video_processor
try:
print("Attempting to initialize ImageProcessor with LLM support...")
image_processor = ImageProcessor(use_llm=True, llm_model_path="meta-llama/Llama-3.2-3B-Instruct")
print("ImageProcessor initialized successfully with LLM")
# 檢查狀態
if hasattr(image_processor, 'scene_analyzer'):
if image_processor.scene_analyzer is not None:
print(f"scene_analyzer initialized: {type(image_processor.scene_analyzer)}")
if hasattr(image_processor.scene_analyzer, 'use_llm'):
print(f"scene_analyzer.use_llm available: {image_processor.scene_analyzer.use_llm}")
else:
print("WARNING: scene_analyzer is None after initialization")
else:
print("WARNING: scene_analyzer attribute not found in image_processor")
# 初始化獨立的VideoProcessor
video_processor = VideoProcessor()
print("VideoProcessor initialized successfully as independent module")
return True
except Exception as e:
print(f"Error initializing processors with LLM: {e}")
import traceback
traceback.print_exc()
# Create fallback processor without LLM
try:
print("Attempting fallback initialization without LLM...")
image_processor = ImageProcessor(use_llm=False, enable_places365=False)
video_processor = VideoProcessor()
print("Fallback processors initialized successfully without LLM and Places365")
return True
except Exception as fallback_error:
print(f"Fatal error: Cannot initialize processors: {fallback_error}")
import traceback
traceback.print_exc()
image_processor = None
video_processor = None
return False
def initialize_ui_manager():
"""
Initialize the UI manager and set up references to processors.
Returns:
UIManager: Initialized UI manager instance
"""
global ui_manager, image_processor
ui_manager = UIManager()
# Set image processor reference for dynamic class retrieval
if image_processor:
ui_manager.set_image_processor(image_processor)
return ui_manager
@spaces.GPU(duration=180)
def handle_image_upload(image, model_name, confidence_threshold, filter_classes=None, use_llm=True, enable_landmark=True):
"""
Processes a single uploaded image.
Args:
image: PIL Image object
model_name: Name of the YOLO model to use
confidence_threshold: Confidence threshold for detections
filter_classes: List of class names/IDs to filter
use_llm: Whether to use LLM for enhanced descriptions
enable_landmark: Whether to enable landmark detection
Returns:
Tuple: (result_image, result_text, formatted_stats, plot_figure,
scene_description_html, original_desc_html, activities_list_data,
safety_data, zones, lighting)
"""
# Enhanced safety check for image_processor
if image_processor is None:
error_msg = "Image processor is not initialized. Please restart the application or check system dependencies."
print(f"ERROR: {error_msg}")
# Create error plot
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, "Initialization Error\nProcessor Not Available",
color="red", ha="center", va="center", fontsize=14, fontweight="bold")
ax.axis('off')
return (None, error_msg, {}, fig, f"<div style='color: red; font-weight: bold;'>Error: {error_msg}</div>",
"<div style='color: red;'>Error: System not initialized</div>",
[["System Error"]], [["System Error"]], {}, {"time_of_day": "error", "confidence": 0})
# Additional safety check for processor attributes
if not hasattr(image_processor, 'use_llm'):
error_msg = "Image processor is corrupted. Missing required attributes."
print(f"ERROR: {error_msg}")
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, "Processor Error\nCorrupted State",
color="red", ha="center", va="center", fontsize=14, fontweight="bold")
ax.axis('off')
return (None, error_msg, {}, fig, f"<div style='color: red; font-weight: bold;'>Error: {error_msg}</div>",
"<div style='color: red;'>Error: Processor corrupted</div>",
[["Processor Error"]], [["Processor Error"]], {}, {"time_of_day": "error", "confidence": 0})
print(f"DIAGNOSTIC: Image upload handled with enable_landmark={enable_landmark}, use_llm={use_llm}")
print(f"Processing image with model: {model_name}, confidence: {confidence_threshold}, use_llm: {use_llm}, enable_landmark: {enable_landmark}")
try:
image_processor.use_llm = use_llm
# 確保 scene_analyzer 不是 None
if hasattr(image_processor, 'scene_analyzer') and image_processor.scene_analyzer is not None:
if hasattr(image_processor.scene_analyzer, 'use_llm'):
image_processor.scene_analyzer.use_llm = use_llm
print(f"Updated existing scene_analyzer use_llm setting to: {use_llm}")
# 檢查並設置 landmark detection
if hasattr(image_processor.scene_analyzer, 'use_landmark_detection'):
# 設置所有相關標記
image_processor.scene_analyzer.use_landmark_detection = enable_landmark
image_processor.scene_analyzer.enable_landmark = enable_landmark
# 確保處理器也設置了這選項(檢測地標用)
image_processor.enable_landmark = enable_landmark
# 檢查並設置更深層次的組件
if hasattr(image_processor.scene_analyzer, 'scene_describer') and image_processor.scene_analyzer.scene_describer is not None:
image_processor.scene_analyzer.scene_describer.enable_landmark = enable_landmark
# 檢查並設置CLIP Analyzer
if hasattr(image_processor.scene_analyzer, 'clip_analyzer') and image_processor.scene_analyzer.clip_analyzer is not None:
if hasattr(image_processor.scene_analyzer.clip_analyzer, 'enable_landmark'):
image_processor.scene_analyzer.clip_analyzer.enable_landmark = enable_landmark
# 檢查並設置LLM方面
if hasattr(image_processor.scene_analyzer, 'llm_enhancer') and image_processor.scene_analyzer.llm_enhancer is not None:
if hasattr(image_processor.scene_analyzer.llm_enhancer, 'enable_landmark'):
image_processor.scene_analyzer.llm_enhancer.enable_landmark = enable_landmark
print(f"Updated LLM enhancer enable_landmark to: {enable_landmark}")
print(f"Updated all landmark detection settings to: {enable_landmark}")
else:
print("WARNING: scene_analyzer is None or not available")
if hasattr(image_processor, 'enable_landmark'):
image_processor.enable_landmark = enable_landmark
# 設置更深層次的組別
if hasattr(image_processor.scene_analyzer, 'scene_describer'):
image_processor.scene_analyzer.scene_describer.enable_landmark = enable_landmark
# 設置CLIP分析器上的標記
if hasattr(image_processor.scene_analyzer, 'clip_analyzer'):
if hasattr(image_processor.scene_analyzer.clip_analyzer, 'enable_landmark'):
image_processor.scene_analyzer.clip_analyzer.enable_landmark = enable_landmark
# 如果有LLM增強器,也設置它
if hasattr(image_processor.scene_analyzer, 'llm_enhancer'):
image_processor.scene_analyzer.llm_enhancer.enable_landmark = enable_landmark
print(f"Updated LLM enhancer enable_landmark to: {enable_landmark}")
print(f"Updated all landmark detection settings to: {enable_landmark}")
class_ids_to_filter = None
if filter_classes:
class_ids_to_filter = []
available_classes_dict = dict(ui_manager.get_all_classes())
name_to_id = {name: id for id, name in available_classes_dict.items()}
for class_str in filter_classes:
class_name_or_id = class_str.split(":")[0].strip()
class_id = -1
try:
class_id = int(class_name_or_id)
if class_id not in available_classes_dict:
class_id = -1
except ValueError:
if class_name_or_id in name_to_id:
class_id = name_to_id[class_name_or_id]
elif class_str in name_to_id: # Check full string "id: name"
class_id = name_to_id[class_str]
if class_id != -1:
class_ids_to_filter.append(class_id)
else:
print(f"Warning: Could not parse class filter: {class_str}")
print(f"Filtering image results for class IDs: {class_ids_to_filter}")
# Call the existing image processing logic
print(f"DEBUG: app.py 傳遞 enable_landmark={enable_landmark} 到 process_image")
result_image, result_text, stats = image_processor.process_image(
image,
model_name,
confidence_threshold,
class_ids_to_filter,
enable_landmark
)
# Format stats for JSON display
formatted_stats = image_processor.format_json_for_display(stats)
# Prepare visualization data for the plot
plot_figure = None
if stats and "class_statistics" in stats and stats["class_statistics"]:
available_classes_dict = dict(ui_manager.get_all_classes())
viz_data = image_processor.prepare_visualization_data(stats, available_classes_dict)
if "error" not in viz_data:
plot_figure = EvaluationMetrics.create_enhanced_stats_plot(viz_data)
else:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, viz_data["error"], ha='center', va='center', fontsize=12)
ax.axis('off')
plot_figure = fig
else:
fig, ax = plt.subplots(figsize=(8, 6))
ax.text(0.5, 0.5, "No detection data for plot", ha='center', va='center', fontsize=12)
ax.axis('off')
plot_figure = fig
# Extract scene analysis info
scene_analysis = stats.get("scene_analysis", {})
scene_desc = scene_analysis.get("description", "Scene analysis requires detected objects.")
# Ensure scene_desc is a string before adding HTML
if not isinstance(scene_desc, str):
scene_desc = str(scene_desc)
def clean_description(desc):
if not desc:
return ""
# 先過濾問答格式
if "Questions:" in desc:
desc = desc.split("Questions:")[0].strip()
if "Answers:" in desc:
desc = desc.split("Answers:")[0].strip()
# 然後按行過濾代碼和其他非敘述內容
lines = desc.split('\n')
clean_lines = []
skip_block = False
for line in lines:
# 檢測問題格式
if re.match(r'^\d+\.\s+(What|How|Why|When|Where|Who|The)', line):
continue
# 檢查需要跳過的行
if line.strip().startswith(':param') or line.strip().startswith('"""'):
continue
if line.strip().startswith("Exercise") or "class SceneDescriptionSystem" in line:
skip_block = True
continue
if ('def generate_scene_description' in line or
'def enhance_scene_descriptions' in line or
'def __init__' in line):
skip_block = True
continue
if line.strip().startswith('#TEST'):
skip_block = True
continue
if skip_block and line.strip() == "":
skip_block = False
# 如果不需要跳過
if not skip_block:
clean_lines.append(line)
cleaned_text = '\n'.join(clean_lines)
# 如果清理後為空,返回原始描述的第一段作為保險
if not cleaned_text.strip():
paragraphs = [p.strip() for p in desc.split('\n\n') if p.strip()]
if paragraphs:
return paragraphs[0]
return desc
return cleaned_text
# 獲取和處理場景描述
scene_analysis = stats.get("scene_analysis", {})
print("Processing scene_analysis:", scene_analysis.keys())
# 獲取原始描述
scene_desc = scene_analysis.get("description", "Scene analysis requires detected objects.")
if not isinstance(scene_desc, str):
scene_desc = str(scene_desc)
print(f"Original scene description (first 50 chars): {scene_desc[:50]}...")
# determine original description
clean_scene_desc = clean_description(scene_desc)
print(f"Cleaned scene description (first 50 chars): {clean_scene_desc[:50]}...")
if not clean_scene_desc.strip():
clean_scene_desc = scene_desc
scene_desc_html = f"<div>{clean_scene_desc}</div>"
# 獲取LLM增強描述並且確保設置默認值為空字符串而非 None,不然會有None type Error
enhanced_description = scene_analysis.get("enhanced_description", "")
if enhanced_description is None:
enhanced_description = ""
if not enhanced_description or not enhanced_description.strip():
print("WARNING: LLM enhanced description is empty!")
# bedge & label
llm_badge = ""
description_to_show = ""
# 在 Original Scene Analysis 折疊區顯示原始的描述
if use_llm and enhanced_description:
llm_badge = '<span style="display:inline-block; margin-left:8px; padding:3px 10px; border-radius:12px; background: linear-gradient(90deg, #38b2ac, #4299e1); color:white; font-size:0.7rem; font-weight:bold; box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); border: 1px solid rgba(255, 255, 255, 0.2);">LLM Enhanced</span>'
description_to_show = enhanced_description
else:
llm_badge = '<span style="display:inline-block; margin-left:8px; padding:3px 10px; border-radius:12px; background-color:#718096; color:white; font-size:0.7rem; font-weight:bold;">Basic</span>'
description_to_show = clean_scene_desc
# 使用LLM敘述時會有徽章標籤在標題上
scene_description_html = f'''
<div>
<div class="section-heading" style="font-size:1.2rem; margin-top:15px;">Scene Description {llm_badge}
<span style="font-size:0.8rem; color:#666; font-weight:normal; display:block; margin-top:2px;">
{('(Enhanced by AI language model)' if use_llm and enhanced_description else '(Based on object detection)')}
</span>
</div>
<div style="padding:15px; background-color:#ffffff; border-radius:8px; border:1px solid #e2e8f0; margin-bottom:20px; box-shadow:0 1px 3px rgba(0,0,0,0.05);">
{description_to_show}
</div>
</div>
'''
# 原始描述只在使用 LLM 且有增強敘述時會在折疊區顯示
original_desc_visibility = "block" if use_llm and enhanced_description else "none"
original_desc_html = f'''
<div id="original_scene_analysis_accordion" style="display: {original_desc_visibility};">
<div style="padding:15px; background-color:#f0f0f0; border-radius:8px; border:1px solid #e2e8f0;">
{clean_scene_desc}
</div>
</div>
'''
# Prepare activities list
activities_list = scene_analysis.get("possible_activities", [])
if not activities_list:
activities_list_data = [["No specific activities inferred"]] # Data for Dataframe
else:
activities_list_data = [[activity] for activity in activities_list]
# Prepare safety concerns list
safety_concerns_list = scene_analysis.get("safety_concerns", [])
if not safety_concerns_list:
safety_data = [["No safety concerns detected"]] # Data for Dataframe
else:
safety_data = [[concern] for concern in safety_concerns_list]
zones = scene_analysis.get("functional_zones", {})
lighting = scene_analysis.get("lighting_conditions", {"time_of_day": "unknown", "confidence": 0})
# 如果描述為空,記錄警告
if not clean_scene_desc.strip():
print("WARNING: Scene description is empty after cleaning!")
if not enhanced_description.strip():
print("WARNING: LLM enhanced description is empty!")
return (result_image, result_text, formatted_stats, plot_figure,
scene_description_html, original_desc_html,
activities_list_data, safety_data, zones, lighting)
except Exception as e:
print(f"Error in handle_image_upload: {e}")
import traceback
error_msg = f"Error processing image: {str(e)}\n{traceback.format_exc()}"
fig, ax = plt.subplots()
ax.text(0.5, 0.5, "Processing Error", color="red", ha="center", va="center")
ax.axis('off')
# Ensure return structure matches outputs even on error
return (None, error_msg, {}, fig, f"<div>Error: {str(e)}</div>", "Error",
[["Error"]], [["Error"]], {}, {"time_of_day": "error", "confidence": 0})
def download_video_from_url(video_url, max_duration_minutes=10):
"""
Downloads a video from a YouTube URL and returns the local path to the downloaded file.
Args:
video_url (str): URL of the YouTube video to download
max_duration_minutes (int): Maximum allowed video duration in minutes
Returns:
tuple: (Path to the downloaded video file or None, Error message or None)
"""
try:
# Create a temporary directory to store the video
temp_dir = tempfile.gettempdir()
output_filename = f"downloaded_{uuid.uuid4().hex}.mp4"
output_path = os.path.join(temp_dir, output_filename)
# Check if it's a YouTube URL
if "youtube.com" in video_url or "youtu.be" in video_url:
# Import yt-dlp here to avoid dependency if not needed
import yt_dlp
# Setup yt-dlp options
ydl_opts = {
'format': 'best[ext=mp4]/best', # Best quality MP4 or best available format
'outtmpl': output_path,
'noplaylist': True,
'quiet': False, # Set to True to reduce output
'no_warnings': False,
}
# First extract info to check duration
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
print(f"Extracting info from YouTube URL: {video_url}")
info_dict = ydl.extract_info(video_url, download=False)
# Check if video exists
if not info_dict:
return None, "Could not retrieve video information. Please check the URL."
video_title = info_dict.get('title', 'Unknown Title')
duration = info_dict.get('duration', 0)
print(f"Video title: {video_title}")
print(f"Video duration: {duration} seconds")
# Check video duration
if duration > max_duration_minutes * 60:
return None, f"Video is too long ({duration} seconds). Maximum duration is {max_duration_minutes} minutes."
# Download the video
print(f"Downloading YouTube video: {video_title}")
ydl.download([video_url])
# Verify the file exists and has content
if not os.path.exists(output_path) or os.path.getsize(output_path) == 0:
return None, "Download failed: Empty or missing file."
print(f"Successfully downloaded video to: {output_path}")
return output_path, None
else:
return None, "Only YouTube URLs are supported at this time. Please enter a valid YouTube URL."
except Exception as e:
import traceback
error_details = traceback.format_exc()
print(f"Error downloading video: {e}\n{error_details}")
return None, f"Error downloading video: {str(e)}"
def generate_basic_video_summary(analysis_results: Dict) -> str:
"""
生成基本的視頻統計摘要
Args:
analysis_results (Dict): 新的分析結果結構
Returns:
str: 詳細的統計摘要
"""
summary_lines = ["=== Video Analysis Summary ===", ""]
# process info
processing_info = analysis_results.get("processing_info", {})
duration = processing_info.get("video_duration_seconds", 0)
total_frames = processing_info.get("total_frames", 0)
analyzed_frames = processing_info.get("frames_analyzed", 0)
summary_lines.extend([
f"Video Duration: {duration:.1f} seconds ({total_frames} total frames)",
f"Frames Analyzed: {analyzed_frames} frames (every {processing_info.get('processing_interval', 1)} frames)",
""
])
# object detected summary
object_summary = analysis_results.get("object_summary", {})
total_objects = object_summary.get("total_unique_objects_detected", 0)
object_types = object_summary.get("object_types_found", 0)
summary_lines.extend([
f"Objects Detected: {total_objects} total objects across {object_types} categories",
""
])
# detailed counting number
detailed_counts = object_summary.get("detailed_counts", {})
if detailed_counts:
summary_lines.extend([
"Object Breakdown:",
*[f" • {count} {name}(s)" for name, count in detailed_counts.items()],
""
])
# 實用分析摘要
practical_analytics = analysis_results.get("practical_analytics", {})
# 物體密度分析
density_info = practical_analytics.get("object_density", {})
if density_info:
objects_per_min = density_info.get("objects_per_minute", 0)
peak_periods = density_info.get("peak_activity_periods", [])
summary_lines.extend([
f"Activity Level: {objects_per_min:.1f} objects per minute",
f"Peak Activity Periods: {len(peak_periods)} identified",
""
])
# 場景適合性
scene_info = practical_analytics.get("scene_appropriateness", {})
if scene_info.get("scene_detected", False):
scene_name = scene_info.get("scene_name", "unknown")
appropriateness = scene_info.get("appropriateness_score", 0)
summary_lines.extend([
f"Scene Type: {scene_name}",
f"Object-Scene Compatibility: {appropriateness:.1%}",
""
])
# 品質指標
quality_info = practical_analytics.get("quality_metrics", {})
if quality_info:
quality_grade = quality_info.get("quality_grade", "unknown")
overall_confidence = quality_info.get("overall_confidence", 0)
summary_lines.extend([
f"Detection Quality: {quality_grade.title()} (avg confidence: {overall_confidence:.3f})",
""
])
summary_lines.append(f"Processing completed in {processing_info.get('processing_time_seconds', 0):.1f} seconds.")
return "\n".join(summary_lines)
@spaces.GPU
def handle_video_upload(video_input, video_url, input_type, model_name,
confidence_threshold, process_interval):
"""
處理影片上傳的函數
Args:
video_input: 上傳的視頻文件
video_url: 視頻URL(如果使用URL輸入)
input_type: 輸入類型("upload" 或 "url")
model_name: YOLO模型名稱
confidence_threshold: 置信度閾值
process_interval: 處理間隔(每N幀處理一次)
Returns:
Tuple: (output_video_path, summary_html, formatted_stats, object_details)
"""
if video_processor is None:
error_msg = "Error: Video processor not initialized."
error_html = f"<div class='video-summary-content-wrapper'><pre>{error_msg}</pre></div>"
empty_object_details = {}
return None, error_html, {"error": "Video processor not available"}, empty_object_details
video_path = None
# 根據輸入類型處理
if input_type == "upload" and video_input:
video_path = video_input
print(f"Processing uploaded video file: {video_path}")
elif input_type == "url" and video_url:
print(f"Processing video from URL: {video_url}")
video_path, error_msg = download_video_from_url(video_url)
if error_msg:
error_html = f"<div class='video-summary-content-wrapper'><pre>{error_msg}</pre></div>"
empty_object_details = {}
return None, error_html, {"error": error_msg}, empty_object_details
if not video_path:
error_msg = "Please provide a video file or valid URL."
error_html = f"<div class='video-summary-content-wrapper'><pre>{error_msg}</pre></div>"
empty_object_details = {}
return None, error_html, {"error": "No video input provided"}, empty_object_details
print(f"Starting practical video analysis: model={model_name}, confidence={confidence_threshold}, interval={process_interval}")
processing_start_time = time.time()
try:
output_video_path, analysis_results = video_processor.process_video(
video_path=video_path,
model_name=model_name,
confidence_threshold=confidence_threshold,
process_interval=int(process_interval)
)
print(f"Video processing function returned: path={output_video_path}")
if output_video_path is None:
error_msg = analysis_results.get("error", "Unknown error occurred during video processing")
error_html = f"<div class='video-summary-content-wrapper'><pre>Processing failed: {error_msg}</pre></div>"
empty_object_details = {}
return None, error_html, analysis_results, empty_object_details
# 生成摘要,直接用統計數據
basic_summary = generate_basic_video_summary(analysis_results)
# Final Result
processing_time = time.time() - processing_start_time
processing_info = analysis_results.get("processing_info", {})
summary_lines = [
f"Video processing completed in {processing_time:.2f} seconds.",
f"Analyzed {processing_info.get('frames_analyzed', 0)} frames out of {processing_info.get('total_frames', 0)} total frames.",
f"Processing interval: every {process_interval} frames",
basic_summary
]
summary_content = '\n'.join(summary_lines)
summary_html = f"<div class='video-summary-content-wrapper'><pre>{summary_content}</pre></div>"
# 提取物體詳情數據用於第四個輸出組件
timeline_analysis = analysis_results.get("timeline_analysis", {})
object_appearances = timeline_analysis.get("object_appearances", {})
# 格式化物體詳情數據以便在JSON組件中顯示
object_details_formatted = {}
for obj_name, details in object_appearances.items():
object_details_formatted[obj_name] = {
"Estimated Count": details.get("estimated_count", 0),
"First Appearance": details.get("first_appearance", "Unknown"),
"Last Seen": details.get("last_seen", "Unknown"),
"Duration in Video": details.get("duration_in_video", "Unknown"),
"Detection Confidence": details.get("detection_confidence", 0.0),
"First Appearance (seconds)": details.get("first_appearance_seconds", 0),
"Duration (seconds)": details.get("duration_seconds", 0)
}
print(f"Extracted object details for {len(object_details_formatted)} object types")
return output_video_path, summary_html, analysis_results, object_details_formatted
except Exception as e:
print(f"Error in handle_video_upload: {e}")
traceback.print_exc()
error_msg = f"Video processing failed: {str(e)}"
error_html = f"<div class='video-summary-content-wrapper'><pre>{error_msg}</pre></div>"
empty_object_details = {}
return None, error_html, {"error": str(e)}, empty_object_details
def main():
"""主函數,初始化並啟動Gradio"""
global ui_manager
print("=== VisionScout Application Starting ===")
print("Initializing processors...")
initialization_success = initialize_processors()
if not initialization_success:
print("ERROR: Failed to initialize processors. Application cannot start.")
return
print("Initializing UI manager...")
ui_manager = initialize_ui_manager()
print("Creating Gradio interface...")
demo_interface = ui_manager.create_interface(
handle_image_upload_fn=handle_image_upload,
handle_video_upload_fn=handle_video_upload,
download_video_from_url_fn=download_video_from_url
)
print("Launching application...")
demo_interface.launch(debug=True)
if __name__ == "__main__":
main()
|