|
from abc import ABC, abstractmethod |
|
from typing import Dict, Type, Any, Optional, Tuple |
|
import os |
|
import json |
|
import ollama |
|
from src.manager.utils.singleton import singleton |
|
from src.manager.utils.streamlit_interface import output_assistant_response |
|
from google import genai |
|
from google.genai import types |
|
from google.genai.types import * |
|
import os |
|
from dotenv import load_dotenv |
|
from src.manager.budget_manager import BudgetManager |
|
|
|
MODEL_PATH = "./src/models/" |
|
MODEL_FILE_PATH = "./src/models/models.json" |
|
|
|
class Agent(ABC): |
|
|
|
def __init__(self, agent_name: str, base_model: str, system_prompt: str, creation_cost: str, invoke_cost: str): |
|
self.agent_name = agent_name |
|
self.base_model = base_model |
|
self.system_prompt = system_prompt |
|
self.creation_cost = creation_cost |
|
self.invoke_cost = invoke_cost |
|
self.create_model() |
|
|
|
@abstractmethod |
|
def create_model(self) -> None: |
|
"""Create and Initialize agent""" |
|
pass |
|
|
|
@abstractmethod |
|
def ask_agent(self, prompt: str) -> str: |
|
"""ask agent a question""" |
|
pass |
|
|
|
@abstractmethod |
|
def delete_agent(self) ->None: |
|
"""delete agent""" |
|
pass |
|
|
|
def get_costs(self): |
|
return { |
|
"create_cost": self.creation_cost, |
|
"invoke_cost": self.invoke_cost |
|
} |
|
|
|
class OllamaAgent(Agent): |
|
|
|
def create_model(self): |
|
ollama_response = ollama.create( |
|
model = self.agent_name, |
|
from_ = self.base_model, |
|
system = self.system_prompt, |
|
stream = False |
|
) |
|
|
|
def ask_agent(self, prompt): |
|
output_assistant_response(f"Asked Agent {self.agent_name} a question") |
|
agent_response = ollama.chat( |
|
model=self.agent_name, |
|
messages=[{"role": "user", "content": prompt}], |
|
) |
|
output_assistant_response(f"Agent {self.agent_name} answered with {agent_response.message.content}") |
|
return agent_response.message.content |
|
|
|
def delete_agent(self): |
|
ollama.delete(self.agent_name) |
|
|
|
class GeminiAgent(Agent): |
|
def __init__(self, agent_name: str, base_model: str, system_prompt: str, creation_cost: str, invoke_cost: str): |
|
load_dotenv() |
|
self.api_key = os.getenv("GEMINI_KEY") |
|
if not self.api_key: |
|
raise ValueError("Google API key is required for Gemini models. Set GOOGLE_API_KEY environment variable or pass api_key parameter.") |
|
|
|
|
|
self.client = genai.Client(api_key=self.api_key) |
|
|
|
|
|
super().__init__(agent_name, base_model, system_prompt, creation_cost, invoke_cost) |
|
|
|
def create_model(self): |
|
self.messages = [] |
|
|
|
def ask_agent(self, prompt): |
|
response = self.client.models.generate_content( |
|
model=self.base_model, |
|
contents=prompt, |
|
config=types.GenerateContentConfig( |
|
system_instruction=self.system_prompt, |
|
) |
|
) |
|
return response.text |
|
|
|
def delete_agent(self): |
|
self.messages = [] |
|
@singleton |
|
class AgentManager(): |
|
budget_manager: BudgetManager = BudgetManager() |
|
def __init__(self): |
|
self._agents: Dict[str, Agent] = {} |
|
self._agent_types ={ |
|
"ollama": OllamaAgent, |
|
"gemini": GeminiAgent |
|
} |
|
|
|
self._load_agents() |
|
|
|
def create_agent(self, agent_name: str, |
|
base_model: str, system_prompt: str, |
|
description: str = "", create_cost: float = 0, |
|
invoke_cost: float = 0, |
|
**additional_params) -> Tuple[Agent, int]: |
|
|
|
if agent_name in self._agents: |
|
raise ValueError(f"Agent {agent_name} already exists") |
|
|
|
self._agents[agent_name] = self.create_agent_class( |
|
agent_name, |
|
base_model, |
|
system_prompt, |
|
description=description, |
|
create_cost=create_cost, |
|
invoke_cost=invoke_cost, |
|
**additional_params |
|
) |
|
|
|
|
|
self._save_agent( |
|
agent_name, |
|
base_model, |
|
system_prompt, |
|
description=description, |
|
create_cost=create_cost, |
|
invoke_cost=invoke_cost, |
|
**additional_params |
|
) |
|
return (self._agents[agent_name], self.budget_manager.get_current_remaining_budget()) |
|
|
|
def validate_budget(self, amount: float) -> None: |
|
if not self.budget_manager.can_spend(amount): |
|
raise ValueError(f"Do not have enough budget to create the tool. " |
|
+f"Creating the tool costs {amount} but only {self.budget_manager.get_current_remaining_budget()} is remaining") |
|
|
|
def create_agent_class(self, agent_name: str, base_model: str, system_prompt: str, description: str = "", create_cost: float = 0, invoke_cost: float = 0, |
|
**additional_params) -> Agent: |
|
agent_type = self._get_agent_type(base_model) |
|
agent_class = self._agent_types.get(agent_type) |
|
|
|
if not agent_class: |
|
raise ValueError(f"Unsupported base model {base_model}") |
|
|
|
created_agent = agent_class(agent_name, base_model, system_prompt, create_cost,invoke_cost) |
|
|
|
self.validate_budget(create_cost) |
|
|
|
self.budget_manager.add_to_expense(create_cost) |
|
|
|
return created_agent |
|
|
|
def get_agent(self, agent_name: str) -> Agent: |
|
"""Get existing agent by name""" |
|
if agent_name not in self._agents: |
|
raise ValueError(f"Agent {agent_name} does not exists") |
|
return self._agents[agent_name] |
|
|
|
def list_agents(self) -> dict: |
|
"""Return agent information (name, description, costs)""" |
|
try: |
|
if os.path.exists(MODEL_FILE_PATH): |
|
with open(MODEL_FILE_PATH, "r", encoding="utf8") as f: |
|
full_models = json.loads(f.read()) |
|
|
|
|
|
simplified_agents = {} |
|
for name, data in full_models.items(): |
|
simplified_agents[name] = { |
|
"description": data.get("description", ""), |
|
"create_cost": data.get("create_cost", 0), |
|
"invoke_cost": data.get("invoke_cost", 0), |
|
"base_model": data.get("base_model", ""), |
|
} |
|
return simplified_agents |
|
else: |
|
return {} |
|
except Exception as e: |
|
output_assistant_response(f"Error listing agents: {e}") |
|
return {} |
|
|
|
def delete_agent(self, agent_name: str) -> int: |
|
agent = self.get_agent(agent_name) |
|
|
|
self.budget_manager.remove_from_expense(agent.creation_cost) |
|
agent.delete_agent() |
|
|
|
del self._agents[agent_name] |
|
try: |
|
if os.path.exists(MODEL_FILE_PATH): |
|
with open(MODEL_FILE_PATH, "r", encoding="utf8") as f: |
|
models = json.loads(f.read()) |
|
|
|
del models[agent_name] |
|
with open(MODEL_FILE_PATH, "w", encoding="utf8") as f: |
|
f.write(json.dumps(models, indent=4)) |
|
except Exception as e: |
|
output_assistant_response(f"Error deleting agent: {e}") |
|
return self.budget_manager.get_current_remaining_budget() |
|
|
|
def ask_agent(self, agent_name: str, prompt: str) -> Tuple[str,int]: |
|
agent = self.get_agent(agent_name) |
|
|
|
self.validate_budget(agent.invoke_cost) |
|
|
|
response = agent.ask_agent(prompt) |
|
return (response, self.budget_manager.get_current_remaining_budget()) |
|
|
|
def _save_agent(self, agent_name: str, base_model: str, system_prompt: str, |
|
description: str = "", create_cost: float = 0, invoke_cost: float = 0, |
|
**additional_params) -> None: |
|
"""Save a single agent to the models.json file""" |
|
try: |
|
|
|
os.makedirs(MODEL_PATH, exist_ok=True) |
|
|
|
|
|
try: |
|
with open(MODEL_FILE_PATH, "r", encoding="utf8") as f: |
|
models = json.loads(f.read()) |
|
except (FileNotFoundError, json.JSONDecodeError): |
|
models = {} |
|
|
|
|
|
models[agent_name] = { |
|
"base_model": base_model, |
|
"description": description, |
|
"system_prompt": system_prompt, |
|
"create_cost": create_cost, |
|
"invoke_cost": invoke_cost, |
|
} |
|
|
|
|
|
for key, value in additional_params.items(): |
|
models[agent_name][key] = value |
|
|
|
|
|
with open(MODEL_FILE_PATH, "w", encoding="utf8") as f: |
|
f.write(json.dumps(models, indent=4)) |
|
|
|
except Exception as e: |
|
output_assistant_response(f"Error saving agent {agent_name}: {e}") |
|
|
|
def _get_agent_type(self, base_model)->str: |
|
|
|
if base_model == "llama3.2": |
|
return "ollama" |
|
elif base_model == "mistral": |
|
return "ollama" |
|
elif "gemini" in base_model: |
|
return "gemini" |
|
else: |
|
return "unknown" |
|
|
|
def _load_agents(self) -> None: |
|
"""Load agent configurations from disk""" |
|
try: |
|
if not os.path.exists(MODEL_FILE_PATH): |
|
return |
|
|
|
with open(MODEL_FILE_PATH, "r", encoding="utf8") as f: |
|
models = json.loads(f.read()) |
|
|
|
for name, data in models.items(): |
|
if name in self._agents: |
|
continue |
|
base_model = data["base_model"] |
|
system_prompt = data["system_prompt"] |
|
creation_cost = data["create_cost"] |
|
invoke_cost = data["invoke_cost"] |
|
model_type = self._get_agent_type(base_model) |
|
manager_class = self._agent_types.get(model_type) |
|
|
|
if manager_class: |
|
|
|
self._agents[name] = self.create_agent_class( |
|
name, |
|
base_model, |
|
system_prompt, |
|
description=data.get("description", ""), |
|
create_cost=creation_cost, |
|
invoke_cost=invoke_cost, |
|
**data.get("additional_params", {}) |
|
) |
|
self._agents[name] = manager_class( |
|
name, |
|
base_model, |
|
system_prompt, |
|
creation_cost, |
|
invoke_cost |
|
) |
|
except Exception as e: |
|
output_assistant_response(f"Error loading agents: {e}") |