import os import time import warnings from dotenv import load_dotenv import numpy as np import requests import pandas as pd warnings.filterwarnings("ignore") os.environ["CURL_CA_BUNDLE"] = "" load_dotenv() from huggingface_hub import configure_http_backend def backend_factory() -> requests.Session: session = requests.Session() session.verify = False return session configure_http_backend(backend_factory=backend_factory) from datasets import load_dataset, Dataset from datasets.data_files import EmptyDatasetError import threading import zipfile import sys import fitz import re import json import traceback import io import concurrent.futures import hashlib CHARS = "0123456789abcdefghijklmnopqrstuvwxyz" DICT_LOCK = threading.Lock() DOCUMENT_LOCK = threading.Lock() STOP_EVENT = threading.Event() documents_by_spec_num = {} try: spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent", token=os.environ.get("HF_TOKEN")) spec_contents = spec_contents["train"].to_list() for section in spec_contents: if section["doc_id"] not in documents_by_spec_num.keys(): documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} else: documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"] except EmptyDatasetError as e: print("Base de données vide !") indexed_specifications = {} specifications_passed = set() processed_count = 0 total_count = 0 session = requests.Session() req = session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")})) print("Récupération des spécifications depuis ETSI...", req.status_code) url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" url_tr = url_ts.replace("stdType=TS", "stdType=TR") data_ts = requests.get(url_ts, verify=False).content data_tr = requests.get(url_tr, verify=False).content df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) backup_ts = df_ts["ETSI deliverable"] backup_tr = df_tr["ETSI deliverable"] df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") df_ts["Version"] = version1[0] df_tr["Version"] = version2[0] def ver_tuple(v): return tuple(map(int, v.split("."))) df_ts["temp"] = df_ts["Version"].apply(ver_tuple) df_tr["temp"] = df_tr["Version"].apply(ver_tuple) df_ts["Type"] = "TS" df_tr["Type"] = "TR" df = pd.concat([df_ts, df_tr]) unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] unique_df = unique_df.drop(columns="temp") unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] df = df.drop(columns="temp") df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] def get_text(specification: str): if STOP_EVENT.is_set(): return None, [] print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) response = session.get( unique_df[unique_df["ETSI deliverable"] == specification].iloc[0]["PDF link"], verify=False, headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} ) if response.status_code != 200: print(f"\n[ERREUR] Echec du téléchargement du PDF pour {specification}. {req.status_code}", flush=True) return None, [] pdf = fitz.open(stream=response.content, filetype="pdf") return pdf, pdf.get_toc() def get_spec_content(specification: str): def extract_sections(text, titles): sections = {} # On trie les titres selon leur position dans le texte sorted_titles = sorted(titles, key=lambda t: text.find(t)) for i, title in enumerate(sorted_titles): start = text.find(title) if i + 1 < len(sorted_titles): end = text.find(sorted_titles[i + 1]) sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) else: sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) return sections if STOP_EVENT.is_set(): return {} print("\n[INFO] Tentative de récupération du texte", flush=True) pdf, doc_toc = get_text(specification) text = [] first = 0 for level, title, page in doc_toc: first = page - 1 break for page in pdf[first:]: text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) text = "\n".join(text) if not text or STOP_EVENT.is_set() or not doc_toc: print("\n[ERREUR] Pas de texte/table of contents trouvé !") return {} print(f"\n[INFO] Texte {specification} récupéré", flush=True) titles = [] for level, title, page in doc_toc: if STOP_EVENT.is_set(): return {} if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: titles.append('\n'.join(title.strip().split(" ", 1))) return extract_sections(text, titles) def hasher(specification: str, version: str): return hashlib.md5(f"{specification}{version}".encode()).hexdigest() def get_scope(content): for title, text in content.items(): if title.lower().endswith("scope"): return text return "" def process_specification(spec): global processed_count, indexed_specifications, documents_by_spec_num if STOP_EVENT.is_set(): return try: version = spec.get('Version') if not version: return doc_id = str(spec.get("ETSI deliverable")) document = None with DOCUMENT_LOCK: if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version) and not doc_id in specifications_passed: document = documents_by_spec_num[doc_id] specifications_passed.add(doc_id) print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['Version']})", flush=True) elif doc_id in specifications_passed: document = documents_by_spec_num[doc_id] print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]") else: print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['Version']})", flush=True) document = get_spec_content(doc_id) if document: documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version)} document = {"content": document, "hash": hasher(doc_id, version)} specifications_passed.add(doc_id) print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['Version']})", flush=True) string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" metadata = { "id": str(doc_id), "title": spec["title"], "type": spec["Type"], "version": version, "url": spec["PDF link"], "scope": "" if not document else get_scope(document["content"]) } with DICT_LOCK: indexed_specifications[string_key] = metadata processed_count += 1 sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...") sys.stdout.flush() except Exception as e: traceback.print_exception(e) print(f"\n[ERREUR] Échec du traitement de {doc_id} {version}: {e}", flush=True) def sauvegarder(indexed_specifications, documents_by_spec_num): print("\nSauvegarde en cours...", flush=True) flat_metadata = [metadata for _, metadata in indexed_specifications.items()] flat_docs = [] for doc_id, data in documents_by_spec_num.items(): for title, content in data["content"].items(): flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) push_spec_content = Dataset.from_list(flat_docs) push_spec_metadata = Dataset.from_list(flat_metadata) push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF_TOKEN"]) push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF_TOKEN"]) print("Sauvegarde terminée.", flush=True) def main(): global total_count start_time = time.time() specifications = df.to_dict(orient="records") total_count = len(specifications) print(f"Traitement de {total_count} spécifications avec multithreading...") try: with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: futures = [executor.submit(process_specification, spec) for spec in specifications] while True: if all(f.done() for f in futures): break if STOP_EVENT.is_set(): break time.sleep(0.35) except Exception as e: print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True) print("\nSauvegarde des résultats...", flush=True) sauvegarder(indexed_specifications, documents_by_spec_num) elapsed_time = time.time() - start_time print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True) STOP_EVENT.set() time.sleep(2) sauvegarder(indexed_specifications, documents_by_spec_num) print("Arrêt propre du script.", flush=True) sys.exit(0) except Exception as e: print(f"\nErreur inattendue : {e}", flush=True) sauvegarder(indexed_specifications, documents_by_spec_num) sys.exit(1) # print(get_spec_content("188 005-1"))