Spaces:
Running
Running
from datetime import datetime | |
import requests | |
from bs4 import BeautifulSoup | |
from datasets import load_dataset, Dataset | |
import json | |
import os | |
import time | |
import warnings | |
import re | |
import concurrent.futures | |
import threading | |
from typing import List, Dict, Any | |
from huggingface_hub import configure_http_backend | |
def backend_factory() -> requests.Session: | |
session = requests.Session() | |
session.verify = False | |
return session | |
configure_http_backend(backend_factory=backend_factory) | |
warnings.filterwarnings("ignore") | |
class TsgDocIndexer: | |
def __init__(self, max_workers=16): | |
self.indexer = self.load_indexer() | |
self.main_ftp_url = "https://3gpp.org/ftp" | |
self.dataset = load_dataset("OrganizedProgrammers/3GPPTDocLocation") | |
self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) | |
self.max_workers = max_workers | |
# Verrous pour les opérations thread-safe | |
self.print_lock = threading.Lock() | |
self.indexer_lock = threading.Lock() | |
# Compteurs pour le suivi | |
self.total_indexed = 0 | |
self.processed_count = 0 | |
self.total_count = 0 | |
def load_indexer(self): | |
"""Load existing index if available""" | |
all_docs = {} | |
tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) | |
tdoc_locations = tdoc_locations["train"].to_list() | |
for doc in tdoc_locations: | |
all_docs[doc["doc_id"]] = doc["url"] | |
return all_docs | |
def save_indexer(self): | |
"""Save the updated index""" | |
data = [] | |
for doc_id, url in self.indexer.items(): | |
data.append({"doc_id": doc_id, "url": url}) | |
dataset = Dataset.from_list(data) | |
dataset.push_to_hub("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) | |
def get_docs_from_url(self, url): | |
"""Récupérer la liste des documents/répertoires depuis une URL""" | |
try: | |
response = requests.get(url, verify=False, timeout=10) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
with self.print_lock: | |
print(f"Erreur lors de l'accès à {url}: {e}") | |
return [] | |
def is_valid_document_pattern(self, filename): | |
"""Vérifier si le nom de fichier correspond au motif requis""" | |
return bool(self.valid_doc_pattern.match(filename)) | |
def is_zip_file(self, filename): | |
"""Vérifier si le fichier est un ZIP""" | |
return filename.lower().endswith('.zip') | |
def extract_doc_id(self, filename): | |
"""Extraire l'identifiant du document à partir du nom de fichier s'il correspond au motif""" | |
if self.is_valid_document_pattern(filename): | |
match = self.valid_doc_pattern.match(filename) | |
if match: | |
# Retourner le motif complet (comme S1-12345) | |
full_id = filename.split('.')[0] # Enlever l'extension si présente | |
return full_id.split('_')[0] # Enlever les suffixes après underscore si présents | |
return None | |
def process_zip_files(self, files_list, base_url, workshop=False): | |
"""Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" | |
indexed_count = 0 | |
for file in files_list: | |
if file in ['./', '../', 'ZIP/', 'zip/']: | |
continue | |
# Vérifier si c'est un fichier ZIP et s'il correspond au motif | |
if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): | |
file_url = f"{base_url}/{file}" | |
# Extraire l'ID du document | |
doc_id = self.extract_doc_id(file) | |
if doc_id is None: | |
doc_id = file.split('.')[0] | |
if doc_id: | |
# Vérifier si ce fichier est déjà indexé | |
with self.indexer_lock: | |
if doc_id in self.indexer and self.indexer[doc_id] == file_url: | |
continue | |
# Ajouter ou mettre à jour l'index | |
self.indexer[doc_id] = file_url | |
indexed_count += 1 | |
self.total_indexed += 1 | |
return indexed_count | |
def process_meeting(self, meeting, wg_url, workshop=False): | |
"""Traiter une réunion individuelle avec multithreading""" | |
try: | |
if meeting in ['./', '../']: | |
return 0 | |
meeting_url = f"{wg_url}/{meeting}" | |
with self.print_lock: | |
print(f" Vérification de la réunion: {meeting}") | |
# Vérifier le contenu de la réunion | |
meeting_contents = self.get_docs_from_url(meeting_url) | |
key = None | |
if "docs" in [x.lower() for x in meeting_contents]: | |
key = "docs" | |
elif "tdocs" in [x.lower() for x in meeting_contents]: | |
key = "tdocs" | |
elif "tdoc" in [x.lower() for x in meeting_contents]: | |
key = "tdoc" | |
if key is not None: | |
docs_url = f"{meeting_url}/{key}" | |
with self.print_lock: | |
print(f" Vérification des documents dans {docs_url}") | |
# Récupérer la liste des fichiers dans le dossier Docs | |
docs_files = self.get_docs_from_url(docs_url) | |
# 1. Indexer les fichiers ZIP directement dans le dossier Docs | |
docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) | |
if docs_indexed_count > 0: | |
with self.print_lock: | |
print(f" {docs_indexed_count} nouveaux fichiers ZIP indexés dans Docs") | |
# 2. Vérifier le sous-dossier ZIP s'il existe | |
if "zip" in [x.lower() for x in docs_files]: | |
zip_url = f"{docs_url}/zip" | |
with self.print_lock: | |
print(f" Vérification du dossier ZIP: {zip_url}") | |
# Récupérer les fichiers dans le sous-dossier ZIP | |
zip_files = self.get_docs_from_url(zip_url) | |
# Indexer les fichiers ZIP dans le sous-dossier ZIP | |
zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) | |
if zip_indexed_count > 0: | |
with self.print_lock: | |
print(f" {zip_indexed_count} nouveaux fichiers ZIP indexés dans le dossier ZIP") | |
# Mise à jour du compteur de progression | |
with self.indexer_lock: | |
self.processed_count += 1 | |
# Affichage de la progression | |
with self.print_lock: | |
progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 | |
print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)", end="") | |
return 1 # Réunion traitée avec succès | |
except Exception as e: | |
with self.print_lock: | |
print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") | |
return 0 | |
def process_workgroup(self, wg, main_url): | |
"""Traiter un groupe de travail avec multithreading pour ses réunions""" | |
if wg in ['./', '../']: | |
return | |
wg_url = f"{main_url}/{wg}" | |
with self.print_lock: | |
print(f" Vérification du groupe de travail: {wg}") | |
# Récupérer les dossiers de réunion | |
meeting_folders = self.get_docs_from_url(wg_url) | |
# Ajouter au compteur total | |
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) | |
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [executor.submit(self.process_meeting, meeting, wg_url) | |
for meeting in meeting_folders if meeting not in ['./', '../']] | |
# Attendre que toutes les tâches soient terminées | |
concurrent.futures.wait(futures) | |
def index_all_tdocs(self): | |
"""Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" | |
print("Démarrage de l'indexation des documents ZIP 3GPP (S1-S6, SP, C1-C6, CP)...") | |
start_time = time.time() | |
docs_count_before = len(self.indexer) | |
# Principaux groupes TSG | |
main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire | |
for main_tsg in main_groups: | |
print(f"\nIndexation de {main_tsg.upper()}...") | |
main_url = f"{self.main_ftp_url}/{main_tsg}" | |
# Récupérer les groupes de travail | |
workgroups = self.get_docs_from_url(main_url) | |
# Traiter chaque groupe de travail séquentiellement | |
# (mais les réunions à l'intérieur seront traitées en parallèle) | |
for wg in workgroups: | |
self.process_workgroup(wg, main_url) | |
docs_count_after = len(self.indexer) | |
new_docs_count = docs_count_after - docs_count_before | |
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") | |
print(f"Nouveaux documents ZIP indexés: {new_docs_count}") | |
print(f"Total des documents dans l'index: {docs_count_after}") | |
return self.indexer | |
def index_all_workshops(self): | |
print("Démarrage de l'indexation des workshops ZIP 3GPP...") | |
start_time = time.time() | |
docs_count_before = len(self.indexer) | |
print("\nIndexation du dossier 'workshop'") | |
main_url = f"{self.main_ftp_url}/workshop" | |
# Récupérer les dossiers de réunion | |
meeting_folders = self.get_docs_from_url(main_url) | |
# Ajouter au compteur total | |
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) | |
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) | |
for meeting in meeting_folders if meeting not in ['./', '../']] | |
concurrent.futures.wait(futures) | |
docs_count_after = len(self.indexer) | |
new_docs_count = docs_count_after - docs_count_before | |
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") | |
print(f"Nouveaux documents ZIP indexés: {new_docs_count}") | |
print(f"Total des documents dans l'index: {docs_count_after}") | |
return self.indexer | |
def main(): | |
# Nombre de workers pour le multithreading (ajustable selon les ressources disponibles) | |
max_workers = 16 | |
indexer = TsgDocIndexer(max_workers=max_workers) | |
try: | |
indexer.index_all_tdocs() | |
indexer.index_all_workshops() | |
except Exception as e: | |
print(f"Erreur lors de l'indexation: {e}") | |
finally: | |
indexer.save_indexer() | |
print("Index sauvegardé.") | |
if __name__ == "__main__": | |
main() | |