Spaces:
Sleeping
Sleeping
import os | |
import logging | |
from typing import Dict, List, Optional, Any | |
from pathlib import Path | |
from smolagents import Model | |
try: | |
from llama_cpp import Llama | |
LLAMA_CPP_AVAILABLE = True | |
except ImportError: | |
LLAMA_CPP_AVAILABLE = False | |
print("llama_cpp module not available, using fallback implementation") | |
logger = logging.getLogger("LlamaCppModel") | |
logger.setLevel(logging.DEBUG) | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') | |
ch.setFormatter(formatter) | |
logger.addHandler(ch) | |
class LlamaCppModel(Model): | |
def __init__(self, model_path: str, n_ctx: int = 2048, n_gpu_layers: int = 0, max_tokens: int = 512, temperature: float = 0.7, verbose: bool = True): | |
super().__init__() | |
self.model_path = model_path | |
self.n_ctx = n_ctx | |
self.max_tokens = max_tokens | |
self.temperature = temperature | |
self.verbose = verbose | |
self.llm = None | |
if not LLAMA_CPP_AVAILABLE: | |
logger.error("llama_cpp is not installed. Please install with 'pip install llama-cpp-python'") | |
raise ImportError("llama_cpp is required but not installed.") | |
if not os.path.exists(model_path): | |
logger.error(f"Model file not found at: {model_path}") | |
raise FileNotFoundError(f"Model file not found at: {model_path}") | |
try: | |
logger.info(f"Loading Llama model from: {model_path}") | |
self.llm = Llama(model_path=model_path, n_ctx=n_ctx, n_gpu_layers=n_gpu_layers, verbose=verbose) | |
logger.info("Llama model loaded successfully.") | |
except Exception as e: | |
logger.exception(f"Failed to initialize Llama model: {e}") | |
raise | |
def generate(self, prompt: str, **kwargs) -> str: | |
try: | |
logger.debug(f"Generating with prompt: {prompt[:100]}...") | |
response = self.llm(prompt=prompt, max_tokens=self.max_tokens, temperature=self.temperature, echo=False) | |
logger.debug(f"Raw response: {response}") | |
if isinstance(response, dict) and 'choices' in response: | |
text = response['choices'][0]['text'].strip() | |
elif isinstance(response, list): | |
text = response[0].get('text', '').strip() | |
else: | |
logger.warning("Unexpected response format from Llama.") | |
text = str(response) | |
logger.debug(f"Generated text: {text}") | |
return text | |
except Exception as e: | |
logger.exception(f"Error generating text: {e}") | |
return f"Error generating response: {e}" | |
def generate_with_tools(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None, **kwargs) -> Dict[str, Any]: | |
try: | |
prompt = self._format_messages_to_prompt(messages, tools) | |
logger.debug(f"Formatted prompt: {prompt}") | |
completion = self.generate(prompt) | |
return {"message": {"role": "assistant", "content": completion}} | |
except Exception as e: | |
logger.exception(f"Error generating with tools: {e}") | |
return {"message": {"role": "assistant", "content": f"Error: {e}"}} | |
def _format_messages_to_prompt(self, messages: List[Dict[str, Any]], tools: Optional[List[Dict[str, Any]]] = None) -> str: | |
formatted_prompt = "" | |
if tools: | |
tool_desc = "\n".join([f"Tool {i+1}: {t['name']} - {t['description']}" for i, t in enumerate(tools)]) | |
formatted_prompt += f"Available tools:\n{tool_desc}\n\n" | |
for msg in messages: | |
role = msg.get("role", "") | |
content = msg.get("content", "") | |
if isinstance(content, list): | |
content = " ".join([c.get("text", str(c)) if isinstance(c, dict) else str(c) for c in content]) | |
formatted_prompt += f"{role.capitalize()}: {content}\n\n" | |
formatted_prompt += "Assistant: " | |
logger.debug(f"Constructed prompt: {formatted_prompt}") | |
return formatted_prompt | |
# Example usage (for testing): | |
# model = LlamaCppModel(model_path="/path/to/your/llama-model.gguf") | |
# print(model.generate("Hello, how are you?")) | |