GitBot / app.py
acecalisto3's picture
Update app.py
3e6657e verified
raw
history blame
61.8 kB
import gradio as gr
import os
import aiohttp
import asyncio
from git import Repo, GitCommandError
from pathlib import Path
from datetime import datetime
import shutil
import json
import logging
import re
from typing import Dict, List, Optional, Tuple
import subprocess
import plotly.express as px
import plotly.graph_objects as go
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
import speech_recognition as sr
# Removed duplicate import: from code_editor import code_editor
from functools import lru_cache
import hashlib
import markdown2
from concurrent.futures import ThreadPoolExecutor
from hdbscan import HDBSCAN
import websockets
from websockets.exceptions import ConnectionClosed
from code_editor import code_editor, OTCodeEditor # Assuming OTCodeEditor is also in code_editor
# ========== Configuration ==========
WORKSPACE = Path("/tmp/issue_workspace")
WORKSPACE.mkdir(exist_ok=True)
GITHUB_API = "https://api.github.com/repos"
HF_INFERENCE_API = "https://api-inference.huggingface.co/models"
WEBHOOK_PORT = 8000
WS_PORT = 8001
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
executor = ThreadPoolExecutor(max_workers=4)
HF_MODELS = {
"Mistral-8x7B": "mistralai/Mixtral-8x7B-Instruct-v0.1",
"Llama-3-8B": "meta-llama/Meta-Llama-3-8B",
"CodeLlama-34B": "codellama/CodeLlama-34b-Instruct-hf",
"StarCoder2": "bigcode/starcoder2-15b"
}
# Default Model
DEFAULT_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1"
# ========== Modern Theme ==========
# Define the base theme
theme = gr.themes.Soft(
primary_hue="violet",
secondary_hue="emerald",
radius_size="lg",
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"]
).set(
# Apply custom settings using .set()
button_primary_background_fill="linear-gradient(90deg, #8B5CF6 0%, #EC4899 100%)",
button_primary_text_color="white",
# button_primary_border_radius="12px", # <-- FIX: Removed this line causing the TypeError
block_label_text_size="lg",
block_label_text_weight="600",
block_title_text_size="lg",
block_title_text_weight="800",
panel_background_fill="white",
# panel_border_radius="16px", # Assuming this might also cause issues if not supported, commented out as a precaution. Uncomment if needed and supported.
block_shadow="*shadow_drop_lg",
)
# ========== Enhanced Webhook Handler ==========
class WebhookHandler(BaseHTTPRequestHandler):
# Keep a reference to the manager instance
manager_instance = None
def do_POST(self):
content_length = int(self.headers['Content-Length'])
try:
payload = json.loads(self.rfile.read(content_length).decode('utf-8'))
except json.JSONDecodeError:
self.send_response(400)
self.end_headers()
self.wfile.write(b"Invalid JSON payload")
return
except Exception as e:
logger.error(f"Error reading webhook payload: {e}")
self.send_response(500)
self.end_headers()
return
event = self.headers.get('X-GitHub-Event')
logger.info(f"Received GitHub webhook event: {event}")
if event == 'issues' and WebhookHandler.manager_instance:
action = payload.get('action')
logger.info(f"Issue action: {action}")
if action in ['opened', 'reopened', 'closed', 'assigned']:
# Ensure the event loop is running in the webhook thread if needed
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(
WebhookHandler.manager_instance.handle_webhook_event(event, action, payload),
loop
)
elif event == 'ping':
logger.info("Received GitHub webhook ping.")
else:
logger.warning(f"Unhandled event type: {event} or manager not initialized.")
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
# ========== AI-Powered Issue Manager ==========
class IssueManager:
def __init__(self):
self.issues: Dict[int, dict] = {}
self.repo_url: Optional[str] = None
self.repo: Optional[Repo] = None
self.current_issue: Optional[int] = None
self.github_token: Optional[str] = None
self.hf_token: Optional[str] = None
self.collaborators: Dict[str, dict] = {} # Example: {"user1": {"status": "editing file.py"}}
self.points: int = 0
self.severity_rules: Dict[str, List[str]] = {
"Critical": ["critical", "urgent", "security", "crash", "blocker"],
"High": ["high", "important", "error", "regression", "major"],
"Medium": ["medium", "bug", "performance", "minor"],
"Low": ["low", "documentation", "enhancement", "trivial", "feature"]
}
self.issue_clusters: Dict[int, List[int]] = {} # Store clusters: {cluster_id: [issue_index1, issue_index2]}
self.issue_list_for_clustering: List[dict] = [] # Store issues in list order for clustering index mapping
# self._init_local_models() # Consider lazy loading or conditional loading
self.ws_clients: List[websockets.WebSocketClientProtocol] = []
self.code_editors: Dict[int, OTCodeEditor] = {} # Store code editors for each issue
# Placeholder for local model initialization - implement actual loading if needed
def _init_local_models(self):
logger.info("Initializing local models (placeholder)...")
# self.code_model = pipeline(...)
# self.summarizer = pipeline(...)
logger.info("Local models initialized (placeholder).")
# Simple hash for caching based on issue content
def _get_issue_hash(self, issue_data: dict) -> str:
content = f"{issue_data.get('title', '')}{issue_data.get('body', '')}"
return hashlib.md5(content.encode()).hexdigest()
@lru_cache(maxsize=100)
async def cached_suggestion(self, issue_hash: str, model: str) -> str:
# Find the issue corresponding to the hash (inefficient, improve if needed)
found_issue = None
for issue in self.issues.values():
if self._get_issue_hash(issue) == issue_hash:
found_issue = issue
break
if not found_issue:
return "Error: Issue not found for the given hash."
logger.info(f"Cache miss or first request for issue hash {issue_hash}. Requesting suggestion from {model}.")
return await self.suggest_resolution(found_issue, model)
async def handle_webhook_event(self, event: str, action: str, payload: dict):
logger.info(f"Processing webhook event: {event}, action: {action}")
issue_data = payload.get('issue')
if not issue_data:
logger.warning("Webhook payload missing 'issue' data.")
return
issue_number = issue_data.get('number')
if not issue_number:
logger.warning("Webhook issue data missing 'number'.")
return
if action == 'closed':
logger.info(f"Removing closed issue {issue_number} from active list.")
self.issues.pop(issue_number, None)
# Optionally remove associated editor, etc.
self.code_editors.pop(issue_number, None)
elif action in ['opened', 'reopened', 'edited']: # Handle edited issues too
logger.info(f"Adding/Updating issue {issue_number} from webhook.")
self.issues[issue_number] = issue_data
# Potentially trigger re-clustering or update specific issue details
elif action == 'assigned':
logger.info(f"Issue {issue_number} assigned to {payload.get('assignee', {}).get('login', 'N/A')}")
self.issues[issue_number] = issue_data # Update issue data
else:
logger.info(f"Ignoring action '{action}' for issue {issue_number}.")
# Consider triggering a UI update after handling the webhook
# This might involve re-crawling or just updating the specific issue
await self.broadcast_issue_update() # Example function to notify clients
async def crawl_issues(self, repo_url: str, github_token: str, hf_token: str) -> Tuple[List[List], go.Figure, str]:
"""
Crawls issues, updates internal state, performs clustering, and returns data for UI update.
"""
if not repo_url or not github_token or not hf_token:
return [], go.Figure(), "Error: Repository URL, GitHub Token, and HF Token are required."
logger.info(f"Starting issue crawl for {repo_url}")
self.repo_url = repo_url
self.github_token = github_token
self.hf_token = hf_token
self.issues = {} # Reset issues before crawl
# Extract owner/repo from URL
match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", repo_url)
if not match:
logger.error(f"Invalid GitHub URL format: {repo_url}")
return [], go.Figure(), "Error: Invalid GitHub URL format. Use https://github.com/owner/repo"
owner, repo_name = match.groups()
api_url = f"{GITHUB_API}/{owner}/{repo_name}/issues?state=open" # Fetch only open issues
headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json"
}
try:
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(api_url) as response:
response.raise_for_status() # Raise exception for bad status codes
issues_data = await response.json()
logger.info(f"Fetched {len(issues_data)} open issues.")
for issue in issues_data:
issue_number = issue['number']
self.issues[issue_number] = {
"id": issue_number,
"title": issue.get('title', 'No Title'),
"body": issue.get('body', ''),
"state": issue.get('state', 'unknown'),
"labels": [label['name'] for label in issue.get('labels', [])],
"assignee": issue.get('assignee', {}).get('login') if issue.get('assignee') else None,
"url": issue.get('html_url', '#')
# Add other relevant fields if needed
}
if not self.issues:
logger.info("No open issues found.")
return [], go.Figure(), "No open issues found in the repository."
# Prepare data for clustering
self.issue_list_for_clustering = list(self.issues.values())
logger.info("Clustering issues...")
await self._cluster_similar_issues() # Update self.issue_clusters
# Prepare data for Gradio Dataframe
dataframe_data = []
severity_counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0}
# Map clustered indices back to issue numbers and determine severity
cluster_map = {} # {issue_index: cluster_id}
for cluster_id, indices in self.issue_clusters.items():
for index in indices:
cluster_map[index] = cluster_id
for i, issue in enumerate(self.issue_list_for_clustering):
severity = self._determine_severity(issue['labels'])
severity_counts[severity] += 1
cluster_id = cluster_map.get(i, -1) # -1 for noise/unclustered
dataframe_data.append([
issue['id'],
issue['title'],
severity,
cluster_id if cluster_id != -1 else "N/A" # Display N/A for noise
])
logger.info("Generating statistics plot...")
stats_fig = self._generate_stats_plot(severity_counts)
success_msg = f"Found {len(self.issues)} open issues. Clustered into {len(self.issue_clusters)} groups (excluding noise)."
logger.info(success_msg)
return dataframe_data, stats_fig, success_msg
except aiohttp.ClientResponseError as e:
logger.error(f"GitHub API request failed: {e.status} {e.message}")
error_msg = f"Error fetching issues: {e.status} - {e.message}. Check token permissions and repo URL."
if e.status == 404:
error_msg = f"Error: Repository not found at {repo_url}. Check the URL."
elif e.status == 401:
error_msg = "Error: Invalid GitHub token or insufficient permissions."
return [], go.Figure(), error_msg
except GitCommandError as e:
logger.error(f"Git clone error: {e}")
return [], go.Figure(), f"Error cloning repository: {e}"
except Exception as e:
logger.exception(f"An unexpected error occurred during issue crawl: {e}") # Log full traceback
return [], go.Figure(), f"An unexpected error occurred: {e}"
def _determine_severity(self, labels: List[str]) -> str:
"""Determines issue severity based on labels."""
labels_lower = [label.lower() for label in labels]
for severity, keywords in self.severity_rules.items():
if any(keyword in label for keyword in keywords for label in labels_lower):
return severity
return "Unknown" # Default if no matching label found
def _generate_stats_plot(self, severity_counts: Dict[str, int]) -> go.Figure:
"""Generates a Plotly bar chart for issue severity distribution."""
severities = list(severity_counts.keys())
counts = list(severity_counts.values())
fig = px.bar(
x=severities,
y=counts,
title="Issue Severity Distribution",
labels={'x': 'Severity', 'y': 'Number of Issues'},
color=severities, # Color bars by severity
color_discrete_map={ # Define colors
'Critical': '#DC2626', # Red
'High': '#F97316', # Orange
'Medium': '#FACC15', # Yellow
'Low': '#84CC16', # Lime
'Unknown': '#6B7280' # Gray
}
)
fig.update_layout(
showlegend=False, # Hide legend if coloring by severity directly
yaxis_title="Number of Issues",
xaxis_title="Severity Level",
plot_bgcolor='rgba(0,0,0,0)', # Transparent background
paper_bgcolor='rgba(0,0,0,0)'
)
return fig
async def _cluster_similar_issues(self):
"""Generates embeddings and clusters issues using HDBSCAN."""
if not self.issue_list_for_clustering or not self.hf_token:
logger.warning("Cannot cluster issues: No issues loaded or HF token missing.")
self.issue_clusters = {}
return
logger.info("Generating embeddings for clustering...")
try:
embeddings = await self._generate_embeddings([f"{i.get('title','')} {i.get('body','')}" for i in self.issue_list_for_clustering])
if not embeddings or len(embeddings) != len(self.issue_list_for_clustering):
logger.error("Failed to generate valid embeddings for all issues.")
self.issue_clusters = {}
return
logger.info(f"Generated {len(embeddings)} embeddings. Running HDBSCAN...")
# Use HDBSCAN for density-based clustering
# min_cluster_size: minimum number of samples in a cluster
# metric: distance metric used
# allow_single_cluster: If True, allows forming a single large cluster
clusterer = HDBSCAN(min_cluster_size=2, metric='cosine', allow_single_cluster=True, gen_min_span_tree=True)
clusters = clusterer.fit_predict(embeddings)
self.issue_clusters = {}
for i, cluster_id in enumerate(clusters):
if cluster_id == -1: # HDBSCAN uses -1 for noise points
continue # Skip noise points
if cluster_id not in self.issue_clusters:
self.issue_clusters[cluster_id] = []
self.issue_clusters[cluster_id].append(i) # Store original index
logger.info(f"Clustering complete. Found {len(self.issue_clusters)} clusters (excluding noise).")
except Exception as e:
logger.exception(f"Error during issue clustering: {e}")
self.issue_clusters = {}
async def _generate_embeddings(self, texts: List[str]):
"""Generates sentence embeddings using Hugging Face Inference API."""
if not self.hf_token:
logger.error("Hugging Face token is not set. Cannot generate embeddings.")
return None
# Recommended embedding model (check HF for alternatives if needed)
model_id = "sentence-transformers/all-mpnet-base-v2"
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
logger.info(f"Requesting embeddings from {api_url} for {len(texts)} texts.")
async with aiohttp.ClientSession(headers=headers) as session:
try:
response = await session.post(api_url, json={"inputs": texts})
response.raise_for_status()
result = await response.json()
# Check if the result is a list of embeddings (floats)
if isinstance(result, list) and all(isinstance(emb, list) for emb in result):
logger.info(f"Successfully received {len(result)} embeddings.")
return result
else:
logger.error(f"Unexpected embedding format received: {type(result)}. Full response: {result}")
return None
except aiohttp.ClientResponseError as e:
logger.error(f"HF Inference API request failed: {e.status} {e.message}")
logger.error(f"Response body: {await e.response.text()}")
return None
except Exception as e:
logger.exception(f"An unexpected error occurred during embedding generation: {e}")
return None
async def generate_code_patch(self, issue_number: int, model_key: str) -> dict:
"""Generates a code patch suggestion using a selected AI model."""
if issue_number not in self.issues:
return {"error": f"Issue {issue_number} not found."}
if not self.hf_token:
return {"error": "Hugging Face token not set."}
if model_key not in HF_MODELS:
return {"error": f"Invalid model key: {model_key}"}
issue = self.issues[issue_number]
model_id = HF_MODELS[model_key]
logger.info(f"Generating patch for issue {issue_number} using model {model_id}")
# --- Context Gathering (Simplified) ---
# In a real scenario, this needs to be much smarter:
# - Identify relevant files based on issue text, stack traces, etc.
# - Potentially use git history or blame to find relevant code sections.
# For now, we'll use a placeholder or skip context if too complex.
context = "Context gathering not implemented. Provide code snippets in the issue description."
# context = await self._get_code_context(issue_number) # Uncomment if implemented
# --- Prompt Engineering ---
prompt = f"""You are an expert programmer tasked with fixing a bug described in a GitHub issue.
Analyze the following issue and provide a code patch in standard `diff` format.
Focus only on the necessary changes to resolve the problem described.
Explain your reasoning briefly before the patch.
## Issue Title: {issue.get('title', 'N/A')}
## Issue Body:
{issue.get('body', 'N/A')}
## Relevant Code Context (if available):
{context}
## Instructions:
1. Analyze the issue and the context.
2. Determine the code changes needed.
3. Provide the changes as a Git diff block (```diff ... ```).
4. If you cannot determine a patch, explain why.
## Patch Suggestion:
"""
# --- Call Inference API ---
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": { # Adjust parameters as needed
"max_new_tokens": 1024, # Max length of the generated patch + explanation
"temperature": 0.3, # Lower temperature for more deterministic code
"return_full_text": False, # Only get the generated part
"do_sample": True,
"top_p": 0.9,
}
}
try:
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(api_url, json=payload) as response:
response.raise_for_status()
result = await response.json()
if result and isinstance(result, list):
generated_text = result[0].get('generated_text', '')
logger.info(f"Received patch suggestion from {model_id}")
# Basic extraction of diff block (improve if needed)
diff_match = re.search(r"```diff\n(.*?)```", generated_text, re.DOTALL)
explanation = generated_text.split("```diff")[0].strip()
patch = diff_match.group(1).strip() if diff_match else "No diff block found in response."
return {
"explanation": explanation,
"patch": patch,
"model_used": model_id
}
else:
logger.error(f"Unexpected response format from {model_id}: {result}")
return {"error": "Received unexpected response format from AI model."}
except aiohttp.ClientResponseError as e:
logger.error(f"HF Inference API request failed for patch generation: {e.status} {e.message}")
logger.error(f"Response body: {await e.response.text()}")
return {"error": f"AI model request failed ({e.status}). Check model availability and HF token."}
except Exception as e:
logger.exception(f"Error generating code patch: {e}")
return {"error": f"An unexpected error occurred: {e}"}
async def _get_code_context(self, issue_number: int) -> str:
"""Placeholder for retrieving relevant code context for an issue."""
# This needs a proper implementation based on how the repo is managed
# - Clone/pull the repo if not present/up-to-date
# - Identify relevant files (e.g., using file paths mentioned in the issue, heuristics)
# - Read relevant parts of the files
logger.warning(f"Code context retrieval for issue {issue_number} is not fully implemented.")
# Example: Look for file paths in the issue body
# issue_body = self.issues.get(issue_number, {}).get('body', '')
# Find potential file paths (very basic example)
# potential_files = re.findall(r'[\w/.-]+\.(?:py|js|java|cpp|c|ts|html|css)', issue_body)
# Read content from these files if they exist in the workspace repo
return "Code context retrieval is currently a placeholder."
async def suggest_resolution(self, issue: dict, model_key: str) -> str:
"""Suggests a resolution description using a selected AI model."""
if not self.hf_token:
return "Error: Hugging Face token not set."
if model_key not in HF_MODELS:
return f"Error: Invalid model key: {model_key}"
model_id = HF_MODELS[model_key]
logger.info(f"Requesting resolution suggestion for issue {issue.get('id','N/A')} using {model_id}")
prompt = f"""Analyze the following GitHub issue and provide a concise, step-by-step suggestion on how to resolve it. Focus on the technical steps required.
## Issue Title: {issue.get('title', 'N/A')}
## Issue Body:
{issue.get('body', 'N/A')}
## Labels: {', '.join(issue.get('labels', []))}
## Suggested Resolution Steps:
"""
api_url = f"{HF_INFERENCE_API}/{model_id}"
headers = {"Authorization": f"Bearer {self.hf_token}"}
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 512,
"temperature": 0.7, # Higher temp for more creative suggestions
"return_full_text": False,
"do_sample": True,
"top_p": 0.95,
}
}
try:
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(api_url, json=payload) as response:
response.raise_for_status()
result = await response.json()
if result and isinstance(result, list):
suggestion = result[0].get('generated_text', 'No suggestion generated.')
logger.info(f"Received suggestion from {model_id}")
return suggestion.strip()
else:
logger.error(f"Unexpected response format from {model_id} for suggestion: {result}")
return "Error: Received unexpected response format from AI model."
except aiohttp.ClientResponseError as e:
logger.error(f"HF Inference API request failed for suggestion: {e.status} {e.message}")
logger.error(f"Response body: {await e.response.text()}")
return f"Error: AI model request failed ({e.status}). Check model availability and HF token."
except Exception as e:
logger.exception(f"Error suggesting resolution: {e}")
return f"An unexpected error occurred: {e}"
# --- WebSocket Methods ---
async def broadcast_collaboration_status(self):
"""Periodically sends collaborator status to all connected clients."""
while True:
await asyncio.sleep(5) # Send updates every 5 seconds
if not self.ws_clients:
continue
status_payload = json.dumps({
"type": "collaboration_status",
"collaborators": self.collaborators
})
# Use asyncio.gather to send concurrently, handling potential errors
results = await asyncio.gather(
*[client.send(status_payload) for client in self.ws_clients],
return_exceptions=True # Don't let one failed send stop others
)
# Log any errors that occurred during broadcast
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"Failed to send status to client {i}: {result}")
async def handle_code_editor_update(self, issue_num: int, delta: str, client_id: str):
"""Applies a delta from one client and broadcasts it to others."""
if issue_num not in self.code_editors:
logger.warning(f"Received code update for non-existent editor for issue {issue_num}")
return # Or initialize editor: self.code_editors[issue_num] = OTCodeEditor(...)
try:
# Apply the delta to the server-side authoritative state
self.code_editors[issue_num].apply_delta(json.loads(delta))
logger.info(f"Applied delta for issue {issue_num} from client {client_id}")
# Broadcast the delta to all *other* connected clients
update_payload = json.dumps({
"type": "code_update",
"issue_num": issue_num,
"delta": delta # Send the original delta
})
tasks = []
for client in self.ws_clients:
# Check if the client has an associated ID and avoid sending back to originator
client_ws_id = getattr(client, 'client_id', None)
if client_ws_id != client_id:
tasks.append(client.send(update_payload))
if tasks:
results = await asyncio.gather(*tasks, return_exceptions=True)
# Log errors during broadcast
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"Failed to broadcast code update to client {i}: {result}")
except json.JSONDecodeError:
logger.error(f"Received invalid JSON delta for issue {issue_num}: {delta}")
except Exception as e:
logger.exception(f"Error handling code editor update for issue {issue_num}: {e}")
async def broadcast_issue_update(self):
"""Notifies clients that the issue list/data has changed."""
if not self.ws_clients:
return
logger.info("Broadcasting issue update notification to clients.")
update_payload = json.dumps({"type": "issues_updated"})
results = await asyncio.gather(
*[client.send(update_payload) for client in self.ws_clients],
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.warning(f"Failed to send issue update notification to client {i}: {result}")
# ========== Gradio UI Definition ==========
def create_ui(manager: IssueManager):
"""Creates the Gradio interface."""
# --- Helper Functions for UI ---
def generate_issue_preview(issue_num: Optional[int]) -> str:
"""Generates HTML preview for a selected issue."""
if issue_num is None or issue_num not in manager.issues:
return "<p>Select an issue from the board to see details.</p>"
issue = manager.issues[issue_num]
# Convert markdown body to HTML
html_body = markdown2.markdown(issue.get('body', '*No description provided.*'))
# Basic styling
preview_html = f"""
<div style="border: 1px solid #e5e7eb; padding: 15px; border-radius: 8px; background-color: #f9fafb;">
<h4><a href='{issue.get('url', '#')}' target='_blank' style='color: #6d28d9; text-decoration: none;'>#{issue['id']} - {issue.get('title', 'N/A')}</a></h4>
<hr style='margin: 10px 0; border-top: 1px solid #e5e7eb;'>
<p><strong>State:</strong> {issue.get('state', 'N/A')} | <strong>Assignee:</strong> {issue.get('assignee', 'None')}</p>
<p><strong>Labels:</strong> {' | '.join(f'<span style=\'background-color: #eee; padding: 2px 5px; border-radius: 4px; font-size: 0.9em;\'>{l}</span>' for l in issue.get('labels', [])) or 'None'}</p>
<div style="margin-top: 10px; max-height: 300px; overflow-y: auto; border-top: 1px dashed #ccc; padding-top: 10px;">
{html_body}
</div>
</div>
"""
return preview_html
async def get_ai_suggestion(issue_num: Optional[int], model_key: str) -> str:
"""Wrapper to get AI suggestion for the chat."""
if issue_num is None or issue_num not in manager.issues:
return "Please select a valid issue first."
issue = manager.issues[issue_num]
issue_hash = manager._get_issue_hash(issue) # Use hash for caching
# Use cached_suggestion which handles the actual API call via lru_cache
suggestion = await manager.cached_suggestion(issue_hash, HF_MODELS[model_key])
# Format for chat
return f"**Suggestion based on {model_key}:**\n\n{suggestion}"
async def get_ai_patch(issue_num: Optional[int], model_key: str) -> str:
"""Wrapper to get AI patch for the chat."""
if issue_num is None or issue_num not in manager.issues:
return "Please select a valid issue first."
result = await manager.generate_code_patch(issue_num, model_key)
if "error" in result:
return f"**Error generating patch:** {result['error']}"
else:
# Format for chat display
return f"""**Patch Suggestion from {result.get('model_used', model_key)}:**
**Explanation:**
{result.get('explanation', 'N/A')}
**Patch:**
```diff
{result.get('patch', 'N/A')}
```"""
# --- Gradio Blocks ---
with gr.Blocks(theme=theme, title="🤖 AI Issue Resolver Pro", css=".gradio-container {max-width: 1400px !important;}") as app:
gr.Markdown("""
<div style="text-align: center; margin-bottom: 20px;">
<h1 style="color: #6d28d9; font-weight: 800;">🚀 AI Issue Resolver Pro</h1>
<p style="color: #4b5563; font-size: 1.1em;">Next-generation issue resolution powered by AI collaboration</p>
</div>
""")
# --- Configuration Row ---
with gr.Row(variant="panel", elem_id="config-panel"):
with gr.Column(scale=3):
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="[https://github.com/owner/repo](https://github.com/owner/repo)", info="Enter the full URL of the public GitHub repository.", elem_id="repo_url")
with gr.Row():
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", info="Required for private repos or higher rate limits.", elem_id="github_token")
hf_token = gr.Textbox(label="Hugging Face Token", type="password", info="Required for AI model interactions.", elem_id="hf_token")
with gr.Column(scale=1):
# Removed language select as code editor handles it
model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value="Mistral-8x7B",
label="🤖 Select AI Model", info="Choose the AI for suggestions and patches.", elem_id="model_select")
crawl_btn = gr.Button("🛰️ Scan Repository Issues", variant="primary", icon="🔍", elem_id="crawl_btn")
status_output = gr.Textbox(label="Status", interactive=False, lines=1, placeholder="Status updates will appear here...", elem_id="status_output")
# --- Main Tabs ---
with gr.Tabs(elem_id="main-tabs"):
# --- Issue Board Tab ---
with gr.Tab("📋 Issue Board", id="board", elem_id="tab-board"):
with gr.Row():
with gr.Column(scale=3):
gr.Markdown("### Open Issues")
issue_list = gr.Dataframe(
headers=["ID", "Title", "Severity", "Cluster"],
datatype=["number", "str", "str", "str"], # Cluster ID shown as str
interactive=True,
height=500,
wrap=True, # Wrap long titles
elem_id="issue_list_df"
)
with gr.Column(scale=2):
gr.Markdown("### Issue Severity")
stats_plot = gr.Plot(elem_id="stats_plot")
# Placeholder for collaborators - updated via JS
collab_status = gr.HTML("""
<div style="margin-top: 20px; border: 1px solid #e5e7eb; padding: 10px; border-radius: 8px;">
<h4 style="margin-bottom: 5px; color: #374151;">👥 Active Collaborators</h4>
<div id="collab-list" style="font-size: 0.9em; max-height: 100px; overflow-y: auto;">
Connecting...
</div>
</div>
""", elem_id="collab_status_html")
# --- Resolution Studio Tab ---
with gr.Tab("💻 Resolution Studio", id="studio", elem_id="tab-studio"):
with gr.Row():
# Left Column: Issue Details & AI Tools
with gr.Column(scale=1):
gr.Markdown("### Selected Issue")
# Hidden number input to store selected issue ID
selected_issue_id = gr.Number(label="Selected Issue ID", visible=False, precision=0, elem_id="selected_issue_id")
issue_preview_html = gr.HTML(
"<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>",
elem_id="issue_preview"
)
with gr.Accordion("🛠️ AI Assistance Tools", open=True, elem_id="ai_tools_accordion"):
suggest_btn = gr.Button("🧠 Suggest Resolution Steps", icon="💡", elem_id="suggest_btn")
patch_btn = gr.Button("📝 Generate Code Patch", icon="🩹", elem_id="patch_btn")
# Add placeholders for other buttons if needed
# test_btn = gr.Button("🧪 Create Tests (Future)", icon="🔬", interactive=False)
# impact_btn = gr.Button("📊 Impact Analysis (Future)", icon="📈", interactive=False)
chat_output_display = gr.Textbox(label="AI Output", lines=10, interactive=False, placeholder="AI suggestions and patches will appear here...", elem_id="ai_output_display")
# Right Column: Code Editor & Chat (removed chat interface)
with gr.Column(scale=2):
gr.Markdown("### Collaborative Code Editor")
# Use the imported code_editor component
# We'll update its value dynamically when an issue is selected
code_edit_component = code_editor(
label="Code Editor",
# Initial value can be empty or a placeholder message
value={"main.py": "# Select an issue to load relevant code (placeholder)"},
# Language is set dynamically if needed, or defaults
language="python", # Default language
elem_id="code_editor_component"
)
# Hidden input to trigger code editor updates from server->client WS messages
code_editor_update_trigger = gr.Textbox(visible=False, elem_id="code-editor-update-trigger")
# --- Analytics Tab (Placeholder) ---
with gr.Tab("📈 Analytics", id="analytics", elem_id="tab-analytics"):
gr.Markdown("### Analytics Dashboard (Placeholder)")
gr.Markdown("Future home for resolution timelines, achievement badges, and more detailed metrics.")
# with gr.Row():
# gr.Markdown("#### 📅 Resolution Timeline")
# timeline = gr.Timeline() # Requires specific data format
# with gr.Row():
# gr.Markdown("#### 🏆 Achievement System")
# badges = gr.HTML("<div class='badges'>Coming Soon!</div>")
# --- Event Handlers ---
# 1. Crawl Button Click
crawl_btn.click(
fn=manager.crawl_issues,
inputs=[repo_url, github_token, hf_token],
outputs=[issue_list, stats_plot, status_output],
api_name="crawl_issues" # For API access if needed
)
# 2. Issue Selection in Dataframe
async def handle_issue_select(evt: gr.SelectData):
"""Handles issue selection: updates preview, loads code (placeholder)."""
if evt.index[0] is None: # No row selected
return {
selected_issue_id: None,
issue_preview_html: "<p style='color: #6b7280;'>Select an issue from the table.</p>",
# Reset code editor or show placeholder
code_edit_component: gr.update(value={"placeholder.txt": "# Select an issue to load code."})
}
selected_id = int(evt.value[0]) # Get ID from the first column ('ID') of the selected row
logger.info(f"Issue selected: ID {selected_id}")
# Update the hidden ID field
updates = {selected_issue_id: selected_id}
# Generate and update the HTML preview
preview_html = generate_issue_preview(selected_id)
updates[issue_preview_html] = preview_html
# --- Code Loading Logic (Placeholder) ---
# This needs real implementation: Find relevant files for the issue
# and load their content into the editor component's value format.
# Example: Fetch files related to the issue (needs implementation)
# files_content = await fetch_relevant_code_for_issue(selected_id)
files_content = {
f"issue_{selected_id}_code.py": f"# Code related to issue {selected_id}\n# (Replace with actual file content)\n\nprint('Hello from issue {selected_id}')",
"README.md": f"# Issue {selected_id}\n\nDetails about the issue..."
}
updates[code_edit_component] = gr.update(value=files_content)
# --- End Placeholder ---
return updates
issue_list.select(
fn=handle_issue_select,
inputs=[], # Event data is passed automatically
outputs=[selected_issue_id, issue_preview_html, code_edit_component],
show_progress="minimal"
)
# 3. Suggest Resolution Button Click
suggest_btn.click(
fn=get_ai_suggestion,
inputs=[selected_issue_id, model_select],
outputs=[chat_output_display],
api_name="suggest_resolution"
)
# 4. Generate Patch Button Click
patch_btn.click(
fn=get_ai_patch,
inputs=[selected_issue_id, model_select],
outputs=[chat_output_display],
api_name="generate_patch"
)
# 5. Code Editor Change (User typing) -> Send update via WebSocket
# This requires JavaScript to capture the 'change' event from the Ace editor
# instance within the code_editor component and send it over WebSocket.
# The Python backend then receives it via handle_ws_connection.
# 6. WebSocket Message (Server -> Client) -> Trigger UI Update
# This uses JavaScript to listen for WebSocket messages and update Gradio components.
# Example: Update collaborator list, trigger code editor update.
# --- JavaScript for WebSocket Communication ---
def web_socket_js(ws_port):
# Generate unique client ID for this session
client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}"
logger.info(f"Generated Client ID for WebSocket: {client_id}")
return f"""
<script>
// Ensure this runs only once
if (!window.collabWs) {{
console.log('Initializing WebSocket connection...');
const wsUrl = `ws://localhost:{ws_port}`; // Use localhost for local Gradio run
// For Hugging Face Spaces, you need to use the public WS endpoint:
// const wsUrl = `wss://YOUR_SPACE_NAME.hf.space/ws`; // Adjust if using custom domain/port mapping
window.collabWs = new WebSocket(wsUrl);
window.clientId = '{client_id}'; // Store client ID globally for this session
window.collabWs.onopen = function(event) {{
console.log('WebSocket connection established.');
// Optionally send a join message
window.collabWs.send(JSON.stringify({{ type: 'join', clientId: window.clientId }}));
// Initial update for collaborator list (optional)
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = 'Connected.';
}};
window.collabWs.onmessage = function(event) {{
// console.log('WebSocket message received:', event.data);
try {{
const data = JSON.parse(event.data);
if (data.type === 'collaboration_status') {{
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) {{
if (Object.keys(data.collaborators).length > 0) {{
collabListDiv.innerHTML = Object.entries(data.collaborators)
.map(([id, info]) => `<div class="collab-item">${info.name || id}: ${info.status || 'Idle'}</div>`)
.join('');
}} else {{
collabListDiv.innerHTML = 'No other collaborators active.';
}}
}}
}} else if (data.type === 'code_update') {{
console.log('Received code update delta for issue:', data.issue_num);
// Find the Gradio Textbox used as a trigger
const triggerTextbox = document.getElementById('code-editor-update-trigger').querySelector('textarea');
if (triggerTextbox) {{
// Set its value to the received delta (JSON string)
// This change event will be picked up by Gradio if a .change() listener is attached
// However, directly manipulating the Ace editor instance is more reliable if possible.
// For now, we assume the code_editor component handles incoming deltas internally
// or provides a JS API. If not, this trigger approach is a fallback.
triggerTextbox.value = JSON.stringify(data); // Pass full data
// Manually dispatch an input event to ensure Gradio detects the change
triggerTextbox.dispatchEvent(new Event('input', {{ bubbles: true }}));
console.log('Triggered Gradio update for code editor.');
// --- Ideal approach: Directly update Ace Editor ---
// This requires the code_editor component to expose its Ace instance
// or provide a JS function like `window.updateCodeEditor(issueNum, delta)`
/*
if (window.ace && window.aceEditors && window.aceEditors[data.issue_num]) {{
const editor = window.aceEditors[data.issue_num];
editor.getSession().getDocument().applyDeltas([JSON.parse(data.delta)]);
console.log('Applied delta directly to Ace editor for issue:', data.issue_num);
}} else {{
console.warn('Ace editor instance not found for issue:', data.issue_num);
}}
*/
}} else {{
console.error('Code editor update trigger textbox not found.');
}}
}} else if (data.type === 'issues_updated') {{
console.log('Received issues updated notification.');
// Optionally trigger a refresh or show a notification
// Example: Update status bar
const statusBar = document.getElementById('status_output').querySelector('textarea');
if (statusBar) {{
statusBar.value = 'Issue list updated. Refresh may be needed.';
statusBar.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
// More robust: Trigger the crawl button's click event via JS? (Can be complex)
}}
}} catch (e) {{
console.error('Failed to parse WebSocket message or update UI:', e);
}}
}};
window.collabWs.onclose = function(event) {{
console.warn('WebSocket connection closed:', event.code, event.reason);
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: red;">Disconnected</span>';
// Implement reconnection logic if needed
}};
window.collabWs.onerror = function(error) {{
console.error('WebSocket error:', error);
const collabListDiv = document.getElementById('collab-list');
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: red;">Connection Error</span>';
}};
// Function to send messages (e.g., code changes)
window.sendWsMessage = function(message) {{
if (window.collabWs && window.collabWs.readyState === WebSocket.OPEN) {{
window.collabWs.send(JSON.stringify(message));
}} else {{
console.error('WebSocket not connected. Cannot send message.');
}}
}};
// --- JS Integration with Code Editor Component ---
// This part is CRUCIAL and depends heavily on how the `code_editor`
// component is implemented (e.g., using Ace Editor).
// We need to:
// 1. Get the editor instance(s).
// 2. Attach a listener to its 'change' event (which provides deltas).
// 3. When a change occurs, send the delta via `sendWsMessage`.
// Example assuming Ace Editor and the component stores instances:
function setupCodeEditorListener() {{
// This needs to run *after* the Gradio component is rendered
// and the editor is initialized. Using setTimeout is a common hack.
setTimeout(() => {{
const editorElement = document.querySelector('#code_editor_component'); // Find editor container
// Find the actual Ace instance (this depends on the component's structure)
// This is a GUESS - inspect the component's HTML/JS to find the correct way
let aceEditor;
if (window.ace && editorElement) {{
// Try common ways Ace is attached
aceEditor = window.ace.edit(editorElement.querySelector('.ace_editor')); // Common pattern
// Or maybe the component stores it globally?
// aceEditor = window.activeAceEditor;
}}
if (aceEditor) {{
console.log('Ace Editor instance found. Attaching change listener.');
aceEditor.getSession().on('change', function(delta) {{
// Only send changes made by the user (ignore programmatic changes)
if (aceEditor.curOp && aceEditor.curOp.command.name) {{
console.log('Code changed by user:', delta);
const issueIdElem = document.getElementById('selected_issue_id').querySelector('input');
const currentIssueId = issueIdElem ? parseInt(issueIdElem.value, 10) : null;
if (currentIssueId !== null && !isNaN(currentIssueId)) {{
window.sendWsMessage({{
type: 'code_update',
issue_num: currentIssueId,
delta: JSON.stringify(delta), // Send delta as JSON string
clientId: window.clientId
}});
}} else {{
console.warn('No valid issue selected, cannot send code update.');
}}
}}
}});
}} else {{
console.warn('Could not find Ace Editor instance to attach listener. Collaboration may not work.');
// Retry after a delay?
// setTimeout(setupCodeEditorListener, 2000);
}}
}}, 1500); // Wait 1.5 seconds for Gradio/Ace to initialize
}}
// Call setup after initial load and potentially after issue selection changes
// if the editor instance is recreated.
setupCodeEditorListener();
// Re-attach listener if the editor component updates (e.g., on issue select)
// This requires observing changes to the component's container
const observer = new MutationObserver((mutationsList, observer) => {{
for(const mutation of mutationsList) {{
if (mutation.type === 'childList' && mutation.addedNodes.length > 0) {{
// Check if the editor element was re-added/modified significantly
if (document.querySelector('#code_editor_component .ace_editor')) {{
console.log("Code editor component updated, re-attaching listener...");
setupCodeEditorListener();
break; // Assume we only need to re-attach once per mutation batch
}}
}}
}}
}});
const targetNode = document.getElementById('code_editor_component');
if(targetNode) {{
observer.observe(targetNode, {{ childList: true, subtree: true }});
}}
}} else {{
console.log('WebSocket connection already initialized.');
}}
</script>
"""
# Inject the JavaScript into the Gradio app
app.load(_js=web_socket_js(WS_PORT), fn=None, inputs=None, outputs=None)
return app
# ========== WebSocket Server Logic ==========
async def handle_ws_connection(websocket: websockets.WebSocketServerProtocol, path: str, manager: IssueManager):
"""Handles incoming WebSocket connections and messages."""
client_id = None # Initialize client_id for this connection
manager.ws_clients.append(websocket)
logger.info(f"WebSocket client connected: {websocket.remote_address}")
try:
async for message in websocket:
try:
data = json.loads(message)
msg_type = data.get("type")
logger.debug(f"Received WS message: {data}") # Log received message content
if msg_type == "join":
client_id = data.get("clientId", f"anon_{websocket.id}")
setattr(websocket, 'client_id', client_id) # Associate ID with socket object
manager.collaborators[client_id] = {"name": client_id, "status": "Connected"} # Add to collaborators
logger.info(f"Client {client_id} joined.")
# Don't await broadcast here, let the periodic task handle it
elif msg_type == "code_update":
issue_num = data.get("issue_num")
delta = data.get("delta")
sender_id = data.get("clientId") # ID of the client who sent the update
if issue_num is not None and delta and sender_id:
# Pass client_id to handler to avoid broadcasting back to sender
await manager.handle_code_editor_update(issue_num, delta, sender_id)
else:
logger.warning(f"Invalid code_update message received: {data}")
elif msg_type == "status_update": # Client updates their status
sender_id = data.get("clientId")
status = data.get("status", "Idle")
if sender_id and sender_id in manager.collaborators:
manager.collaborators[sender_id]["status"] = status
logger.info(f"Client {sender_id} status updated: {status}")
# Don't await broadcast here
else:
logger.warning(f"Unknown WebSocket message type received: {msg_type}")
except json.JSONDecodeError:
logger.error(f"Received invalid JSON over WebSocket: {message}")
except Exception as e:
logger.exception(f"Error processing WebSocket message: {e}")
except ConnectionClosed as e:
logger.info(f"WebSocket client disconnected: {websocket.remote_address} (Code: {e.code}, Reason: {e.reason})")
except Exception as e:
logger.exception(f"Unexpected error in WebSocket handler: {e}")
finally:
logger.info(f"Cleaning up connection for client {client_id if client_id else websocket.remote_address}")
manager.ws_clients.remove(websocket)
if client_id and client_id in manager.collaborators:
del manager.collaborators[client_id] # Remove collaborator on disconnect
logger.info(f"Removed collaborator {client_id}.")
# Don't await broadcast here
async def start_websocket_server(manager: IssueManager, port: int):
"""Starts the WebSocket server."""
# Pass manager instance to the connection handler factory
handler = lambda ws, path: handle_ws_connection(ws, path, manager)
async with websockets.serve(handler, "localhost", port):
logger.info(f"WebSocket server started on ws://localhost:{port}")
await asyncio.Future() # Run forever
def run_webhook_server(manager: IssueManager, port: int):
"""Starts the HTTP webhook server in a separate thread."""
WebhookHandler.manager_instance = manager # Pass manager instance to the class
server_address = ("", port)
httpd = HTTPServer(server_address, WebhookHandler)
logger.info(f"Webhook HTTP server started on port {port}")
httpd.serve_forever()
# ========== Main Execution ==========
if __name__ == "__main__":
# --- Setup ---
manager = IssueManager()
# --- Start Background Servers ---
# 1. Webhook Server (HTTP)
webhook_thread = threading.Thread(target=run_webhook_server, args=(manager, WEBHOOK_PORT), daemon=True)
webhook_thread.start()
# 2. WebSocket Server (Runs in main asyncio loop)
# We need to run the WebSocket server and the collaborator status broadcast
# within an asyncio event loop.
async def main_async_tasks():
# Start the periodic broadcast task
broadcast_task = asyncio.create_task(manager.broadcast_collaboration_status())
# Start the WebSocket server
websocket_server_task = asyncio.create_task(start_websocket_server(manager, WS_PORT))
await asyncio.gather(broadcast_task, websocket_server_task)
# Run the asyncio tasks in a separate thread
asyncio_thread = threading.Thread(target=lambda: asyncio.run(main_async_tasks()), daemon=True)
asyncio_thread.start()
# --- Create and Launch Gradio App ---
app = create_ui(manager)
app.launch(
# share=True, # Enable for public access (use with caution)
server_name="0.0.0.0", # Bind to all interfaces for accessibility in containers/networks
server_port=7860, # Default Gradio port
favicon_path="[https://huggingface.co/front/assets/huggingface_logo-noborder.svg](https://huggingface.co/front/assets/huggingface_logo-noborder.svg)"
)
logger.info("Gradio app launched. Webhook and WebSocket servers running in background.")