File size: 7,518 Bytes
9726fac
 
1e2d981
9726fac
2332ba1
 
 
 
 
 
 
 
 
 
1e2d981
9726fac
 
 
e5f6777
1e2d981
 
2332ba1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1e2d981
 
2332ba1
6a34e6a
 
2332ba1
1e2d981
9702f0e
 
 
e5f6777
9726fac
 
 
 
 
1e2d981
 
 
9726fac
 
9702f0e
9726fac
 
 
 
e5f6777
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2332ba1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e5f6777
9726fac
1d9d6ab
9a28b27
9726fac
2332ba1
 
9726fac
1c58dec
e5f6777
2332ba1
 
 
 
 
 
 
 
 
 
 
 
e8e2a24
2332ba1
 
 
 
e8e2a24
2332ba1
 
1c58dec
9726fac
1e2d981
2332ba1
 
 
 
 
 
 
 
 
 
 
9726fac
 
 
8a8d916
e5f6777
 
1e2d981
 
d24f851
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import socket
import subprocess
import gradio as gr
from openai import OpenAI
import json
import sys
from io import StringIO
import traceback
import matplotlib

matplotlib.use("Agg")  # Use non-interactive backend
import matplotlib.pyplot as plt
import base64
from io import BytesIO


subprocess.Popen("bash /home/user/app/start.sh", shell=True)

client = OpenAI(base_url="http://0.0.0.0:8000/v1", api_key="sk-local", timeout=600)


def execute_python_code(code):
    """Execute Python code safely and return results"""
    # Capture stdout
    old_stdout = sys.stdout
    sys.stdout = StringIO()

    # Store any plots
    plt.clf()  # Clear any existing plots

    try:
        # Execute the code
        exec_globals = {
            "plt": plt,
            "matplotlib": matplotlib,
            "__builtins__": __builtins__,
            # Add other safe modules as needed
            "json": json,
            "math": __import__("math"),
            "numpy": __import__("numpy"),  # if available
            "pandas": __import__("pandas"),  # if available
        }

        exec(code, exec_globals)

        # Get printed output
        output = sys.stdout.getvalue()

        # Check if there are any plots
        plot_data = None
        if plt.get_fignums():  # If there are active figures
            buf = BytesIO()
            plt.savefig(buf, format="png", bbox_inches="tight", dpi=150)
            buf.seek(0)
            plot_data = base64.b64encode(buf.read()).decode()
            plt.close("all")  # Close all figures

        sys.stdout = old_stdout

        result = {"success": True, "output": output, "plot": plot_data}

        return result

    except Exception as e:
        sys.stdout = old_stdout
        error_msg = f"Error: {str(e)}\n{traceback.format_exc()}"
        return {"success": False, "output": error_msg, "plot": None}


def handle_function_call(function_name, arguments):
    """Handle function calls from the model"""
    if function_name == "browser_search":
        # Implement your browser search logic here
        query = arguments.get("query", "")
        max_results = arguments.get("max_results", 5)
        return f"Search results for '{query}' (max {max_results} results): [Implementation needed]"

    elif function_name == "code_interpreter":
        code = arguments.get("code", "")
        if not code:
            return "No code provided to execute."

        result = execute_python_code(code)

        if result["success"]:
            response = f"Code executed successfully:\n\n```\n{result['output']}\n```"
            if result["plot"]:
                response += (
                    f"\n\n[Plot generated - base64 data: {result['plot'][:50]}...]"
                )
            return response
        else:
            return f"Code execution failed:\n\n```\n{result['output']}\n```"

    return f"Unknown function: {function_name}"


def respond(
    message,
    history: list[tuple[str, str]] = [],
    system_message=None,
    max_tokens=None,
    temperature=0.7,
):
    messages = []
    if system_message:
        messages = [{"role": "system", "content": system_message}]

    for user, assistant in history:
        if user:
            messages.append({"role": "user", "content": user})
        if assistant:
            messages.append({"role": "assistant", "content": assistant})

    messages.append({"role": "user", "content": message})

    try:
        stream = client.chat.completions.create(
            model="Deepseek-R1-0528-Qwen3-8B",
            messages=messages,
            max_tokens=max_tokens,
            temperature=temperature,
            stream=True,
            tools=[
                {
                    "type": "function",
                    "function": {
                        "name": "browser_search",
                        "description": (
                            "Search the web for a given query and return the most relevant results."
                        ),
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "query": {
                                    "type": "string",
                                    "description": "The search query string.",
                                },
                                "max_results": {
                                    "type": "integer",
                                    "description": (
                                        "Maximum number of search results to return. "
                                        "If omitted the service will use its default."
                                    ),
                                    "default": 5,
                                },
                            },
                            "required": ["query"],
                        },
                    },
                },
                {
                    "type": "function",
                    "function": {
                        "name": "code_interpreter",
                        "description": (
                            "Execute Python code and return the results. "
                            "Can generate plots, perform calculations, and data analysis."
                        ),
                        "parameters": {
                            "type": "object",
                            "properties": {
                                "code": {
                                    "type": "string",
                                    "description": "The Python code to execute.",
                                },
                            },
                            "required": ["code"],
                        },
                    },
                },
            ],
        )

        print("messages", messages)
        output = ""
        function_calls_to_handle = []

        for chunk in stream:
            delta = chunk.choices[0].delta

            # Handle function calls
            if hasattr(delta, "tool_calls") and delta.tool_calls:
                for tool_call in delta.tool_calls:
                    if tool_call.function:
                        function_calls_to_handle.append(
                            {
                                "name": tool_call.function.name,
                                "arguments": json.loads(tool_call.function.arguments),
                            }
                        )

            # Handle regular content
            try:
                if hasattr(delta, "reasoning_content") and delta.reasoning_content:
                    output += delta.reasoning_content
                elif delta.content:
                    output += delta.content
            except:
                if delta.content:
                    output += delta.content

            yield output

        # Handle any function calls that were made
        if function_calls_to_handle:
            for func_call in function_calls_to_handle:
                func_result = handle_function_call(
                    func_call["name"], func_call["arguments"]
                )
                output += (
                    f"\n\n**Function Result ({func_call['name']}):**\n{func_result}"
                )
                yield output

    except Exception as e:
        print(f"[Error] {e}")
        yield "⚠️ Llama.cpp server error"


demo = gr.ChatInterface(respond)

if __name__ == "__main__":
    demo.launch(show_api=False)