Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import aiohttp | |
import asyncio | |
from git import Repo, GitCommandError, InvalidGitRepositoryError, NoSuchPathError | |
from pathlib import Path | |
from datetime import datetime, timedelta, timezone | |
import shutil | |
import json | |
import logging | |
import re | |
from typing import Dict, List, Optional, Tuple, Any | |
import subprocess | |
import plotly.express as px | |
import plotly.graph_objects as go | |
import time | |
import random | |
import pandas as pd | |
from collections import Counter | |
import string | |
from concurrent.futures import ThreadPoolExecutor | |
from hdbscan import HDBSCAN | |
import websockets | |
from websockets.server import WebSocketServerProtocol | |
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK | |
# Assuming code_editor is available, e.g., installed via pip or included locally | |
try: | |
from code_editor import code_editor | |
except ImportError: | |
logger.error("The 'code_editor' Gradio component is not installed or available.") | |
logger.error("Please install it, e.g., 'pip install gradio_code_editor'") | |
def code_editor(*args, **kwargs): | |
logger.warning("Using dummy code_editor. Code editing and collaboration will not function.") | |
return gr.Textbox(label=kwargs.get('label', 'Code Editor (Unavailable)'), interactive=False, value="Error: Code editor component not found.") | |
# ========== Configuration ========== | |
WORKSPACE = Path("./issue_workspace") | |
WORKSPACE.mkdir(exist_ok=True) | |
GITHUB_API = "https://api.github.com/repos" | |
HF_INFERENCE_API = "https://api-inference.huggingface.co/models" | |
WEBHOOK_PORT = 8000 | |
WS_PORT = 8001 | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
executor = ThreadPoolExecutor(max_workers=4) | |
# Example HF models (replace with your actual models) | |
HF_MODELS = { | |
"Mistral-8x7B": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"Llama-2-7B-chat": "huggingface/llama-2-7b-chat-hf", | |
"CodeLlama-34B": "codellama/CodeLlama-34b-Instruct-hf", | |
} | |
DEFAULT_MODEL_KEY = "Mistral-8x7B" | |
DEFAULT_MODEL_ID = HF_MODELS.get(DEFAULT_MODEL_KEY, None) | |
# Model for cheap idle tasks (summaries, info gaps, hypothesis) | |
# Use a smaller one if available, otherwise default. | |
DEFAULT_IDLE_MODEL_KEY = DEFAULT_MODEL_KEY # Fallback to default | |
DEFAULT_IDLE_MODEL_ID = HF_MODELS.get(DEFAULT_IDLE_MODEL_KEY, DEFAULT_MODEL_ID) | |
# --- Idle State Configuration --- | |
STALE_ISSUE_THRESHOLD_DAYS = 30 | |
MAX_SUMMARY_COMPUTATIONS_PER_CYCLE = 2 | |
MAX_CONTEXT_COMPUTATIONS_PER_CYCLE = 3 | |
MAX_MISSING_INFO_COMPUTATIONS_PER_CYCLE = 1 | |
MAX_ANALYSIS_COMPUTATIONS_PER_CYCLE = 1 | |
RECLUSTER_THRESHOLD = 5 | |
IDLE_PROCESSING_INTERVAL_SECONDS = 60.0 | |
# ========== Placeholder OTCodeEditor Class ========== | |
# WARNING: This is a placeholder and DOES NOT implement Operational Transformation. | |
# Concurrent edits WILL lead to data loss or inconsistencies. | |
class OTCodeEditor: | |
def __init__(self, initial_value: Dict[str, str]): | |
self.files: Dict[str, str] = initial_value.copy() | |
self.revision = 0 # Basic revision counter, not used for OT logic | |
logger.debug(f"OTCodeEditor initialized with files: {list(self.files.keys())}") | |
def apply_delta(self, delta: Dict[str, Any]): | |
# VERY basic placeholder: This logs the delta but does NOT perform OT. | |
# It does NOT handle concurrent edits safely. | |
logger.warning(f"Placeholder apply_delta called. Delta: {str(delta)[:200]}. " | |
"WARNING: Full Operational Transformation is NOT implemented. Concurrent edits are UNSAFE.") | |
# Increment revision regardless for basic tracking | |
self.revision += 1 | |
def get_content(self) -> Dict[str, str]: | |
return self.files.copy() | |
# ========== Modern Theme ========== | |
try: | |
theme = gr.themes.Soft( | |
primary_hue="violet", | |
secondary_hue="emerald", | |
radius_size="lg", | |
font=[gr.themes.GoogleFont("Inter"), "ui-sans-serif", "system-ui"] | |
).set( | |
button_primary_background_fill="linear-gradient(90deg, #8B5CF6 0%, #EC4899 100%)", | |
button_primary_text_color="white", | |
block_label_text_size="lg", | |
block_label_text_weight="600", | |
block_title_text_size="lg", | |
block_title_text_weight="800", | |
panel_background_fill="white", | |
block_shadow="*shadow_drop_lg", | |
) | |
except AttributeError as e: | |
logger.warning(f"Could not apply all theme settings (might be Gradio version difference): {e}. Using default Soft theme.") | |
theme = gr.themes.Soft() | |
# ========== Enhanced Webhook Handler ========== | |
class WebhookHandler(BaseHTTPRequestHandler): | |
manager_instance = None | |
main_loop = None # Store reference to the main asyncio loop | |
def do_POST(self): | |
content_length = int(self.headers['Content-Length']) | |
try: | |
payload_bytes = self.rfile.read(content_length) | |
payload = json.loads(payload_bytes.decode('utf-8')) | |
except json.JSONDecodeError: | |
logger.error(f"Invalid JSON payload received: {payload_bytes[:500]}") | |
self.send_response(400) | |
self.send_header("Content-type", "text/plain") | |
self.end_headers() | |
self.wfile.write(b"Invalid JSON payload") | |
return | |
except Exception as e: | |
logger.error(f"Error reading webhook payload: {e}") | |
self.send_response(500) | |
self.end_headers() | |
return | |
event = self.headers.get('X-GitHub-Event') | |
delivery_id = self.headers.get('X-GitHub-Delivery') | |
logger.info(f"Received GitHub webhook event: {event} (Delivery ID: {delivery_id})") | |
if event == 'issues' and WebhookHandler.manager_instance and WebhookHandler.main_loop: | |
action = payload.get('action') | |
logger.info(f"Issue action: {action}") | |
# Handle common actions that affect issue state or content | |
if action in ['opened', 'reopened', 'closed', 'assigned', 'unassigned', 'edited', 'labeled', 'unlabeled', 'milestoned', 'demilestoned']: | |
if WebhookHandler.main_loop.is_running(): | |
# Schedule the async handler to run in the main event loop | |
asyncio.run_coroutine_threadsafe( | |
WebhookHandler.manager_instance.handle_webhook_event(event, action, payload), | |
WebhookHandler.main_loop | |
) | |
logger.debug(f"Scheduled webhook processing for action '{action}' in main loop.") | |
else: | |
logger.error("Asyncio event loop is not running in the target thread for webhook.") | |
else: | |
logger.info(f"Webhook action '{action}' received but not actively handled by current logic.") | |
elif event == 'ping': | |
logger.info("Received GitHub webhook ping.") | |
else: | |
logger.warning(f"Unhandled event type: {event} or manager/loop not initialized.") | |
self.send_response(200) | |
self.send_header("Content-type", "text/plain") | |
self.end_headers() | |
self.wfile.write(b"OK") | |
# ========== AI-Powered Issue Manager ========== | |
class IssueManager: | |
def __init__(self): | |
self.issues: Dict[int, dict] = {} | |
self.repo_url: Optional[str] = None | |
self.repo_owner: Optional[str] = None | |
self.repo_name: Optional[str] = None | |
self.repo_local_path: Optional[Path] = None | |
self.repo: Optional[Repo] = None | |
self.github_token: Optional[str] = None | |
self.hf_token: Optional[str] = None | |
self.collaborators: Dict[str, dict] = {} | |
self.points: int = 0 | |
self.severity_rules: Dict[str, List[str]] = { | |
"Critical": ["critical", "urgent", "security", "crash", "blocker", "p0", "s0"], | |
"High": ["high", "important", "error", "regression", "major", "p1", "s1"], | |
"Medium": ["medium", "bug", "performance", "minor", "p2", "s2"], | |
"Low": ["low", "documentation", "enhancement", "trivial", "feature", "p3", "s3", "chore", "refactor", "question", "help wanted"] | |
} | |
self.issue_clusters: Dict[int, List[int]] = {} | |
self.issue_list_for_clustering: List[dict] = [] | |
self.ws_clients: List[WebSocketServerProtocol] = [] | |
self.code_editors: Dict[int, OTCodeEditor] = {} | |
self.main_loop = asyncio.get_event_loop() | |
self.broadcast_task: Optional[asyncio.Task] = None | |
self.idle_task: Optional[asyncio.Task] = None | |
# --- State for Idle Processing Results --- | |
self.precomputed_context: Dict[int, Dict[str, Any]] = {} | |
self.precomputed_summaries: Dict[int, Dict[str, Any]] = {} | |
self.precomputed_missing_info: Dict[int, Dict[str, Any]] = {} | |
self.precomputed_analysis: Dict[int, Dict[str, Any]] = {} | |
self.code_embeddings: Dict[str, List[float]] = {} | |
self.potential_duplicates: Dict[int, List[int]] = {} | |
self.stale_issues: List[int] = [] | |
self.high_priority_candidates: List[int] = [] | |
self.last_webhook_time: float = 0.0 | |
self.needs_recluster: bool = False | |
self._webhook_change_count = 0 | |
# --- Configuration for Idle Tasks --- | |
self.idle_processing_interval = IDLE_PROCESSING_INTERVAL_SECONDS | |
self.max_context_computations_per_cycle = MAX_CONTEXT_COMPUTATIONS_PER_CYCLE | |
self.max_summary_computations_per_cycle = MAX_SUMMARY_COMPUTATIONS_PER_CYCLE | |
self.max_missing_info_computations_per_cycle = MAX_MISSING_INFO_COMPUTATIONS_PER_CYCLE | |
self.max_analysis_computations_per_cycle = MAX_ANALYSIS_COMPUTATIONS_PER_CYCLE | |
self.stale_issue_threshold_days = STALE_ISSUE_THRESHOLD_DAYS | |
self.recluster_threshold = RECLUSTER_THRESHOLD | |
def start_broadcast_loop(self): | |
"""Starts the periodic broadcast task.""" | |
if not self.broadcast_task or self.broadcast_task.done(): | |
if self.main_loop.is_running(): | |
self.broadcast_task = self.main_loop.create_task(self.broadcast_collaboration_status()) | |
logger.info("Started collaboration status broadcast loop.") | |
else: | |
logger.error("Cannot start broadcast loop: Main event loop is not running.") | |
def stop_broadcast_loop(self): | |
"""Stops the periodic broadcast task.""" | |
if self.broadcast_task and not self.broadcast_task.done(): | |
try: | |
self.broadcast_task.cancel() | |
except asyncio.CancelledError: | |
pass # Expected | |
logger.info("Stopped collaboration status broadcast loop.") | |
self.broadcast_task = None | |
def _get_issue_hash(self, issue_data: dict) -> str: | |
"""Generates a hash based on key issue content for caching AI suggestions.""" | |
content = f"{issue_data.get('title', '')}{issue_data.get('body', '')}{','.join(issue_data.get('labels',[]))}" | |
return hashlib.md5(content.encode()).hexdigest() | |
async def cached_suggestion(self, issue_hash: str, model_key: str) -> str: | |
"""Retrieves or generates an AI suggestion, using an LRU cache based on issue content hash.""" | |
found_issue = None | |
for issue in self.issues.values(): | |
if self._get_issue_hash(issue) == issue_hash: | |
found_issue = issue | |
break | |
if not found_issue: | |
logger.error(f"Could not find issue data for hash {issue_hash} in current state. Suggestion might be based on outdated info if generated.") | |
return "Error: Issue data for this suggestion request (hash) not found in current state. The issue might have been updated or closed. Please re-select the issue." | |
if model_key not in HF_MODELS: | |
return {"error": f"Error: Invalid model key: {model_key}"} # Removed the extra } | |
logger.info(f"Cache miss or first request for issue hash {issue_hash}. Requesting suggestion from {model_key}.") | |
return await self.suggest_resolution(found_issue, model_key) | |
async def handle_webhook_event(self, event: str, action: str, payload: dict): | |
"""Processes incoming webhook events to update the issue state.""" | |
logger.info(f"Processing webhook event: {event}, action: {action}") | |
issue_data = payload.get('issue') | |
repo_data = payload.get('repository') | |
if not issue_data or not repo_data: | |
logger.warning("Webhook payload missing 'issue' or 'repository' data.") | |
return | |
event_repo_url = repo_data.get('html_url') | |
if event_repo_url != self.repo_url: | |
logger.info(f"Ignoring webhook event for different repository: {event_repo_url}") | |
return | |
issue_number = issue_data.get('number') | |
if not issue_number: | |
logger.warning("Webhook issue data missing 'number'.") | |
return | |
needs_ui_update = False | |
significant_change = False # Flag for changes affecting clustering/content/AI caches | |
if action == 'closed': | |
logger.info(f"Webhook: Removing closed issue {issue_number} from active list.") | |
if self.issues.pop(issue_number, None): | |
needs_ui_update = True | |
significant_change = True | |
# Clean up associated cached/computed data | |
self.precomputed_context.pop(issue_number, None) | |
self.precomputed_summaries.pop(issue_number, None) | |
self.precomputed_missing_info.pop(issue_number, None) | |
self.precomputed_analysis.pop(issue_number, None) | |
self.potential_duplicates.pop(issue_number, None) | |
if issue_number in self.stale_issues: self.stale_issues.remove(issue_number) | |
if issue_number in self.high_priority_candidates: self.high_priority_candidates.remove(issue_number) | |
self.code_editors.pop(issue_number, None) | |
elif action in ['opened', 'reopened', 'edited', 'assigned', 'unassigned', 'labeled', 'unlabeled', 'milestoned', 'demilestoned']: | |
logger.info(f"Webhook: Adding/Updating issue {issue_number} (action: {action}).") | |
processed_data = self._process_issue_data(issue_data) | |
old_issue = self.issues.get(issue_number) | |
if not old_issue or \ | |
old_issue.get('body') != processed_data.get('body') or \ | |
old_issue.get('title') != processed_data.get('title') or \ | |
set(old_issue.get('labels', [])) != set(processed_data.get('labels', [])): | |
significant_change = True | |
logger.info(f"Significant change detected for issue {issue_number} (content/labels).") | |
# Invalidate ALL precomputed AI state on significant edit | |
self.precomputed_context.pop(issue_number, None) | |
self.precomputed_summaries.pop(issue_number, None) | |
self.precomputed_missing_info.pop(issue_number, None) | |
self.precomputed_analysis.pop(issue_number, None) | |
# Check if state-related fields changed (affecting idle processing lists) | |
if not old_issue or \ | |
old_issue.get('updated_at') != processed_data.get('updated_at') or \ | |
old_issue.get('assignee') != processed_data.get('assignee') or \ | |
set(old_issue.get('labels', [])) != set(processed_data.get('labels', [])): | |
logger.debug(f"State-related change detected for issue {issue_number} (update time, assignee, labels). Idle loop will re-evaluate.") | |
self.issues[issue_number] = processed_data | |
needs_ui_update = True | |
else: | |
logger.info(f"Ignoring webhook action '{action}' for issue {issue_number} (already filtered).") | |
# --- Track changes for idle processing --- | |
if needs_ui_update: | |
self.last_webhook_time = time.time() | |
if significant_change: | |
self._increment_change_counter() | |
# Rebuild the list used for clustering immediately if a significant change occurred | |
self.issue_list_for_clustering = list(self.issues.values()) | |
logger.info("Issue list for clustering updated due to significant webhook change.") | |
await self.broadcast_issue_update() | |
def _increment_change_counter(self): | |
"""Increments change counter and sets recluster flag if threshold reached.""" | |
self._webhook_change_count += 1 | |
logger.debug(f"Significant change detected. Change count: {self._webhook_change_count}/{self.recluster_threshold}") | |
if self._webhook_change_count >= self.recluster_threshold: | |
self.needs_recluster = True | |
logger.info(f"Change threshold ({self.recluster_threshold}) reached. Flagging for re-clustering.") | |
def _process_issue_data(self, issue_data: dict) -> dict: | |
"""Helper to structure issue data consistently.""" | |
return { | |
"id": issue_data['number'], | |
"title": issue_data.get('title', 'No Title Provided'), | |
"body": issue_data.get('body', ''), | |
"state": issue_data.get('state', 'unknown'), | |
"labels": sorted([label['name'] for label in issue_data.get('labels', [])]), | |
"assignee": issue_data.get('assignee', {}).get('login') if issue_data.get('assignee') else None, | |
"url": issue_data.get('html_url', '#'), | |
"created_at": issue_data.get('created_at'), | |
"updated_at": issue_data.get('updated_at'), | |
} | |
async def crawl_issues(self, repo_url: str, github_token: Optional[str], hf_token: Optional[str]) -> Tuple[List[List], go.Figure, str, go.Figure]: | |
""" | |
Crawls issues, resets state, clones repo, clusters, starts background tasks. | |
Returns dataframe data, stats plot, status message, and analytics plot. | |
""" | |
if not repo_url or not hf_token: | |
logger.error("Repository URL and Hugging Face Token are required.") | |
# Return empty plot of the correct type for analytics | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", xaxis={"visible": False}, yaxis={"visible": False}, | |
annotations=[{"text": "Scan needed.", "xref": "paper", "yref": "paper", "showarrow": False}], | |
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return [], empty_fig, "Error: Repository URL and Hugging Face Token are required.", empty_fig | |
logger.info(f"Starting new issue crawl and setup for {repo_url}") | |
# --- Reset Manager State --- | |
self.stop_idle_processing() | |
self.stop_broadcast_loop() | |
self.issues = {} | |
self.code_editors = {} | |
self.issue_clusters = {} | |
self.issue_list_for_clustering = [] | |
self.cached_suggestion.cache_clear() | |
self.precomputed_context = {} | |
self.precomputed_summaries = {} | |
self.precomputed_missing_info = {} | |
self.precomputed_analysis = {} | |
self.code_embeddings = {} | |
self.potential_duplicates = {} | |
self.stale_issues = [] | |
self.high_priority_candidates = [] | |
self.needs_recluster = False | |
self._webhook_change_count = 0 | |
self.last_webhook_time = time.time() | |
self.repo = None | |
self.repo_url = repo_url.strip() | |
self.github_token = github_token | |
self.hf_token = hf_token | |
logger.info("Internal state reset for new crawl.") | |
# --- Repository Cloning/Updating --- | |
match = re.match(r"https?://github\.com/([^/]+)/([^/]+)", self.repo_url) | |
if not match: | |
logger.error(f"Invalid GitHub URL format: {self.repo_url}") | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", annotations=[{"text": "Invalid URL.", "xref": "paper", "yref": "paper", "showarrow": False}], plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return [], empty_fig, "Error: Invalid GitHub URL format. Use https://github.com/owner/repo", empty_fig | |
self.repo_owner, self.repo_name = match.groups() | |
self.repo_local_path = WORKSPACE / f"{self.repo_owner}_{self.repo_name}" | |
try: | |
if self.repo_local_path.exists(): | |
logger.info(f"Attempting to update existing repository clone at {self.repo_local_path}") | |
try: | |
self.repo = Repo(self.repo_local_path) | |
if self.repo.remotes: | |
remote_url = next((r.url for r in self.repo.remotes.origin.urls), None) | |
expected_urls = [self.repo_url, self.repo_url + ".git"] | |
if remote_url not in expected_urls: | |
logger.warning(f"Existing repo path {self.repo_local_path} has different remote URL ('{remote_url}' vs '{self.repo_url}'). Re-cloning.") | |
shutil.rmtree(self.repo_local_path) | |
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}")) | |
else: | |
logger.info("Pulling latest changes...") | |
self.repo.remotes.origin.fetch(progress=lambda op, cur, tot, msg: logger.debug(f"Fetch progress: {msg}")) | |
self.repo.remotes.origin.pull(progress=lambda op, cur, tot, msg: logger.debug(f"Pull progress: {msg}")) | |
if self.repo.git.rev_parse('--is-shallow-repository').strip() == 'true': | |
logger.info("Repository is shallow, unshallowing...") | |
self.repo.git.fetch('--unshallow') | |
else: | |
logger.warning(f"Existing repo at {self.repo_local_path} has no remotes defined. Re-cloning.") | |
shutil.rmtree(self.repo_local_path) | |
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}")) | |
except (InvalidGitRepositoryError, NoSuchPathError): | |
logger.warning(f"Invalid or missing Git repository at {self.repo_local_path}. Re-cloning.") | |
if self.repo_local_path.exists(): shutil.rmtree(self.repo_local_path) | |
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}")) | |
except GitCommandError as git_err: | |
logger.error(f"Git pull/update error: {git_err}. Trying to proceed with existing copy, but it might be stale.") | |
if not self.repo: | |
try: self.repo = Repo(self.repo_local_path) | |
except Exception: logger.error("Failed to even load existing repo after pull error.") | |
else: | |
logger.info(f"Cloning repository {self.repo_url} to {self.repo_local_path}") | |
self.repo = Repo.clone_from(self.repo_url, self.repo_local_path, progress=lambda op, cur, tot, msg: logger.debug(f"Clone progress: {msg}")) | |
logger.info("Repository clone/update process finished.") | |
if not self.repo: | |
raise Exception("Repository object could not be initialized.") | |
except GitCommandError as e: | |
logger.error(f"Failed to clone/update repository: {e}") | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", annotations=[{"text": "Repo Error.", "xref": "paper", "yref": "paper", "showarrow": False}], plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return [], empty_fig, f"Error cloning/updating repository: {e}. Check URL, permissions, and network.", empty_fig | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred during repository handling: {e}") | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", annotations=[{"text": "Repo Error.", "xref": "paper", "yref": "paper", "showarrow": False}], plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return [], empty_fig, f"An unexpected error occurred during repo setup: {e}", empty_fig | |
# --- Issue Fetching --- | |
api_url = f"{GITHUB_API}/{self.repo_owner}/{self.repo_name}/issues?state=open&per_page=100" | |
headers = {"Accept": "application/vnd.github.v3+json"} | |
if github_token: | |
headers["Authorization"] = f"token {github_token}" | |
try: | |
all_issues_data = [] | |
page = 1 | |
logger.info(f"Fetching open issues from GitHub API (repo: {self.repo_owner}/{self.repo_name})...") | |
async with aiohttp.ClientSession(headers=headers) as session: | |
while True: | |
paginated_url = f"{api_url}&page={page}" | |
logger.debug(f"Fetching URL: {paginated_url}") | |
async with session.get(paginated_url) as response: | |
rate_limit_remaining = response.headers.get('X-RateLimit-Remaining') | |
logger.debug(f"GitHub API Response Status: {response.status}, RateLimit Remaining: {rate_limit_remaining}") | |
response.raise_for_status() | |
issues_page_data = await response.json() | |
if not issues_page_data: break | |
logger.info(f"Fetched page {page} with {len(issues_page_data)} items.") | |
all_issues_data.extend(issues_page_data) | |
link_header = response.headers.get('Link') | |
if link_header and 'rel="next"' not in link_header: | |
logger.debug("No 'next' link found in Link header. Assuming last page.") | |
break | |
page += 1 | |
await asyncio.sleep(0.1) | |
logger.info(f"Total items fetched (including potential PRs): {len(all_issues_data)}") | |
self.issues = { | |
issue_data['number']: self._process_issue_data(issue_data) | |
for issue_data in all_issues_data | |
if 'pull_request' not in issue_data | |
} | |
logger.info(f"Filtered out pull requests, {len(self.issues)} actual open issues remaining.") | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", xaxis={"visible": False}, yaxis={"visible": False}, | |
annotations=[{"text": "No issues found.", "xref": "paper", "yref": "paper", "showarrow": False}], | |
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
if not self.issues: | |
logger.warning("No open issues found for this repository.") | |
return [], empty_fig, "No open issues found in the repository.", empty_fig | |
# --- Clustering and UI Data Prep --- | |
self.issue_list_for_clustering = list(self.issues.values()) | |
logger.info("Clustering issues...") | |
await self._cluster_similar_issues() | |
# --- Initial Idle Task Prep (Run synchronously after load) --- | |
logger.info("Identifying potential duplicates based on initial clusters...") | |
self._identify_potential_duplicates() | |
logger.info("Identifying potentially stale issues...") | |
self._identify_stale_issues() | |
logger.info("Identifying high priority candidates...") | |
self._identify_high_priority_candidates() | |
# --- Prepare Dataframe Output & Stats --- | |
dataframe_data = [] | |
severity_counts = {"Critical": 0, "High": 0, "Medium": 0, "Low": 0, "Unknown": 0} | |
index_to_cluster_id: Dict[int, int] = {} | |
for cluster_id, indices in self.issue_clusters.items(): | |
for index in indices: | |
if 0 <= index < len(self.issue_list_for_clustering): | |
index_to_cluster_id[index] = cluster_id | |
else: | |
logger.warning(f"Clustering returned invalid index {index} for list of length {len(self.issue_list_for_clustering)}") | |
for i, issue in enumerate(self.issue_list_for_clustering): | |
severity = self._determine_severity(issue['labels']) | |
severity_counts[severity] += 1 | |
cluster_id = index_to_cluster_id.get(i, -1) | |
dataframe_data.append([ | |
issue['id'], | |
issue['title'], | |
severity, | |
cluster_id if cluster_id != -1 else "N/A" | |
]) | |
logger.info("Generating statistics plot...") | |
stats_fig = self._generate_stats_plot(severity_counts) | |
# --- Start Background Tasks --- | |
self.start_broadcast_loop() | |
self.start_idle_processing() | |
success_msg = f"Found {len(self.issues)} open issues. Clustered into {len(self.issue_clusters)} groups. Repo ready. Background analysis started." | |
logger.info(success_msg) | |
# Return both plots | |
return dataframe_data, stats_fig, success_msg, stats_fig # Mypy may complain about return type mismatch if not explicitly handled | |
except aiohttp.ClientResponseError as e: | |
logger.error(f"GitHub API request failed: Status={e.status}, Message='{e.message}', URL='{e.request_info.url}'") | |
error_msg = f"Error fetching issues: {e.status} - {e.message}. Check token/URL." | |
if e.status == 404: error_msg = f"Error: Repository not found at {self.repo_url}." | |
elif e.status == 401: error_msg = "Error: Invalid GitHub token or insufficient permissions for this repository." | |
elif e.status == 403: | |
rate_limit_reset = e.headers.get('X-RateLimit-Reset') | |
reset_time_str = "unknown" | |
if rate_limit_reset: | |
try: reset_time_str = datetime.fromtimestamp(int(rate_limit_reset), timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z') | |
except ValueError: pass | |
error_msg = f"Error: GitHub API rate limit likely exceeded or access forbidden (Remaining: {rate_limit_remaining}). Reset time: {reset_time_str}. Check token or wait." | |
self.stop_idle_processing() | |
self.stop_broadcast_loop() | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", annotations=[{"text": "API Error.", "xref": "paper", "yref": "paper", "showarrow": False}], plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return [], empty_fig, error_msg, empty_fig | |
except Exception as e: | |
self.stop_idle_processing() | |
self.stop_broadcast_loop() | |
logger.exception(f"An unexpected error occurred during issue crawl: {e}") | |
empty_fig = go.Figure() | |
empty_fig.update_layout(title="Issue Severity Distribution", annotations=[{"text": "Unexpected Error.", "xref": "paper", "yref": "paper", "showarrow": False}], plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return [], empty_fig, f"An unexpected error occurred: {e}", empty_fig | |
def _determine_severity(self, labels: List[str]) -> str: | |
"""Determines issue severity based on labels using predefined rules.""" | |
labels_lower = {label.lower().strip() for label in labels} | |
for severity, keywords in self.severity_rules.items(): | |
if any(keyword in label for keyword in keywords for label in labels_lower): | |
return severity | |
return "Unknown" | |
def _generate_stats_plot(self, severity_counts: Dict[str, int]) -> go.Figure: | |
"""Generates a Plotly bar chart for issue severity distribution.""" | |
filtered_counts = {k: v for k, v in severity_counts.items() if v > 0} | |
if not filtered_counts: | |
fig = go.Figure() | |
fig.update_layout(title="Issue Severity Distribution", xaxis={"visible": False}, yaxis={"visible": False}, | |
annotations=[{"text": "No issues to display.", "xref": "paper", "yref": "paper", "showarrow": False, "font": "size": 16}], # Removed the extra } | |
plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)') | |
return fig | |
severities = list(filtered_counts.keys()) | |
counts = list(filtered_counts.values()) | |
order = ['Critical', 'High', 'Medium', 'Low', 'Unknown'] | |
severities_sorted = sorted(severities, key=lambda x: order.index(x) if x in order else len(order)) | |
counts_sorted = [filtered_counts[s] for s in severities_sorted] | |
fig = px.bar(x=severities_sorted, y=counts_sorted, title="Issue Severity Distribution", | |
labels={'x': 'Severity', 'y': 'Number of Issues'}, color=severities_sorted, | |
color_discrete_map={'Critical': '#DC2626', 'High': '#F97316', 'Medium': '#FACC15', 'Low': '#84CC16', 'Unknown': '#6B7280'}, | |
text=counts_sorted) | |
fig.update_layout(xaxis_title=None, yaxis_title="Number of Issues", plot_bgcolor='rgba(0,0,0,0)', | |
paper_bgcolor='rgba(0,0,0,0)', showlegend=False, | |
xaxis={'categoryorder':'array', 'categoryarray': order}) | |
fig.update_traces(textposition='outside') | |
return fig | |
async def _cluster_similar_issues(self): | |
"""Generates embeddings and clusters issues using HDBSCAN. Uses self.issue_list_for_clustering.""" | |
if not self.issue_list_for_clustering: | |
logger.warning("Cannot cluster issues: No issues loaded or list is empty.") | |
self.issue_clusters = {} | |
return | |
if not self.hf_token: | |
logger.error("Cannot cluster issues: Hugging Face token missing.") | |
self.issue_clusters = {} | |
return | |
num_issues = len(self.issue_list_for_clustering) | |
logger.info(f"Generating embeddings for {num_issues} issues for clustering...") | |
try: | |
texts_to_embed = [ | |
f"Title: {i.get('title','')} Body: {i.get('body','')[:1500]}" | |
for i in self.issue_list_for_clustering | |
] | |
embeddings = await self._generate_embeddings(texts_to_embed) | |
if embeddings is None or not isinstance(embeddings, list) or len(embeddings) != num_issues: | |
logger.error(f"Failed to generate valid embeddings for clustering. Expected {num_issues}, got {type(embeddings)} len {len(embeddings) if embeddings else 'N/A'}.") | |
self.issue_clusters = {} | |
return | |
logger.info(f"Generated {len(embeddings)} embeddings. Running HDBSCAN clustering...") | |
clusterer = HDBSCAN(min_cluster_size=2, metric='cosine', allow_single_cluster=True, gen_min_span_tree=True) | |
clusters = clusterer.fit_predict(embeddings) | |
new_issue_clusters: Dict[int, List[int]] = {} | |
noise_count = 0 | |
for i, cluster_id in enumerate(clusters): | |
cluster_id_int = int(cluster_id) | |
if cluster_id_int == -1: | |
noise_count += 1 | |
continue | |
if cluster_id_int not in new_issue_clusters: | |
new_issue_clusters[cluster_id_int] = [] | |
new_issue_clusters[cluster_id_int].append(i) | |
self.issue_clusters = new_issue_clusters | |
logger.info(f"Clustering complete. Found {len(self.issue_clusters)} clusters (min size 2) with {noise_count} noise points.") | |
# Reset the change counter and flag after successful clustering | |
self._webhook_change_count = 0 | |
self.needs_recluster = False | |
logger.debug("Reset webhook change counter and recluster flag after clustering.") | |
except Exception as e: | |
logger.exception(f"Error during issue clustering: {e}") | |
self.issue_clusters = {} | |
def _identify_potential_duplicates(self): | |
"""Populates self.potential_duplicates based on self.issue_clusters and self.issue_list_for_clustering.""" | |
self.potential_duplicates = {} | |
if not self.issue_clusters or not self.issue_list_for_clustering: | |
logger.debug("Skipping duplicate identification: No clusters or issue list.") | |
return | |
index_to_id = {} | |
try: | |
for i, issue in enumerate(self.issue_list_for_clustering): | |
issue_id = issue.get('id') | |
if issue_id is None: | |
logger.warning(f"Issue at index {i} in clustering list is missing an ID.") | |
continue | |
index_to_id[i] = issue_id | |
except Exception as e: | |
logger.error(f"Error creating index-to-ID map for duplicate check: {e}. Issue list might be inconsistent.") | |
return | |
for cluster_id, indices in self.issue_clusters.items(): | |
if len(indices) > 1: | |
cluster_issue_ids = [index_to_id[i] for i in indices if i in index_to_id] | |
if len(cluster_issue_ids) > 1: | |
for issue_id in cluster_issue_ids: | |
self.potential_duplicates[issue_id] = [other_id for other_id in cluster_issue_ids if other_id != issue_id] | |
logger.info(f"Identified potential duplicates for {len(self.potential_duplicates)} issues based on clustering.") | |
async def _generate_embeddings(self, texts: List[str]): | |
"""Generates sentence embeddings using Hugging Face Inference API.""" | |
if not self.hf_token: | |
logger.error("Hugging Face token is not set. Cannot generate embeddings.") | |
return None | |
if not texts: | |
logger.warning("Embedding generation requested with empty text list.") | |
return [] | |
model_id = "sentence-transformers/all-mpnet-base-v2" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
timeout = aiohttp.ClientTimeout(total=180) | |
logger.info(f"Requesting embeddings from {api_url} for {len(texts)} texts.") | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
try: | |
payload = {"inputs": texts, "options": {"wait_for_model": True} | |
async with session.post(api_url, json=payload) as response: | |
rate_limit_remaining = response.headers.get('X-Ratelimit-Remaining') | |
logger.debug(f"HF Embedding API Response Status: {response.status}, RateLimit Remaining: {rate_limit_remaining}") | |
response.raise_for_status() | |
result = await response.json() | |
if isinstance(result, list) and all(isinstance(emb, list) and all(isinstance(f, float) for f in emb) for emb in result): | |
if len(result) == len(texts): | |
logger.info(f"Successfully received {len(result)} embeddings of expected dimension.") | |
return result | |
else: | |
logger.error(f"HF Embedding API returned wrong number of embeddings: Got {len(result)}, expected {len(texts)}.") | |
return None | |
elif isinstance(result, dict) and 'error' in result: | |
error_msg = result['error'] | |
estimated_time = result.get('estimated_time') | |
logger.error(f"HF Inference API embedding error: {error_msg}" + (f" (Estimated time: {estimated_time}s)" if estimated_time else "")) | |
return None | |
else: | |
logger.error(f"Unexpected embedding format received: Type={type(result)}. Response: {str(result)[:500]}") | |
return None | |
except aiohttp.ClientResponseError as e: | |
error_body = await e.text() | |
logger.error(f"HF Inference API embedding request failed: Status={e.status}, Message='{e.message}'. Body: {error_body[:500]}") | |
return None | |
except asyncio.TimeoutError: | |
logger.error(f"HF Inference API embedding request timed out after {timeout.total} seconds.") | |
return None | |
except Exception as e: | |
logger.exception(f"Unexpected error during embedding generation: {e}") | |
return None | |
async def generate_code_patch(self, issue_number: int, model_key: str) -> dict: | |
"""Generates a code patch suggestion using a selected AI model.""" | |
if issue_number not in self.issues: | |
return {"error": f"Issue {issue_number} not found."} | |
if not self.hf_token: | |
return {"error": "Hugging Face token not set."} | |
if model_key not in HF_MODELS: | |
return {"error": f"Invalid model key: {model_key}"} | |
if not self.repo_local_path or not self.repo: | |
return {"error": "Repository not cloned/available locally. Please scan the repository first."} | |
issue = self.issues[issue_number] | |
model_id = HF_MODELS[model_key] | |
logger.info(f"Generating patch for issue {issue_number} ('{issue.get('title', 'N/A')[:50]}...') using model {model_id}") | |
# --- Context Gathering --- | |
context_str = "Context gathering failed or not available." | |
context_source = "Error" | |
start_time_context = time.time() | |
if issue_number in self.precomputed_context: | |
context_data = self.precomputed_context[issue_number] | |
timestamp_str = datetime.fromtimestamp(context_data.get('timestamp', 0)).strftime('%Y-%m-%d %H:%M:%S') | |
if context_data.get("error"): | |
context_str = f"Pre-computed context retrieval failed: {context_data['error']}" | |
context_source = f"Pre-computed (Failed @ {timestamp_str})" | |
elif context_data.get("content"): | |
context_str = context_data["content"] | |
num_files = len(context_data.get('files',[])) | |
context_source = f"Pre-computed ({num_files} files @ {timestamp_str})" | |
else: | |
context_str = "Pre-computed context was empty or unavailable." | |
context_source = f"Pre-computed (Empty @ {timestamp_str})" | |
logger.info(f"Using pre-computed context for issue {issue_number} (Source: {context_source})") | |
else: | |
logger.info(f"No pre-computed context found for issue {issue_number}, computing now.") | |
context_source = "Computed On-Demand" | |
context_result = await self._get_code_context(issue) | |
if "error" in context_result and context_result["error"]: | |
context_str = f"Error retrieving context: {context_result['error']}" | |
context_source += " (Error)" | |
else: | |
context_str = context_result.get("content", "No specific context found.") | |
context_source += f" ({len(context_result.get('files',[]))} files)" | |
self.precomputed_context[issue_number] = { | |
"content": context_str, | |
"files": context_result.get("files", []), | |
"error": context_result.get("error"), | |
"timestamp": time.time() | |
} | |
context_duration = time.time() - start_time_context | |
logger.info(f"Computed context on-demand in {context_duration:.2f}s. Source: {context_source}") | |
# --- Get Pre-computed Info --- | |
summary_text = self._get_precomputed_text(issue_number, self.precomputed_summaries, "summary", "Summary") | |
missing_info_text = self._get_precomputed_text(issue_number, self.precomputed_missing_info, "info_needed", "Missing Info Analysis") | |
analysis_text = self._get_precomputed_text(issue_number, self.precomputed_analysis, "hypothesis", "Preliminary Analysis") | |
duplicate_info = self._get_duplicate_info_text(issue_number) | |
# --- Enhanced Prompt --- | |
prompt = f"""You are an expert software engineer AI assistant generating a minimal `diff` code patch. | |
## Issue Details: | |
### Issue ID: {issue.get('id', 'N/A')} | |
### Title: {issue.get('title', 'N/A')} | |
### Labels: {', '.join(issue.get('labels', []))} | |
### Body: | |
{issue.get('body', 'N/A')} | |
## AI Pre-computation Results: | |
### Summary: {summary_text} | |
### Missing Information Analysis: {missing_info_text} | |
### Preliminary Hypothesis: {analysis_text} | |
{duplicate_info} | |
## Relevant Code Context (Source: {context_source}): | |
{context_str} | |
(Context may be truncated or incomplete. Base patch only on provided code.) | |
## Instructions: | |
1. **Analyze:** Review all provided information (issue details, AI analysis, code context). | |
2. **Identify Changes:** Determine minimal code modifications *within the provided context* to address the issue. | |
3. **Format:** Generate a standard `diff` patch (--- a/..., +++ b/..., @@ ..., +/-, space for context). Use exact relative paths from context. | |
4. **Output:** Provide a concise explanation *before* the ```diff ... ``` block. | |
5. **Constraints:** If context is insufficient, state "Insufficient context to generate patch." and explain why. **Do not generate a diff block.** Do not invent code/paths # # Patch Suggestion: | |
""" | |
# --- Call Inference API --- | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 2048, "temperature": 0.1, "return_full_text": False, "do_sample": False}, | |
"options": {"wait_for_model": True} | |
} | |
timeout = aiohttp.ClientTimeout(total=180) | |
try: | |
start_time_api = time.time() | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
api_duration = time.time() - start_time_api | |
rate_limit_remaining = response.headers.get('X-Ratelimit-Remaining') | |
logger.debug(f"HF Patch API Response Status: {response.status}, Duration: {api_duration:.2f}s, RateLimit Remaining: {rate_limit_remaining}") | |
response.raise_for_status() | |
result = await response.json() | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
generated_text = result[0].get('generated_text', '').strip() | |
logger.info(f"Received patch suggestion from {model_id} ({len(generated_text)} chars).") | |
diff_match = re.search(r"```diff\n(.*?)```", generated_text, re.DOTALL | re.IGNORECASE) | |
explanation = generated_text.split("```diff")[0].strip() if diff_match else generated_text | |
if not explanation: explanation = "(No explanation provided by AI.)" | |
if diff_match: | |
patch_content = diff_match.group(1).strip() | |
if not re.search(r'^(--- |\+\+\+ |@@ )', patch_content, re.MULTILINE): | |
logger.warning(f"Generated patch for issue {issue_number} might lack standard diff headers or spacing.") | |
return {"explanation": explanation, "patch": patch_content, "model_used": model_id} | |
else: | |
if re.search(r"(insufficient context|cannot generate|unable to create patch|context required)", explanation, re.IGNORECASE): | |
logger.warning(f"AI indicated insufficient context for issue {issue_number}.") | |
return {"explanation": explanation, "patch": None, "model_used": model_id} | |
else: | |
logger.warning(f"No diff block found in patch suggestion response for issue {issue_number}.") | |
return {"explanation": f"(AI response did not contain a ```diff block. Full response below)\n---\n{generated_text}", "patch": None, "model_used": model_id} | |
elif isinstance(result, dict) and 'error' in result: | |
error_msg = result['error'] | |
estimated_time = result.get('estimated_time') | |
logger.error(f"HF Inference API patch error for issue {issue_number}: {error_msg}" + (f" (Est: {estimated_time}s)" if estimated_time else "")) | |
return {"error": f"AI model error: {error_msg}"} | |
else: | |
logger.error(f"Unexpected patch response format from {model_id} for issue {issue_number}: {str(result)[:500]}") | |
return {"error": "Unexpected response format from AI model."} | |
except aiohttp.ClientResponseError as e: | |
error_body = await e.response.text() | |
logger.error(f"HF Inference API patch request failed for issue {issue_number}: Status={e.status}, Message='{e.message}'. Body: {error_body[:500]}") | |
return {"error": f"AI model request failed ({e.status}). Check model/token/API status. {error_body[:100]}"} | |
except asyncio.TimeoutError: | |
logger.error(f"HF Inference API patch request timed out ({timeout.total}s) for issue {issue_number}.") | |
return {"error": "AI model request timed out. The model might be overloaded or the request too complex."} | |
except Exception as e: | |
logger.exception(f"Error generating code patch for issue {issue_number}: {e}") | |
return {"error": f"An unexpected error occurred during patch generation: {e}"} | |
async def _get_code_context(self, issue: dict) -> dict: | |
"""Retrieves relevant code context based on file paths mentioned in the issue.""" | |
if not self.repo or not self.repo_local_path: | |
return {"content": "Local repository not available.", "files": [], "error": "Local repository not available."} | |
issue_id = issue.get('id', 'N/A') | |
issue_body = issue.get('body', '') or "" | |
issue_title = issue.get('title', '') or "" | |
text_to_search = f"{issue_title}\n{issue_body}" | |
if not text_to_search.strip(): | |
return {"content": "No issue title or body provided to search for file paths.", "files": [], "error": None} | |
path_pattern = r""" | |
(?<![:/]) | |
(?: | |
[\'"`]? | |
( | |
(?: (?: \./ | / )? [\w./_-]+ / )? | |
[\w._-]+ | |
\. | |
(?:py|js|jsx|ts|tsx|java|c|cpp|h|hpp|cs|go|rs|php|rb|html|css|scss|less|md|yaml|yml|json|toml|sh|bash|zsh|ipynb|config|cfg|ini|xml|sql|gradle|tf|tfvars|dockerfile|Makefile|Dockerfile|txt|log) | |
) | |
[\'"`]? | |
) | |
(?= [\s\n\.,;:"')\]!>] | $ ) | |
""" | |
potential_files = set(re.findall(path_pattern, text_to_search, re.VERBOSE | re.IGNORECASE)) | |
if not potential_files: | |
return {"content": "No file paths matching common patterns found in the issue title or body.", "files": [], "error": None} | |
logger.info(f"Found {len(potential_files)} potential file references in issue {issue_id}: {potential_files}") | |
context_content = "" | |
max_context_length = 6000 | |
files_included = [] | |
files_not_found = [] | |
files_read_error = [] | |
files_skipped_length = [] | |
for file_path_str in potential_files: | |
normalized_path_str = file_path_str.strip('\'"` ./\\') | |
relative_path = Path(normalized_path_str.replace('\\', '/')) | |
full_path = self.repo_local_path / relative_path | |
if full_path.is_file(): | |
try: | |
file_content = full_path.read_text(encoding='utf-8', errors='ignore') | |
snippet_len = 1000 | |
content_snippet = f"---\nFile: {relative_path}\n---\n{file_content[:snippet_len]}{' [...]' if len(file_content) > snippet_len else ''}\n\n" | |
if len(context_content) + len(content_snippet) <= max_context_length: | |
context_content += content_snippet | |
files_included.append(str(relative_path)) | |
else: | |
logger.warning(f"Skipping file {relative_path} for context in issue {issue_id} due to total length limit ({max_context_length} chars).") | |
files_skipped_length.append(str(relative_path)) | |
except OSError as e: | |
logger.warning(f"Could not read file {full_path} for issue {issue_id}: {e}") | |
files_read_error.append(str(relative_path)) | |
except Exception as e: | |
logger.warning(f"Unexpected error reading file {full_path} for issue {issue_id}: {e}") | |
files_read_error.append(str(relative_path)) | |
else: | |
logger.info(f"Potential path '{relative_path}' (from '{file_path_str}') not found or not a file in local repo for issue {issue_id}.") | |
files_not_found.append(str(relative_path)) | |
final_content = "" | |
error_status = None | |
if files_included: | |
final_content = context_content.strip() | |
logger.info(f"Included context from {len(files_included)} files for issue {issue_id}: {files_included}") | |
else: | |
final_content = "No content could be retrieved from the potential file paths found." | |
logger.warning(f"Context generation for issue {issue_id} resulted in no included files.") | |
if potential_files: # If paths were found but none included | |
error_status = "No readable or found files among potential paths." | |
status_notes = [] | |
if files_not_found: | |
status_notes.append(f"Files mentioned but not found: {files_not_found}") | |
logger.info(f"Files mentioned but not found for issue {issue_id}: {files_not_found}") | |
if files_read_error: | |
status_notes.append(f"Files failed to read: {files_read_error}") | |
logger.warning(f"Files mentioned but failed to read for issue {issue_id}: {files_read_error}") | |
if files_skipped_length: | |
status_notes.append(f"File content skipped due to length limit: {files_skipped_length}") | |
logger.warning(f"File content skipped due to length limit for issue {issue_id}: {files_skipped_length}") | |
if status_notes: | |
final_content += "\n\n--- Context Notes ---\n" + "\n".join(status_notes) | |
if error_status is None and (files_not_found or files_read_error): | |
error_status = "Some mentioned files were not found or could not be read." | |
return {"content": final_content.strip(), "files": files_included, "error": error_status} | |
async def suggest_resolution(self, issue: dict, model_key: str) -> str: | |
"""Suggests a resolution description using a selected AI model.""" | |
if not self.hf_token: | |
return "Error: Hugging Face token not set." | |
if model_key not in HF_MODELS: | |
return f"Error: Invalid model key: {model_key}" | |
model_id = HF_MODELS[model_key] | |
issue_id = issue.get('id','N/A') | |
logger.info(f"Requesting resolution suggestion for issue {issue_id} ('{issue.get('title', 'N/A')[:50]}...') using {model_id}") | |
# --- Get Pre-computed Info --- | |
summary_text = self._get_precomputed_text(issue_id, self.precomputed_summaries, "summary", "Summary") | |
missing_info_text = self._get_precomputed_text(issue_id, self.precomputed_missing_info, "info_needed", "Missing Info Analysis") | |
analysis_text = self._get_precomputed_text(issue_id, self.precomputed_analysis, "hypothesis", "Preliminary Analysis") | |
duplicate_info = self._get_duplicate_info_text(issue_id) | |
# Enhanced Prompt | |
prompt = f"""You are a helpful AI assistant acting as a senior software engineer. Analyze the following GitHub issue and provide concise, actionable, step-by-step suggestions on how to approach its resolution # # Issue Details: | |
### Issue ID: {issue.get('id', 'N/A')} | |
### Title: {issue.get('title', 'N/A')} | |
### Labels: {', '.join(issue.get('labels', []))} | |
### Body: | |
{issue.get('body', 'N/A')[:1500]} | |
## AI Pre-computation Results: | |
### Summary: {summary_text} | |
### Missing Information Analysis: {missing_info_text} | |
### Preliminary Hypothesis: {analysis_text} | |
{duplicate_info} | |
## Suggested Resolution Approach: | |
Based on *all* the information provided above, outline a potential plan: | |
1. **Address Missing Information (If Any):** | |
* [If the 'Missing Information Analysis' identified gaps, state how to get that information (e.g., "Request logs from the user," "Ask for steps to reproduce"). If none needed, state "Issue description seems reasonably complete."] | |
2. **Refine Understanding / Verify Hypothesis:** | |
* [Based on the 'Preliminary Hypothesis' and issue details, state the most likely root cause or goal. Mention any ambiguities or areas needing confirmation.] | |
3. **Identify Relevant Code Areas (Hypothesize):** | |
* [Based on keywords, error messages, paths, or conventions, list specific files/functions/classes likely needing investigation. Acknowledge if this is speculative.] | |
4. **Propose Implementation / Investigation Steps:** | |
* [Describe core logic changes, debugging steps, configuration updates, or API interactions needed. Break it down logically.] | |
5. **Testing Recommendations:** | |
* [Suggest specific unit, integration, or manual tests crucial for verification. Mention key scenarios.] | |
6. **Next Steps (Standard Workflow):** | |
* [Mention branching, coding, testing, committing, PR.] | |
**Important:** If critical information is still missing despite the AI analysis, emphasize that in step 1. Do not invent details. | |
""" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 1024, "temperature": 0.6, "return_full_text": False, "do_sample": True, "top_p": 0.9}, | |
"options": {"wait_for_model": True} | |
} | |
timeout = aiohttp.ClientTimeout(total=90) | |
try: | |
start_time_api = time.time() | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
api_duration = time.time() - start_time_api | |
rate_limit_remaining = response.headers.get('X-Ratelimit-Remaining') | |
logger.debug(f"HF Suggestion API Response Status: {response.status}, Duration: {api_duration:.2f}s, RateLimit Remaining: {rate_limit_remaining}") | |
response.raise_for_status() | |
result = await response.json() | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
suggestion = result[0].get('generated_text', 'AI Error: No suggestion text generated.').strip() | |
logger.info(f"Received suggestion from {model_id} for issue {issue_id} ({len(suggestion)} chars).") | |
return suggestion | |
elif isinstance(result, dict) and 'error' in result: | |
error_msg = result['error'] | |
estimated_time = result.get('estimated_time') | |
logger.error(f"HF Inference API suggestion error for issue {issue_id}: {error_msg}" + (f" (Est: {estimated_time}s)" if estimated_time else "")) | |
return f"Error: AI model returned an error: {error_msg}" | |
else: | |
logger.error(f"Unexpected suggestion response format from {model_id} for issue {issue_id}: {str(result)[:500]}") | |
return "Error: Received unexpected response format from AI model." | |
except aiohttp.ClientResponseError as e: | |
error_body = await e.response.text() | |
logger.error(f"HF Inference API suggestion request failed for issue {issue_id}: Status={e.status}, Message='{e.message}'. Body: {error_body[:500]}") | |
return f"Error: AI model request failed ({e.status}). Check model/token/API status. {error_body[:100]}" | |
except asyncio.TimeoutError: | |
logger.error(f"HF Inference API suggestion request timed out ({timeout.total}s) for issue {issue_id}.") | |
return "Error: AI model request timed out. The model might be busy." | |
except Exception as e: | |
logger.exception(f"Error suggesting resolution for issue {issue_id}: {e}") | |
return f"An unexpected error occurred during suggestion generation: {e}" | |
def _get_precomputed_text(self, issue_id: int, data_dict: dict, key: str, name: str) -> str: | |
"""Safely retrieves precomputed text, handling errors and pending states.""" | |
if issue_id in data_dict: | |
entry = data_dict[issue_id] | |
timestamp = entry.get("timestamp", 0) | |
is_recent = time.time() - timestamp < self.idle_processing_interval * 2 | |
if entry.get("error"): | |
return f"{name} Error (at {datetime.fromtimestamp(timestamp).strftime('%H:%M:%S')}): {entry['error']}" | |
elif entry.get(key) is not None: # Check key exists and is not None | |
return entry[key] | |
else: # No error, but key might be missing or None | |
if is_recent: | |
return f"({name} computation pending...)" | |
else: | |
return f"({name} not computed or result was empty)" | |
else: | |
return f"({name} not computed yet)" | |
def _get_duplicate_info_text(self, issue_id: int) -> str: | |
"""Formats duplicate info text.""" | |
if issue_id in self.potential_duplicates: | |
dup_ids = self.potential_duplicates[issue_id] | |
if dup_ids: | |
return f"\n### Potential Duplicate Issues:\nIssue IDs: {', '.join(map(str, dup_ids))}" | |
return "" | |
async def broadcast_collaboration_status(self): | |
"""Periodically sends collaborator status to all connected clients.""" | |
while True: | |
try: | |
await asyncio.sleep(5) | |
if not self.ws_clients: continue | |
status_payload = json.dumps({"type": "collaboration_status", "collaborators": self.collaborators}) | |
active_clients_snapshot = list(self.ws_clients) | |
tasks = [] | |
disconnected_clients = [] | |
for client in active_clients_snapshot: | |
if client.closed: | |
disconnected_clients.append(client) | |
continue | |
# Use standard socket exceptions for checks | |
try: | |
tasks.append(client.send(status_payload)) | |
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError) as e: | |
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected before send: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
except Exception as e: | |
logger.error(f"Unexpected error preparing send to client {getattr(client, 'client_id', client.remote_address)}: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
if tasks: | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for i, result in enumerate(results): | |
client = active_clients_snapshot[i] | |
# Use standard socket exceptions for checks | |
if isinstance(result, (ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError)): | |
disconnected_clients.append(client) | |
elif isinstance(result, Exception): | |
logger.error(f"Error sending status to client {getattr(client, 'client_id', client.remote_address)}: {result}. Marking for removal.") | |
disconnected_clients.append(client) | |
if disconnected_clients: | |
unique_disconnected = list(set(disconnected_clients)) | |
logger.info(f"Removing {len(unique_disconnected)} disconnected clients after broadcast.") | |
for client in unique_disconnected: | |
self.remove_ws_client(client) | |
except asyncio.CancelledError: | |
logger.info("Broadcast loop cancelled.") | |
break | |
except Exception as e: | |
logger.exception(f"Error in broadcast loop: {e}") | |
await asyncio.sleep(10) | |
async def handle_code_editor_update(self, issue_num: int, delta_str: str, sender_client_id: str): | |
""" | |
Applies a delta from one client and broadcasts it to others. | |
WARNING: Lacks Operational Transformation - concurrent edits are UNSAFE. | |
""" | |
if issue_num not in self.code_editors: | |
logger.warning(f"Received code update for non-existent editor instance for issue {issue_num}. Ignoring.") | |
return | |
if issue_num not in self.issues: | |
logger.warning(f"Received code update for non-existent issue {issue_num} in manager. Ignoring.") | |
return | |
logger.warning(f"Handling code editor update for issue {issue_num} from {sender_client_id}. " | |
"WARNING: NO OT IMPLEMENTED - Last write wins / potential conflicts.") | |
try: | |
delta_obj = json.loads(delta_str) | |
self.code_editors[issue_num].apply_delta(delta_obj) | |
logger.info(f"Applied delta for issue {issue_num} from client {sender_client_id} (Placeholder OT Logic - Revision {self.code_editors[issue_num].revision})") | |
update_payload = json.dumps({ | |
"type": "code_update", | |
"issue_num": issue_num, | |
"delta": delta_str, | |
"senderId": sender_client_id | |
}) | |
tasks = [] | |
disconnected_clients = [] | |
active_clients_snapshot = list(self.ws_clients) | |
for client in active_clients_snapshot: | |
client_ws_id = getattr(client, 'client_id', None) | |
if client_ws_id != sender_client_id: | |
if client.closed: | |
disconnected_clients.append(client) | |
continue | |
# Use standard socket exceptions for checks | |
try: | |
tasks.append(client.send(update_payload)) | |
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError) as e: | |
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected before send: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
except Exception as e: | |
logger.error(f"Unexpected error preparing send to client {getattr(client, 'client_id', client.remote_address)}: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
if tasks: | |
logger.debug(f"Broadcasting code update for issue {issue_num} to {len(tasks)} other clients.") | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for i, result in enumerate(results): | |
if isinstance(result, Exception): | |
failed_client = active_clients_snapshot[i] | |
failed_client_id = getattr(failed_client, 'client_id', 'Unknown') | |
logger.warning(f"Failed to broadcast code update to client {failed_client_id}: {result}") | |
# Use standard socket exceptions for checks | |
if isinstance(result, (ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError)): | |
disconnected_clients.append(failed_client) | |
if disconnected_clients: | |
unique_disconnected = list(set(disconnected_clients)) | |
logger.info(f"Removing {len(unique_disconnected)} clients after code update broadcast failure.") | |
for client in unique_disconnected: | |
if client: self.remove_ws_client(client) | |
except json.JSONDecodeError: | |
logger.error(f"Received invalid JSON delta for issue {issue_num} from {sender_client_id}: {delta_str[:200]}") | |
except Exception as e: | |
logger.exception(f"Error handling code editor update for issue {issue_num} from {sender_client_id}: {e}") | |
async def broadcast_issue_update(self): | |
"""Notifies clients that the issue list/data has changed (e.g., due to webhook).""" | |
if not self.ws_clients: return | |
logger.info("Broadcasting 'issues_updated' notification to all clients.") | |
update_payload = json.dumps({"type": "issues_updated"}) | |
active_clients_snapshot = list(self.ws_clients) | |
tasks = [] | |
disconnected_clients = [] | |
for client in active_clients_snapshot: | |
if client.closed: | |
disconnected_clients.append(client) | |
continue | |
# Use standard socket exceptions for checks | |
try: | |
tasks.append(client.send(update_payload)) | |
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError) as e: | |
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected before send: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
except Exception as e: | |
logger.error(f"Unexpected error preparing send to client {getattr(client, 'client_id', client.remote_address)}: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
if tasks: | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for i, result in enumerate(results): | |
client = active_clients_snapshot[i] | |
# Use standard socket exceptions for checks | |
if isinstance(result, (ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError)): | |
disconnected_clients.append(client) | |
elif isinstance(result, Exception): | |
logger.error(f"Error sending issue update to client {getattr(client, 'client_id', client.remote_address)}: {result}. Marking for removal.") | |
disconnected_clients.append(client) | |
if disconnected_clients: | |
unique_disconnected = list(set(disconnected_clients)) | |
logger.info(f"Removing {len(unique_disconnected)} clients after issue update broadcast.") | |
for client in unique_disconnected: | |
self.remove_ws_client(client) | |
def remove_ws_client(self, client_to_remove: WebSocketServerProtocol): | |
"""Safely removes a client from the list and collaborator dict.""" | |
client_id = getattr(client_to_remove, 'client_id', None) | |
client_addr = client_to_remove.remote_address | |
client_desc = f"{client_id or 'Unknown ID'} ({client_addr})" | |
removed_from_list = False | |
removed_from_collab = False | |
try: | |
self.ws_clients.remove(client_to_remove) | |
removed_from_list = True | |
logger.info(f"Removed WebSocket client from list: {client_desc} (Remaining: {len(self.ws_clients)})") | |
except ValueError: | |
logger.debug(f"Client {client_desc} already removed from list or not found.") | |
pass | |
if client_id and client_id in self.collaborators: | |
del self.collaborators[client_id] | |
removed_from_collab = True | |
logger.info(f"Removed collaborator entry for {client_id}.") | |
if (removed_from_list or removed_from_collab) and self.main_loop.is_running(): | |
asyncio.run_coroutine_threadsafe(self.broadcast_collaboration_status_once(), self.main_loop) | |
logger.debug(f"Scheduled immediate status broadcast after removing client {client_desc}.") | |
async def broadcast_collaboration_status_once(self): | |
"""Sends a single collaboration status update immediately.""" | |
if not self.ws_clients: return | |
status_payload = json.dumps({"type": "collaboration_status", "collaborators": self.collaborators}) | |
active_clients_snapshot = list(self.ws_clients) | |
tasks = [] | |
disconnected_clients = [] | |
for client in active_clients_snapshot: | |
if not client.closed: | |
# Use standard socket exceptions for checks | |
try: | |
tasks.append(client.send(status_payload)) | |
except (ConnectionClosed, ConnectionAbortedError, ConnectionResetError) as e: | |
logger.warning(f"Client {getattr(client, 'client_id', client.remote_address)} seems disconnected before send: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
except Exception as e: | |
logger.error(f"Unexpected error preparing send to client {getattr(client, 'client_id', client.remote_address)}: {e}. Marking for removal.") | |
disconnected_clients.append(client) | |
else: | |
disconnected_clients.append(client) | |
if tasks: | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
for i, result in enumerate(results): | |
if isinstance(result, Exception): | |
client = active_clients_snapshot[i] | |
client_id = getattr(client, 'client_id', 'Unknown') | |
logger.warning(f"Error during single status broadcast to client {client_id}: {result}") | |
# Use standard socket exceptions for checks | |
if isinstance(result, (ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError)): | |
disconnected_clients.append(client) | |
if disconnected_clients: | |
unique_disconnected = list(set(disconnected_clients)) | |
logger.info(f"Removing {len(unique_disconnected)} clients found disconnected during single broadcast.") | |
for client in unique_disconnected: | |
self.remove_ws_client(client) | |
def _identify_stale_issues(self): | |
"""Identifies issues not updated recently based on 'updated_at'.""" | |
self.stale_issues = [] | |
threshold = timedelta(days=self.stale_issue_threshold_days) | |
now_aware = datetime.now(timezone.utc) | |
for issue_id, issue_data in self.issues.items(): | |
updated_at_str = issue_data.get("updated_at") | |
if updated_at_str: | |
try: | |
updated_at = datetime.fromisoformat(updated_at_str.replace("Z", "+00:00")) | |
if now_aware - updated_at > threshold: | |
self.stale_issues.append(issue_id) | |
except (ValueError, TypeError) as e: | |
logger.warning(f"Could not parse 'updated_at' ('{updated_at_str}') for issue {issue_id}: {e}") | |
logger.info(f"Identified {len(self.stale_issues)} potentially stale issues (updated > {self.stale_issue_threshold_days} days ago).") | |
def _identify_high_priority_candidates(self): | |
"""Identifies high-priority issues (e.g., Critical/High severity).""" | |
self.high_priority_candidates = [] | |
for issue_id, issue_data in self.issues.items(): | |
severity = self._determine_severity(issue_data.get('labels', [])) | |
if severity in ["Critical", "High"]: | |
self.high_priority_candidates.append(issue_id) | |
logger.info(f"Identified {len(self.high_priority_candidates)} high-priority candidates (Critical/High severity).") | |
async def _compute_and_store_summary(self, issue_id: int): | |
"""Generates and stores a summary for a given issue using an LLM (Idle Task).""" | |
if issue_id not in self.issues: | |
logger.warning(f"Skipping summary generation for issue {issue_id}: Issue no longer exists.") | |
return | |
if not self.hf_token: | |
self.precomputed_summaries[issue_id] = {"error": "HF token not set", "timestamp": time.time()} | |
return | |
try: | |
issue = self.issues[issue_id] | |
model_id = DEFAULT_IDLE_MODEL_ID # Use designated idle model | |
logger.info(f"Idle Task: Generating summary for issue {issue_id} using {model_id}") | |
start_time = time.time() | |
prompt = f"""Concisely summarize the following GitHub issue in 1-2 sentences. Focus on the core problem or request reported by the user. | |
Issue Title: {issue.get('title', 'N/A')} | |
Issue Body (first 1000 chars): | |
{issue.get('body', 'N/A')[:1000]} | |
Summary:""" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 128, "temperature": 0.2, "return_full_text": False, "do_sample": False}, | |
"options": {"wait_for_model": True} | |
} | |
timeout = aiohttp.ClientTimeout(total=60) | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
response.raise_for_status() | |
result = await response.json() | |
duration = time.time() - start_time | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
summary = result[0].get('generated_text', '').strip() or "(AI generated empty summary)" | |
self.precomputed_summaries[issue_id] = {"summary": summary, "error": None, "timestamp": time.time()} | |
logger.info(f"Stored summary for issue {issue_id} (took {duration:.2f}s).") | |
elif isinstance(result, dict) and 'error' in result: | |
raise ValueError(f"API Error: {result['error']}") | |
else: | |
raise ValueError(f"Unexpected API response format: {str(result)[:200]}") | |
except Exception as e: | |
err_msg = f"Failed summary: {e}" | |
logger.error(f"Failed to generate summary for issue {issue_id}: {e}", exc_info=False) # Keep log cleaner | |
self.precomputed_summaries[issue_id] = {"error": err_msg, "summary": None, "timestamp": time.time()} | |
async def _compute_and_store_missing_info(self, issue_id: int): | |
"""Idle Task: Use LLM to identify missing information needed for an issue.""" | |
if issue_id not in self.issues: return | |
if not self.hf_token: | |
self.precomputed_missing_info[issue_id] = {"error": "HF token not set", "timestamp": time.time()} | |
return | |
try: | |
issue = self.issues[issue_id] | |
model_id = DEFAULT_IDLE_MODEL_ID # Use cheap model | |
logger.info(f"Idle Task: Identifying missing info for issue {issue_id} using {model_id}") | |
start_time = time.time() | |
prompt = f"""Analyze the following GitHub issue description. Identify critical information potentially missing for effective debugging or resolution. List the missing items concisely (e.g., "Steps to reproduce", "Error logs", "Expected vs. Actual behavior", "Environment details"). If the description seems reasonably complete, respond with ONLY the word "None". | |
Issue Title: {issue.get('title', 'N/A')} | |
Issue Body: | |
{issue.get('body', 'N/A')[:1500]} | |
Missing Information:""" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 64, "temperature": 0.1, "return_full_text": False, "do_sample": False}, | |
"options": {"wait_for_model": True} | |
} | |
timeout = aiohttp.ClientTimeout(total=45) | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
response.raise_for_status() | |
result = await response.json() | |
duration = time.time() - start_time | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
info_needed = result[0].get('generated_text', '').strip() | |
if info_needed.lower() == "none" or not info_needed: | |
info_needed = "None needed." | |
self.precomputed_missing_info[issue_id] = {"info_needed": info_needed, "error": None, "timestamp": time.time()} | |
logger.info(f"Stored missing info analysis for issue {issue_id} (took {duration:.2f}s): '{info_needed[:50]}...'") | |
elif isinstance(result, dict) and 'error' in result: | |
raise ValueError(f"API Error: {result['error']}") | |
else: | |
raise ValueError(f"Unexpected API response format: {str(result)[:200]}") | |
except Exception as e: | |
err_msg = f"Failed missing info analysis: {e}" | |
logger.error(f"Failed missing info analysis for issue {issue_id}: {e}", exc_info=False) | |
self.precomputed_missing_info[issue_id] = {"error": err_msg, "info_needed": None, "timestamp": time.time()} | |
async def _compute_and_store_preliminary_analysis(self, issue_id: int): | |
"""Idle Task: Use LLM for a very concise preliminary hypothesis.""" | |
if issue_id not in self.issues: return | |
if not self.hf_token: | |
self.precomputed_analysis[issue_id] = {"error": "HF token not set", "timestamp": time.time()} | |
return | |
try: | |
issue = self.issues[issue_id] | |
model_id = DEFAULT_IDLE_MODEL_ID # Use cheap model | |
logger.info(f"Idle Task: Generating preliminary analysis for issue {issue_id} using {model_id}") | |
start_time = time.time() | |
prompt = f"""Analyze the GitHub issue below. Provide a single, concise sentence hypothesizing the root cause OR the main goal. Start with "Hypothesis:". If unsure, respond ONLY with "Hypothesis: Further investigation needed.". | |
Issue Title: {issue.get('title', 'N/A')} | |
Issue Body: | |
{issue.get('body', 'N/A')[:1500]} | |
Response:""" | |
api_url = f"{HF_INFERENCE_API}/{model_id}" | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = { | |
"inputs": prompt, | |
"parameters": {"max_new_tokens": 80, "temperature": 0.3, "return_full_text": False, "do_sample": False}, | |
"options": {"wait_for_model": True} | |
} | |
timeout = aiohttp.ClientTimeout(total=45) | |
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session: | |
async with session.post(api_url, json=payload) as response: | |
response.raise_for_status() | |
result = await response.json() | |
duration = time.time() - start_time | |
if result and isinstance(result, list) and 'generated_text' in result[0]: | |
hypothesis = result[0].get('generated_text', '').strip() | |
if not hypothesis.lower().startswith("hypothesis:"): | |
hypothesis = "Hypothesis: (Analysis unclear)" | |
elif len(hypothesis) < 15: | |
hypothesis = "Hypothesis: (Analysis failed or too short)" | |
self.precomputed_analysis[issue_id] = {"hypothesis": hypothesis, "error": None, "timestamp": time.time()} | |
logger.info(f"Stored preliminary analysis for issue {issue_id} (took {duration:.2f}s): '{hypothesis[:60]}...'") | |
elif isinstance(result, dict) and 'error' in result: | |
raise ValueError(f"API Error: {result['error']}") | |
else: | |
raise ValueError(f"Unexpected API response format: {str(result)[:200]}") | |
except Exception as e: | |
err_msg = f"Failed preliminary analysis: {e}" | |
logger.error(f"Failed preliminary analysis for issue {issue_id}: {e}", exc_info=False) | |
self.precomputed_analysis[issue_id] = {"error": err_msg, "hypothesis": None, "timestamp": time.time()} | |
def start_idle_processing(self): | |
"""Starts the background idle processing task if conditions are met.""" | |
if not self.idle_task or self.idle_task.done(): | |
if self.hf_token and self.repo and self.main_loop.is_running(): | |
self.idle_task = self.main_loop.create_task(self.run_idle_processing()) | |
logger.info(f"Started background idle processing task (interval: {self.idle_processing_interval}s).") | |
else: | |
missing = [] | |
if not self.hf_token: missing.append("HF Token") | |
if not self.repo: missing.append("Repository") | |
if not self.main_loop.is_running(): missing.append("Running Event Loop") | |
logger.warning(f"Cannot start idle processing: Missing {', '.join(missing)}.") | |
else: | |
logger.debug("Idle processing task already running.") | |
def stop_idle_processing(self): | |
"""Stops the background idle processing task.""" | |
if self.idle_task and not self.idle_task.done(): | |
try: | |
self.idle_task.cancel() | |
except asyncio.CancelledError: | |
pass | |
logger.info("Stopped background idle processing task.") | |
self.idle_task = None | |
async def run_idle_processing(self): | |
"""Main loop for background analysis, including new idle thinking tasks.""" | |
logger.info("Idle processing loop starting.") | |
try: | |
while True: | |
await asyncio.sleep(self.idle_processing_interval) | |
if not self.issues or not self.repo or not self.hf_token: | |
logger.debug("Idle processing skipped: No issues, repo, or HF token.") | |
continue | |
logger.info(f"--- Starting idle processing cycle ---") | |
cycle_tasks = [] | |
start_time_cycle = time.time() | |
# 1. Re-clustering (if needed) | |
if self.needs_recluster: | |
logger.info("Idle Task: Scheduling re-clustering and duplicate identification...") | |
self.issue_list_for_clustering = list(self.issues.values()) | |
cycle_tasks.append(self._run_clustering_and_duplicates_async()) | |
else: | |
logger.debug("No re-clustering needed in this cycle.") | |
# 2. Identify Stale & High Priority Issues (Sync) | |
try: | |
self._identify_stale_issues() | |
self._identify_high_priority_candidates() | |
except Exception as e: | |
logger.error(f"Error during synchronous stale/priority identification: {e}") | |
# --- Identify Issues Needing Work --- | |
all_issue_ids = list(self.issues.keys()) | |
random.shuffle(all_issue_ids) | |
issues_needing_context = [i for i in all_issue_ids if i not in self.precomputed_context] | |
issues_needing_summary = [i for i in all_issue_ids if i not in self.precomputed_summaries] | |
issues_needing_missing_info = [i for i in all_issue_ids if i not in self.precomputed_missing_info] | |
issues_needing_analysis = [i for i in all_issue_ids if i not in self.precomputed_analysis] | |
priority_summary_candidates = [i for i in self.high_priority_candidates if i in issues_needing_summary] | |
other_summary_candidates = [i for i in issues_needing_summary if i not in self.high_priority_candidates] | |
ordered_summary_candidates = priority_summary_candidates + other_summary_candidates | |
logger.debug(f"Idle candidates: Ctx:{len(issues_needing_context)}, Sum:{len(issues_needing_summary)}, " | |
f"Info:{len(issues_needing_missing_info)}, Anl:{len(issues_needing_analysis)}") | |
# 3. Schedule Context Pre-computation (I/O) | |
context_computed_count = 0 | |
for issue_id in issues_needing_context: | |
if context_computed_count < self.max_context_computations_per_cycle: | |
cycle_tasks.append(self._compute_and_store_context(issue_id)) | |
context_computed_count += 1 | |
else: break | |
if context_computed_count > 0: logger.info(f"Scheduled {context_computed_count} context computations.") | |
# 4. Schedule Summary Generation (LLM - Medium Cost) | |
summary_computed_count = 0 | |
for issue_id in ordered_summary_candidates: | |
if summary_computed_count < self.max_summary_computations_per_cycle: | |
cycle_tasks.append(self._compute_and_store_summary(issue_id)) | |
summary_computed_count += 1 | |
else: break | |
if summary_computed_count > 0: logger.info(f"Scheduled {summary_computed_count} summary computations.") | |
# 5. Schedule Missing Info Analysis (LLM - Low Cost) | |
missing_info_count = 0 | |
for issue_id in issues_needing_missing_info: | |
if missing_info_count < self.max_missing_info_computations_per_cycle: | |
cycle_tasks.append(self._compute_and_store_missing_info(issue_id)) | |
missing_info_count += 1 | |
else: break | |
if missing_info_count > 0: logger.info(f"Scheduled {missing_info_count} missing info analyses.") | |
# 6. Schedule Preliminary Analysis (LLM - Low Cost) | |
analysis_count = 0 | |
for issue_id in issues_needing_analysis: | |
if analysis_count < self.max_analysis_computations_per_cycle: | |
cycle_tasks.append(self._compute_and_store_preliminary_analysis(issue_id)) | |
analysis_count += 1 | |
else: break | |
if analysis_count > 0: logger.info(f"Scheduled {analysis_count} preliminary analyses.") | |
# --- Execute Scheduled Async Tasks --- | |
if cycle_tasks: | |
logger.info(f"Executing {len(cycle_tasks)} async idle tasks for this cycle...") | |
results = await asyncio.gather(*cycle_tasks, return_exceptions=True) | |
num_errors = 0 | |
for i, result in enumerate(results): | |
if isinstance(result, Exception): | |
num_errors += 1 | |
logger.error(f"Error encountered in background idle task {i+1}/{len(cycle_tasks)}: {result}", exc_info=False) # Keep log cleaner | |
cycle_duration = time.time() - start_time_cycle | |
logger.info(f"Idle processing cycle finished in {cycle_duration:.2f} seconds. {len(results)} tasks processed ({num_errors} errors).") | |
else: | |
logger.info("No async idle tasks to perform in this cycle.") | |
logger.info(f"--- Finished idle processing cycle ---") | |
except asyncio.CancelledError: | |
logger.info("Idle processing loop cancelled.") | |
except Exception as e: | |
logger.exception(f"Critical error in idle processing loop: {e}") | |
await asyncio.sleep(self.idle_processing_interval * 2) | |
finally: | |
logger.info("Idle processing loop finished.") | |
async def _compute_and_store_context(self, issue_id: int): | |
"""Helper async task to compute and store context for one issue during idle time.""" | |
if issue_id not in self.issues: | |
logger.warning(f"Skipping context computation for issue {issue_id}: Issue no longer exists.") | |
return | |
try: | |
logger.debug(f"Starting background context computation for issue {issue_id}...") | |
start_time = time.time() | |
issue_data = self.issues[issue_id] | |
context_result = await self._get_code_context(issue_data) | |
duration = time.time() - start_time | |
computed_data = { | |
"content": context_result.get("content"), | |
"files": context_result.get("files", []), | |
"error": context_result.get("error"), | |
"timestamp": time.time() | |
} | |
self.precomputed_context[issue_id] = computed_data | |
log_msg = f"Stored context result for issue {issue_id} (found {len(computed_data['files'])} files, took {duration:.2f}s)." | |
if computed_data['error']: | |
log_msg += f" Error: {computed_data['error']}" | |
logger.info(log_msg) | |
except Exception as e: | |
logger.exception(f"Failed to compute context for issue {issue_id} in background task: {e}") | |
self.precomputed_context[issue_id] = {"error": f"Unexpected computation error: {e}", "timestamp": time.time(), "content": None, "files": []} | |
async def _run_clustering_and_duplicates_async(self): | |
"""Runs clustering and then identifies duplicates as a single background task unit.""" | |
try: | |
logger.info("Background Task: Starting re-clustering issues...") | |
start_time = time.time() | |
await self._cluster_similar_issues() # This resets flags on success internally | |
cluster_duration = time.time() - start_time | |
if self.issue_clusters: | |
logger.info(f"Background Task: Clustering finished in {cluster_duration:.2f}s. Identifying duplicates...") | |
start_time_dup = time.time() | |
self._identify_potential_duplicates() | |
dup_duration = time.time() - start_time_dup | |
logger.info(f"Background Task: Duplicate identification finished in {dup_duration:.2f}s.") | |
else: | |
logger.warning("Background Task: Clustering did not produce results. Skipping duplicate identification.") | |
self._webhook_change_count = 0 | |
self.needs_recluster = False | |
except Exception as e: | |
logger.error(f"Error during background clustering/duplicate identification task: {e}", exc_info=True) | |
# ========== Gradio UI Definition ========== | |
def create_ui(manager: IssueManager) -> gr.Blocks: | |
"""Creates the Gradio interface.""" | |
# --- Helper Functions for UI --- | |
def generate_issue_preview(issue_num: Optional[int]) -> str: | |
"""Generates HTML preview including new idle thinking results.""" | |
if issue_num is None or issue_num not in manager.issues: | |
return "<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>" | |
try: | |
issue = manager.issues[issue_num] | |
html_body = markdown2.markdown( | |
issue.get('body', '*No description provided.*') or '*No description provided.*', | |
extras=["fenced-code-blocks", "tables", "strike", "task_list", "code-friendly", "html-classes", "nofollow", "spoiler"] | |
) | |
labels_html = ' '.join(f'<span style=\'background-color: #f3f4f6; color: #374151; border: 1px solid #d1d5db; padding: 2px 6px; border-radius: 4px; font-size: 0.85em; display: inline-block; margin-right: 4px; margin-bottom: 4px;\'>{gr.Textbox.sanitize_html(l)}</span>' for l in issue.get('labels', [])) or '<span style="color: #6b7280;">None</span>' | |
status_indicators = [] | |
if issue_num in manager.stale_issues: | |
status_indicators.append(f"<span title='No updates in >{manager.stale_issue_threshold_days} days' style='color: #b91c1c; font-weight: bold; font-size: 0.9em; background-color: #fee2e2; padding: 1px 4px; border-radius: 3px;'>[Stale]</span>") | |
if issue_num in manager.high_priority_candidates: | |
severity = manager._determine_severity(issue.get('labels', [])) | |
if severity == "Critical": color, bgcolor = "#ef4444", "#fee2e2" | |
elif severity == "High": color, bgcolor = "#f97316", "#ffedd5" | |
else: color, bgcolor = "#c2410c", "#fffbeb" | |
status_indicators.append(f"<span title='Marked as {severity}' style='color: {color}; font-weight: bold; font-size: 0.9em; background-color: {bgcolor}; padding: 1px 4px; border-radius: 3px;'>[{severity}]</span>") | |
status_html = " ".join(status_indicators) | |
# --- Get Precomputed Data with Helper --- | |
summary_text = manager._get_precomputed_text(issue_num, manager.precomputed_summaries, "summary", "Summary") | |
missing_info_text = manager._get_precomputed_text(issue_num, manager.precomputed_missing_info, "info_needed", "Missing Info") | |
analysis_text = manager._get_precomputed_text(issue_num, manager.precomputed_analysis, "hypothesis", "Analysis") | |
duplicate_text = manager._get_duplicate_info_text(issue_num) | |
# --- Format AI Sections --- | |
ai_sections = [] | |
if not summary_text.startswith("("): # Don't show if pending/empty | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 10px; border-top: 1px dashed #eee; padding-top: 8px; background-color: #f0e6ff; padding: 6px 10px; border-radius: 4px; border: 1px solid #ddd6fe;"> | |
<strong>🤖 AI Summary:</strong> {gr.Textbox.sanitize_html(summary_text)} | |
</div>""") | |
elif "Error" in summary_text: | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 10px; border-top: 1px dashed #eee; padding-top: 8px; background-color: #fee2e2; padding: 6px 10px; border-radius: 4px; border: 1px solid #fecaca;"> | |
<strong>🤖 {gr.Textbox.sanitize_html(summary_text)}</strong> | |
</div>""") | |
if not missing_info_text.startswith("("): | |
color = "#fffbeb" if "None needed." not in missing_info_text else "#f0fdf4" | |
border_color = "#fef3c7" if "None needed." not in missing_info_text else "#bbf7d0" | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 8px; background-color: {color}; padding: 6px 10px; border-radius: 4px; border: 1px solid {border_color};"> | |
<strong>🤔 AI Missing Info Analysis:</strong> {gr.Textbox.sanitize_html(missing_info_text)} | |
</div>""") | |
elif "Error" in missing_info_text: | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 8px; background-color: #fee2e2; padding: 6px 10px; border-radius: 4px; border: 1px solid #fecaca;"> | |
<strong>🤔 {gr.Textbox.sanitize_html(missing_info_text)}</strong> | |
</div>""") | |
if not analysis_text.startswith("("): | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 8px; background-color: #e0f2fe; padding: 6px 10px; border-radius: 4px; border: 1px solid #bae6fd;"> | |
<strong>🔬 AI Preliminary Analysis:</strong> {gr.Textbox.sanitize_html(analysis_text)} | |
</div>""") | |
elif "Error" in analysis_text: | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 8px; background-color: #fee2e2; padding: 6px 10px; border-radius: 4px; border: 1px solid #fecaca;"> | |
<strong>🔬 {gr.Textbox.sanitize_html(analysis_text)}</strong> | |
</div>""") | |
if duplicate_text: | |
dup_ids = manager.potential_duplicates.get(issue_num, []) | |
dup_links = ", ".join([f"<span style='cursor: help; color: #4338ca; text-decoration: underline dotted;' title='Issue #{dup_id} has similar content'>#{dup_id}</span>" for dup_id in dup_ids]) | |
ai_sections.append(f""" | |
<div style="font-size: 0.9em; margin-top: 8px; background-color: #fffbeb; padding: 6px 10px; border-radius: 4px; border: 1px solid #fef3c7;"> | |
<strong style="color: #d97706;">⚠️ Potential Duplicates:</strong> {dup_links} | |
</div>""") | |
ai_sections_html = "\n".join(ai_sections) | |
# Construct the full preview HTML | |
preview_html = f""" | |
<div style="border: 1px solid #e5e7eb; padding: 15px; border-radius: 8px; background-color: #ffffff; font-family: 'Inter', sans-serif; display: flex; flex-direction: column; max-height: 80vh;"> | |
<h4 style="margin-top: 0; margin-bottom: 10px; font-size: 1.1em; display: flex; justify-content: space-between; align-items: center;"> | |
<a href='{issue.get('url', '#')}' target='_blank' style='color: #6d28d9; text-decoration: none; font-weight: 600;' title="Open issue #{issue['id']} on GitHub"> | |
#{issue['id']} - {gr.Textbox.sanitize_html(issue.get('title', 'N/A'))} | |
</a> | |
<span style="margin-left: 10px; flex-shrink: 0;">{status_html}</span> | |
</h4> | |
<hr style='margin: 10px 0; border-top: 1px solid #e5e7eb;'> | |
<div style="font-size: 0.9em; color: #4b5563; margin-bottom: 10px; display: flex; justify-content: space-between; flex-wrap: wrap; gap: 15px;"> | |
<span><strong>State:</strong> <span style="text-transform: capitalize; background-color: {'#dcfce7' if issue.get('state') == 'open' else '#fee2e2'}; color: {'#166534' if issue.get('state') == 'open' else '#991b1b'}; padding: 1px 6px; border-radius: 3px; font-size: 0.9em; border: 1px solid {'#86efac' if issue.get('state') == 'open' else '#fecaca'};">{issue.get('state', 'N/A')}</span></span> | |
<span><strong>Assignee:</strong> {issue.get('assignee') or '<span style="color: #6b7280;">None</span>'}</span> | |
<span><strong>Severity:</strong> {manager._determine_severity(issue.get('labels', []))}</span> | |
</div> | |
<div style="font-size: 0.9em; color: #4b5563; margin-bottom: 12px;"> | |
<strong>Labels:</strong> {labels_html} | |
</div> | |
<!-- AI Analysis Sections --> | |
{ai_sections_html} | |
<!-- Issue Body --> | |
<div style="margin-top: 15px; border-top: 1px solid #eee; padding-top: 10px; font-size: 0.95em; line-height: 1.6; overflow-y: auto; flex-grow: 1; max-height: 35vh;"> | |
<h5 style="margin-bottom: 5px; margin-top: 0; color: #374151;">Description:</h5> | |
{html_body} | |
</div> | |
</div> | |
""" | |
return preview_html | |
except Exception as e: | |
logger.exception(f"Error generating issue preview for {issue_num}: {e}") | |
return f"<p style='color: red;'>Error generating preview for issue {issue_num}. Check logs.</p>" | |
async def get_ai_suggestion_wrapper(issue_num: Optional[int], model_key: str, progress=gr.Progress()) -> str: | |
"""UI wrapper for getting AI suggestions, handles state and progress.""" | |
progress(0, desc="Preparing request...") | |
if issue_num is None or issue_num not in manager.issues: | |
return "⚠️ Error: Please select a valid issue first." | |
if not manager.hf_token: | |
return "🔒 Error: Hugging Face Token is not configured." | |
issue = manager.issues[issue_num] | |
issue_hash = manager._get_issue_hash(issue) | |
logger.info(f"Requesting suggestion for issue {issue_num} (hash: {issue_hash}) using model {model_key}.") | |
try: | |
progress(0.3, desc=f"Querying {model_key}...") | |
suggestion = await manager.cached_suggestion(issue_hash, model_key) | |
progress(1, desc="Suggestion received.") | |
if suggestion.lower().startswith("error:"): | |
return f"⚠️ {suggestion}" | |
else: | |
return f"**💡 Suggestion based on {model_key}:**\n\n---\n{suggestion}" | |
except Exception as e: | |
logger.exception(f"Error in get_ai_suggestion_wrapper for issue {issue_num}: {e}") | |
return f"❌ An unexpected error occurred while getting the suggestion: {e}" | |
async def get_ai_patch_wrapper(issue_num: Optional[int], model_key: str, progress=gr.Progress()) -> str: | |
"""UI wrapper for getting AI patches, handles state and progress.""" | |
progress(0, desc="Preparing request...") | |
if issue_num is None or issue_num not in manager.issues: | |
return "⚠️ Error: Please select a valid issue first." | |
if not manager.hf_token: | |
return "🔒 Error: Hugging Face Token is not configured." | |
if not manager.repo: | |
return "❌ Error: Repository not loaded. Please scan the repository first." | |
logger.info(f"Requesting patch for issue {issue_num} using model {model_key}.") | |
progress(0.1, desc="Gathering code context (using cache if available)...") | |
try: | |
progress(0.4, desc=f"Querying {model_key} for patch...") | |
result = await manager.generate_code_patch(issue_num, model_key) | |
progress(1, desc="Patch result received.") | |
if "error" in result: | |
logger.error(f"Patch generation failed for issue {issue_num}: {result['error']}") | |
return f"**❌ Error generating patch:**\n\n{result['error']}" | |
else: | |
model_used = result.get('model_used', model_key) | |
explanation = result.get('explanation', '(No explanation provided)') | |
patch_content = result.get('patch') | |
if patch_content: | |
patch_content_sanitized = patch_content.replace('`', '\\`') | |
logger.info(f"Successfully generated patch for issue {issue_num} using {model_used}.") | |
return f"""**🩹 Patch Suggestion from {model_used}:** | |
**Explanation:** | |
{explanation} | |
--- | |
**Patch:** | |
```diff | |
{patch_content_sanitized} | |
```""" | |
else: | |
logger.warning(f"AI provided explanation but no patch for issue {issue_num}. Explanation: {explanation}") | |
if re.search(r"(insufficient context|cannot generate|unable to create patch)", explanation, re.IGNORECASE): | |
status_msg = "AI indicated insufficient context" | |
else: | |
status_msg = "AI did not generate a diff block" | |
return f"""**🩹 Patch Generation Result from {model_used}:** | |
**Explanation:** | |
{explanation} | |
--- | |
**({status_msg})**""" | |
except Exception as e: | |
logger.exception(f"Error in get_ai_patch_wrapper for issue {issue_num}: {e}") | |
return f"❌ An unexpected error occurred while generating the patch: {e}" | |
async def handle_issue_select(evt: gr.SelectData, current_state: dict): | |
""" | |
Handles issue selection in the Dataframe: updates preview, loads code context into editor. | |
Uses Gradio State to get the current selected issue ID if needed, though event data is primary. | |
""" | |
default_response = { | |
"selected_issue_id_state": gr.update(value=None), | |
"issue_preview_html": gr.update(value="<p style='color: #6b7280;'>Select an issue from the table.</p>"), | |
"code_edit_component": gr.update(value={"placeholder.txt": "# Select an issue to load relevant code context."}, interactive=True), | |
"ai_output_display": gr.update(value="*AI suggestions and patches will appear here after selecting an issue and action.*") | |
} | |
if evt.index is None or not hasattr(evt, 'value') or not evt.value or evt.value[0] is None: | |
logger.info("Issue deselected or invalid selection event.") | |
return default_response | |
try: | |
selected_id = int(evt.value[0]) | |
logger.info(f"Issue selected via Dataframe: ID {selected_id}") | |
if selected_id not in manager.issues: | |
logger.error(f"Selected issue ID {selected_id} not found in manager's issue list.") | |
return { | |
**default_response, | |
"issue_preview_html": gr.update(value=f"<p style='color: red; font-weight: bold;'>Error: Issue {selected_id} not found in the current list. Try re-scanning.</p>"), | |
} | |
issue_data = manager.issues[selected_id] | |
files_content: Dict[str, str] = {} | |
context_source_msg = "Error determining context source" | |
context_load_start = time.time() | |
if selected_id in manager.precomputed_context: | |
context_data = manager.precomputed_context[selected_id] | |
timestamp_str = datetime.fromtimestamp(context_data.get('timestamp', 0)).strftime('%Y-%m-%d %H:%M:%S') | |
if context_data.get("error"): | |
context_source_msg = f"Pre-computed (Failed @ {timestamp_str})" | |
files_content["error_context.txt"] = f"# Error loading pre-computed context:\n# {context_data['error']}" | |
elif context_data.get("files"): | |
context_source_msg = f"Pre-computed ({len(context_data['files'])} files @ {timestamp_str})" | |
logger.info(f"Loading {len(context_data['files'])} files from pre-computed context for issue {selected_id}: {context_data['files']}") | |
loaded_count = 0 | |
for file_path_str in context_data["files"]: | |
full_path = manager.repo_local_path / file_path_str | |
try: | |
files_content[file_path_str] = full_path.read_text(encoding='utf-8', errors='ignore') | |
loaded_count += 1 | |
except Exception as e: | |
logger.warning(f"Error reading pre-computed file {full_path} for issue {selected_id}: {e}") | |
files_content[file_path_str] = f"# Error reading file: {e}" | |
if loaded_count == 0 and context_data["files"]: | |
files_content["error_reading_files.txt"] = "# Precomputed context found file references, but failed to read any file content." | |
else: | |
context_source_msg = f"Pre-computed (No files found @ {timestamp_str})" | |
files_content[f"issue_{selected_id}_context.md"] = context_data.get("content", "# No specific code context found (pre-computed).") | |
else: | |
logger.info(f"Context not pre-computed for issue {selected_id}, computing on demand for editor.") | |
context_source_msg = "Computed On-Demand" | |
context_result = await manager._get_code_context(issue_data) | |
manager.precomputed_context[selected_id] = { | |
"content": context_result.get("content"), | |
"files": context_result.get("files", []), | |
"error": context_result.get("error"), | |
"timestamp": time.time() | |
} | |
if "error" in context_result and context_result["error"]: | |
context_source_msg += f" (Error: {context_result['error']})" | |
files_content["error_context.txt"] = f"# Error loading context on demand:\n# {context_result['error']}" | |
elif context_result.get("files"): | |
context_source_msg += f" ({len(context_result['files'])} files)" | |
logger.info(f"Loading {len(context_result['files'])} files computed on-demand for issue {selected_id}: {context_result['files']}") | |
loaded_count = 0 | |
for file_path_str in context_result["files"]: | |
full_path = manager.repo_local_path / file_path_str | |
try: | |
files_content[file_path_str] = full_path.read_text(encoding='utf-8', errors='ignore') | |
loaded_count +=1 | |
except Exception as e: | |
logger.warning(f"Error reading on-demand file {full_path} for issue {selected_id}: {e}") | |
files_content[file_path_str] = f"# Error reading file: {e}" | |
if loaded_count == 0 and context_result["files"]: | |
files_content["error_reading_files.txt"] = "# Context computation found file references, but failed to read any file content." | |
else: | |
context_source_msg += " (No files found)" | |
files_content[f"issue_{selected_id}_context.md"] = context_result.get("content", "# No specific code context found.") | |
context_load_duration = time.time() - context_load_start | |
logger.info(f"Context loading for editor took {context_load_duration:.2f}s. Source: {context_source_msg}") | |
if not files_content: | |
files_content["placeholder.txt"] = f"# No relevant files found or context failed to load for issue {selected_id}." | |
manager.code_editors[selected_id] = OTCodeEditor(initial_value=files_content) | |
logger.info(f"Initialized/Updated OT editor state for issue {selected_id} with files: {list(files_content.keys())}") | |
updates = { | |
"selected_issue_id_state": gr.update(value=selected_id), | |
"issue_preview_html": gr.update(value=generate_issue_preview(selected_id)), | |
"code_edit_component": gr.update(value=files_content, interactive=True), | |
"ai_output_display": gr.update(value=f"*Context loaded ({context_source_msg}). Ready for AI actions or editing.*") | |
} | |
return updates | |
except (ValueError, TypeError, IndexError, KeyError, AttributeError) as e: | |
logger.exception(f"Error processing issue selection event (evt.value: {getattr(evt, 'value', 'N/A')}): {e}") | |
return { | |
**default_response, | |
"issue_preview_html": gr.update(value="<p style='color: red; font-weight: bold;'>Error loading issue details. Please check logs and try again.</p>"), | |
"code_edit_component": gr.update(value={"error.txt": f"# Error loading code context for selection.\n# Error: {e}"}, interactive=False), | |
"ai_output_display": gr.update(value="*Error processing selection. See logs.*") | |
} | |
# --- Gradio Blocks --- | |
with gr.Blocks(theme=theme, title="AI Issue Resolver Pro", css="#collab-list .collab-item { margin-bottom: 4px; font-size: 0.9em; } .gradio-container { max-width: 1600px !important; }") as demo_app: | |
gr.Markdown(""" | |
<div style="text-align: center; margin-bottom: 20px;"> | |
<h1 style="color: #6d28d9; font-weight: 800;">🚀 AI Issue Resolver Pro</h1> | |
<p style="color: #4b5563; font-size: 1.1em;">Collaborative Issue Resolution Powered by AI</p> | |
</div> | |
""") | |
selected_issue_id_state = gr.State(value=None) | |
with gr.Row(variant="panel", elem_id="config-panel"): | |
with gr.Column(scale=3): | |
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/owner/repo", info="Enter the full URL.", elem_id="repo_url_inp") | |
with gr.Row(): | |
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", placeholder="ghp_...", info="For private repos or higher rate limits.", elem_id="github_token_inp") | |
hf_token = gr.Textbox(label="Hugging Face Token", type="password", placeholder="hf_...", info="Required for AI features.", elem_id="hf_token_inp") | |
with gr.Column(scale=1, min_width=250): | |
model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value=DEFAULT_MODEL_KEY, | |
label="🤖 Select AI Model", info="Used for suggestions & patches.", elem_id="model_select_dd") | |
crawl_btn = gr.Button("🛰️ Scan Repository Issues", variant="primary", icon="🔍", elem_id="crawl_btn") | |
status_output = gr.Textbox(label="Status Log", interactive=False, lines=1, max_lines=1, | |
placeholder="Status updates appear here... Idle tasks may run in background.", | |
elem_id="status_output_txt") | |
with gr.Tabs(elem_id="main-tabs"): | |
with gr.Tab("📋 Issue Board", id="board", elem_id="tab-board"): | |
with gr.Row(equal_height=False): | |
with gr.Column(scale=3): | |
gr.Markdown("### Open Issues (Select Row to View/Edit)") | |
issue_list = gr.Dataframe( | |
headers=["ID", "Title", "Severity", "Cluster"], | |
datatype=["number", "str", "str", "str"], | |
interactive=True, | |
row_count=(15, "dynamic"), | |
col_count=(4, "fixed"), | |
wrap=True, | |
elem_id="issue_list_df", | |
label="Issues" | |
) | |
with gr.Column(scale=2, min_width=350): | |
gr.Markdown("### Issue Severity Distribution") | |
stats_plot = gr.Plot(label="Severity Plot", elem_id="stats_plot_viz") | |
collab_status = gr.HTML(""" | |
<div style="margin-top: 20px; border: 1px solid #e5e7eb; padding: 10px; border-radius: 8px; background-color: #f9fafb;"> | |
<h4 style="margin-bottom: 5px; color: #374151; font-size: 1em;">👥 Active Collaborators</h4> | |
<div id="collab-list" style="min-height: 30px; max-height: 100px; overflow-y: auto; padding-top: 5px;"> | |
<span style="color: #888;">Connecting...</span> | |
</div> | |
</div> | |
""", elem_id="collab_status_html") | |
with gr.Tab("💻 Resolution Studio", id="studio", elem_id="tab-studio"): | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=450): | |
gr.Markdown("### Selected Issue Details") | |
issue_preview_html = gr.HTML("<p style='color: #6b7280;'>Select an issue from the 'Issue Board' tab.</p>", elem_id="issue_preview_div") | |
with gr.Accordion("🛠️ AI Assistance Tools", open=True, elem_id="ai_tools_accordion"): | |
suggest_btn = gr.Button("🧠 Suggest Resolution Steps", icon="💡", elem_id="suggest_btn") | |
patch_btn = gr.Button("📝 Generate Code Patch", icon="🩹", elem_id="patch_btn") | |
gr.Markdown("### AI Output") | |
ai_output_display = gr.Markdown(value="*AI suggestions and patches will appear here...*", elem_id="ai_output_md") | |
with gr.Column(scale=2, min_width=600): | |
gr.Markdown("### Collaborative Code Editor (Context-Aware)") | |
gr.Markdown("<p style='color: orange; font-weight: bold;'>⚠️ Warning: Real-time collaborative editing is experimental and may lose data with simultaneous edits. Save work frequently!</p>") | |
code_edit_component = code_editor( | |
label="Code Context / Editor", | |
language="python", | |
interactive=True, | |
elem_id="code_editor_component" | |
) | |
with gr.Tab("📈 Analytics", id="analytics", elem_id="tab-analytics"): | |
gr.Markdown("### Repository Analytics") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("#### Issue Status Overview") | |
analytics_severity_plot = gr.Plot(label="Severity Distribution (Analytics)", elem_id="analytics_severity_plot") | |
with gr.Column(scale=1): | |
gr.Markdown("#### Cluster Analysis (Top 5)") | |
cluster_info_df = gr.Dataframe( | |
headers=["Cluster ID", "Issue Count", "Top Keywords (Example)"], | |
datatype=["number", "number", "str"], | |
value=[["Loading...", 0, ""]], | |
label="Issue Clusters", elem_id="cluster_info_df" | |
) | |
gr.Markdown("*(More analytics like resolution times, contributor stats, etc., could be added.)*") | |
# --- Analytics Helper --- | |
def update_cluster_analytics(mgr: IssueManager): | |
"""Generates data for the cluster analytics dataframe.""" | |
if not mgr.issue_clusters or not mgr.issue_list_for_clustering: | |
return [["N/A", 0, "No clusters found"]] | |
cluster_data = [] | |
stop_words = {'this', 'that', 'with', 'issue', 'error', 'file', 'code', 'when', 'from', 'the', 'and', 'for', 'not', 'are', 'you', 'can', 'use', 'how', 'get', 'all', 'has', 'have', 'but', 'like', 'just', 'will', 'should'} | |
def get_top_keywords(indices): | |
text = "" | |
for idx in indices: | |
if 0 <= idx < len(mgr.issue_list_for_clustering): | |
issue = mgr.issue_list_for_clustering[idx] | |
text += issue.get('title', '') + " " + issue.get('body', '')[:200] + " " | |
text = text.lower() | |
text = text.translate(str.maketrans('', '', string.punctuation)) | |
words = [w for w in text.split() if len(w) > 3 and w not in stop_words] | |
if not words: return "N/A" | |
counts = Counter(words) | |
top_3 = [word for word, count in counts.most_common(3)] | |
return ", ".join(top_3) | |
sorted_clusters = sorted(mgr.issue_clusters.items(), key=lambda item: len(item[1]), reverse=True) | |
for cluster_id, indices in sorted_clusters[:5]: | |
keywords = get_top_keywords(indices) | |
cluster_data.append([cluster_id, len(indices), keywords]) | |
if not cluster_data: | |
return [["N/A", 0, "No clusters found"]] | |
return cluster_data | |
# --- Event Handlers --- | |
crawl_btn.click( | |
fn=manager.crawl_issues, | |
inputs=[repo_url, github_token, hf_token], | |
outputs=[issue_list, stats_plot, status_output, analytics_severity_plot], | |
api_name="crawl_issues", | |
show_progress="full" | |
).then( | |
fn=lambda: update_cluster_analytics(manager), | |
inputs=[], | |
outputs=[cluster_info_df] | |
) | |
issue_list.select( | |
fn=handle_issue_select, | |
inputs=[selected_issue_id_state], | |
outputs=[selected_issue_id_state, issue_preview_html, code_edit_component, ai_output_display], | |
show_progress="minimal", | |
) | |
suggest_btn.click( | |
fn=get_ai_suggestion_wrapper, | |
inputs=[selected_issue_id_state, model_select], | |
outputs=[ai_output_display], | |
api_name="suggest_resolution", | |
show_progress="full" | |
) | |
patch_btn.click( | |
fn=get_ai_patch_wrapper, | |
inputs=[selected_issue_id_state, model_select], | |
outputs=[ai_output_display], | |
api_name="generate_patch", | |
show_progress="full" | |
) | |
# --- JavaScript for WebSocket Communication --- | |
def web_socket_js(ws_port): | |
client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}" | |
logger.info(f"Generated Client ID for WebSocket: {client_id}") | |
return f""" | |
<script> | |
(function() { // IIFE | |
if (window.collabWsInitialized) { | |
console.log('WebSocket script already initialized. Skipping setup.'); | |
if (!window.collabWs || window.collabWs.readyState === WebSocket.CLOSED) { | |
console.log('Attempting reconnect from existing script...'); | |
connectWebSocket(); | |
} | |
return; | |
} | |
window.collabWsInitialized = true; | |
console.log('Initializing WebSocket connection script (Client ID: {client_id})...'); | |
console.warn('Code Editor Collaboration WARNING: This feature uses a placeholder implementation without Operational Transformation (OT). Simultaneous edits by multiple users WILL likely lead to data inconsistencies or loss. Use with caution and save frequently.'); | |
const clientId = '{client_id}'; | |
window.clientId = clientId; | |
let wsUrl; | |
const wsPort = {ws_port}; | |
const hostname = window.location.hostname; | |
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; | |
if (hostname === 'localhost' || hostname === '127.0.0.1' || hostname.endsWith('.gradio.live')) { | |
wsUrl = `${protocol}//${hostname}:${wsPort}`; | |
console.log('Detected local/gradio.live environment, using direct WebSocket URL:', wsUrl); | |
} else { | |
const wsPath = '/ws'; | |
wsUrl = `${protocol}//${window.location.host}${wsPath}`; | |
console.log('Detected non-local environment, assuming proxied WebSocket URL:', wsUrl); | |
} | |
let collabWs = null; | |
window.collabWs = collabWs; | |
let aceEditorInstance = null; | |
let currentIssueId = null; | |
let reconnectAttempts = 0; | |
const maxReconnectAttempts = 7; | |
let connectInterval = null; | |
let editorSetupAttempts = 0; | |
const maxEditorSetupAttempts = 15; | |
let editorChangeListenerAttached = false; | |
let lastSentDeltaTimestamp = 0; | |
const deltaSendDebounceMs = 250; | |
function updateCollabList(collaborators) { | |
const collabListDiv = document.getElementById('collab-list'); | |
if (!collabListDiv) return; | |
const activeCollaborators = Object.entries(collaborators || {}) | |
.filter(([id, info]) => id !== clientId); | |
if (activeCollaborators.length > 0) { | |
collabListDiv.innerHTML = activeCollaborators | |
.map(([id, info]) => ` | |
<div class="collab-item" title="ID: ${id}"> | |
<span style="font-weight: 500;">${info.name || id.substring(0, 8)}:</span> | |
<span style="color: #555;">${info.status || 'Idle'}</span> | |
</div>`) | |
.join(''); | |
} else { | |
collabListDiv.innerHTML = '<span style="color: #6b7280;">You are the only active user.</span>'; | |
} | |
} | |
function updateStatusBar(message, isError = false) { | |
const statusBar = document.querySelector('#status_output_txt textarea'); | |
if (statusBar) { | |
const timestamp = new Date().toLocaleTimeString(); | |
statusBar.value = `[${timestamp}] ${message}`; | |
statusBar.style.color = isError ? '#D32F2F' : '#333'; | |
statusBar.style.fontWeight = isError ? 'bold' : 'normal'; | |
} | |
} | |
function connectWebSocket() { | |
if (collabWs && (collabWs.readyState === WebSocket.OPEN || collabWs.readyState === WebSocket.CONNECTING)) { | |
console.log(`WebSocket already ${(collabWs.readyState === WebSocket.OPEN) ? 'open' : 'connecting'}. State: ${collabWs.readyState}`); | |
return; | |
} | |
console.log(`Attempting WebSocket connection to ${wsUrl} (Attempt ${reconnectAttempts + 1}/${maxReconnectAttempts})...`); | |
updateStatusBar(`Connecting collaboration service (Attempt ${reconnectAttempts + 1})...`); | |
try { | |
collabWs = new WebSocket(wsUrl); | |
} catch (e) { | |
console.error("WebSocket constructor failed:", e); | |
updateStatusBar("Collaboration connection failed (init error).", true); | |
// Handle reconnection attempt here as well | |
handleWsClose(); | |
return; // Prevent setting up listeners on failed constructor | |
} | |
window.collabWs = collabWs; | |
collabWs.onopen = function(event) { | |
console.log('WebSocket connection established.'); | |
updateStatusBar('Collaboration connected.'); | |
reconnectAttempts = 0; | |
if(connectInterval) { clearInterval(connectInterval); connectInterval = null; } | |
sendWsMessage({ type: 'join', clientId: clientId, name: `User_${clientId.substring(0,4)}` }); | |
setupCodeEditorListener(); | |
}; | |
collabWs.onmessage = function(event) { | |
try { | |
const data = JSON.parse(event.data); | |
if (data.type === 'collaboration_status') { | |
updateCollabList(data.collaborators); | |
} else if (data.type === 'code_update') { | |
const receivedIssueNum = parseInt(data.issue_num, 10); | |
if (aceEditorInstance && receivedIssueNum === currentIssueId && data.senderId !== clientId) { | |
console.warn(`Applying remote delta for issue ${receivedIssueNum} from ${data.senderId}. WARNING: NO OT!`); | |
try { | |
const delta = JSON.parse(data.delta); | |
// Add ignore flag for local change listener | |
aceEditorInstance.getSession().getDocument().applyDeltas([{...delta, ignore: true}]); | |
} catch (e) { console.error('Failed to parse or apply remote delta:', e, data.delta); } | |
} | |
} else if (data.type === 'issues_updated') { | |
console.log('Received notification: Issue list updated on server.'); | |
updateStatusBar('Issue list updated on server. Refreshing the page or re-scanning is recommended.'); | |
const crawlButton = document.getElementById('crawl_btn'); | |
if (crawlButton) { | |
crawlButton.style.backgroundColor = '#fef08a'; | |
setTimeout(() => { crawlButton.style.backgroundColor = '' }, 2000); | |
} | |
} else { | |
console.warn("Received unknown WebSocket message type:", data.type, data); | |
} | |
} catch (e) { | |
console.error('Failed to parse WebSocket message or update UI:', e, event.data); | |
} | |
}; | |
collabWs.onclose = function(event) { | |
console.warn(`WebSocket connection closed: Code=${event.code}, Reason='${event.reason || 'N/A'}', Clean=${event.wasClean}`); | |
handleWsClose(); // Centralize close handling | |
}; | |
collabWs.onerror = function(error) { | |
console.error('WebSocket error event:', error); | |
updateStatusBar('Collaboration connection error.', true); | |
// onclose will likely fire after onerror. | |
}; | |
} | |
function handleWsClose() { | |
const collabListDiv = document.getElementById('collab-list'); | |
if (collabListDiv) collabListDiv.innerHTML = '<span style="color: #D32F2F; font-weight: bold;">Collaboration Disconnected</span>'; | |
updateStatusBar('Collaboration disconnected.', true); | |
collabWs = null; window.collabWs = null; | |
aceEditorInstance = null; // Clear editor instance on disconnect | |
editorChangeListenerAttached = false; // Allow re-attaching on reconnect | |
if (reconnectAttempts < maxReconnectAttempts) { | |
const delay = Math.pow(2, reconnectAttempts) * 1500 + Math.random() * 1000; | |
console.log(`Attempting to reconnect WebSocket in approx. ${Math.round(delay / 1000)} seconds...`); | |
setTimeout(connectWebSocket, delay); | |
reconnectAttempts++; | |
} else { | |
console.error('Max WebSocket reconnection attempts reached.'); | |
updateStatusBar('Collaboration failed - Max reconnect attempts reached.', true); | |
} | |
} | |
function sendWsMessage(message) { | |
if (collabWs && collabWs.readyState === WebSocket.OPEN) { | |
try { | |
collabWs.send(JSON.stringify(message)); | |
} catch (e) { | |
console.error("Failed to stringify or send WebSocket message:", e, message); | |
} | |
} else { | |
console.warn('WebSocket not connected. Cannot send message:', message); | |
} | |
} | |
window.sendWsMessage = sendWsMessage; | |
function setupCodeEditorListener() { | |
// Find the ACE editor instance managed by the Gradio component | |
// This relies on the internal structure of gradio_code_editor | |
const editorElement = document.querySelector('#code_editor_component .ace_editor'); | |
if (typeof ace === 'undefined' || !editorElement) { | |
if (editorSetupAttempts < maxEditorSetupAttempts) { | |
console.warn(`Ace library or editor element not found yet. Retrying editor setup (${editorSetupAttempts + 1}/${maxEditorSetupAttempts})...`); | |
editorSetupAttempts++; | |
setTimeout(setupCodeEditorListener, 1500 + Math.random() * 500); | |
} else { | |
console.error("Ace library or editor element not found after multiple attempts. Code editor collaboration disabled."); | |
updateStatusBar("Code editor library or UI element failed to load.", true); | |
} | |
return; | |
} | |
// Prevent double initialization | |
if (editorElement.dataset.collabInitialized === 'true' && aceEditorInstance) { | |
// Editor is already set up, just ensure the correct issue ID is tracked | |
updateTrackedIssueId(); | |
return; | |
} | |
try { | |
// Get the ACE editor instance | |
aceEditorInstance = ace.edit(editorElement); | |
if (!aceEditorInstance) { throw new Error("ace.edit(element) returned null or undefined."); } | |
console.log('Ace Editor instance acquired successfully:', aceEditorInstance); | |
editorElement.dataset.collabInitialized = 'true'; // Mark as initialized | |
if (!editorChangeListenerAttached) { | |
console.log("Attaching Ace editor 'change' listener..."); | |
aceEditorInstance.getSession().on('change', function(delta) { | |
// Check for the custom 'ignore' flag added when applying remote deltas | |
if (delta.ignore) { return; } // Ignore remote changes | |
if (currentIssueId !== null) { | |
const now = Date.now(); | |
// Simple debounce to avoid flooding server | |
if (now - lastSentDeltaTimestamp > deltaSendDebounceMs) { | |
console.debug(`User change detected on issue ${currentIssueId}. Sending delta:`, delta); | |
sendWsMessage({ | |
type: 'code_update', | |
issue_num: currentIssueId, | |
delta: JSON.stringify(delta), | |
clientId: clientId | |
}); | |
lastSentDeltaTimestamp = now; | |
} | |
} | |
}); | |
// Add focus/blur listeners for status updates | |
aceEditorInstance.on('focus', () => { | |
if(currentIssueId !== null) { | |
sendWsMessage({ type: 'status_update', clientId: clientId, status: `Editing Issue #${currentIssueId}`}); | |
} | |
}); | |
aceEditorInstance.on('blur', () => { | |
const status = currentIssueId !== null ? `Viewing Issue #${currentIssueId}` : 'Idle'; | |
sendWsMessage({ type: 'status_update', clientId: clientId, status: status}); | |
}); | |
editorChangeListenerAttached = true; | |
console.log('Ace editor listeners attached successfully.'); | |
} | |
editorSetupAttempts = 0; // Reset attempts on success | |
updateTrackedIssueId(); // Ensure correct issue ID is tracked after setup | |
} catch (e) { | |
console.error('Failed to initialize Ace editor instance or attach listeners:', e); | |
if (editorElement) delete editorElement.dataset.collabInitialized; // Allow retry | |
aceEditorInstance = null; | |
editorChangeListenerAttached = false; | |
if (editorSetupAttempts < maxEditorSetupAttempts) { | |
console.warn(`Retrying editor setup after error (${editorSetupAttempts + 1}/${maxEditorSetupAttempts})...`); | |
editorSetupAttempts++; | |
setTimeout(setupCodeEditorListener, 2000 + Math.random() * 500); | |
} else { | |
console.error("Max editor setup attempts failed after error. Collaboration disabled."); | |
updateStatusBar("Code editor setup failed repeatedly.", true); | |
} | |
} | |
} | |
// Function to update the internally tracked issue ID | |
function updateTrackedIssueId() { | |
// This is the most fragile part, relying on Gradio's internal structure. | |
// We need to find the hidden input element that Gradio uses to store the | |
// state variable 'selected_issue_id_state'. Its exact location/attributes | |
// might change between Gradio versions. | |
// A common pattern is a hidden input within the component's parent structure. | |
// We'll look for an input with type="hidden" and a value that looks like an integer. | |
// This is heuristic and might need adjustment if Gradio changes significantly. | |
let newIssueId = null; | |
const hiddenInputs = document.querySelectorAll('input[type="hidden"]'); | |
for (const input of hiddenInputs) { | |
const value = input.value; | |
if (value && value !== 'null' && /^\d+$/.test(value)) { | |
// Found a hidden input with an integer value. | |
// Is it *our* hidden state input? Hard to tell definitively. | |
// We'll assume the most recently updated one with an integer value | |
// *might* be it, or just the first one we find. | |
// A more robust way would require Gradio to expose component IDs to JS. | |
try { | |
// Check if it's likely the state variable by looking at nearby elements or parent structure if possible | |
// This is still very hacky. Let's just parse the first plausible one for now. | |
newIssueId = parseInt(value, 10); | |
// Found a potential ID, stop searching | |
break; | |
} catch (e) { | |
console.debug("Could not parse hidden input value as int:", value, e); | |
} | |
} | |
} | |
if (newIssueId !== currentIssueId) { | |
console.log(`Updating tracked issue ID: from ${currentIssueId} to ${newIssueId}`); | |
currentIssueId = newIssueId; | |
const status = currentIssueId !== null ? `Viewing Issue #${currentIssueId}` : 'Idle'; | |
sendWsMessage({ type: 'status_update', clientId: clientId, status: status}); | |
// If we just got a new issue ID and the editor wasn't set up, try setting it up | |
if (currentIssueId !== null && !aceEditorInstance) { | |
clearTimeout(window.setupEditorTimeout); | |
window.setupEditorTimeout = setTimeout(setupCodeEditorListener, 250); | |
} | |
} | |
} | |
// --- Initialization and Observers --- | |
connectWebSocket(); | |
// Use a MutationObserver to detect changes in the DOM, specifically looking | |
// for changes that might indicate a new issue has been selected (which | |
// updates the hidden state variable and potentially the editor component). | |
const observerTargetNode = document.body; // Observe the entire body | |
const observerConfig = { childList: true, subtree: true, attributes: true, attributeFilter: ['value'] }; | |
const observerCallback = (mutationsList, observer) => { | |
let editorNeedsCheck = false; | |
let issueIdCheckNeeded = false; | |
for (const mutation of mutationsList) { | |
// Check if the code editor component or its contents changed | |
if (mutation.target.closest('#code_editor_component') || | |
(mutation.target.id === 'code_editor_component' && mutation.type === 'childList')) { | |
editorNeedsCheck = true; | |
} | |
// Check if any hidden input's value changed | |
if (mutation.type === 'attributes' && mutation.attributeName === 'value' && mutation.target.type === 'hidden') { | |
// We can't be sure *which* hidden input changed is the one we care about | |
// without more specific info, so we'll just trigger the check | |
// of all hidden inputs whenever any hidden input value changes. | |
issueIdCheckNeeded = true; | |
} | |
// Also check childList changes on the main Gradio container, as this | |
// might indicate components being added/removed or updated. | |
if (mutation.type === 'childList' && mutation.target.classList && mutation.target.classList.contains('gradio-container')) { | |
issueIdCheckNeeded = true; // Assume issue ID might have changed if main container children change | |
editorNeedsCheck = true; // Editor might have been re-rendered | |
} | |
} | |
if (editorNeedsCheck) { | |
// Debounce editor setup check | |
clearTimeout(window.setupEditorTimeout); | |
window.setupEditorTimeout = setTimeout(setupCodeEditorListener, 250); | |
} | |
if (issueIdCheckNeeded) { | |
// Debounce issue ID check | |
clearTimeout(window.issueIdCheckTimeout); | |
window.issueIdCheckTimeout = setTimeout(updateTrackedIssueId, 100); // Shorter debounce for responsiveness | |
} | |
}; | |
const observer = new MutationObserver(observerCallback); | |
if (observerTargetNode) { | |
console.log("Starting MutationObserver to detect Gradio UI changes (incl. hidden state)."); | |
observer.observe(observerTargetNode, observerConfig); | |
} else { | |
console.error("Could not find observer target node (document.body)."); | |
} | |
// Initial setup attempts | |
setTimeout(setupCodeEditorListener, 700); | |
setTimeout(updateTrackedIssueId, 500); // Also check initial issue ID state | |
})(); // End IIFE | |
</script> | |
""" | |
demo_app.load(_js=web_socket_js(WS_PORT), fn=None, inputs=None, outputs=None) | |
return demo_app | |
# ========== WebSocket Server Logic ========== | |
async def handle_ws_connection(websocket: WebSocketServerProtocol, path: str, manager: IssueManager): | |
"""Handles incoming WebSocket connections and messages for collaboration.""" | |
client_id = f"client_{hashlib.sha1(os.urandom(16)).hexdigest()[:8]}" | |
setattr(websocket, 'client_id', client_id) | |
remote_addr = websocket.remote_address | |
logger.info(f"WebSocket client connected: {remote_addr} assigned ID {client_id}") | |
manager.ws_clients.append(websocket) | |
logger.info(f"Client list size: {len(manager.ws_clients)}") | |
try: | |
async for message in websocket: | |
try: | |
data = json.loads(message) | |
msg_type = data.get("type") | |
sender_id = client_id | |
logger.debug(f"Received WS message type '{msg_type}' from {sender_id} ({remote_addr})") | |
if msg_type == "join": | |
client_name = data.get("name", f"User_{sender_id[:4]}") | |
manager.collaborators[sender_id] = {"name": client_name, "status": "Connected"} | |
logger.info(f"Client {sender_id} ({client_name}) joined collaboration. Current collaborators: {list(manager.collaborators.keys())}") | |
await manager.broadcast_collaboration_status_once() | |
elif msg_type == "code_update": | |
issue_num = data.get("issue_num") | |
delta_str = data.get("delta") | |
if issue_num is not None and delta_str is not None: | |
await manager.handle_code_editor_update(int(issue_num), delta_str, sender_id) | |
else: | |
logger.warning(f"Invalid 'code_update' message from {sender_id}: Missing issue_num or delta. Data: {str(data)[:200]}") | |
elif msg_type == "status_update": | |
status = data.get("status", "Idle") | |
if sender_id in manager.collaborators: | |
manager.collaborators[sender_id]["status"] = status | |
await manager.broadcast_collaboration_status_once() | |
else: | |
logger.warning(f"Received status update from client {sender_id} not in collaborator list. Adding/Updating.") | |
manager.collaborators[sender_id] = {"name": f"User_{sender_id[:4]} (Re-added)", "status": status} | |
await manager.broadcast_collaboration_status_once() | |
else: | |
logger.warning(f"Unknown WebSocket message type '{msg_type}' received from {sender_id} ({remote_addr}). Message: {str(message)[:200]}") | |
except json.JSONDecodeError: | |
logger.error(f"Received invalid JSON over WebSocket from {sender_id} ({remote_addr}): {str(message)[:200]}...") | |
except Exception as e: | |
logger.exception(f"Error processing WebSocket message from {sender_id} ({remote_addr}): {e}") | |
# Catch standard socket exceptions for disconnects | |
except (ConnectionClosed, ConnectionClosedOK, ConnectionAbortedError, ConnectionResetError) as e: | |
logger.info(f"WebSocket client {client_id} ({remote_addr}) disconnected: Code={getattr(e, 'code', 'N/A')}, Reason='{getattr(e, 'reason', 'N/A')}'") | |
except Exception as e: | |
logger.exception(f"Unexpected error in WebSocket handler for {client_id} ({remote_addr}): {e}") | |
finally: | |
logger.info(f"Cleaning up connection for client {client_id} ({remote_addr})") | |
manager.remove_ws_client(websocket) | |
async def start_websocket_server(manager: IssueManager, port: int): | |
"""Starts the WebSocket server.""" | |
handler_with_manager = lambda ws, path: handle_ws_connection(ws, path, manager) | |
server = None | |
stop_event = asyncio.Future() | |
# Set the stop event for the server when the handler needs to stop | |
async def stop_server(): | |
if not stop_event.done(): | |
stop_event.set_result(True) | |
# Add stop_server to manager or pass it somehow if needed, otherwise rely on task cancellation | |
manager.stop_ws_server = stop_server # Example of making it accessible | |
try: | |
server = await websockets.serve( | |
handler_with_manager, | |
"0.0.0.0", | |
port, | |
ping_interval=20, | |
ping_timeout=20 | |
) | |
logger.info(f"WebSocket server started successfully on ws://0.0.0.0:{port}") | |
await stop_event # Keep running until stop_event is set | |
except OSError as e: | |
logger.error(f"Failed to start WebSocket server on port {port}: {e}. Is the port already in use?") | |
raise SystemExit(f"WebSocket Port {port} unavailable. Application cannot start.") | |
except asyncio.CancelledError: | |
logger.info("WebSocket server task cancelled.") | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred starting or running the WebSocket server: {e}") | |
if not stop_event.done(): stop_event.set_result(True) # Ensure loop terminates on error | |
raise | |
finally: | |
if server: | |
logger.info("Attempting to stop WebSocket server...") | |
server.close() | |
await server.wait_closed() | |
logger.info("WebSocket server stopped.") | |
if not stop_event.done(): # Ensure future is completed | |
stop_event.set_result(True) | |
def run_webhook_server(manager: IssueManager, port: int, main_loop: asyncio.AbstractEventLoop): | |
"""Starts the HTTP webhook server in a separate thread.""" | |
WebhookHandler.manager_instance = manager | |
WebhookHandler.main_loop = main_loop | |
httpd = None | |
try: | |
server_address = ("0.0.0.0", port) | |
httpd = HTTPServer(server_address, WebhookHandler) | |
logger.info(f"Webhook HTTP server starting on http://0.0.0.0:{port}") | |
httpd.serve_forever() | |
except OSError as e: | |
logger.error(f"Failed to start Webhook server on port {port}: {e}. Is the port already in use?") | |
except Exception as e: | |
logger.exception(f"Unexpected error in Webhook server thread: {e}") | |
finally: | |
if httpd: | |
logger.info("Shutting down Webhook HTTP server...") | |
httpd.server_close() | |
logger.info("Webhook server thread finished.") | |
# ========== Main Execution ========== | |
if __name__ == "__main__": | |
print("--- Acknowledging potential TensorFlow/CUDA warnings ---") | |
print("If you see warnings like 'Could not load dynamic library...' or related to CUDA/GPU,") | |
print("they are often harmless if you are not using a local GPU-accelerated model.") | |
print("Hugging Face Inference API calls run remotely and do not require local GPU setup.") | |
print("--- Starting Application ---") | |
try: | |
main_loop = asyncio.get_event_loop() or asyncio.new_event_loop() | |
asyncio.set_event_loop(main_loop) | |
manager = IssueManager() | |
# Start the webhook server in a separate thread | |
webhook_thread = threading.Thread( | |
target=run_webhook_server, | |
args=(manager, WEBHOOK_PORT, main_loop), | |
name="WebhookServerThread", | |
daemon=True | |
) | |
webhook_thread.start() | |
# Create the Gradio UI | |
app = create_ui(manager) | |
# Start the WebSocket server as an asyncio task | |
ws_task = asyncio.create_task(start_websocket_server(manager, WS_PORT)) | |
# Launch Gradio app | |
app.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=True, | |
inbrowser=True, | |
prevent_thread_lock=True | |
) | |
except KeyboardInterrupt: | |
logger.info("Gradio app interrupted. Initiating shutdown...") | |
except Exception as e: | |
logger.exception(f"An unexpected error occurred: {e}") | |
finally: | |
# Ensure graceful shutdown of tasks | |
logger.info("Cancelling WebSocket server task...") | |
if ws_task and not ws_task.done(): | |
ws_task.cancel() | |
try: | |
await ws_task | |
except asyncio.CancelledError: | |
pass | |
except Exception as e: | |
logger.error(f"Error during WebSocket cancellation: {e}") | |
# Stop the main asyncio loop | |
if main_loop.is_running(): | |
logger.info("Stopping asyncio event loop...") | |
main_loop.stop() | |
logger.info("Application shutdown complete.") |