Spaces:
Running
Running
import nbformat | |
from nbconvert import HTMLExporter | |
from traitlets.config import Config | |
import json | |
import copy | |
from jinja2 import DictLoader | |
import datetime | |
system_template = """\ | |
<details> | |
<summary style="display: flex; align-items: center; cursor: pointer;"> | |
<span style="background-color: #374151; color: white; padding: 0px 4px; border-radius: 3px; font-weight: 500; display: inline-block;">System:</span> | |
<span class="arrow" style="margin-left: 8px; font-size: 12px;">▶</span> | |
</summary> | |
<div style="margin-top: 8px; padding: 8px; background-color: #f9fafb; border-radius: 4px; border-left: 3px solid #374151;"> | |
{} | |
</div> | |
</details> | |
<style> | |
details > summary .arrow {{ | |
display: inline-block; | |
transition: transform 0.2s; | |
}} | |
details[open] > summary .arrow {{ | |
transform: rotate(90deg); | |
}} | |
details > summary {{ | |
list-style: none; | |
}} | |
details > summary::-webkit-details-marker {{ | |
display: none; | |
}} | |
</style> | |
""" | |
user_template = """\ | |
<span style="background-color: #166534; color: white; padding: 0px 4px; border-radius: 3px; font-weight: 500; display: inline-block; margin-bottom: 0px;">User:</span> {}""" | |
assistant_thinking_template = """\ | |
<span style="background-color: #1d5b8e; color: white; padding: 0px 4px; border-radius: 3px; font-weight: 500; display: inline-block; margin-bottom: 0px;">Assistant:</span> {}""" | |
assistant_final_answer_template = """<div class="alert alert-block alert-warning"> | |
<b>Assistant:</b> Final answer: {} | |
</div> | |
""" | |
header_message = """<p align="center"> | |
<img src="https://huggingface.co/spaces/lvwerra/jupyter-agent/resolve/main/jupyter-agent.png" alt="Jupyter Agent Logo" /> | |
</p> | |
<p style="text-align:center;">Let a LLM agent write and execute code inside a notebook!</p>""" | |
bad_html_bad = """input[type="file"] { | |
display: block; | |
}""" | |
EXECUTING_WIDGET = """ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 8px 12px; background-color: #e3f2fd; border-radius: 6px; border-left: 3px solid #2196f3;"> | |
<div style="display: flex; gap: 4px;"> | |
<div style="width: 6px; height: 6px; background-color: #2196f3; border-radius: 50%; animation: pulse 1.5s ease-in-out infinite;"></div> | |
<div style="width: 6px; height: 6px; background-color: #2196f3; border-radius: 50%; animation: pulse 1.5s ease-in-out 0.1s infinite;"></div> | |
<div style="width: 6px; height: 6px; background-color: #2196f3; border-radius: 50%; animation: pulse 1.5s ease-in-out 0.2s infinite;"></div> | |
</div> | |
<span style="color: #1976d2; font-size: 14px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
Executing code... | |
</span> | |
</div> | |
<style> | |
@keyframes pulse { | |
0%, 80%, 100% { | |
opacity: 0.3; | |
transform: scale(0.8); | |
} | |
40% { | |
opacity: 1; | |
transform: scale(1); | |
} | |
} | |
</style> | |
""" | |
GENERATING_WIDGET = """ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 8px 12px; background-color: #f3e5f5; border-radius: 6px; border-left: 3px solid #9c27b0;"> | |
<div style="width: 80px; height: 4px; background-color: #e1bee7; border-radius: 2px; overflow: hidden;"> | |
<div style="width: 30%; height: 100%; background-color: #9c27b0; border-radius: 2px; animation: progress 2s ease-in-out infinite;"></div> | |
</div> | |
<span style="color: #7b1fa2; font-size: 14px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
Generating... | |
</span> | |
</div> | |
<style> | |
@keyframes progress { | |
0% { transform: translateX(-100%); } | |
100% { transform: translateX(250%); } | |
} | |
</style> | |
""" | |
DONE_WIDGET = """ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 8px 12px; background-color: #e8f5e8; border-radius: 6px; border-left: 3px solid #4caf50;"> | |
<div style="width: 16px; height: 16px; background-color: #4caf50; border-radius: 50%; display: flex; align-items: center; justify-content: center;"> | |
<svg width="10" height="8" viewBox="0 0 10 8" fill="none"> | |
<path d="M1 4L3.5 6.5L9 1" stroke="white" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round"/> | |
</svg> | |
</div> | |
<span style="color: #2e7d32; font-size: 14px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
Generation complete | |
</span> | |
</div> | |
""" | |
DONE_WIDGET = """ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 8px 12px; background-color: #e8f5e8; border-radius: 6px; border-left: 3px solid #4caf50; animation: fadeInOut 4s ease-in-out forwards;"> | |
<div style="width: 16px; height: 16px; background-color: #4caf50; border-radius: 50%; display: flex; align-items: center; justify-content: center;"> | |
<svg width="10" height="8" viewBox="0 0 10 8" fill="none"> | |
<path d="M1 4L3.5 6.5L9 1" stroke="white" stroke-width="1.5" stroke-linecap="round" stroke-linejoin="round"/> | |
</svg> | |
</div> | |
<span style="color: #2e7d32; font-size: 14px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
Generation complete | |
</span> | |
</div> | |
<style> | |
@keyframes fadeInOut { | |
0% { opacity: 0; transform: translateY(10px); } | |
15% { opacity: 1; transform: translateY(0); } | |
85% { opacity: 1; transform: translateY(0); } | |
100% { opacity: 0; transform: translateY(-10px); } | |
} | |
</style> | |
""" | |
ERROR_HTML = """\ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 12px; background-color: #ffebee; border-radius: 6px; border-left: 3px solid #f44336; margin: 8px 0;"> | |
<div style="width: 20px; height: 20px; background-color: #f44336; border-radius: 50%; display: flex; align-items: center; justify-content: center; color: white; font-weight: bold; font-size: 12px;"> | |
! | |
</div> | |
<div style="color: #c62828; font-size: 14px; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
<strong>Error:</strong> {} | |
</div> | |
</div>""" | |
STOPPED_SANDBOX_HTML = """ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 8px 12px; background-color: #f5f5f5; border-radius: 6px; border-left: 3px solid #9e9e9e; margin-bottom: 16px;"> | |
<div style="width: 16px; height: 16px; background-color: #9e9e9e; border-radius: 50%; display: flex; align-items: center; justify-content: center; color: white; font-weight: bold; font-size: 10px;"> | |
⏹ | |
</div> | |
<div style="flex: 1;"> | |
<div style="margin-bottom: 4px; font-size: 13px; color: #757575; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; font-weight: 500;"> | |
Sandbox stopped | |
</div> | |
<div style="width: 100%; height: 8px; background-color: #e0e0e0; border-radius: 4px; overflow: hidden;"> | |
<div style="height: 100%; background-color: #9e9e9e; border-radius: 4px; width: 100%;"></div> | |
</div> | |
<div style="display: flex; justify-content: space-between; margin-top: 4px; font-size: 11px; color: #757575; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
<span>Started: {start_time}</span> | |
<span>Expired: {end_time}</span> | |
</div> | |
</div> | |
</div> | |
""" | |
TIMEOUT_HTML = """ | |
<div style="display: flex; align-items: center; gap: 8px; padding: 8px 12px; background-color: #fff3e0; border-radius: 6px; border-left: 3px solid #ff9800; margin-bottom: 16px;"> | |
<div style="width: 16px; height: 16px; background-color: #ff9800; border-radius: 50%; display: flex; align-items: center; justify-content: center; color: white; font-weight: bold; font-size: 10px;"> | |
⏱ | |
</div> | |
<div style="flex: 1;"> | |
<div style="margin-bottom: 4px; font-size: 13px; color: #f57c00; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; font-weight: 500;"> | |
The E2B Sandbox for code execution has a timeout of {total_seconds} seconds. | |
</div> | |
<div style="width: 100%; height: 8px; background-color: #ffe0b3; border-radius: 4px; overflow: hidden;"> | |
<div id="progress-bar-{unique_id}" style="height: 100%; background: linear-gradient(90deg, #ff9800 0%, #f57c00 50%, #f44336 100%); border-radius: 4px; width: {current_progress}%; animation: progress-fill-{unique_id} {remaining_seconds}s linear forwards;"></div> | |
</div> | |
<div style="display: flex; justify-content: space-between; margin-top: 4px; font-size: 11px; color: #f57c00; font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;"> | |
<span>Started: {start_time}</span> | |
<span>Expires: {end_time}</span> | |
</div> | |
</div> | |
</div> | |
<style> | |
@keyframes progress-fill-{unique_id} {{ | |
from {{ width: {current_progress}%; }} | |
to {{ width: 100%; }} | |
}} | |
</style> | |
""" | |
# just make the code font a bit smaller | |
custom_css = """ | |
<style type="text/css"> | |
/* Code font size */ | |
.highlight pre, .highlight code, | |
div.input_area pre, div.output_area pre { | |
font-size: 12px !important; | |
line-height: 1.4 !important; | |
} | |
/* Fix prompt truncation */ | |
.jp-InputPrompt, .jp-OutputPrompt { | |
text-overflow: clip !important; | |
} | |
</style> | |
""" | |
# Configure the exporter | |
config = Config() | |
html_exporter = HTMLExporter(config=config, template_name="classic") | |
class JupyterNotebook: | |
def __init__(self, messages=None): | |
self.exec_count = 0 | |
self.countdown_info = None | |
if messages is None: | |
messages = [] | |
self.data, self.code_cell_counter = self.create_base_notebook(messages) | |
def create_base_notebook(self, messages): | |
base_notebook = { | |
"metadata": { | |
"kernel_info": {"name": "python3"}, | |
"language_info": { | |
"name": "python", | |
"version": "3.12", | |
}, | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 0, | |
"cells": [] | |
} | |
# Add header | |
base_notebook["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": header_message | |
}) | |
# Set initial data | |
self.data = base_notebook | |
# Add empty code cell if no messages | |
if len(messages) == 0: | |
self.data["cells"].append({ | |
"cell_type": "code", | |
"execution_count": None, | |
"metadata": {}, | |
"source": "", | |
"outputs": [] | |
}) | |
return self.data, 0 | |
# Process messages using existing methods | |
i = 0 | |
while i < len(messages): | |
message = messages[i] | |
if message["role"] == "system": | |
self.add_markdown(message["content"], "system") | |
elif message["role"] == "user": | |
self.add_markdown(message["content"], "user") | |
elif message["role"] == "assistant": | |
if "tool_calls" in message: | |
# Add assistant thinking if there's content | |
if message.get("content"): | |
self.add_markdown(message["content"], "assistant") | |
# Process tool calls - we know the next message(s) will be tool responses | |
for tool_call in message["tool_calls"]: | |
if tool_call["function"]["name"] == "add_and_execute_jupyter_code_cell": | |
tool_args = json.loads(tool_call["function"]["arguments"]) | |
code = tool_args["code"] | |
# Get the next tool response (guaranteed to exist) | |
tool_message = messages[i + 1] | |
if tool_message["role"] == "tool" and tool_message.get("tool_call_id") == tool_call["id"]: | |
# Use the raw execution directly! | |
execution = tool_message["raw_execution"] | |
self.add_code_execution(code, execution, parsed=True) | |
i += 1 # Skip the tool message since we just processed it | |
else: | |
# Regular assistant message | |
self.add_markdown(message["content"], "assistant") | |
elif message["role"] == "tool": | |
# Skip - should have been handled with corresponding tool_calls | |
# This shouldn't happen given our assumptions, but just in case | |
pass | |
i += 1 | |
return self.data, 0 | |
def _update_countdown_cell(self): | |
if not self.countdown_info: | |
return | |
start_time = self.countdown_info['start_time'] | |
end_time = self.countdown_info['end_time'] | |
current_time = datetime.datetime.now(datetime.timezone.utc) | |
remaining_time = end_time - current_time | |
# Show stopped message if expired | |
if remaining_time.total_seconds() <= 0: | |
# Format display for stopped sandbox | |
start_display = start_time.strftime("%H:%M") | |
end_display = end_time.strftime("%H:%M") | |
stopped_html = STOPPED_SANDBOX_HTML.format( | |
start_time=start_display, | |
end_time=end_display | |
) | |
# Update countdown cell to show stopped message | |
stopped_cell = { | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": stopped_html | |
} | |
# Find and update existing countdown cell | |
for i, cell in enumerate(self.data["cells"]): | |
if cell.get("cell_type") == "markdown" and ("⏱" in str(cell.get("source", "")) or "⏹" in str(cell.get("source", ""))): | |
self.data["cells"][i] = stopped_cell | |
break | |
return | |
# Calculate current progress | |
total_duration = end_time - start_time | |
elapsed_time = current_time - start_time | |
current_progress = (elapsed_time.total_seconds() / total_duration.total_seconds()) * 100 | |
current_progress = max(0, min(100, current_progress)) | |
# Format display | |
start_display = start_time.strftime("%H:%M") | |
end_display = end_time.strftime("%H:%M") | |
remaining_seconds = int(remaining_time.total_seconds()) | |
remaining_minutes = remaining_seconds // 60 | |
remaining_secs = remaining_seconds % 60 | |
remaining_display = f"{remaining_minutes}:{remaining_secs:02d}" | |
# Generate unique ID to avoid CSS conflicts when updating | |
unique_id = int(current_time.timestamp() * 1000) % 100000 | |
# Calculate total timeout duration in seconds | |
total_seconds = int(total_duration.total_seconds()) | |
countdown_html = TIMEOUT_HTML.format( | |
start_time=start_display, | |
end_time=end_display, | |
current_progress=current_progress, | |
remaining_seconds=remaining_seconds, | |
unique_id=unique_id, | |
total_seconds=total_seconds | |
) | |
# Update or insert the countdown cell | |
countdown_cell = { | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": countdown_html | |
} | |
# Find existing countdown cell by looking for the timer emoji | |
found_countdown = False | |
for i, cell in enumerate(self.data["cells"]): | |
if cell.get("cell_type") == "markdown" and "⏱" in str(cell.get("source", "")): | |
# Update existing countdown cell | |
self.data["cells"][i] = countdown_cell | |
found_countdown = True | |
break | |
if not found_countdown: | |
# Insert new countdown cell at position 1 (after header) | |
self.data["cells"].insert(1, countdown_cell) | |
def add_sandbox_countdown(self, start_time, end_time): | |
# Store the countdown info for later updates | |
self.countdown_info = { | |
'start_time': start_time, | |
'end_time': end_time, | |
'cell_index': 1 # Remember where we put it | |
} | |
def add_code_execution(self, code, execution, parsed=False): | |
self.exec_count += 1 | |
self.data["cells"].append({ | |
"cell_type": "code", | |
"execution_count": self.exec_count, | |
"metadata": {}, | |
"source": code, | |
"outputs": execution if parsed else self.parse_exec_result_nb(execution) | |
}) | |
def add_code(self, code): | |
"""Add a code cell without execution results""" | |
self.exec_count += 1 | |
self.data["cells"].append({ | |
"cell_type": "code", | |
"execution_count": self.exec_count, | |
"metadata": {}, | |
"source": code, | |
"outputs": [] | |
}) | |
def append_execution(self, execution): | |
"""Append execution results to the immediate previous cell if it's a code cell""" | |
if (len(self.data["cells"]) > 0 and | |
self.data["cells"][-1]["cell_type"] == "code"): | |
self.data["cells"][-1]["outputs"] = self.parse_exec_result_nb(execution) | |
else: | |
raise ValueError("Cannot append execution: previous cell is not a code cell") | |
def add_markdown(self, markdown, role="markdown"): | |
if role == "system": | |
system_message = markdown if markdown else "default" | |
markdown_formatted = system_template.format(system_message.replace('\n', '<br>')) | |
elif role == "user": | |
markdown_formatted = user_template.format(markdown.replace('\n', '<br>')) | |
elif role == "assistant": | |
markdown_formatted = assistant_thinking_template.format(markdown) | |
markdown_formatted = markdown_formatted.replace('<think>', '<think>') | |
markdown_formatted = markdown_formatted.replace('</think>', '</think>') | |
else: | |
# Default case for raw markdown | |
markdown_formatted = markdown | |
self.data["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": markdown_formatted | |
}) | |
def add_error(self, error_message): | |
"""Add an error message cell to the notebook""" | |
error_html = ERROR_HTML.format(error_message) | |
self.data["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": error_html | |
}) | |
def add_final_answer(self, answer): | |
self.data["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": assistant_final_answer_template.format(answer) | |
}) | |
def parse_exec_result_nb(self, execution): | |
"""Convert an E2B Execution object to Jupyter notebook cell output format""" | |
outputs = [] | |
if execution.logs.stdout: | |
outputs.append({ | |
'output_type': 'stream', | |
'name': 'stdout', | |
'text': ''.join(execution.logs.stdout) | |
}) | |
if execution.logs.stderr: | |
outputs.append({ | |
'output_type': 'stream', | |
'name': 'stderr', | |
'text': ''.join(execution.logs.stderr) | |
}) | |
if execution.error: | |
outputs.append({ | |
'output_type': 'error', | |
'ename': execution.error.name, | |
'evalue': execution.error.value, | |
'traceback': [line for line in execution.error.traceback.split('\n')] | |
}) | |
for result in execution.results: | |
output = { | |
'output_type': 'execute_result' if result.is_main_result else 'display_data', | |
'metadata': {}, | |
'data': {} | |
} | |
if result.text: | |
output['data']['text/plain'] = result.text | |
if result.html: | |
output['data']['text/html'] = result.html | |
if result.png: | |
output['data']['image/png'] = result.png | |
if result.svg: | |
output['data']['image/svg+xml'] = result.svg | |
if result.jpeg: | |
output['data']['image/jpeg'] = result.jpeg | |
if result.pdf: | |
output['data']['application/pdf'] = result.pdf | |
if result.latex: | |
output['data']['text/latex'] = result.latex | |
if result.json: | |
output['data']['application/json'] = result.json | |
if result.javascript: | |
output['data']['application/javascript'] = result.javascript | |
if result.is_main_result and execution.execution_count is not None: | |
output['execution_count'] = execution.execution_count | |
if output['data']: | |
outputs.append(output) | |
return outputs | |
def filter_base64_images(self, message): | |
"""Filter out base64 encoded images from message content""" | |
if isinstance(message, dict) and 'nbformat' in message: | |
for output in message['nbformat']: | |
if 'data' in output: | |
for key in list(output['data'].keys()): | |
if key.startswith('image/') or key == 'application/pdf': | |
output['data'][key] = '<placeholder_image>' | |
return message | |
def render(self, mode="default"): | |
if self.countdown_info is not None: | |
self._update_countdown_cell() | |
render_data = copy.deepcopy(self.data) | |
if mode == "generating": | |
render_data["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": GENERATING_WIDGET | |
}) | |
elif mode == "executing": | |
render_data["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": EXECUTING_WIDGET | |
}) | |
elif mode == "done": | |
render_data["cells"].append({ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": DONE_WIDGET | |
}) | |
elif mode != "default": | |
raise ValueError(f"Render mode should be generating, executing or done. Given: {mode}.") | |
notebook = nbformat.from_dict(render_data) | |
notebook_body, _ = html_exporter.from_notebook_node(notebook) | |
notebook_body = notebook_body.replace(bad_html_bad, "") | |
# make code font a bit smaller with custom css | |
if "<head>" in notebook_body: | |
notebook_body = notebook_body.replace("</head>", f"{custom_css}</head>") | |
return notebook_body |