Spaces:
Running
Running
import importlib | |
import importlib.util | |
import os | |
import types | |
import pip | |
from google.genai import types | |
import sys | |
from src.budget_manager import BudgetManager | |
from src.utils.singleton import singleton | |
from src.utils.suppress_outputs import suppress_output | |
from default_tools.get_agents_tool import GetAgents | |
from default_tools.tool_deletor import ToolDeletor | |
from src.utils.streamlit_interface import output_assistant_response | |
toolsImported = [] | |
TOOLS_DIRECTORIES = [os.path.abspath("./default_tools"), os.path.abspath("./tools")] | |
class Tool: | |
def __init__(self, toolClass): | |
suppress_output(self.load_tool)(toolClass) | |
def load_tool(self, toolClass): | |
self.tool = toolClass() | |
self.inputSchema = self.tool.inputSchema | |
self.name = self.inputSchema["name"] | |
self.description = self.inputSchema["description"] | |
self.dependencies = self.tool.dependencies | |
for package in self.tool.dependencies: | |
try: | |
__import__(package.split('==')[0]) | |
except ImportError: | |
print(f"Installing {package}") | |
if '==' in package: | |
package = package.split('==')[0] | |
pip.main(['install', package]) | |
def run(self, query): | |
return self.tool.run(**query) | |
class ToolLoader: | |
toolsImported = [] | |
budget_manager = BudgetManager() | |
def __init__(self): | |
self.load_tools() | |
self.load_costs() | |
def load_costs(self): | |
get_agents = GetAgents() | |
agents = get_agents.run()["agents"] | |
for agent in agents: | |
agentConfig = agents[agent] | |
if agentConfig["create_cost"] is not None: | |
self.budget_manager.add_to_expense(agentConfig["create_cost"]) | |
for tool in self.toolsImported: | |
if "create_cost" in tool.inputSchema: | |
if tool.inputSchema["create_cost"] is not None: | |
self.budget_manager.add_to_expense(tool.inputSchema["create_cost"]) | |
output_assistant_response(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}") | |
def load_tools(self): | |
newToolsImported = [] | |
for directory in TOOLS_DIRECTORIES: | |
for filename in os.listdir(directory): | |
if filename.endswith(".py") and filename != "__init__.py": | |
module_name = filename[:-3] | |
spec = importlib.util.spec_from_file_location(module_name, f"{directory}/{filename}") | |
foo = importlib.util.module_from_spec(spec) | |
spec.loader.exec_module(foo) | |
class_name = foo.__all__[0] | |
toolClass = getattr(foo, class_name) | |
toolObj = Tool(toolClass) | |
newToolsImported.append(toolObj) | |
self.toolsImported = newToolsImported | |
def runTool(self, toolName, query): | |
output_assistant_response(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}") | |
for tool in self.toolsImported: | |
if tool.name == toolName: | |
self.update_budget(query, tool.inputSchema) | |
return tool.run(query) | |
output_assistant_response(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}") | |
return { | |
"status": "error", | |
"message": f"Tool {toolName} not found", | |
"output": None | |
} | |
def update_budget(self, query, inputSchema): | |
if "creates" in inputSchema: | |
selector = inputSchema["creates"]["selector"] | |
if selector in query: | |
create_cost = inputSchema["creates"]["types"][query[selector]]["create_cost"] | |
if not self.budget_manager.can_spend(create_cost): | |
return { | |
"status": "error", | |
"message": f"Do not have enough budget to create the tool. " | |
+f"Creating the tool costs {create_cost} but only {self.budget_manager.get_current_remaining_budget()} is remaining", | |
"output": None | |
} | |
self.budget_manager.add_to_expense(create_cost) | |
if "invoke_cost" in inputSchema: | |
invoke_cost = inputSchema["invoke_cost"] | |
if not self.budget_manager.can_spend(invoke_cost): | |
return { | |
"status": "error", | |
"message": f"Do not have enough budget to invoke the tool. " | |
+f"Invoking the tool costs {invoke_cost} but only {self.budget_manager.get_current_remaining_budget()} is remaining", | |
"output": None | |
} | |
def getTools(self): | |
toolsList = [] | |
for tool in self.toolsImported: | |
parameters = types.Schema() | |
parameters.type = tool.inputSchema["parameters"]["type"] | |
properties = {} | |
for prop, value in tool.inputSchema["parameters"]["properties"].items(): | |
properties[prop] = types.Schema( | |
type=value["type"], | |
description=value["description"] | |
) | |
parameters.properties = properties | |
parameters.required = tool.inputSchema["parameters"].get("required", []) | |
function = types.FunctionDeclaration( | |
name=tool.inputSchema["name"], | |
description=tool.inputSchema["description"], | |
parameters=parameters, | |
) | |
toolType = types.Tool(function_declarations=[function]) | |
toolsList.append(toolType) | |
return toolsList | |
def delete_tool(self, toolName, toolFile): | |
try: | |
tool_deletor = ToolDeletor() | |
tool_deletor.run(name=toolName, file_path=toolFile) | |
for tool in self.toolsImported: | |
if tool.name == toolName: | |
self.toolsImported.remove(tool) | |
return { | |
"status": "success", | |
"message": f"Tool {toolName} deleted", | |
"output": None | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"message": f"Tool {toolName} not found", | |
"output": None | |
} | |
# toolLoader = ToolLoader() | |
# Example usage | |
# print(toolLoader.getTools()) | |
# print(toolLoader.runTool("AgentCreator", {"agent_name": "Kunla","base_model":"llama3.2","system_prompt": "You love making the indian dish called Kulcha. You declare that in every conversation you have in a witty way." })) |