from datetime import datetime import requests from bs4 import BeautifulSoup from datasets import load_dataset, Dataset import json import os import time import warnings import re import concurrent.futures import threading from typing import List, Dict, Any warnings.filterwarnings("ignore") class TsgDocIndexer: def __init__(self, max_workers=16): self.indexer = self.load_indexer() self.main_ftp_url = "https://3gpp.org/ftp" self.dataset = load_dataset("OrganizedProgrammers/3GPPTDocLocation") self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) self.max_workers = max_workers # Verrous pour les opérations thread-safe self.print_lock = threading.Lock() self.indexer_lock = threading.Lock() # Compteurs pour le suivi self.total_indexed = 0 self.processed_count = 0 self.total_count = 0 def load_indexer(self): """Load existing index if available""" all_docs = {} tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) tdoc_locations = tdoc_locations["train"].to_list() for doc in tdoc_locations: all_docs[doc["doc_id"]] = doc["url"] return all_docs def save_indexer(self): """Save the updated index""" data = [] for doc_id, url in self.indexer.items(): data.append({"doc_id": doc_id, "url": url}) dataset = Dataset.from_list(data) dataset.push_to_hub("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) def get_docs_from_url(self, url): """Récupérer la liste des documents/répertoires depuis une URL""" try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: with self.print_lock: print(f"Erreur lors de l'accès à {url}: {e}") return [] def is_valid_document_pattern(self, filename): """Vérifier si le nom de fichier correspond au motif requis""" return bool(self.valid_doc_pattern.match(filename)) def is_zip_file(self, filename): """Vérifier si le fichier est un ZIP""" return filename.lower().endswith('.zip') def extract_doc_id(self, filename): """Extraire l'identifiant du document à partir du nom de fichier s'il correspond au motif""" if self.is_valid_document_pattern(filename): match = self.valid_doc_pattern.match(filename) if match: # Retourner le motif complet (comme S1-12345) full_id = filename.split('.')[0] # Enlever l'extension si présente return full_id.split('_')[0] # Enlever les suffixes après underscore si présents return None def process_zip_files(self, files_list, base_url, workshop=False): """Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" indexed_count = 0 for file in files_list: if file in ['./', '../', 'ZIP/', 'zip/']: continue # Vérifier si c'est un fichier ZIP et s'il correspond au motif if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): file_url = f"{base_url}/{file}" # Extraire l'ID du document doc_id = self.extract_doc_id(file) if doc_id is None: doc_id = file.split('.')[0] if doc_id: # Vérifier si ce fichier est déjà indexé with self.indexer_lock: if doc_id in self.indexer and self.indexer[doc_id] == file_url: continue # Ajouter ou mettre à jour l'index self.indexer[doc_id] = file_url indexed_count += 1 self.total_indexed += 1 return indexed_count def process_meeting(self, meeting, wg_url, workshop=False): """Traiter une réunion individuelle avec multithreading""" try: if meeting in ['./', '../']: return 0 meeting_url = f"{wg_url}/{meeting}" with self.print_lock: print(f" Vérification de la réunion: {meeting}") # Vérifier le contenu de la réunion meeting_contents = self.get_docs_from_url(meeting_url) key = None if "docs" in [x.lower() for x in meeting_contents]: key = "docs" elif "tdocs" in [x.lower() for x in meeting_contents]: key = "tdocs" elif "tdoc" in [x.lower() for x in meeting_contents]: key = "tdoc" if key is not None: docs_url = f"{meeting_url}/{key}" with self.print_lock: print(f" Vérification des documents dans {docs_url}") # Récupérer la liste des fichiers dans le dossier Docs docs_files = self.get_docs_from_url(docs_url) # 1. Indexer les fichiers ZIP directement dans le dossier Docs docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) if docs_indexed_count > 0: with self.print_lock: print(f" {docs_indexed_count} nouveaux fichiers ZIP indexés dans Docs") # 2. Vérifier le sous-dossier ZIP s'il existe if "zip" in [x.lower() for x in docs_files]: zip_url = f"{docs_url}/zip" with self.print_lock: print(f" Vérification du dossier ZIP: {zip_url}") # Récupérer les fichiers dans le sous-dossier ZIP zip_files = self.get_docs_from_url(zip_url) # Indexer les fichiers ZIP dans le sous-dossier ZIP zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) if zip_indexed_count > 0: with self.print_lock: print(f" {zip_indexed_count} nouveaux fichiers ZIP indexés dans le dossier ZIP") # Mise à jour du compteur de progression with self.indexer_lock: self.processed_count += 1 # Affichage de la progression with self.print_lock: progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)", end="") return 1 # Réunion traitée avec succès except Exception as e: with self.print_lock: print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") return 0 def process_workgroup(self, wg, main_url): """Traiter un groupe de travail avec multithreading pour ses réunions""" if wg in ['./', '../']: return wg_url = f"{main_url}/{wg}" with self.print_lock: print(f" Vérification du groupe de travail: {wg}") # Récupérer les dossiers de réunion meeting_folders = self.get_docs_from_url(wg_url) # Ajouter au compteur total self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(self.process_meeting, meeting, wg_url) for meeting in meeting_folders if meeting not in ['./', '../']] # Attendre que toutes les tâches soient terminées concurrent.futures.wait(futures) def index_all_tdocs(self): """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" print("Démarrage de l'indexation des documents ZIP 3GPP (S1-S6, SP, C1-C6, CP)...") start_time = time.time() docs_count_before = len(self.indexer) # Principaux groupes TSG main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire for main_tsg in main_groups: print(f"\nIndexation de {main_tsg.upper()}...") main_url = f"{self.main_ftp_url}/{main_tsg}" # Récupérer les groupes de travail workgroups = self.get_docs_from_url(main_url) # Traiter chaque groupe de travail séquentiellement # (mais les réunions à l'intérieur seront traitées en parallèle) for wg in workgroups: self.process_workgroup(wg, main_url) docs_count_after = len(self.indexer) new_docs_count = docs_count_after - docs_count_before print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") print(f"Nouveaux documents ZIP indexés: {new_docs_count}") print(f"Total des documents dans l'index: {docs_count_after}") return self.indexer def index_all_workshops(self): print("Démarrage de l'indexation des workshops ZIP 3GPP...") start_time = time.time() docs_count_before = len(self.indexer) print("\nIndexation du dossier 'workshop'") main_url = f"{self.main_ftp_url}/workshop" # Récupérer les dossiers de réunion meeting_folders = self.get_docs_from_url(main_url) # Ajouter au compteur total self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) for meeting in meeting_folders if meeting not in ['./', '../']] concurrent.futures.wait(futures) docs_count_after = len(self.indexer) new_docs_count = docs_count_after - docs_count_before print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") print(f"Nouveaux documents ZIP indexés: {new_docs_count}") print(f"Total des documents dans l'index: {docs_count_after}") return self.indexer def main(): # Nombre de workers pour le multithreading (ajustable selon les ressources disponibles) max_workers = 16 indexer = TsgDocIndexer(max_workers=max_workers) try: indexer.index_all_tdocs() indexer.index_all_workshops() except Exception as e: print(f"Erreur lors de l'indexation: {e}") finally: indexer.save_indexer() print("Index sauvegardé.") if __name__ == "__main__": main()