Spaces:
Running
Running
File size: 6,387 Bytes
576227b 967c695 576227b 967c695 576227b 967c695 576227b 967c695 576227b 967c695 576227b 967c695 576227b 967c695 576227b 967c695 576227b 967c695 576227b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
import importlib
import importlib.util
import os
import types
import pip
from google.genai import types
import sys
from src.budget_manager import BudgetManager
from src.singleton import singleton
from src.utils.suppress_outputs import suppress_output
from tools.get_agents_tool import GetAgents
from tools.tool_deletor import ToolDeletor
toolsImported = []
TOOLS_DIRECTORY = os.path.abspath("./tools")
class Tool:
def __init__(self, toolClass):
suppress_output(self.load_tool)(toolClass)
def load_tool(self, toolClass):
self.tool = toolClass()
self.inputSchema = self.tool.inputSchema
self.name = self.inputSchema["name"]
self.description = self.inputSchema["description"]
self.dependencies = self.tool.dependencies
for package in self.tool.dependencies:
try:
__import__(package.split('==')[0])
except ImportError:
print(f"Installing {package}")
if '==' in package:
package = package.split('==')[0]
pip.main(['install', package])
def run(self, query):
return self.tool.run(**query)
@singleton
class ToolLoader:
toolsImported = []
budget_manager = BudgetManager()
def __init__(self):
self.load_tools()
self.load_costs()
def load_costs(self):
get_agents = GetAgents()
agents = get_agents.run()["agents"]
for agent in agents:
agentConfig = agents[agent]
if agentConfig["create_cost"] is not None:
self.budget_manager.add_to_expense(agentConfig["create_cost"])
for tool in self.toolsImported:
if "create_cost" in tool.inputSchema:
if tool.inputSchema["create_cost"] is not None:
self.budget_manager.add_to_expense(tool.inputSchema["create_cost"])
print(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}")
def load_tools(self):
newToolsImported = []
for filename in os.listdir(TOOLS_DIRECTORY):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3]
spec = importlib.util.spec_from_file_location(module_name, f"{TOOLS_DIRECTORY}/{filename}")
foo = importlib.util.module_from_spec(spec)
spec.loader.exec_module(foo)
class_name = foo.__all__[0]
toolClass = getattr(foo, class_name)
toolObj = Tool(toolClass)
newToolsImported.append(toolObj)
self.toolsImported = newToolsImported
def runTool(self, toolName, query):
print(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}")
for tool in self.toolsImported:
if tool.name == toolName:
self.update_budget(query, tool.inputSchema)
return tool.run(query)
print(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}")
return {
"status": "error",
"message": f"Tool {toolName} not found",
"output": None
}
def update_budget(self, query, inputSchema):
if "creates" in inputSchema:
selector = inputSchema["creates"]["selector"]
if selector in query:
create_cost = inputSchema["creates"]["types"][query[selector]]["create_cost"]
if not self.budget_manager.can_spend(create_cost):
return {
"status": "error",
"message": f"Do not have enough budget to create the tool. "
+f"Creating the tool costs {create_cost} but only {self.budget_manager.get_current_remaining_budget()} is remaining",
"output": None
}
self.budget_manager.add_to_expense(create_cost)
if "invoke_cost" in inputSchema:
invoke_cost = inputSchema["invoke_cost"]
if not self.budget_manager.can_spend(invoke_cost):
return {
"status": "error",
"message": f"Do not have enough budget to invoke the tool. "
+f"Invoking the tool costs {invoke_cost} but only {self.budget_manager.get_current_remaining_budget()} is remaining",
"output": None
}
def getTools(self):
toolsList = []
for tool in self.toolsImported:
parameters = types.Schema()
parameters.type = tool.inputSchema["parameters"]["type"]
properties = {}
for prop, value in tool.inputSchema["parameters"]["properties"].items():
properties[prop] = types.Schema(
type=value["type"],
description=value["description"]
)
parameters.properties = properties
parameters.required = tool.inputSchema["parameters"].get("required", [])
function = types.FunctionDeclaration(
name=tool.inputSchema["name"],
description=tool.inputSchema["description"],
parameters=parameters,
)
toolType = types.Tool(function_declarations=[function])
toolsList.append(toolType)
return toolsList
def delete_tool(self, toolName, toolFile):
try:
tool_deletor = ToolDeletor()
tool_deletor.run(name=toolName, file_path=toolFile)
for tool in self.toolsImported:
if tool.name == toolName:
self.toolsImported.remove(tool)
return {
"status": "success",
"message": f"Tool {toolName} deleted",
"output": None
}
except Exception as e:
return {
"status": "error",
"message": f"Tool {toolName} not found",
"output": None
}
toolLoader = ToolLoader()
# Example usage
# print(toolLoader.getTools())
# print(toolLoader.runTool("AgentCreator", {"agent_name": "Kunla","base_model":"llama3.2","system_prompt": "You love making the indian dish called Kulcha. You declare that in every conversation you have in a witty way." })) |