Spaces:
Running
Running
import os | |
import time | |
import warnings | |
from dotenv import load_dotenv | |
import numpy as np | |
import requests | |
import pandas as pd | |
warnings.filterwarnings("ignore") | |
os.environ["CURL_CA_BUNDLE"] = "" | |
load_dotenv() | |
from huggingface_hub import configure_http_backend | |
def backend_factory() -> requests.Session: | |
session = requests.Session() | |
session.verify = False | |
return session | |
configure_http_backend(backend_factory=backend_factory) | |
from datasets import load_dataset, Dataset | |
from datasets.data_files import EmptyDatasetError | |
import threading | |
import zipfile | |
import sys | |
import fitz | |
import re | |
import json | |
import traceback | |
import io | |
import concurrent.futures | |
import hashlib | |
CHARS = "0123456789abcdefghijklmnopqrstuvwxyz" | |
DICT_LOCK = threading.Lock() | |
DOCUMENT_LOCK = threading.Lock() | |
STOP_EVENT = threading.Event() | |
documents_by_spec_num = {} | |
try: | |
spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent", token=os.environ.get("HF_TOKEN")) | |
spec_contents = spec_contents["train"].to_list() | |
for section in spec_contents: | |
if section["doc_id"] not in documents_by_spec_num.keys(): | |
documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} | |
else: | |
documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"] | |
except EmptyDatasetError as e: | |
print("Base de données vide !") | |
indexed_specifications = {} | |
specifications_passed = set() | |
processed_count = 0 | |
total_count = 0 | |
session = requests.Session() | |
req = session.post("https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36"}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")})) | |
print("Récupération des spécifications depuis ETSI...", req.status_code) | |
url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" | |
url_tr = url_ts.replace("stdType=TS", "stdType=TR") | |
data_ts = requests.get(url_ts, verify=False).content | |
data_tr = requests.get(url_tr, verify=False).content | |
df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) | |
df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) | |
backup_ts = df_ts["ETSI deliverable"] | |
backup_tr = df_tr["ETSI deliverable"] | |
df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") | |
df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") | |
version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") | |
version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") | |
df_ts["Version"] = version1[0] | |
df_tr["Version"] = version2[0] | |
def ver_tuple(v): | |
return tuple(map(int, v.split("."))) | |
df_ts["temp"] = df_ts["Version"].apply(ver_tuple) | |
df_tr["temp"] = df_tr["Version"].apply(ver_tuple) | |
df_ts["Type"] = "TS" | |
df_tr["Type"] = "TR" | |
df = pd.concat([df_ts, df_tr]) | |
unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] | |
unique_df = unique_df.drop(columns="temp") | |
unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] | |
df = df.drop(columns="temp") | |
df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] | |
def get_text(specification: str): | |
if STOP_EVENT.is_set(): | |
return None, [] | |
print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) | |
response = session.get( | |
unique_df[unique_df["ETSI deliverable"] == specification].iloc[0]["PDF link"], | |
verify=False, | |
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} | |
) | |
if response.status_code != 200: | |
print(f"\n[ERREUR] Echec du téléchargement du PDF pour {specification}. {req.status_code}", flush=True) | |
return None, [] | |
pdf = fitz.open(stream=response.content, filetype="pdf") | |
return pdf, pdf.get_toc() | |
def get_spec_content(specification: str): | |
def extract_sections(text, titles): | |
sections = {} | |
# On trie les titres selon leur position dans le texte | |
sorted_titles = sorted(titles, key=lambda t: text.find(t)) | |
for i, title in enumerate(sorted_titles): | |
start = text.find(title) | |
if i + 1 < len(sorted_titles): | |
end = text.find(sorted_titles[i + 1]) | |
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) | |
else: | |
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) | |
return sections | |
if STOP_EVENT.is_set(): | |
return {} | |
print("\n[INFO] Tentative de récupération du texte", flush=True) | |
pdf, doc_toc = get_text(specification) | |
text = [] | |
first = 0 | |
for level, title, page in doc_toc: | |
first = page - 1 | |
break | |
for page in pdf[first:]: | |
text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) | |
text = "\n".join(text) | |
if not text or STOP_EVENT.is_set() or not doc_toc: | |
print("\n[ERREUR] Pas de texte/table of contents trouvé !") | |
return {} | |
print(f"\n[INFO] Texte {specification} récupéré", flush=True) | |
titles = [] | |
for level, title, page in doc_toc: | |
if STOP_EVENT.is_set(): | |
return {} | |
if title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: | |
titles.append('\n'.join(title.strip().split(" ", 1))) | |
return extract_sections(text, titles) | |
def hasher(specification: str, version: str): | |
return hashlib.md5(f"{specification}{version}".encode()).hexdigest() | |
def get_scope(content): | |
for title, text in content.items(): | |
if title.lower().endswith("scope"): | |
return text | |
return "" | |
def process_specification(spec): | |
global processed_count, indexed_specifications, documents_by_spec_num | |
if STOP_EVENT.is_set(): | |
return | |
try: | |
version = spec.get('Version') | |
if not version: return | |
doc_id = str(spec.get("ETSI deliverable")) | |
document = None | |
with DOCUMENT_LOCK: | |
if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version) and not doc_id in specifications_passed: | |
document = documents_by_spec_num[doc_id] | |
specifications_passed.add(doc_id) | |
print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['Version']})", flush=True) | |
elif doc_id in specifications_passed: | |
document = documents_by_spec_num[doc_id] | |
print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]") | |
else: | |
print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['Version']})", flush=True) | |
document = get_spec_content(doc_id) | |
if document: | |
documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version)} | |
document = {"content": document, "hash": hasher(doc_id, version)} | |
specifications_passed.add(doc_id) | |
print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['Version']})", flush=True) | |
string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" | |
metadata = { | |
"id": str(doc_id), | |
"title": spec["title"], | |
"type": spec["Type"], | |
"version": version, | |
"url": spec["PDF link"], | |
"scope": "" if not document else get_scope(document["content"]) | |
} | |
with DICT_LOCK: | |
indexed_specifications[string_key] = metadata | |
processed_count += 1 | |
sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...") | |
sys.stdout.flush() | |
except Exception as e: | |
traceback.print_exception(e) | |
print(f"\n[ERREUR] Échec du traitement de {doc_id} {version}: {e}", flush=True) | |
def sauvegarder(indexed_specifications, documents_by_spec_num): | |
print("\nSauvegarde en cours...", flush=True) | |
flat_metadata = [metadata for _, metadata in indexed_specifications.items()] | |
flat_docs = [] | |
for doc_id, data in documents_by_spec_num.items(): | |
for title, content in data["content"].items(): | |
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) | |
push_spec_content = Dataset.from_list(flat_docs) | |
push_spec_metadata = Dataset.from_list(flat_metadata) | |
push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF_TOKEN"]) | |
push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF_TOKEN"]) | |
print("Sauvegarde terminée.", flush=True) | |
def main(): | |
global total_count | |
start_time = time.time() | |
specifications = df.to_dict(orient="records") | |
total_count = len(specifications) | |
print(f"Traitement de {total_count} spécifications avec multithreading...") | |
try: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: | |
futures = [executor.submit(process_specification, spec) for spec in specifications] | |
while True: | |
if all(f.done() for f in futures): | |
break | |
if STOP_EVENT.is_set(): | |
break | |
time.sleep(0.35) | |
except Exception as e: | |
print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True) | |
print("\nSauvegarde des résultats...", flush=True) | |
sauvegarder(indexed_specifications, documents_by_spec_num) | |
elapsed_time = time.time() - start_time | |
print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True) | |
STOP_EVENT.set() | |
time.sleep(2) | |
sauvegarder(indexed_specifications, documents_by_spec_num) | |
print("Arrêt propre du script.", flush=True) | |
sys.exit(0) | |
except Exception as e: | |
print(f"\nErreur inattendue : {e}", flush=True) | |
sauvegarder(indexed_specifications, documents_by_spec_num) | |
sys.exit(1) | |
# print(get_spec_content("188 005-1")) |