Spaces:
Sleeping
Sleeping
import datetime | |
import time | |
import sys | |
import json | |
import traceback | |
import requests | |
import zipfile | |
import uuid | |
import os | |
import io | |
import re | |
import subprocess | |
import concurrent.futures | |
import threading | |
from io import StringIO, BytesIO | |
from typing import List, Dict, Any | |
import pandas as pd | |
import numpy as np | |
import warnings | |
warnings.filterwarnings("ignore") | |
# Caractères pour le formatage des versions | |
chars = "0123456789abcdefghijklmnopqrstuvwxyz" | |
# Verrous pour les opérations thread-safe | |
print_lock = threading.Lock() | |
dict_lock = threading.Lock() | |
scope_lock = threading.Lock() | |
# Dictionnaires globaux | |
indexed_specifications = {} | |
documents_by_spec_num = {} | |
processed_count = 0 | |
total_count = 0 | |
regex = r"^(\d+[a-z]?(?:\.\d+)*)\t[\ \S]+$" | |
def get_text(specification: str, version: str): | |
"""Récupère les bytes du PDF à partir d'une spécification et d'une version.""" | |
doc_id = specification | |
series = doc_id.split(".")[0] | |
response = requests.get( | |
f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", | |
verify=False, | |
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} | |
) | |
if response.status_code != 200: | |
raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") | |
zip_bytes = io.BytesIO(response.content) | |
with zipfile.ZipFile(zip_bytes) as zf: | |
for file_name in zf.namelist(): | |
if file_name.endswith("zip"): | |
print("Another ZIP !") | |
zip_bytes = io.BytesIO(zf.read(file_name)) | |
zf = zipfile.ZipFile(zip_bytes) | |
for file_name2 in zf.namelist(): | |
if file_name2.endswith("doc") or file_name2.endswith("docx"): | |
if "cover" in file_name2.lower(): | |
print("COVER !") | |
continue | |
ext = file_name2.split(".")[-1] | |
doc_bytes = zf.read(file_name2) | |
temp_id = str(uuid.uuid4()) | |
input_path = f"/tmp/{temp_id}.{ext}" | |
output_path = f"/tmp/{temp_id}.txt" | |
with open(input_path, "wb") as f: | |
f.write(doc_bytes) | |
subprocess.run([ | |
"libreoffice", | |
"--headless", | |
"--convert-to", "txt", | |
"--outdir", "/tmp", | |
input_path | |
], check=True) | |
with open(output_path, "r") as f: | |
txt_data = [line.strip() for line in f if line.strip()] | |
os.remove(input_path) | |
os.remove(output_path) | |
return txt_data | |
elif file_name.endswith("doc") or file_name.endswith("docx"): | |
if "cover" in file_name.lower(): | |
print("COVER !") | |
continue | |
ext = file_name.split(".")[-1] | |
doc_bytes = zf.read(file_name) | |
temp_id = str(uuid.uuid4()) | |
input_path = f"/tmp/{temp_id}.{ext}" | |
output_path = f"/tmp/{temp_id}.txt" | |
print("Ecriture") | |
with open(input_path, "wb") as f: | |
f.write(doc_bytes) | |
print("Convertissement") | |
subprocess.run([ | |
"libreoffice", | |
"--headless", | |
"--convert-to", "txt", | |
"--outdir", "/tmp", | |
input_path | |
], check=True) | |
print("Ecriture TXT") | |
with open(output_path, "r", encoding="utf-8") as f: | |
txt_data = [line.strip() for line in f if line.strip()] | |
os.remove(input_path) | |
os.remove(output_path) | |
return txt_data | |
raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") | |
def get_spec_content(specification: str, version: str): | |
text = get_text(specification, version) | |
forewords = [] | |
for x in range(len(text)): | |
line = text[x] | |
if "Foreword" in line: | |
forewords.append(x) | |
if len(forewords) >= 2: | |
break | |
toc_brut = text[forewords[1]:] | |
chapters = [] | |
for line in toc_brut: | |
x = line.split("\t") | |
m = re.search(regex, line) | |
if m and any(line in c for c in text[forewords[0]:forewords[1]]): | |
chapters.append(line) | |
print(line) | |
real_toc_indexes = {} | |
for chapter in chapters: | |
x = text.index(chapter) | |
real_toc_indexes[chapter] = x | |
document = {} | |
toc = list(real_toc_indexes.keys()) | |
index_toc = list(real_toc_indexes.values()) | |
curr_index = 0 | |
for x in range(1, len(toc)): | |
document[toc[curr_index].replace("\t", " ")] = re.sub(r"[\ \t]+", " ", "\n".join(text[index_toc[curr_index]+1:index_toc[x]])) | |
curr_index = x | |
document[toc[curr_index].replace("\t", " ")] = re.sub(r"\s+", " ", " ".join(text[index_toc[curr_index]+1:])) | |
print(len(toc)-1, toc[curr_index], curr_index) | |
return document | |
def process_specification(spec: Dict[str, Any], columns: List[str]) -> None: | |
"""Traite une spécification individuelle avec multithreading.""" | |
global processed_count, indexed_specifications, documents_by_spec_num | |
try: | |
if spec.get('vers', None) is None: | |
return | |
doc_id = str(spec["spec_num"]) | |
series = doc_id.split(".")[0] | |
a, b, c = str(spec["vers"]).split(".") | |
# Formatage de l'URL selon la version | |
if not (int(a) > 35 or int(b) > 35 or int(c) > 35): | |
version_code = f"{chars[int(a)]}{chars[int(b)]}{chars[int(c)]}" | |
spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
else: | |
x, y, z = str(a), str(b), str(c) | |
while len(x) < 2: | |
x = "0" + x | |
while len(y) < 2: | |
y = "0" + y | |
while len(z) < 2: | |
z = "0" + z | |
version_code = f"{x}{y}{z}" | |
spec_url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" | |
string = f"{spec['spec_num']}+-+{spec['title']}+-+{spec['type']}+-+{spec['vers']}+-+{spec['WG']}+-+Rel-{spec['vers'].split('.')[0]}" | |
metadata = { | |
"id": str(spec["spec_num"]), | |
"title": spec["title"], | |
"type": spec["type"], | |
"release": str(spec["vers"].split(".")[0]), | |
"version": str(spec["vers"]), | |
"working_group": spec["WG"], | |
"url": spec_url | |
} | |
# Mise à jour du dictionnaire global avec verrou | |
with dict_lock: | |
indexed_specifications[string] = metadata | |
processed_count += 1 | |
# Affichage de la progression avec verrou | |
with print_lock: | |
sys.stdout.write(f"\rTraitement: {processed_count}/{total_count} spécifications") | |
sys.stdout.flush() | |
except Exception as e: | |
with print_lock: | |
print(f"\nErreur lors du traitement de {spec.get('spec_num', 'inconnu')}: {str(e)}") | |
def main(): | |
global total_count | |
start_time = time.time() | |
# Récupération des spécifications depuis le site 3GPP | |
print("Récupération des spécifications depuis 3GPP...") | |
response = requests.get( | |
f'https://www.3gpp.org/dynareport?code=status-report.htm', | |
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, | |
verify=False | |
) | |
# Analyse des tableaux HTML | |
dfs = pd.read_html( | |
StringIO(response.text), | |
storage_options={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}, | |
encoding="utf-8" | |
) | |
for x in range(len(dfs)): | |
dfs[x] = dfs[x].replace({np.nan: None}) | |
# Extraction des colonnes nécessaires | |
columns_needed = [0, 1, 2, 3, 4] | |
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] | |
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] | |
# Préparation des spécifications | |
specifications = [] | |
for df in extracted_dfs: | |
for index, row in df.iterrows(): | |
doc = row.to_list() | |
doc_dict = dict(zip(columns, doc)) | |
specifications.append(doc_dict) | |
total_count = len(specifications) | |
print(f"Traitement de {total_count} spécifications avec multithreading...") | |
try: | |
# Vérification si un fichier de documents existe déjà | |
if os.path.exists("indexed_docs_content.zip"): | |
with zipfile.ZipFile(open("indexed_docs_content.zip", "rb")) as zf: | |
for file_name in zf.namelist(): | |
if file_name.endswith(".json"): | |
doc_bytes = zf.read(file_name) | |
global documents_by_spec_num | |
documents_by_spec_num = json.loads(doc_bytes.decode("utf-8")) | |
print(f"Chargement de {len(documents_by_spec_num)} documents depuis le cache.") | |
# Utilisation de ThreadPoolExecutor pour le multithreading | |
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor: | |
futures = [executor.submit(process_specification, spec, columns) for spec in specifications] | |
concurrent.futures.wait(futures) | |
finally: | |
json_str = json.dumps(documents_by_spec_num, indent=4, ensure_ascii=False) | |
json_bytes = json_str.encode("utf-8") | |
with zipfile.ZipFile("indexed_docs_content.zip", "w", compression=zipfile.ZIP_DEFLATED) as archive: | |
archive.writestr("indexed_documents.json", json_bytes) | |
elapsed_time = time.time() - start_time | |
print(f"\nTraitement terminé en {elapsed_time:.2f} secondes") | |
print(f"Résultats sauvegardés dans l'archive ZIP") | |
if __name__ == "__main__": | |
main() | |