DocIndexer / 3gpp_tdoc_indexer.py
om4r932's picture
Fix test
a49b92f
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from datasets import load_dataset, Dataset
import json
import os
import time
import warnings
import re
import concurrent.futures
import threading
from typing import List, Dict, Any
from huggingface_hub import configure_http_backend
def backend_factory() -> requests.Session:
session = requests.Session()
session.verify = False
return session
configure_http_backend(backend_factory=backend_factory)
warnings.filterwarnings("ignore")
class TsgDocIndexer:
def __init__(self, max_workers=16):
self.indexer = self.load_indexer()
self.main_ftp_url = "https://3gpp.org/ftp"
self.dataset = load_dataset("OrganizedProgrammers/3GPPTDocLocation")
self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE)
self.max_workers = max_workers
# Verrous pour les opérations thread-safe
self.print_lock = threading.Lock()
self.indexer_lock = threading.Lock()
# Compteurs pour le suivi
self.total_indexed = 0
self.processed_count = 0
self.total_count = 0
def load_indexer(self):
"""Load existing index if available"""
all_docs = {}
tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"])
tdoc_locations = tdoc_locations["train"].to_list()
for doc in tdoc_locations:
all_docs[doc["doc_id"]] = doc["url"]
return all_docs
def save_indexer(self):
"""Save the updated index"""
data = []
for doc_id, url in self.indexer.items():
data.append({"doc_id": doc_id, "url": url})
dataset = Dataset.from_list(data)
dataset.push_to_hub("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"])
def get_docs_from_url(self, url):
"""Récupérer la liste des documents/répertoires depuis une URL"""
try:
response = requests.get(url, verify=False, timeout=10)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
with self.print_lock:
print(f"Erreur lors de l'accès à {url}: {e}")
return []
def is_valid_document_pattern(self, filename):
"""Vérifier si le nom de fichier correspond au motif requis"""
return bool(self.valid_doc_pattern.match(filename))
def is_zip_file(self, filename):
"""Vérifier si le fichier est un ZIP"""
return filename.lower().endswith('.zip')
def extract_doc_id(self, filename):
"""Extraire l'identifiant du document à partir du nom de fichier s'il correspond au motif"""
if self.is_valid_document_pattern(filename):
match = self.valid_doc_pattern.match(filename)
if match:
# Retourner le motif complet (comme S1-12345)
full_id = filename.split('.')[0] # Enlever l'extension si présente
return full_id.split('_')[0] # Enlever les suffixes après underscore si présents
return None
def process_zip_files(self, files_list, base_url, workshop=False):
"""Traiter une liste de fichiers pour trouver et indexer les ZIP valides"""
indexed_count = 0
for file in files_list:
if file in ['./', '../', 'ZIP/', 'zip/']:
continue
# Vérifier si c'est un fichier ZIP et s'il correspond au motif
if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop):
file_url = f"{base_url}/{file}"
# Extraire l'ID du document
doc_id = self.extract_doc_id(file)
if doc_id is None:
doc_id = file.split('.')[0]
if doc_id:
# Vérifier si ce fichier est déjà indexé
with self.indexer_lock:
if doc_id in self.indexer and self.indexer[doc_id] == file_url:
continue
# Ajouter ou mettre à jour l'index
self.indexer[doc_id] = file_url
indexed_count += 1
self.total_indexed += 1
return indexed_count
def process_meeting(self, meeting, wg_url, workshop=False):
"""Traiter une réunion individuelle avec multithreading"""
try:
if meeting in ['./', '../']:
return 0
meeting_url = f"{wg_url}/{meeting}"
with self.print_lock:
print(f" Vérification de la réunion: {meeting}")
# Vérifier le contenu de la réunion
meeting_contents = self.get_docs_from_url(meeting_url)
key = None
if "docs" in [x.lower() for x in meeting_contents]:
key = "docs"
elif "tdocs" in [x.lower() for x in meeting_contents]:
key = "tdocs"
elif "tdoc" in [x.lower() for x in meeting_contents]:
key = "tdoc"
if key is not None:
docs_url = f"{meeting_url}/{key}"
with self.print_lock:
print(f" Vérification des documents dans {docs_url}")
# Récupérer la liste des fichiers dans le dossier Docs
docs_files = self.get_docs_from_url(docs_url)
# 1. Indexer les fichiers ZIP directement dans le dossier Docs
docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop)
if docs_indexed_count > 0:
with self.print_lock:
print(f" {docs_indexed_count} nouveaux fichiers ZIP indexés dans Docs")
# 2. Vérifier le sous-dossier ZIP s'il existe
if "zip" in [x.lower() for x in docs_files]:
zip_url = f"{docs_url}/zip"
with self.print_lock:
print(f" Vérification du dossier ZIP: {zip_url}")
# Récupérer les fichiers dans le sous-dossier ZIP
zip_files = self.get_docs_from_url(zip_url)
# Indexer les fichiers ZIP dans le sous-dossier ZIP
zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop)
if zip_indexed_count > 0:
with self.print_lock:
print(f" {zip_indexed_count} nouveaux fichiers ZIP indexés dans le dossier ZIP")
# Mise à jour du compteur de progression
with self.indexer_lock:
self.processed_count += 1
# Affichage de la progression
with self.print_lock:
progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0
print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)", end="")
return 1 # Réunion traitée avec succès
except Exception as e:
with self.print_lock:
print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}")
return 0
def process_workgroup(self, wg, main_url):
"""Traiter un groupe de travail avec multithreading pour ses réunions"""
if wg in ['./', '../']:
return
wg_url = f"{main_url}/{wg}"
with self.print_lock:
print(f" Vérification du groupe de travail: {wg}")
# Récupérer les dossiers de réunion
meeting_folders = self.get_docs_from_url(wg_url)
# Ajouter au compteur total
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.process_meeting, meeting, wg_url)
for meeting in meeting_folders if meeting not in ['./', '../']]
# Attendre que toutes les tâches soient terminées
concurrent.futures.wait(futures)
def index_all_tdocs(self):
"""Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading"""
print("Démarrage de l'indexation des documents ZIP 3GPP (S1-S6, SP, C1-C6, CP)...")
start_time = time.time()
docs_count_before = len(self.indexer)
# Principaux groupes TSG
main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire
for main_tsg in main_groups:
print(f"\nIndexation de {main_tsg.upper()}...")
main_url = f"{self.main_ftp_url}/{main_tsg}"
# Récupérer les groupes de travail
workgroups = self.get_docs_from_url(main_url)
# Traiter chaque groupe de travail séquentiellement
# (mais les réunions à l'intérieur seront traitées en parallèle)
for wg in workgroups:
self.process_workgroup(wg, main_url)
docs_count_after = len(self.indexer)
new_docs_count = docs_count_after - docs_count_before
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes")
print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
print(f"Total des documents dans l'index: {docs_count_after}")
return self.indexer
def index_all_workshops(self):
print("Démarrage de l'indexation des workshops ZIP 3GPP...")
start_time = time.time()
docs_count_before = len(self.indexer)
print("\nIndexation du dossier 'workshop'")
main_url = f"{self.main_ftp_url}/workshop"
# Récupérer les dossiers de réunion
meeting_folders = self.get_docs_from_url(main_url)
# Ajouter au compteur total
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True)
for meeting in meeting_folders if meeting not in ['./', '../']]
concurrent.futures.wait(futures)
docs_count_after = len(self.indexer)
new_docs_count = docs_count_after - docs_count_before
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes")
print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
print(f"Total des documents dans l'index: {docs_count_after}")
return self.indexer
def main():
# Nombre de workers pour le multithreading (ajustable selon les ressources disponibles)
max_workers = 16
indexer = TsgDocIndexer(max_workers=max_workers)
try:
indexer.index_all_tdocs()
indexer.index_all_workshops()
except Exception as e:
print(f"Erreur lors de l'indexation: {e}")
finally:
indexer.save_indexer()
print("Index sauvegardé.")
if __name__ == "__main__":
main()