File size: 6,387 Bytes
576227b
 
 
 
 
 
 
 
967c695
576227b
 
967c695
 
576227b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
967c695
576227b
 
 
967c695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
576227b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
967c695
576227b
 
967c695
576227b
967c695
576227b
 
 
 
 
967c695
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
576227b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
967c695
 
576227b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import importlib
import importlib.util
import os
import types
import pip
from google.genai import types
import sys

from src.budget_manager import BudgetManager
from src.singleton import singleton
from src.utils.suppress_outputs import suppress_output
from tools.get_agents_tool import GetAgents
from tools.tool_deletor import ToolDeletor

toolsImported = []

TOOLS_DIRECTORY = os.path.abspath("./tools")

class Tool:
    def __init__(self, toolClass):
        suppress_output(self.load_tool)(toolClass)

    def load_tool(self, toolClass):
        self.tool = toolClass()
        self.inputSchema = self.tool.inputSchema
        self.name = self.inputSchema["name"]
        self.description = self.inputSchema["description"]
        self.dependencies = self.tool.dependencies
        for package in self.tool.dependencies:
            try:
                __import__(package.split('==')[0])
            except ImportError:
                print(f"Installing {package}")
                if '==' in package:
                    package = package.split('==')[0]
                pip.main(['install', package])

    def run(self, query):
        return self.tool.run(**query)

@singleton
class ToolLoader:
    toolsImported = []
    budget_manager = BudgetManager()

    def __init__(self):
        self.load_tools()
        self.load_costs()
    
    def load_costs(self):
        get_agents = GetAgents()
        agents = get_agents.run()["agents"]
        for agent in agents:
            agentConfig = agents[agent]
            if agentConfig["create_cost"] is not None:
                self.budget_manager.add_to_expense(agentConfig["create_cost"])
        for tool in self.toolsImported:
            if "create_cost" in tool.inputSchema:
                if tool.inputSchema["create_cost"] is not None:
                    self.budget_manager.add_to_expense(tool.inputSchema["create_cost"])

        print(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}")

    def load_tools(self):
        newToolsImported = []
        for filename in os.listdir(TOOLS_DIRECTORY):
            if filename.endswith(".py") and filename != "__init__.py":
                module_name = filename[:-3]
                spec = importlib.util.spec_from_file_location(module_name, f"{TOOLS_DIRECTORY}/{filename}")
                foo = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(foo)
                class_name = foo.__all__[0]
                toolClass = getattr(foo, class_name)
                toolObj = Tool(toolClass)
                newToolsImported.append(toolObj)
        self.toolsImported = newToolsImported

    def runTool(self, toolName, query):
        print(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}")
        for tool in self.toolsImported:
            if tool.name == toolName:
                self.update_budget(query, tool.inputSchema)
                return tool.run(query)
        print(f"Budget Remaining: {self.budget_manager.get_current_remaining_budget()}")
        return {
            "status": "error",
            "message": f"Tool {toolName} not found",
            "output": None
        }
    
    def update_budget(self, query, inputSchema):
        if "creates" in inputSchema:
            selector = inputSchema["creates"]["selector"]
            if selector in query:
                create_cost = inputSchema["creates"]["types"][query[selector]]["create_cost"]
                if not self.budget_manager.can_spend(create_cost):
                    return {
                        "status": "error",
                        "message": f"Do not have enough budget to create the tool. "
                        +f"Creating the tool costs {create_cost} but only {self.budget_manager.get_current_remaining_budget()} is remaining",
                        "output": None
                    }
                self.budget_manager.add_to_expense(create_cost)
        if "invoke_cost" in inputSchema:
            invoke_cost = inputSchema["invoke_cost"]
            if not self.budget_manager.can_spend(invoke_cost):
                return {
                    "status": "error",
                    "message": f"Do not have enough budget to invoke the tool. "
                    +f"Invoking the tool costs {invoke_cost} but only {self.budget_manager.get_current_remaining_budget()} is remaining",
                    "output": None
                }

    def getTools(self):
        toolsList = []
        for tool in self.toolsImported:
            parameters = types.Schema()
            parameters.type = tool.inputSchema["parameters"]["type"]
            properties = {}
            for prop, value in tool.inputSchema["parameters"]["properties"].items():
                properties[prop] = types.Schema(
                    type=value["type"],
                    description=value["description"]
                )
            parameters.properties = properties
            parameters.required = tool.inputSchema["parameters"].get("required", [])
            function = types.FunctionDeclaration(
                name=tool.inputSchema["name"],
                description=tool.inputSchema["description"],
                parameters=parameters,
            )
            toolType = types.Tool(function_declarations=[function])
            toolsList.append(toolType)
        return toolsList
    
    def delete_tool(self, toolName, toolFile):
        try:
            tool_deletor = ToolDeletor()
            tool_deletor.run(name=toolName, file_path=toolFile)
            for tool in self.toolsImported:
                if tool.name == toolName:
                    self.toolsImported.remove(tool)
                    return {
                        "status": "success",
                        "message": f"Tool {toolName} deleted",
                        "output": None
                    }
        except Exception as e:
            return {
                "status": "error",
                "message": f"Tool {toolName} not found",
                "output": None
            }

toolLoader = ToolLoader()

# Example usage
# print(toolLoader.getTools())
# print(toolLoader.runTool("AgentCreator", {"agent_name": "Kunla","base_model":"llama3.2","system_prompt": "You love making the indian dish called Kulcha. You declare that in every conversation you have in a witty way." }))