import gradio as gr from collections import defaultdict import cv2 import numpy as np import tempfile from ultralytics import YOLO import os import telegram # Import de la bibliothèque import asyncio # Nécessaire pour les opérations asynchrones # --- CONFIGURATION TELEGRAM --- # Récupère les identifiants depuis les variables d'environnement TELEGRAM_BOT_TOKEN = "8489885905:AAFoNtXgszbL28yX8ogvQdL6lBHXa9lwHHI" TELEGRAM_CHAT_ID = "-1002689269933" # 1. Charger le modèle une seule fois au démarrage de l'application print("Chargement du modèle YOLOv8...") model = YOLO("yolov11n.pt") print("Modèle chargé.") # 2. Fonction de traitement vidéo (inchangée) def track_objects(video_in_path): cap = cv2.VideoCapture(video_in_path) if not cap.isOpened(): raise gr.Error("Erreur : Impossible d'ouvrir la vidéo d'entrée.") width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) video_out_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(video_out_path, fourcc, fps, (width, height)) track_history = defaultdict(lambda: []) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Début du traitement de {total_frames} images.") while cap.isOpened(): success, frame = cap.read() if not success: break results = model.track(frame, persist=True) annotated_frame = results[0].plot() # .plot() gère déjà le cas où il n'y a pas de détection if results[0].boxes is not None and results[0].boxes.id is not None: boxes = results[0].boxes.xywh.cpu() track_ids = results[0].boxes.id.int().cpu().tolist() for box, track_id in zip(boxes, track_ids): x, y, w, h = box track = track_history[track_id] track.append((float(x), float(y))) if len(track) > 30: track.pop(0) points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2)) cv2.polylines(annotated_frame, [points], isClosed=False, color=(230, 230, 230), thickness=3) out.write(annotated_frame) cap.release() out.release() print("Traitement vidéo terminé.") return video_out_path # 3. Nouvelle fonction pour envoyer la vidéo sur Telegram async def send_to_telegram(video_path): if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID: error_msg = "Erreur : Les variables d'environnement TELEGRAM_BOT_TOKEN et TELEGRAM_CHAT_ID ne sont pas configurées." print(error_msg) return error_msg try: print("Initialisation du bot Telegram...") bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN) print(f"Envoi de la vidéo {video_path} au chat ID {TELEGRAM_CHAT_ID}...") # Envoi de la vidéo async with bot: with open(video_path, 'rb') as video_file: await bot.send_video( chat_id=TELEGRAM_CHAT_ID, video=video_file, caption="Voici la vidéo traitée avec le suivi d'objets YOLOv8." ) success_msg = "Vidéo envoyée avec succès sur Telegram !" print(success_msg) return success_msg except Exception as e: error_msg = f"Une erreur est survenue lors de l'envoi sur Telegram : {e}" print(error_msg) return error_msg # 4. Fonction principale qui orchestre le tout pour Gradio async def process_and_send(video_in_path): if video_in_path is None: raise gr.Error("Veuillez d'abord uploader une vidéo.") # Étape 1: Traiter la vidéo gr.Info("Le traitement de la vidéo a commencé...") processed_video_path = track_objects(video_in_path) # Étape 2: Envoyer sur Telegram gr.Info("Traitement terminé. Envoi sur Telegram...") telegram_status = await send_to_telegram(processed_video_path) # Étape 3: Renvoyer les résultats à l'interface Gradio return processed_video_path, telegram_status # 5. Créer l'interface Gradio with gr.Blocks() as demo: gr.Markdown("# Démonstration de Suivi d'Objets avec YOLOv8 et envoi sur Telegram") gr.Markdown("Uploadez une vidéo. Une fois le traitement terminé, le résultat s'affichera ici et sera envoyé sur votre chat Telegram.") with gr.Row(): video_input = gr.Video(label="Vidéo d'entrée") video_output = gr.Video(label="Vidéo avec suivi") status_textbox = gr.Textbox(label="Statut de l'envoi Telegram", interactive=False) btn = gr.Button("Lancer le suivi et Envoyer sur Telegram") btn.click( fn=process_and_send, inputs=video_input, outputs=[video_output, status_textbox] ) # Lancer l'application if __name__ == "__main__": demo.launch()