Spaces:
Sleeping
Sleeping
import os | |
from smolagents import CodeAgent, ToolCallingAgent | |
from smolagents import OpenAIServerModel | |
from tools.fetch import fetch_webpage | |
from tools.yttranscript import get_youtube_transcript, get_youtube_title_description | |
import myprompts | |
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
import torch | |
# --- Basic Agent Definition --- | |
class BasicAgent: | |
def __init__(self): | |
print("BasicAgent initialized.") | |
def __call__(self, question: str) -> str: | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
try: | |
# Use the reviewer agent to determine if the question can be answered by a model or requires code | |
print("Calling reviewer agent...") | |
reviewer_answer = reviewer_agent.run(myprompts.review_prompt + "\nThe question is:\n" + question) | |
print(f"Reviewer agent answer: {reviewer_answer}") | |
question = question + '\n' + myprompts.output_format | |
fixed_answer = "" | |
if reviewer_answer == "code": | |
fixed_answer = gaia_agent.run(question) | |
print(f"Code agent answer: {fixed_answer}") | |
elif reviewer_answer == "model": | |
# If the reviewer agent suggests using the model, we can proceed with the model agent | |
print("Using model agent to answer the question.") | |
fixed_answer = model_agent.run(myprompts.model_prompt + "\nThe question is:\n" + question) | |
print(f"Model agent answer: {fixed_answer}") | |
return fixed_answer | |
except Exception as e: | |
error = f"An error occurred while processing the question: {e}" | |
print(error) | |
return error | |
# Load model and tokenizer | |
model_id = "LiquidAI/LFM2-1.2B" | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, | |
device_map="auto", | |
torch_dtype=torch.bfloat16, # Fixed: was string, should be torch dtype | |
trust_remote_code=True, | |
# attn_implementation="flash_attention_2" # <- uncomment on compatible GPU | |
) | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
# Create a wrapper class that matches the expected interface | |
class LocalLlamaModel: | |
def __init__(self, model, tokenizer): | |
self.model = model | |
self.tokenizer = tokenizer | |
self.device = model.device if hasattr(model, 'device') else 'cpu' | |
def _extract_text_from_messages(self, messages): | |
"""Extract text content from ChatMessage objects or handle string input""" | |
if isinstance(messages, str): | |
return messages | |
elif isinstance(messages, list): | |
# Handle list of ChatMessage objects | |
text_parts = [] | |
for msg in messages: | |
if hasattr(msg, 'content'): | |
# Handle ChatMessage with content attribute | |
if isinstance(msg.content, list): | |
# Content is a list of content items | |
for content_item in msg.content: | |
if isinstance(content_item, dict) and 'text' in content_item: | |
text_parts.append(content_item['text']) | |
elif hasattr(content_item, 'text'): | |
text_parts.append(content_item.text) | |
elif isinstance(msg.content, str): | |
text_parts.append(msg.content) | |
elif isinstance(msg, dict) and 'content' in msg: | |
# Handle dictionary format | |
text_parts.append(str(msg['content'])) | |
else: | |
# Fallback: convert to string | |
text_parts.append(str(msg)) | |
return '\n'.join(text_parts) | |
else: | |
return str(messages) | |
def generate(self, prompt, max_new_tokens=512*5, **kwargs): | |
try: | |
print("Prompt: ", prompt) | |
print("Prompt type: ", type(prompt)) | |
# Extract text from the prompt (which might be ChatMessage objects) | |
text_prompt = self._extract_text_from_messages(prompt) | |
print("Extracted text prompt:", text_prompt[:200] + "..." if len(text_prompt) > 200 else text_prompt) | |
# Tokenize the text prompt | |
inputs = self.tokenizer(text_prompt, return_tensors="pt").to(self.model.device) | |
input_ids = inputs['input_ids'] | |
# Generate output | |
with torch.no_grad(): | |
output = self.model.generate( | |
input_ids, | |
do_sample=True, | |
temperature=0.3, | |
min_p=0.15, | |
repetition_penalty=1.05, | |
max_new_tokens=max_new_tokens, | |
pad_token_id=self.tokenizer.eos_token_id, # Handle padding | |
) | |
# Decode only the new tokens (exclude the input) | |
new_tokens = output[0][len(input_ids[0]):] | |
response = self.tokenizer.decode(new_tokens, skip_special_tokens=True) | |
return response.strip() | |
except Exception as e: | |
print(f"Error in model generation: {e}") | |
return f"Error generating response: {str(e)}" | |
def __call__(self, prompt, max_new_tokens=512, **kwargs): | |
"""Make the model callable like a function""" | |
return self.generate(prompt, max_new_tokens, **kwargs) | |
# Create the model instance | |
wrapped_model = LocalLlamaModel(model, tokenizer) | |
# Now create your agents - these should work with the wrapped model | |
reviewer_agent = ToolCallingAgent(model=wrapped_model, tools=[]) | |
model_agent = ToolCallingAgent(model=wrapped_model, tools=[fetch_webpage]) | |
gaia_agent = CodeAgent( | |
tools=[fetch_webpage, get_youtube_title_description, get_youtube_transcript], | |
model=wrapped_model | |
) | |
if __name__ == "__main__": | |
# Example usage | |
question = "What was the actual enrollment of the Malko competition in 2023?" | |
agent = BasicAgent() | |
answer = agent(question) | |
print(f"Answer: {answer}") |