|
import os |
|
import gradio as gr |
|
from gradio import ChatMessage |
|
import torch |
|
import torch._dynamo |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from threading import Thread |
|
from huggingface_hub import hf_hub_download, login |
|
from dotenv import load_dotenv |
|
import re |
|
from llama_cpp import Llama |
|
from typing import Iterator |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
if HF_TOKEN: |
|
login(token=HF_TOKEN) |
|
|
|
|
|
try: |
|
import spaces |
|
SPACES_AVAILABLE = True |
|
except ImportError: |
|
SPACES_AVAILABLE = False |
|
|
|
|
|
torch._dynamo.config.suppress_errors = True |
|
torch._dynamo.disable() |
|
|
|
|
|
MODEL_ID = "somosnlp-hackathon-2025/iberotales-gemma-3-1b-it-es" |
|
GGUF_MODEL_ID = "somosnlp-hackathon-2025/iberotales-gemma-3-1b-it-es-finetune-gguf" |
|
GGUF_FILENAME = "gemma-3-finetune.Q8_0.gguf" |
|
GGUF_REVISION = "main" |
|
MAX_MAX_NEW_TOKENS = 2048 |
|
DEFAULT_MAX_NEW_TOKENS = 2048 |
|
|
|
|
|
IS_HF_SPACE = any([ |
|
os.getenv("SPACE_ID") is not None, |
|
os.getenv("SPACE_AUTHOR_NAME") is not None, |
|
os.getenv("SPACE_REPO_NAME") is not None, |
|
os.getenv("SPACE_HOST") is not None, |
|
]) |
|
|
|
|
|
DEFAULT_SYSTEM_MESSAGE = """Resuelve el siguiente problema. |
|
Primero, piensa en voz alta qué debes hacer, paso por paso y de forma resumida, entre <think> y </think>. |
|
Luego, da la respuesta final entre <SOLUTION> y </SOLUTION>. |
|
No escribas nada fuera de ese formato.""" |
|
|
|
|
|
PERSONAJES_POR_PAIS = { |
|
"🇦🇷 Argentina": [ |
|
{"nombre": "La Difunta Correa", "imagen": "images/ar1.jpg", "descripcion": "Santa popular que murió de sed siguiendo a su esposo reclutado"}, |
|
{"nombre": "El Lobizón", "imagen": "images/ar2.jpg", "descripcion": "Hombre lobo de la tradición gaucha, séptimo hijo varón maldito"}, |
|
{"nombre": "La Telesita", "imagen": "images/ar3.webp", "descripcion": "Bailarina folklórica que se aparece en festivales y zambas"} |
|
], |
|
"🇧🇴 Bolivia": [ |
|
{"nombre": "El Tío del Cerro Rico", "imagen": "images/bo1.webp", "descripcion": "Señor de las minas que protege y castiga a los mineros"}, |
|
{"nombre": "El Ekeko", "imagen": "images/bo2.jpg", "descripcion": "Dios aymara de la abundancia y la fortuna con jorobas"}, |
|
{"nombre": "El Jichi", "imagen": "images/bo3.webp", "descripcion": "Serpiente protectora de ríos y lagunas en la cultura andina"} |
|
] |
|
}; |
|
|
|
|
|
model = None |
|
tokenizer = None |
|
current_personajes = [] |
|
|
|
def load_model(): |
|
"""Cargar modelo y tokenizador""" |
|
global model, tokenizer |
|
|
|
if torch.cuda.is_available(): |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
MODEL_ID, |
|
torch_dtype=torch.float32, |
|
device_map="auto", |
|
trust_remote_code=True, |
|
) |
|
if tokenizer.pad_token is None: |
|
tokenizer.pad_token = tokenizer.eos_token |
|
return True |
|
except Exception as e: |
|
print(f"Error GPU: {e}") |
|
return False |
|
else: |
|
try: |
|
local_model_path = os.path.join("models", GGUF_FILENAME) |
|
if os.path.exists(local_model_path): |
|
model_path = local_model_path |
|
else: |
|
model_path = hf_hub_download( |
|
repo_id=GGUF_MODEL_ID, |
|
filename=GGUF_FILENAME, |
|
revision=GGUF_REVISION, |
|
local_dir="./models", |
|
force_download=False, |
|
resume_download=True |
|
) |
|
tokenizer = AutoTokenizer.from_pretrained("google/gemma-3-1b-it") |
|
model = Llama( |
|
model_path=model_path, |
|
n_ctx=2048, |
|
n_threads=4, |
|
n_gpu_layers=0 |
|
) |
|
return True |
|
except Exception as e: |
|
print(f"Error GGUF: {e}") |
|
return False |
|
|
|
model_loaded = load_model() |
|
|
|
def format_chat_history(messages: list, exclude_last_user: bool = True) -> list: |
|
"""Formatea el historial de chat para el modelo""" |
|
formatted_history = [] |
|
messages_to_process = messages[:] |
|
if exclude_last_user and messages_to_process and messages_to_process[-1].get("role") == "user": |
|
messages_to_process = messages_to_process[:-1] |
|
|
|
for message in messages_to_process: |
|
current_role = message.get("role") |
|
current_content = message.get("content", "").strip() |
|
|
|
if current_role == "assistant" and message.get("metadata"): |
|
continue |
|
if not current_content: |
|
continue |
|
|
|
if formatted_history and formatted_history[-1]["role"] == current_role: |
|
formatted_history[-1]["content"] += "\n\n" + current_content |
|
else: |
|
formatted_history.append({ |
|
"role": current_role, |
|
"content": current_content |
|
}) |
|
|
|
return formatted_history |
|
|
|
def stream_iberotales_response( |
|
user_message: str, |
|
messages: list, |
|
system_message: str = DEFAULT_SYSTEM_MESSAGE, |
|
max_new_tokens: int = DEFAULT_MAX_NEW_TOKENS, |
|
temperature: float = 0.7, |
|
top_p: float = 0.95, |
|
top_k: int = 50, |
|
repetition_penalty: float = 1.2, |
|
) -> Iterator[list]: |
|
"""Genera respuesta con streaming""" |
|
global model, tokenizer |
|
|
|
if model is None or tokenizer is None: |
|
messages.append(ChatMessage(role="assistant", content="Error: Modelo no disponible.")) |
|
yield messages |
|
return |
|
|
|
try: |
|
chat_history = format_chat_history(messages, exclude_last_user=True) |
|
conversation = [] |
|
if system_message.strip(): |
|
conversation.append({"role": "system", "content": system_message.strip()}) |
|
conversation.extend(chat_history) |
|
conversation.append({"role": "user", "content": user_message}) |
|
|
|
|
|
for i in range(1, len(conversation)): |
|
if conversation[i]["role"] == conversation[i-1]["role"] and conversation[i-1]["role"] != "system": |
|
messages.append(ChatMessage(role="assistant", content="Error: Reinicia la conversación.")) |
|
yield messages |
|
return |
|
|
|
prompt = tokenizer.apply_chat_template(conversation, tokenize=False, add_generation_prompt=True) |
|
response = model( |
|
prompt, |
|
max_tokens=max_new_tokens, |
|
temperature=temperature, |
|
top_p=top_p, |
|
top_k=top_k, |
|
repeat_penalty=repetition_penalty, |
|
stream=True |
|
) |
|
|
|
full_response = "" |
|
thinking_message_index = None |
|
solution_message_index = None |
|
in_think_block = False |
|
in_solution_block = False |
|
thinking_complete = False |
|
|
|
for chunk in response: |
|
if chunk["choices"][0]["finish_reason"] is None: |
|
new_text = chunk["choices"][0]["text"] |
|
full_response += new_text |
|
|
|
|
|
if "<think>" in full_response and not thinking_complete: |
|
if not in_think_block: |
|
in_think_block = True |
|
if thinking_message_index is None: |
|
messages.append(ChatMessage( |
|
role="assistant", |
|
content="", |
|
metadata={"title": "🤔 Pensando..."} |
|
)) |
|
thinking_message_index = len(messages) - 1 |
|
|
|
think_start = full_response.find("<think>") + 7 |
|
if "</think>" in full_response: |
|
think_end = full_response.find("</think>") |
|
current_thinking = full_response[think_start:think_end].strip() |
|
thinking_complete = True |
|
in_think_block = False |
|
else: |
|
current_thinking = full_response[think_start:].strip() |
|
|
|
if thinking_message_index is not None: |
|
messages[thinking_message_index] = ChatMessage( |
|
role="assistant", |
|
content=current_thinking, |
|
metadata={"title": "🤔 Pensando..."} |
|
) |
|
yield messages |
|
|
|
|
|
if "<SOLUTION>" in full_response: |
|
if not in_solution_block: |
|
in_solution_block = True |
|
if solution_message_index is None: |
|
messages.append(ChatMessage(role="assistant", content="")) |
|
solution_message_index = len(messages) - 1 |
|
|
|
solution_start = full_response.find("<SOLUTION>") + 10 |
|
if "</SOLUTION>" in full_response: |
|
solution_end = full_response.find("</SOLUTION>") |
|
current_solution = full_response[solution_start:solution_end].strip() |
|
in_solution_block = False |
|
else: |
|
current_solution = full_response[solution_start:].strip() |
|
|
|
if solution_message_index is not None and current_solution: |
|
messages[solution_message_index] = ChatMessage( |
|
role="assistant", |
|
content=current_solution |
|
) |
|
yield messages |
|
|
|
|
|
if full_response.strip() and solution_message_index is None: |
|
clean_response = full_response |
|
if "<think>" in clean_response and "</think>" in clean_response: |
|
clean_response = re.sub(r'<think>.*?</think>', '', clean_response, flags=re.DOTALL) |
|
if "<SOLUTION>" in clean_response and "</SOLUTION>" in clean_response: |
|
clean_response = re.sub(r'<SOLUTION>(.*?)</SOLUTION>', r'\1', clean_response, flags=re.DOTALL) |
|
|
|
clean_response = clean_response.strip() |
|
if clean_response: |
|
messages.append(ChatMessage(role="assistant", content=clean_response)) |
|
yield messages |
|
|
|
except Exception as e: |
|
messages.append(ChatMessage(role="assistant", content=f"Error: {str(e)}")) |
|
yield messages |
|
|
|
def user_message(msg: str, history: list) -> tuple[str, list]: |
|
"""Añade mensaje del usuario al historial""" |
|
history.append(ChatMessage(role="user", content=msg)) |
|
return "", history |
|
|
|
def actualizar_personajes(pais_seleccionado): |
|
"""Actualiza la galería de personajes según el país seleccionado""" |
|
global current_personajes |
|
personajes = PERSONAJES_POR_PAIS.get(pais_seleccionado, []) |
|
current_personajes = personajes |
|
|
|
if not personajes: |
|
return [], "Selecciona un país para ver sus personajes" |
|
|
|
|
|
imagenes = [] |
|
for p in personajes: |
|
if os.path.exists(p["imagen"]): |
|
imagenes.append((p["imagen"], f"{p['nombre']}: {p['descripcion']}")) |
|
else: |
|
|
|
imagenes.append(("data:image/svg+xml;base64,PHN2ZyB3aWR0aD0iMTAwIiBoZWlnaHQ9IjEwMCIgeG1sbnM9Imh0dHA6Ly93d3cudzMub3JnLzIwMDAvc3ZnIj48cmVjdCB3aWR0aD0iMTAwIiBoZWlnaHQ9IjEwMCIgZmlsbD0iI2NjYyIvPjx0ZXh0IHg9IjUwIiB5PSI1MCIgZm9udC1mYW1pbHk9IkFyaWFsIiBmb250LXNpemU9IjEyIiBmaWxsPSIjNjY2IiB0ZXh0LWFuY2hvcj0ibWlkZGxlIiBkeT0iLjNlbSI+SW1hZ2VuPC90ZXh0Pjwvc3ZnPg==", f"{p['nombre']}: {p['descripcion']}")) |
|
|
|
return imagenes, f"Personajes de {pais_seleccionado}" |
|
|
|
def crear_prompt_desde_personaje(evt: gr.SelectData): |
|
"""Crea un prompt basado en el personaje seleccionado""" |
|
global current_personajes |
|
|
|
try: |
|
if evt.index is not None and evt.index < len(current_personajes): |
|
personaje = current_personajes[evt.index] |
|
return f"Crea una historia sobre {personaje['nombre']}, {personaje['descripcion']}" |
|
else: |
|
return "Crea una historia sobre un personaje mítico" |
|
except Exception as e: |
|
print(f"Error al crear prompt: {e}") |
|
return "Crea una historia sobre un personaje mítico" |
|
|
|
|
|
if IS_HF_SPACE and SPACES_AVAILABLE and torch.cuda.is_available(): |
|
stream_iberotales_response = spaces.GPU(stream_iberotales_response) |
|
|
|
|
|
custom_css = """ |
|
.gradio-container { |
|
max-width: 1400px !important; |
|
margin: auto; |
|
padding-top: 1.5rem; |
|
} |
|
#galeria .grid-wrap { |
|
max-height: 350px; |
|
overflow-y: auto; |
|
} |
|
#galeria .grid-container { |
|
grid-template-columns: repeat(1, 1fr) !important; |
|
gap: 0.5rem; |
|
} |
|
#galeria .thumbnail-item { |
|
aspect-ratio: 1; |
|
max-height: 100px; |
|
} |
|
#galeria .thumbnail-item img { |
|
object-fit: cover; |
|
width: 100%; |
|
height: 100%; |
|
border-radius: 8px; |
|
} |
|
.header-info { |
|
background: linear-gradient(135deg, #2c3e50 0%, #1a1a2e 100%); |
|
color: white; |
|
padding: 1rem; |
|
border-radius: 12px; |
|
margin-bottom: 1rem; |
|
text-align: center; |
|
} |
|
""" |
|
|
|
|
|
with gr.Blocks(fill_height=True, title="Iberotales", css=custom_css) as demo: |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.HTML(""" |
|
<div class="header-info"> |
|
<h1>📚 Iberotales</h1> |
|
<p><strong>Autor:</strong> David Quispe | <a href="https://github.com/mcdaqc/Iberotales" target="_blank" style="text-decoration: none;">GitHub</a> | <a href="https://huggingface.co/somosnlp-hackathon-2025/iberotales-gemma-3-1b-it-es" target="_blank" style="text-decoration: none;">Modelo</a> | <a href="https://huggingface.co/somosnlp-hackathon-2025/iberotales-gemma-3-1b-it-es-finetune-gguf" target="_blank" style="text-decoration: none;">GGUF</a></p> |
|
<p><em>Alineando modelos de lenguaje con la narrativa de mitos y leyendas de Iberoamérica.</em></p> |
|
<p><em>Hackathon SomosNLP 2025</em></p> |
|
</div> |
|
""") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1, min_width=320): |
|
gr.Markdown("### 🗃️ Pokédex de Personajes") |
|
|
|
pais_dropdown = gr.Dropdown( |
|
choices=list(PERSONAJES_POR_PAIS.keys()), |
|
value="🇦🇷 Argentina", |
|
label="País", |
|
container=False |
|
) |
|
|
|
galeria_personajes = gr.Gallery( |
|
value=[], |
|
label="Personajes", |
|
show_label=False, |
|
elem_id="galeria", |
|
columns=1, |
|
rows=4, |
|
height=350, |
|
object_fit="cover", |
|
preview=False |
|
) |
|
|
|
|
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot( |
|
type="messages", |
|
show_label=False, |
|
height=400, |
|
avatar_images=(None, "🏛️") |
|
) |
|
|
|
with gr.Row(): |
|
input_box = gr.Textbox( |
|
placeholder="Escribe tu historia o selecciona un personaje...", |
|
show_label=False, |
|
scale=4, |
|
container=False |
|
) |
|
send_button = gr.Button("📤", scale=1, variant="primary") |
|
|
|
with gr.Row(): |
|
clear_button = gr.Button("🗑️ Limpiar", scale=1, size="sm") |
|
|
|
with gr.Column(scale=3): |
|
with gr.Row(): |
|
max_tokens = gr.Slider(100, MAX_MAX_NEW_TOKENS, DEFAULT_MAX_NEW_TOKENS, label="Tokens", container=False) |
|
temperature = gr.Slider(0.1, 2.0, 0.7, label="Temp", container=False) |
|
|
|
|
|
msg_store = gr.State("") |
|
|
|
|
|
def submit_message(msg, history): |
|
if not msg.strip(): |
|
return msg, history |
|
return "", user_message(msg, history)[1] |
|
|
|
def generate_response(msg, history, max_tok, temp): |
|
yield from stream_iberotales_response(msg, history, DEFAULT_SYSTEM_MESSAGE, max_tok, temp) |
|
|
|
|
|
pais_dropdown.change( |
|
fn=actualizar_personajes, |
|
inputs=[pais_dropdown], |
|
outputs=[galeria_personajes, gr.Textbox(visible=False)] |
|
) |
|
|
|
|
|
demo.load( |
|
fn=actualizar_personajes, |
|
inputs=[pais_dropdown], |
|
outputs=[galeria_personajes, gr.Textbox(visible=False)] |
|
) |
|
|
|
|
|
galeria_personajes.select( |
|
fn=crear_prompt_desde_personaje, |
|
outputs=[input_box] |
|
) |
|
|
|
|
|
input_box.submit( |
|
lambda msg, hist: (msg, submit_message(msg, hist)[1]), |
|
inputs=[input_box, chatbot], |
|
outputs=[msg_store, chatbot], |
|
queue=False |
|
).then( |
|
generate_response, |
|
inputs=[msg_store, chatbot, max_tokens, temperature], |
|
outputs=chatbot |
|
) |
|
|
|
send_button.click( |
|
lambda msg, hist: (msg, submit_message(msg, hist)[1]), |
|
inputs=[input_box, chatbot], |
|
outputs=[msg_store, chatbot], |
|
queue=False |
|
).then( |
|
generate_response, |
|
inputs=[msg_store, chatbot, max_tokens, temperature], |
|
outputs=chatbot |
|
) |
|
|
|
clear_button.click( |
|
lambda: ([], "", ""), |
|
outputs=[chatbot, input_box, msg_store], |
|
queue=False |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
if model_loaded: |
|
demo.launch(share=False, show_error=True) |
|
else: |
|
print("Error al cargar el modelo.") |