DocIndexer / etsi_spec_indexer.py
om4r932's picture
Fix test
a49b92f
import os
import time
import warnings
from dotenv import load_dotenv
import numpy as np
import requests
import pandas as pd
warnings.filterwarnings("ignore")
os.environ["CURL_CA_BUNDLE"] = ""
load_dotenv()
from huggingface_hub import configure_http_backend
def backend_factory() -> requests.Session:
session = requests.Session()
session.verify = False
return session
configure_http_backend(backend_factory=backend_factory)
from datasets import load_dataset, Dataset
from datasets.data_files import EmptyDatasetError
import threading
import zipfile
import sys
import fitz
import re
import json
import traceback
import io
import concurrent.futures
import hashlib
CHARS = "0123456789abcdefghijklmnopqrstuvwxyz"
DICT_LOCK = threading.Lock()
DOCUMENT_LOCK = threading.Lock()
STOP_EVENT = threading.Event()
documents_by_spec_num = {}
try:
spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent", token=os.environ.get("HF_TOKEN"))
spec_contents = spec_contents["train"].to_list()
for section in spec_contents:
if section["doc_id"] not in documents_by_spec_num.keys():
documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
else:
documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"]
except EmptyDatasetError as e:
print("Base de données vide !")
indexed_specifications = {}
specifications_passed = set()
processed_count = 0
total_count = 0
session = requests.Session()
req = session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}))
print("Récupération des spécifications depuis ETSI...", req.status_code)
url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1"
url_tr = url_ts.replace("stdType=TS", "stdType=TR")
data_ts = requests.get(url_ts, verify=False).content
data_tr = requests.get(url_tr, verify=False).content
df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False)
df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False)
backup_ts = df_ts["ETSI deliverable"]
backup_tr = df_tr["ETSI deliverable"]
df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)")
df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)")
version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
df_ts["Version"] = version1[0]
df_tr["Version"] = version2[0]
def ver_tuple(v):
return tuple(map(int, v.split(".")))
df_ts["temp"] = df_ts["Version"].apply(ver_tuple)
df_tr["temp"] = df_tr["Version"].apply(ver_tuple)
df_ts["Type"] = "TS"
df_tr["Type"] = "TR"
df = pd.concat([df_ts, df_tr])
unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()]
unique_df = unique_df.drop(columns="temp")
unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))]
df = df.drop(columns="temp")
df = df[(~df["title"].str.contains("3GPP", case=True, na=False))]
def get_text(specification: str):
if STOP_EVENT.is_set():
return None, []
print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True)
response = session.get(
unique_df[unique_df["ETSI deliverable"] == specification].iloc[0]["PDF link"],
verify=False,
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
)
if response.status_code != 200:
print(f"\n[ERREUR] Echec du téléchargement du PDF pour {specification}. {req.status_code}", flush=True)
return None, []
pdf = fitz.open(stream=response.content, filetype="pdf")
return pdf, pdf.get_toc()
def get_spec_content(specification: str):
def extract_sections(text, titles):
sections = {}
# On trie les titres selon leur position dans le texte
sorted_titles = sorted(titles, key=lambda t: text.find(t))
for i, title in enumerate(sorted_titles):
start = text.find(title)
if i + 1 < len(sorted_titles):
end = text.find(sorted_titles[i + 1])
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip())
else:
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip())
return sections
if STOP_EVENT.is_set():
return {}
print("\n[INFO] Tentative de récupération du texte", flush=True)
pdf, doc_toc = get_text(specification)
text = []
first = 0
for level, title, page in doc_toc:
first = page - 1
break
for page in pdf[first:]:
text.append("\n".join([line.strip() for line in page.get_text().splitlines()]))
text = "\n".join(text)
if not text or STOP_EVENT.is_set() or not doc_toc:
print("\n[ERREUR] Pas de texte/table of contents trouvé !")
return {}
print(f"\n[INFO] Texte {specification} récupéré", flush=True)
titles = []
for level, title, page in doc_toc:
if STOP_EVENT.is_set():
return {}
if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text:
titles.append('\n'.join(title.strip().split(" ", 1)))
return extract_sections(text, titles)
def hasher(specification: str, version: str):
return hashlib.md5(f"{specification}{version}".encode()).hexdigest()
def get_scope(content):
for title, text in content.items():
if title.lower().endswith("scope"):
return text
return ""
def process_specification(spec):
global processed_count, indexed_specifications, documents_by_spec_num
if STOP_EVENT.is_set():
return
try:
version = spec.get('Version')
if not version: return
doc_id = str(spec.get("ETSI deliverable"))
document = None
with DOCUMENT_LOCK:
if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version) and not doc_id in specifications_passed:
document = documents_by_spec_num[doc_id]
specifications_passed.add(doc_id)
print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['Version']})", flush=True)
elif doc_id in specifications_passed:
document = documents_by_spec_num[doc_id]
print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]")
else:
print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['Version']})", flush=True)
document = get_spec_content(doc_id)
if document:
documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version)}
document = {"content": document, "hash": hasher(doc_id, version)}
specifications_passed.add(doc_id)
print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['Version']})", flush=True)
string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}"
metadata = {
"id": str(doc_id),
"title": spec["title"],
"type": spec["Type"],
"version": version,
"url": spec["PDF link"],
"scope": "" if not document else get_scope(document["content"])
}
with DICT_LOCK:
indexed_specifications[string_key] = metadata
processed_count += 1
sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...")
sys.stdout.flush()
except Exception as e:
traceback.print_exception(e)
print(f"\n[ERREUR] Échec du traitement de {doc_id} {version}: {e}", flush=True)
def sauvegarder(indexed_specifications, documents_by_spec_num):
print("\nSauvegarde en cours...", flush=True)
flat_metadata = [metadata for _, metadata in indexed_specifications.items()]
flat_docs = []
for doc_id, data in documents_by_spec_num.items():
for title, content in data["content"].items():
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
push_spec_content = Dataset.from_list(flat_docs)
push_spec_metadata = Dataset.from_list(flat_metadata)
push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF_TOKEN"])
push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF_TOKEN"])
print("Sauvegarde terminée.", flush=True)
def main():
global total_count
start_time = time.time()
specifications = df.to_dict(orient="records")
total_count = len(specifications)
print(f"Traitement de {total_count} spécifications avec multithreading...")
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
futures = [executor.submit(process_specification, spec) for spec in specifications]
while True:
if all(f.done() for f in futures):
break
if STOP_EVENT.is_set():
break
time.sleep(0.35)
except Exception as e:
print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True)
print("\nSauvegarde des résultats...", flush=True)
sauvegarder(indexed_specifications, documents_by_spec_num)
elapsed_time = time.time() - start_time
print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True)
STOP_EVENT.set()
time.sleep(2)
sauvegarder(indexed_specifications, documents_by_spec_num)
print("Arrêt propre du script.", flush=True)
sys.exit(0)
except Exception as e:
print(f"\nErreur inattendue : {e}", flush=True)
sauvegarder(indexed_specifications, documents_by_spec_num)
sys.exit(1)
# print(get_spec_content("188 005-1"))