Segizu's picture
funcionando con DEeepface
d9e1976
import os
import numpy as np
from PIL import Image
import gradio as gr
from deepface import DeepFace
from datasets import load_dataset
import pickle
from io import BytesIO
from huggingface_hub import upload_file, hf_hub_download, list_repo_files
from pathlib import Path
import gc
import requests
import time
import shutil
import tarfile
import tensorflow as tf
# Configuración de GPU
print("Dispositivos GPU disponibles:", tf.config.list_physical_devices('GPU'))
# Configurar memoria GPU
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
# Permitir crecimiento de memoria
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
print("✅ GPU configurada correctamente")
# Configurar para usar solo GPU
tf.config.set_visible_devices(gpus[0], 'GPU')
print(f"✅ Usando GPU: {gpus[0]}")
except RuntimeError as e:
print(f"⚠️ Error configurando GPU: {e}")
else:
print("⚠️ No se detectó GPU, usando CPU")
# Configurar para usar mixed precision
tf.keras.mixed_precision.set_global_policy('mixed_float16')
# 🔁 Limpiar almacenamiento temporal si existe
def clean_temp_dirs():
print("🧹 Limpiando carpetas temporales...")
for folder in ["embeddings", "batches"]:
path = Path(folder)
if path.exists() and path.is_dir():
shutil.rmtree(path)
print(f"✅ Carpeta eliminada: {folder}")
path.mkdir(exist_ok=True)
clean_temp_dirs()
# 📁 Parámetros
DATASET_ID = "Segizu/facial-recognition-preview"
EMBEDDINGS_SUBFOLDER = "embeddings"
LOCAL_EMB_DIR = Path("embeddings")
LOCAL_EMB_DIR.mkdir(exist_ok=True)
HF_TOKEN = os.getenv("HF_TOKEN")
headers = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# 💾 Configuración
MAX_TEMP_STORAGE_GB = 40
UPLOAD_EVERY = 50
def get_folder_size(path):
total = 0
for dirpath, _, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
total += os.path.getsize(fp)
return total / (1024 ** 3)
def preprocess_image(img: Image.Image) -> np.ndarray:
# Convertir a RGB si no lo es
if img.mode != 'RGB':
img = img.convert('RGB')
# Obtener la orientación EXIF si existe
try:
exif = img._getexif()
if exif is not None:
orientation = exif.get(274) # 274 es el tag de orientación en EXIF
if orientation is not None:
# Rotar la imagen según la orientación EXIF
if orientation == 3:
img = img.rotate(180, expand=True)
elif orientation == 6:
img = img.rotate(270, expand=True)
elif orientation == 8:
img = img.rotate(90, expand=True)
except:
pass # Si no hay EXIF o hay error, continuamos con la imagen original
# Intentar detectar la orientación del rostro
try:
# Convertir a array numpy para DeepFace
img_array = np.array(img)
# Detectar rostros con GPU
face_objs = DeepFace.extract_faces(
img_path=img_array,
target_size=(160, 160),
detector_backend='retinaface',
enforce_detection=False
)
if face_objs and len(face_objs) > 0:
# Si se detecta un rostro, usar la imagen detectada
img_array = face_objs[0]['face']
return img_array
except:
pass # Si falla la detección, continuamos con el procesamiento normal
# Si no se detectó rostro o falló la detección, redimensionar la imagen original
img_resized = img.resize((160, 160), Image.Resampling.LANCZOS)
return np.array(img_resized)
# ✅ Cargar CSV desde el dataset
dataset = load_dataset(
"csv",
data_files="metadata.csv",
split="train",
column_names=["image"],
header=0
)
def build_database():
print(f"📊 Uso actual de almacenamiento temporal INICIO: {get_folder_size('.'):.2f} GB")
print("🔄 Generando embeddings...")
batch_size = 10
archive_batch_size = 50
batch_files = []
batch_index = 0
ARCHIVE_DIR = Path("batches")
ARCHIVE_DIR.mkdir(exist_ok=True)
for i in range(0, len(dataset), batch_size):
batch = dataset[i:i + batch_size]
print(f"📦 Lote {i // batch_size + 1}/{(len(dataset) + batch_size - 1) // batch_size}")
for j in range(len(batch["image"])):
image_url = batch["image"][j]
if not isinstance(image_url, str) or not image_url.startswith("http") or image_url.strip().lower() == "image":
print(f"⚠️ Saltando {i + j} - URL inválida: {image_url}")
continue
name = f"image_{i + j}"
filename = LOCAL_EMB_DIR / f"{name}.pkl"
# Verificar si ya fue subido
try:
hf_hub_download(
repo_id=DATASET_ID,
repo_type="dataset",
filename=f"{EMBEDDINGS_SUBFOLDER}/batch_{batch_index:03}.tar.gz",
token=HF_TOKEN
)
print(f"⏩ Ya existe en remoto: {name}.pkl")
continue
except:
pass
try:
response = requests.get(image_url, headers=headers, timeout=10)
response.raise_for_status()
img = Image.open(BytesIO(response.content)).convert("RGB")
img_processed = preprocess_image(img)
embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False
)[0]["embedding"]
with open(filename, "wb") as f:
pickle.dump({"name": name, "img": img, "embedding": embedding}, f)
batch_files.append(filename)
del img_processed
gc.collect()
if len(batch_files) >= archive_batch_size or get_folder_size(".") > MAX_TEMP_STORAGE_GB:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"📦 Empaquetado: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
print(f"✅ Subido: {archive_path.name}")
for f in batch_files:
f.unlink()
archive_path.unlink()
print("🧹 Limpieza completada tras subida")
batch_files = []
batch_index += 1
time.sleep(2)
print(f"📊 Uso actual FINAL: {get_folder_size('.'):.2f} GB")
except Exception as e:
print(f"❌ Error en {name}: {e}")
continue
if batch_files:
archive_path = ARCHIVE_DIR / f"batch_{batch_index:03}.tar.gz"
with tarfile.open(archive_path, "w:gz") as tar:
for file in batch_files:
tar.add(file, arcname=file.name)
print(f"📦 Empaquetado final: {archive_path}")
upload_file(
path_or_fileobj=str(archive_path),
path_in_repo=f"{EMBEDDINGS_SUBFOLDER}/{archive_path.name}",
repo_id=DATASET_ID,
repo_type="dataset",
token=HF_TOKEN
)
for f in batch_files:
f.unlink()
archive_path.unlink()
print("✅ Subida y limpieza final")
# 🔍 Buscar similitudes
def find_similar_faces(uploaded_image: Image.Image):
if uploaded_image is None:
return [], "⚠ Por favor, sube una imagen primero"
try:
print("🔄 Procesando imagen de entrada...")
# Convertir a RGB si no lo es
if uploaded_image.mode != 'RGB':
uploaded_image = uploaded_image.convert('RGB')
# Mostrar dimensiones de la imagen
print(f"📐 Dimensiones de la imagen: {uploaded_image.size}")
img_processed = preprocess_image(uploaded_image)
print("✅ Imagen preprocesada correctamente")
# Intentar primero con enforce_detection=True
try:
query_embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=True,
detector_backend='retinaface'
)[0]["embedding"]
print("✅ Rostro detectado con enforce_detection=True")
except Exception as e:
print(f"⚠ No se pudo detectar rostro con enforce_detection=True, intentando con False: {str(e)}")
# Si falla, intentar con enforce_detection=False
query_embedding = DeepFace.represent(
img_path=img_processed,
model_name="Facenet",
enforce_detection=False,
detector_backend='retinaface'
)[0]["embedding"]
print("✅ Embedding generado con enforce_detection=False")
del img_processed
gc.collect()
except Exception as e:
print(f"❌ Error en procesamiento de imagen: {str(e)}")
return [], f"⚠ Error procesando imagen: {str(e)}"
similarities = []
print("🔍 Buscando similitudes en la base de datos...")
try:
embedding_files = [
f for f in list_repo_files(DATASET_ID, repo_type="dataset", token=HF_TOKEN)
if f.startswith(f"{EMBEDDINGS_SUBFOLDER}/") and f.endswith(".tar.gz")
]
print(f"📁 Encontrados {len(embedding_files)} archivos de embeddings")
except Exception as e:
print(f"❌ Error obteniendo archivos: {str(e)}")
return [], f"⚠ Error obteniendo archivos: {str(e)}"
# Procesar en lotes para mejor rendimiento
batch_size = 10
for i in range(0, len(embedding_files), batch_size):
batch_files = embedding_files[i:i + batch_size]
print(f"📦 Procesando lote {i//batch_size + 1}/{(len(embedding_files) + batch_size - 1)//batch_size}")
for file_path in batch_files:
try:
file_bytes = requests.get(
f"https://huggingface.co/datasets/{DATASET_ID}/resolve/main/{file_path}",
headers=headers,
timeout=30
).content
# Crear un archivo temporal para el tar.gz
temp_archive = Path("temp_archive.tar.gz")
with open(temp_archive, "wb") as f:
f.write(file_bytes)
# Extraer el contenido
with tarfile.open(temp_archive, "r:gz") as tar:
tar.extractall(path="temp_extract")
# Procesar cada archivo .pkl en el tar
for pkl_file in Path("temp_extract").glob("*.pkl"):
with open(pkl_file, "rb") as f:
record = pickle.load(f)
name = record["name"]
img = record["img"]
emb = record["embedding"]
dist = np.linalg.norm(np.array(query_embedding) - np.array(emb))
sim_score = 1 / (1 + dist)
similarities.append((sim_score, name, np.array(img)))
# Limpiar archivos temporales
shutil.rmtree("temp_extract")
temp_archive.unlink()
except Exception as e:
print(f"⚠ Error procesando {file_path}: {e}")
continue
if not similarities:
return [], "⚠ No se encontraron similitudes en la base de datos"
print(f"✅ Encontradas {len(similarities)} similitudes")
similarities.sort(reverse=True)
top = similarities[:5]
gallery = [(img, f"{name} - Similitud: {sim:.2f}") for sim, name, img in top]
summary = "\n".join([f"{name} - Similitud: {sim:.2f}" for sim, name, _ in top])
return gallery, summary
# 🎛️ Interfaz Gradio
with gr.Blocks() as demo:
gr.Markdown("## 🔍 Reconocimiento facial con DeepFace + ZeroGPU")
with gr.Row():
image_input = gr.Image(label="📤 Sube una imagen", type="pil")
find_btn = gr.Button("🔎 Buscar similares")
gallery = gr.Gallery(label="📸 Rostros similares")
summary = gr.Textbox(label="🧠 Detalle", lines=6)
find_btn.click(fn=find_similar_faces, inputs=image_input, outputs=[gallery, summary])
with gr.Row():
build_btn = gr.Button("⚙️ Construir base de embeddings (usa GPU)")
build_btn.click(fn=build_database, inputs=[], outputs=[])
demo.launch(share=True)