DocIndexer / 3gpp_spec_indexer.py
om4r932's picture
Fix test
a49b92f
import os
import time
import warnings
import requests
from dotenv import load_dotenv
import numpy as np
import pandas as pd
from huggingface_hub import configure_http_backend
def backend_factory() -> requests.Session:
session = requests.Session()
session.verify = False
return session
configure_http_backend(backend_factory=backend_factory)
warnings.filterwarnings("ignore")
os.environ["CURL_CA_BUNDLE"] = ""
load_dotenv()
from datasets import load_dataset, Dataset
import threading
import zipfile
import sys
import subprocess
import re
import traceback
import io
import concurrent.futures
import hashlib
CHARS = "0123456789abcdefghijklmnopqrstuvwxyz"
DICT_LOCK = threading.Lock()
DOCUMENT_LOCK = threading.Lock()
STOP_EVENT = threading.Event()
spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")
spec_contents = spec_contents["train"].to_list()
documents_by_spec_num = {}
for section in spec_contents:
if section["doc_id"] not in documents_by_spec_num.keys():
documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
else:
documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"]
indexed_specifications = {}
specifications_passed = set()
processed_count = 0
total_count = 0
def get_text(specification: str, version: str, second: bool = False):
"""Récupère les bytes du PDF à partir d'une spécification et d'une version."""
if STOP_EVENT.is_set():
return []
doc_id = specification
series = doc_id.split(".")[0]
content = []
print(f"\n[INFO] Tentative de récupération de la spécification {doc_id} version {version}", flush=True)
response = requests.get(
f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip",
verify=False,
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
)
if response.status_code != 200:
print(f"\n[ERREUR] Echec du téléchargement du ZIP pour {specification}-{version}. Tentative avec dernière version disponible", flush=True)
last_possible_version = requests.post('https://organizedprogrammers-3gppdocfinder.hf.space/find', verify=False, headers={"Content-Type": "application/json"}, json={"doc_id": specification})
if last_possible_version.status_code != 200:
print(f"\n[ERREUR] Echec du 2e téléchargement du ZIP pour {specification}-{version}. {last_possible_version.status_code}", flush=True)
return []
data = last_possible_version.json()
return get_text(specification, data['version'], True)
zip_bytes = io.BytesIO(response.content)
zip_file = zipfile.ZipFile(zip_bytes)
def extract_text(zipfile: zipfile.ZipFile, filename: str):
if (filename.endswith(".doc") or filename.endswith(".docx")) and ("cover" not in filename.lower() and "annex" not in filename.lower()):
doc_bytes = zipfile.read(filename)
input_path = f"/tmp/{filename}"
output_path = "/tmp"
changed_ext_filename = re.sub(r".docx?$", ".txt", filename)
output_file = f"/tmp/{changed_ext_filename}"
with open(input_path, "wb") as f:
f.write(doc_bytes)
try:
print(f"\n[INFO] Tentative de conversion DOC/DOCX -> TXT", flush=True)
try:
subprocess.run(
["libreoffice", "--headless", "--convert-to", "txt:Text", "--outdir", output_path, input_path],
check=True,
capture_output=True,
timeout=60*5
)
except subprocess.TimeoutExpired as e:
print("[SKIP] Trop long !")
return []
except subprocess.CalledProcessError as e:
print(f"\n[ERREUR] LibreOffice a échoué : {e}", flush=True)
return []
if os.path.exists(output_file):
with open(output_file, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip()]
finally:
if os.path.exists(input_path):
os.remove(input_path)
if os.path.exists(output_file):
os.remove(output_file)
return []
for fileinfo in zip_file.infolist():
if STOP_EVENT.is_set():
return []
if fileinfo.filename.endswith(".zip") and len(zip_file.namelist()) == 1:
nested_zip_bytes = io.BytesIO(zip_file.read(fileinfo.filename))
zip_file = zipfile.ZipFile(nested_zip_bytes)
break
for filename in zip_file.namelist():
if STOP_EVENT.is_set():
return []
content.extend(extract_text(zip_file, filename))
if content:
print("\n[INFO] Conversion terminé", flush=True)
else:
print(f"\n[ERREUR] Pas réussi", flush=True)
return content
def get_spec_content(specification: str, version:str):
if STOP_EVENT.is_set():
return {}
print("\n[INFO] Tentative de récupération du texte", flush=True)
text = get_text(specification, version)
if not text or STOP_EVENT.is_set():
return {}
print(f"\n[INFO] Texte {specification}-{version} récupéré", flush=True)
chapters = []
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") # 3.5.2.1 Introduction
for i, line in enumerate(text):
if STOP_EVENT.is_set():
return {}
if chapter_regex.fullmatch(line):
chapters.append((i, line))
document = {}
for i in range(len(chapters)):
if STOP_EVENT.is_set():
return {}
start_index, chapter_title = chapters[i]
end_index = chapters[i+1][0] if i + 1 < len(chapters) else len(text)
content_lines = text[start_index + 1:end_index]
document[chapter_title.replace("\t", " ")] = "\n".join(content_lines)
print(f"\n[INFO] Document fini", flush=True)
return document
def version_to_code(version_str):
parts = version_str.split('.')
if len(parts) != 3: return None
try:
x, y, z = [int(p) for p in parts]
except ValueError:
return None
if x < 36 and y < 36 and z < 36:
return f"{CHARS[x]}{CHARS[y]}{CHARS[z]}"
else:
return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}"
def hasher(specification: str, version_code: str):
return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest()
def get_scope(content):
for title, text in content.items():
if title.lower().endswith("scope"):
return text
return ""
def process_specification(spec):
global processed_count, indexed_specifications, documents_by_spec_num
if STOP_EVENT.is_set():
return
try:
if not spec.get('vers'): return
doc_id = str(spec['spec_num'])
document = None
version_code = version_to_code(str(spec['vers']))
if not version_code: return
with DOCUMENT_LOCK:
if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version_code) and not doc_id in specifications_passed:
document = documents_by_spec_num[doc_id]
specifications_passed.add(doc_id)
print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['vers']})", flush=True)
elif doc_id in specifications_passed:
print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]")
else:
print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['vers']})", flush=True)
document = get_spec_content(doc_id, version_code)
if document:
documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version_code)}
specifications_passed.add(doc_id)
print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['vers']})", flush=True)
series = doc_id.split(".")[0]
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
string_key = f"{spec['spec_num']}+-+{spec['title']}+-+{spec['type']}+-+{spec['vers']}+-+{spec['WG']}"
metadata = {
"id": doc_id,
"title": spec["title"],
"type": spec["type"],
"version": str(spec["vers"]),
"working_group": spec["WG"],
"url": url,
"scope": "" if not document else get_scope(document["content"])
}
with DICT_LOCK:
indexed_specifications[string_key] = metadata
processed_count += 1
sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...")
sys.stdout.flush()
except Exception as e:
traceback.print_exception(e)
print(f"\n[ERREUR] Échec du traitement de {spec.get('spec_num', 'inconnu')} v{spec.get('vers')}: {e}", flush=True)
def sauvegarder(indexed_specifications, documents_by_spec_num):
print("\nSauvegarde en cours...", flush=True)
flat_metadata = [metadata for _, metadata in indexed_specifications.items()]
flat_docs = []
for doc_id, data in documents_by_spec_num.items():
for title, content in data["content"].items():
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
push_spec_content = Dataset.from_list(flat_docs)
push_spec_metadata = Dataset.from_list(flat_metadata)
push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"])
push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"])
print("Sauvegarde terminée.", flush=True)
def main():
global total_count
start_time = time.time()
# Récupération des spécifications depuis le site 3GPP
print("Récupération des spécifications depuis 3GPP...")
response = requests.get(
f'https://www.3gpp.org/dynareport?code=status-report.htm',
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'},
verify=False
)
# Analyse des tableaux HTML
dfs = pd.read_html(
io.StringIO(response.text),
storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'},
encoding="utf-8"
)
for x in range(len(dfs)):
dfs[x] = dfs[x].replace({np.nan: None})
# Extraction des colonnes nécessaires
columns_needed = [0, 1, 2, 3, 4]
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs]
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns]
# Préparation des spécifications
specifications = []
for df in extracted_dfs:
for index, row in df.iterrows():
doc = row.to_list()
doc_dict = dict(zip(columns, doc))
specifications.append(doc_dict)
total_count = len(specifications)
print(f"Traitement de {total_count} spécifications avec multithreading...")
if os.path.exists("indexed_docs_content.zip"):
+print(f"Chargement de {len(documents_by_spec_num)} documents depuis le cache.")
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
futures = [executor.submit(process_specification, spec) for spec in specifications]
while True:
if all(f.done() for f in futures):
break
if STOP_EVENT.is_set():
break
time.sleep(0.35)
except Exception as e:
print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True)
print("\nSauvegarde des résultats...", flush=True)
sauvegarder(indexed_specifications, documents_by_spec_num)
elapsed_time = time.time() - start_time
print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True)
print(f"Métadonnées sauvegardées dans 'indexed_specifications.json'.", flush=True)
print(f"Contenu des documents sauvegardé dans 'indexed_docs_content.zip'.", flush=True)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True)
STOP_EVENT.set()
time.sleep(2)
sauvegarder(indexed_specifications, documents_by_spec_num)
print("Arrêt propre du script.", flush=True)
sys.exit(0)
except Exception as e:
print(f"\nErreur inattendue : {e}", flush=True)
sauvegarder(indexed_specifications, documents_by_spec_num)
sys.exit(1)