Spaces:
Running
Running
import os | |
import time | |
import warnings | |
import requests | |
from dotenv import load_dotenv | |
import numpy as np | |
import pandas as pd | |
from huggingface_hub import configure_http_backend | |
def backend_factory() -> requests.Session: | |
session = requests.Session() | |
session.verify = False | |
return session | |
configure_http_backend(backend_factory=backend_factory) | |
warnings.filterwarnings("ignore") | |
os.environ["CURL_CA_BUNDLE"] = "" | |
load_dotenv() | |
from datasets import load_dataset, Dataset | |
import threading | |
import zipfile | |
import sys | |
import subprocess | |
import re | |
import traceback | |
import io | |
import concurrent.futures | |
import hashlib | |
CHARS = "0123456789abcdefghijklmnopqrstuvwxyz" | |
DICT_LOCK = threading.Lock() | |
DOCUMENT_LOCK = threading.Lock() | |
STOP_EVENT = threading.Event() | |
spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent") | |
spec_contents = spec_contents["train"].to_list() | |
documents_by_spec_num = {} | |
for section in spec_contents: | |
if section["doc_id"] not in documents_by_spec_num.keys(): | |
documents_by_spec_num[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} | |
else: | |
documents_by_spec_num[section["doc_id"]]["content"][section["section"]] = section["content"] | |
indexed_specifications = {} | |
specifications_passed = set() | |
processed_count = 0 | |
total_count = 0 | |
def get_text(specification: str, version: str, second: bool = False): | |
"""Récupère les bytes du PDF à partir d'une spécification et d'une version.""" | |
if STOP_EVENT.is_set(): | |
return [] | |
doc_id = specification | |
series = doc_id.split(".")[0] | |
content = [] | |
print(f"\n[INFO] Tentative de récupération de la spécification {doc_id} version {version}", flush=True) | |
response = requests.get( | |
f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", | |
verify=False, | |
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} | |
) | |
if response.status_code != 200: | |
print(f"\n[ERREUR] Echec du téléchargement du ZIP pour {specification}-{version}. Tentative avec dernière version disponible", flush=True) | |
last_possible_version = requests.post('https://organizedprogrammers-3gppdocfinder.hf.space/find', verify=False, headers={"Content-Type": "application/json"}, json={"doc_id": specification}) | |
if last_possible_version.status_code != 200: | |
print(f"\n[ERREUR] Echec du 2e téléchargement du ZIP pour {specification}-{version}. {last_possible_version.status_code}", flush=True) | |
return [] | |
data = last_possible_version.json() | |
return get_text(specification, data['version'], True) | |
zip_bytes = io.BytesIO(response.content) | |
zip_file = zipfile.ZipFile(zip_bytes) | |
def extract_text(zipfile: zipfile.ZipFile, filename: str): | |
if (filename.endswith(".doc") or filename.endswith(".docx")) and ("cover" not in filename.lower() and "annex" not in filename.lower()): | |
doc_bytes = zipfile.read(filename) | |
input_path = f"/tmp/{filename}" | |
output_path = "/tmp" | |
changed_ext_filename = re.sub(r".docx?$", ".txt", filename) | |
output_file = f"/tmp/{changed_ext_filename}" | |
with open(input_path, "wb") as f: | |
f.write(doc_bytes) | |
try: | |
print(f"\n[INFO] Tentative de conversion DOC/DOCX -> TXT", flush=True) | |
try: | |
subprocess.run( | |
["libreoffice", "--headless", "--convert-to", "txt:Text", "--outdir", output_path, input_path], | |
check=True, | |
capture_output=True, | |
timeout=60*5 | |
) | |
except subprocess.TimeoutExpired as e: | |
print("[SKIP] Trop long !") | |
return [] | |
except subprocess.CalledProcessError as e: | |
print(f"\n[ERREUR] LibreOffice a échoué : {e}", flush=True) | |
return [] | |
if os.path.exists(output_file): | |
with open(output_file, "r", encoding="utf-8") as f: | |
return [line.strip() for line in f if line.strip()] | |
finally: | |
if os.path.exists(input_path): | |
os.remove(input_path) | |
if os.path.exists(output_file): | |
os.remove(output_file) | |
return [] | |
for fileinfo in zip_file.infolist(): | |
if STOP_EVENT.is_set(): | |
return [] | |
if fileinfo.filename.endswith(".zip") and len(zip_file.namelist()) == 1: | |
nested_zip_bytes = io.BytesIO(zip_file.read(fileinfo.filename)) | |
zip_file = zipfile.ZipFile(nested_zip_bytes) | |
break | |
for filename in zip_file.namelist(): | |
if STOP_EVENT.is_set(): | |
return [] | |
content.extend(extract_text(zip_file, filename)) | |
if content: | |
print("\n[INFO] Conversion terminé", flush=True) | |
else: | |
print(f"\n[ERREUR] Pas réussi", flush=True) | |
return content | |
def get_spec_content(specification: str, version:str): | |
if STOP_EVENT.is_set(): | |
return {} | |
print("\n[INFO] Tentative de récupération du texte", flush=True) | |
text = get_text(specification, version) | |
if not text or STOP_EVENT.is_set(): | |
return {} | |
print(f"\n[INFO] Texte {specification}-{version} récupéré", flush=True) | |
chapters = [] | |
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") # 3.5.2.1 Introduction | |
for i, line in enumerate(text): | |
if STOP_EVENT.is_set(): | |
return {} | |
if chapter_regex.fullmatch(line): | |
chapters.append((i, line)) | |
document = {} | |
for i in range(len(chapters)): | |
if STOP_EVENT.is_set(): | |
return {} | |
start_index, chapter_title = chapters[i] | |
end_index = chapters[i+1][0] if i + 1 < len(chapters) else len(text) | |
content_lines = text[start_index + 1:end_index] | |
document[chapter_title.replace("\t", " ")] = "\n".join(content_lines) | |
print(f"\n[INFO] Document fini", flush=True) | |
return document | |
def version_to_code(version_str): | |
parts = version_str.split('.') | |
if len(parts) != 3: return None | |
try: | |
x, y, z = [int(p) for p in parts] | |
except ValueError: | |
return None | |
if x < 36 and y < 36 and z < 36: | |
return f"{CHARS[x]}{CHARS[y]}{CHARS[z]}" | |
else: | |
return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}" | |
def hasher(specification: str, version_code: str): | |
return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest() | |
def get_scope(content): | |
for title, text in content.items(): | |
if title.lower().endswith("scope"): | |
return text | |
return "" | |
def process_specification(spec): | |
global processed_count, indexed_specifications, documents_by_spec_num | |
if STOP_EVENT.is_set(): | |
return | |
try: | |
if not spec.get('vers'): return | |
doc_id = str(spec['spec_num']) | |
document = None | |
version_code = version_to_code(str(spec['vers'])) | |
if not version_code: return | |
with DOCUMENT_LOCK: | |
if doc_id in documents_by_spec_num and documents_by_spec_num[doc_id]["hash"] == hasher(doc_id, version_code) and not doc_id in specifications_passed: | |
document = documents_by_spec_num[doc_id] | |
specifications_passed.add(doc_id) | |
print(f"\n[INFO] Document déjà présent pour {doc_id} (version {spec['vers']})", flush=True) | |
elif doc_id in specifications_passed: | |
print(f"\n[INFO] Document déjà présent pour {doc_id} [dernière version présent]") | |
else: | |
print(f"\n[INFO] Tentative de récupération du document {doc_id} (version {spec['vers']})", flush=True) | |
document = get_spec_content(doc_id, version_code) | |
if document: | |
documents_by_spec_num[doc_id] = {"content": document, "hash": hasher(doc_id, version_code)} | |
specifications_passed.add(doc_id) | |
print(f"\n[INFO] Document extrait pour {doc_id} (version {spec['vers']})", flush=True) | |
series = doc_id.split(".")[0] | |
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
string_key = f"{spec['spec_num']}+-+{spec['title']}+-+{spec['type']}+-+{spec['vers']}+-+{spec['WG']}" | |
metadata = { | |
"id": doc_id, | |
"title": spec["title"], | |
"type": spec["type"], | |
"version": str(spec["vers"]), | |
"working_group": spec["WG"], | |
"url": url, | |
"scope": "" if not document else get_scope(document["content"]) | |
} | |
with DICT_LOCK: | |
indexed_specifications[string_key] = metadata | |
processed_count += 1 | |
sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications...") | |
sys.stdout.flush() | |
except Exception as e: | |
traceback.print_exception(e) | |
print(f"\n[ERREUR] Échec du traitement de {spec.get('spec_num', 'inconnu')} v{spec.get('vers')}: {e}", flush=True) | |
def sauvegarder(indexed_specifications, documents_by_spec_num): | |
print("\nSauvegarde en cours...", flush=True) | |
flat_metadata = [metadata for _, metadata in indexed_specifications.items()] | |
flat_docs = [] | |
for doc_id, data in documents_by_spec_num.items(): | |
for title, content in data["content"].items(): | |
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) | |
push_spec_content = Dataset.from_list(flat_docs) | |
push_spec_metadata = Dataset.from_list(flat_metadata) | |
push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"]) | |
push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"]) | |
print("Sauvegarde terminée.", flush=True) | |
def main(): | |
global total_count | |
start_time = time.time() | |
# Récupération des spécifications depuis le site 3GPP | |
print("Récupération des spécifications depuis 3GPP...") | |
response = requests.get( | |
f'https://www.3gpp.org/dynareport?code=status-report.htm', | |
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, | |
verify=False | |
) | |
# Analyse des tableaux HTML | |
dfs = pd.read_html( | |
io.StringIO(response.text), | |
storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, | |
encoding="utf-8" | |
) | |
for x in range(len(dfs)): | |
dfs[x] = dfs[x].replace({np.nan: None}) | |
# Extraction des colonnes nécessaires | |
columns_needed = [0, 1, 2, 3, 4] | |
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] | |
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] | |
# Préparation des spécifications | |
specifications = [] | |
for df in extracted_dfs: | |
for index, row in df.iterrows(): | |
doc = row.to_list() | |
doc_dict = dict(zip(columns, doc)) | |
specifications.append(doc_dict) | |
total_count = len(specifications) | |
print(f"Traitement de {total_count} spécifications avec multithreading...") | |
if os.path.exists("indexed_docs_content.zip"): | |
+print(f"Chargement de {len(documents_by_spec_num)} documents depuis le cache.") | |
try: | |
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor: | |
futures = [executor.submit(process_specification, spec) for spec in specifications] | |
while True: | |
if all(f.done() for f in futures): | |
break | |
if STOP_EVENT.is_set(): | |
break | |
time.sleep(0.35) | |
except Exception as e: | |
print(f"\nErreur inattendue dans le ThreadPool : {e}", flush=True) | |
print("\nSauvegarde des résultats...", flush=True) | |
sauvegarder(indexed_specifications, documents_by_spec_num) | |
elapsed_time = time.time() - start_time | |
print(f"\nTraitement terminé en {elapsed_time:.2f} secondes.", flush=True) | |
print(f"Métadonnées sauvegardées dans 'indexed_specifications.json'.", flush=True) | |
print(f"Contenu des documents sauvegardé dans 'indexed_docs_content.zip'.", flush=True) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("\nInterruption détectée (Ctrl+C). Arrêt des tâches en cours...", flush=True) | |
STOP_EVENT.set() | |
time.sleep(2) | |
sauvegarder(indexed_specifications, documents_by_spec_num) | |
print("Arrêt propre du script.", flush=True) | |
sys.exit(0) | |
except Exception as e: | |
print(f"\nErreur inattendue : {e}", flush=True) | |
sauvegarder(indexed_specifications, documents_by_spec_num) | |
sys.exit(1) |