|
import os |
|
import io |
|
import uuid |
|
import base64 |
|
import traceback |
|
import contextlib |
|
from typing import Dict, Any |
|
import numpy as np |
|
import pandas as pd |
|
import matplotlib.pyplot as plt |
|
from PIL import Image |
|
from langchain_core.tools import tool |
|
|
|
class CodeInterpreter: |
|
|
|
def __init__(self, allowed_modules = None, max_execution_time = 30, working_directory = None): |
|
"""Initialize the code interpreter with safety measures.""" |
|
self.allowed_modules = allowed_modules or [ |
|
"numpy", "pandas", "matplotlib", "scipy", "sklearn", |
|
"math", "random", "statistics", "datetime", "collections", |
|
"itertools", "functools", "operator", "re", "json", |
|
"sympy", "networkx", "nltk", "PIL", "pytesseract", |
|
"cmath", "uuid", "tempfile", "requests", "urllib" |
|
] |
|
self.max_execution_time = max_execution_time |
|
self.working_directory = working_directory or os.path.join(os.getcwd()) |
|
if not os.path.exists(self.working_directory): |
|
os.makedirs(self.working_directory) |
|
|
|
self.globals = { |
|
"__builtins__": __builtins__, |
|
"np": np, |
|
"pd": pd, |
|
"plt": plt, |
|
"Image": Image, |
|
} |
|
|
|
def execute_code(self, code: str, language: str = "python") -> Dict[str, Any]: |
|
"""Execute the provided code in the selected programming language.""" |
|
language = language.lower() |
|
execution_id = str(uuid.uuid4()) |
|
|
|
result = { |
|
"execution_id": execution_id, |
|
"status": "error", |
|
"stdout": "", |
|
"stderr": "", |
|
"result": None, |
|
"plots": [], |
|
"dataframes": [] |
|
} |
|
|
|
try: |
|
return self._execute_python(code, execution_id) |
|
except Exception as e: |
|
result["stderr"] = f"Unsupported Language: {str(e)}" |
|
|
|
return result |
|
|
|
def _execute_python(self, code: str, execution_id: str) -> dict: |
|
output_buffer = io.StringIO() |
|
error_buffer = io.StringIO() |
|
result = { |
|
"execution_id": execution_id, |
|
"status": "error", |
|
"stdout": "", |
|
"stderr": "", |
|
"result": None, |
|
"plots": [], |
|
"dataframes": [] |
|
} |
|
|
|
try: |
|
exec_dir = os.path.join(self.working_directory, execution_id) |
|
os.makedirs(exec_dir, exist_ok=True) |
|
plt.switch_backend('Agg') |
|
|
|
with contextlib.redirect_stdout(output_buffer), contextlib.redirect_stderr(error_buffer): |
|
exec_result = exec(code, self.globals) |
|
|
|
if plt.get_fignums(): |
|
for i, fig_num in enumerate(plt.get_fignums()): |
|
fig = plt.figure(fig_num) |
|
img_path = os.path.join(exec_dir, f"plot_{i}.png") |
|
fig.savefig(img_path) |
|
with open(img_path, "rb") as img_file: |
|
img_data = base64.b64encode(img_file.read()).decode('utf-8') |
|
result["plots"].append({ |
|
"figure_number": fig_num, |
|
"data": img_data |
|
}) |
|
|
|
for var_name, var_value in self.globals.items(): |
|
if isinstance(var_value, pd.DataFrame) and len(var_value) > 0: |
|
result["dataframes"].append({ |
|
"name": var_name, |
|
"head": var_value.head().to_dict(), |
|
"shape": var_value.shape, |
|
"dtypes": str(var_value.dtypes) |
|
}) |
|
|
|
result["status"] = "success" |
|
result["stdout"] = output_buffer.getvalue() |
|
result["result"] = exec_result |
|
|
|
except Exception as e: |
|
result["status"] = "error" |
|
result["stderr"] = f"{error_buffer.getvalue()}\n{traceback.format_exc()}" |
|
|
|
return result |
|
|
|
|
|
interpreter_instance = CodeInterpreter() |
|
@tool |
|
def execute_code_lang(code: str, language: str = "python") -> str: |
|
"""Execute code in python |
|
Args: |
|
code (str): The source code to execute. |
|
language (str): The language of the code. Supported: "python". |
|
Returns: |
|
A string summarizing the execution results (stdout, stderr, errors, plots, dataframes if any). |
|
""" |
|
supported_language = "python" |
|
language = language.lower() |
|
|
|
if language != supported_language: |
|
return f"❌ Unsupported language: {language}." |
|
|
|
result = interpreter_instance.execute_code(code, language=language) |
|
|
|
response = [] |
|
|
|
if result["status"] == "success": |
|
response.append(f"✅ Code executed successfully in **{language.upper()}**") |
|
|
|
if result.get("stdout"): |
|
response.append( |
|
"\n**Standard Output:**\n```\n" + result["stdout"].strip() + "\n```" |
|
) |
|
|
|
if result.get("stderr"): |
|
response.append( |
|
"\n**Standard Error (if any):**\n```\n" |
|
+ result["stderr"].strip() |
|
+ "\n```" |
|
) |
|
|
|
if result.get("result") is not None: |
|
response.append( |
|
"\n**Execution Result:**\n```\n" |
|
+ str(result["result"]).strip() |
|
+ "\n```" |
|
) |
|
|
|
if result.get("dataframes"): |
|
for df_info in result["dataframes"]: |
|
response.append( |
|
f"\n**DataFrame `{df_info['name']}` (Shape: {df_info['shape']})**" |
|
) |
|
df_preview = pd.DataFrame(df_info["head"]) |
|
response.append("First 5 rows:\n```\n" + str(df_preview) + "\n```") |
|
|
|
if result.get("plots"): |
|
response.append( |
|
f"\n**Generated {len(result['plots'])} plot(s)** (Image data returned separately)" |
|
) |
|
|
|
else: |
|
response.append(f"❌ Code execution failed in **{language.upper()}**") |
|
if result.get("stderr"): |
|
response.append( |
|
"\n**Error Log:**\n```\n" + result["stderr"].strip() + "\n```" |
|
) |
|
|
|
return "\n".join(response) |
|
|