import os import time import warnings import requests from dotenv import load_dotenv import numpy as np import pandas as pd from huggingface_hub import configure_http_backend def backend_factory() -> requests.Session: session = requests.Session() session.verify = False return session configure_http_backend(backend_factory=backend_factory) warnings.filterwarnings("ignore") os.environ["CURL_CA_BUNDLE"] = "" load_dotenv() from datasets import load_dataset, Dataset import threading import zipfile import sys import subprocess import re import traceback import io import concurrent.futures import hashlib CHARS = "0123456789abcdefghijklmnopqrstuvwxyz" DICT_LOCK = threading.Lock() DOCUMENT_LOCK = threading.Lock() STOP_EVENT = threading.Event() spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent") spec_contents = spec_contents["train"].to_list() documents_by_spec_num = {} for section in spec_contents: if section["doc_id"] not in documents_by_spec_num.keys(): documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} else: documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"] indexed_specifications = {} specifications_passed = set() processed_count = 0 total_count = 0 def get_text(specification: str, version: str, second: bool = False): """Récupère les bytes du PDF à partir d'une spécification et d'une version.""" if STOP_EVENT.is_set(): return [] doc_id = specification series = doc_id.split(".")[0] content = [] print(f"\n[INFO] Tentative de récupération de la spécification {doc_id} version {version}", flush=True) response = requests.get( f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", verify=False, headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} ) if response.status_code != 200: print(f"\n[ERREUR] Echec du téléchargement du ZIP pour {specification}-{version}. Tentative avec dernière version disponible", flush=True) last_possible_version = requests.post('https://organizedprogrammers-3gppdocfinder.hf.space/find', verify=False, headers={"Content-Type": "application/json"}, json={"doc_id": specification}) if last_possible_version.status_code != 200: print(f"\n[ERREUR] Echec du 2e téléchargement du ZIP pour {specification}-{version}. {last_possible_version.status_code}", flush=True) return [] data = last_possible_version.json() return get_text(specification, data['version'], True) zip_bytes = io.BytesIO(response.content) zip_file = zipfile.ZipFile(zip_bytes) def extract_text(zipfile: zipfile.ZipFile, filename: str): if (filename.endswith(".doc") or filename.endswith(".docx")) and ("cover" not in filename.lower() and "annex" not in filename.lower()): doc_bytes = zipfile.read(filename) input_path = f"/tmp/{filename}" output_path = "/tmp" changed_ext_filename = re.sub(r".docx?$", ".txt", filename) output_file = f"/tmp/{changed_ext_filename}" with open(input_path, "wb") as f: f.write(doc_bytes) try: print(f"\n[INFO] Tentative de conversion DOC/DOCX -> TXT", flush=True) try: subprocess.run( ["libreoffice", "--headless", "--convert-to", "txt:Text", "--outdir", output_path, input_path], check=True, capture_output=True, timeout=60*5 ) except subprocess.TimeoutExpired as e: print("[SKIP] Trop long !") return [] except subprocess.CalledProcessError as e: print(f"\n[ERREUR] LibreOffice a échoué : {e}", flush=True) return [] if os.path.exists(output_file): with open(output_file, "r", encoding="utf-8") as f: return [line.strip() for line in f if line.strip()] finally: if os.path.exists(input_path): os.remove(input_path) if os.path.exists(output_file): os.remove(output_file) return [] for fileinfo in zip_file.infolist(): if STOP_EVENT.is_set(): return [] if fileinfo.filename.endswith(".zip") and len(zip_file.namelist()) == 1: nested_zip_bytes = io.BytesIO(zip_file.read(fileinfo.filename)) zip_file = zipfile.ZipFile(nested_zip_bytes) break for filename in zip_file.namelist(): if STOP_EVENT.is_set(): return [] content.extend(extract_text(zip_file, filename)) if content: print("\n[INFO] Conversion terminé", flush=True) else: print(f"\n[ERREUR] Pas réussi", flush=True) return content def get_spec_content(specification: str, version:str): if STOP_EVENT.is_set(): return {} print("\n[INFO] Tentative de récupération du texte", flush=True) text = get_text(specification, version) if not text or STOP_EVENT.is_set(): return {} print(f"\n[INFO] Texte {specification}-{version} récupéré", flush=True) chapters = [] chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") # 3.5.2.1 Introduction for i, line in enumerate(text): if STOP_EVENT.is_set(): return {} if chapter_regex.fullmatch(line): chapters.append((i, line)) document = {} for i in range(len(chapters)): if STOP_EVENT.is_set(): return {} start_index, chapter_title = chapters[i] end_index = chapters[i+1][0] if i + 1 < len(chapters) else len(text) content_lines = text[start_index + 1:end_index] document[chapter_title.replace("\t", " ")] = "\n".join(content_lines) print(f"\n[INFO] Document fini", flush=True) return document def version_to_code(version_str): parts = version_str.split('.') if len(parts) != 3: return None try: x, y, z = [int(p) for p in parts] except ValueError: return None if x < 36 and y < 36 and z < 36: return f"{CHARS[x]}{CHARS[y]}{CHARS[z]}" else: return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}" def hasher(specification: str, version_code: str): return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest() def get_scope(content): for title, text in content.items(): if title.lower().endswith("scope"): return text return "" def process_specification(spec): global processed_count, indexed_specifications, documents_by_spec_num if STOP_EVENT.is_set(): return try: if not spec.get('vers'): return doc_id = str(spec['spec_num']) document = None version_code = version_to_code(str(spec['vers'])) if not version_code: return with DOCUMENT_LOCK: if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version_code) and not doc_id in specifications_passed: document = documents_by_spec_num[doc_id] specifications_passed.add(doc_id) print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['vers']})", flush=True) elif doc_id in specifications_passed: print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]") else: print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['vers']})", flush=True) document = get_spec_content(doc_id, version_code) if document: documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version_code)} specifications_passed.add(doc_id) print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['vers']})", flush=True) series = doc_id.split(".")[0] url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" string_key = f"{spec['spec_num']}+-+{spec['title']}+-+{spec['type']}+-+{spec['vers']}+-+{spec['WG']}" metadata = { "id": doc_id, "title": spec["title"], "type": spec["type"], "version": str(spec["vers"]), "working_group": spec["WG"], "url": url, "scope": "" if not document else get_scope(document["content"]) } with DICT_LOCK: indexed_specifications[string_key] = metadata processed_count += 1 sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...") sys.stdout.flush() except Exception as e: traceback.print_exception(e) print(f"\n[ERREUR] Échec du traitement de {spec.get('spec_num', 'inconnu')} v{spec.get('vers')}: {e}", flush=True) def sauvegarder(indexed_specifications, documents_by_spec_num): print("\nSauvegarde en cours...", flush=True) flat_metadata = [metadata for _, metadata in indexed_specifications.items()] flat_docs = [] for doc_id, data in documents_by_spec_num.items(): for title, content in data["content"].items(): flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) push_spec_content = Dataset.from_list(flat_docs) push_spec_metadata = Dataset.from_list(flat_metadata) push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"]) push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"]) print("Sauvegarde terminée.", flush=True) def main(): global total_count start_time = time.time() # Récupération des spécifications depuis le site 3GPP print("Récupération des spécifications depuis 3GPP...") response = requests.get( f'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, verify=False ) # Analyse des tableaux HTML dfs = pd.read_html( io.StringIO(response.text), storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, encoding="utf-8" ) for x in range(len(dfs)): dfs[x] = dfs[x].replace({np.nan: None}) # Extraction des colonnes nécessaires columns_needed = [0, 1, 2, 3, 4] extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] # Préparation des spécifications specifications = [] for df in extracted_dfs: for index, row in df.iterrows(): doc = row.to_list() doc_dict = dict(zip(columns, doc)) specifications.append(doc_dict) total_count = len(specifications) print(f"Traitement de {total_count} spécifications avec multithreading...") if os.path.exists("indexed_docs_content.zip"): +print(f"Chargement de {len(documents_by_spec_num)} documents depuis le cache.") try: with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: futures = [executor.submit(process_specification, spec) for spec in specifications] while True: if all(f.done() for f in futures): break if STOP_EVENT.is_set(): break time.sleep(0.35) except Exception as e: print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True) print("\nSauvegarde des résultats...", flush=True) sauvegarder(indexed_specifications, documents_by_spec_num) elapsed_time = time.time() - start_time print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True) print(f"Métadonnées sauvegardées dans 'indexed_specifications.json'.", flush=True) print(f"Contenu des documents sauvegardé dans 'indexed_docs_content.zip'.", flush=True) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True) STOP_EVENT.set() time.sleep(2) sauvegarder(indexed_specifications, documents_by_spec_num) print("Arrêt propre du script.", flush=True) sys.exit(0) except Exception as e: print(f"\nErreur inattendue : {e}", flush=True) sauvegarder(indexed_specifications, documents_by_spec_num) sys.exit(1)