Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import aiohttp | |
import asyncio | |
from git import Repo, GitCommandError | |
from pathlib import Path | |
from datetime import datetime | |
import shutil | |
import json | |
import logging | |
import re | |
from typing import Dict, List, Optional, Tuple | |
import subprocess | |
import plotly.graph_objects as go | |
from transformers import pipeline # For local fallback | |
import threading | |
# ========== Configuration ========== | |
WORKSPACE = Path("/tmp/issue_workspace") | |
WORKSPACE.mkdir(exist_ok=True) | |
GITHUB_API = "https://api.github.com/repos" | |
HF_INFERENCE_API = "https://api-inference.huggingface.co/models" | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Free Hugging Face models for selection | |
HF_MODELS = { | |
"Mistral-8x7B (Powerful)": "mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"DistilBERT (Lightweight)": "distilbert-base-uncased", | |
"BART (Summarization)": "facebook/bart-large" | |
} | |
DEFAULT_MODEL = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
# ========== Theme ========== | |
theme = gr.themes.Default( | |
primary_hue="indigo", | |
secondary_hue="slate", | |
radius_size="lg", | |
).set( | |
button_primary_background_fill="*primary_600", | |
button_primary_text_color="white", | |
block_label_background_fill="*primary_500", | |
block_label_text_color="white", | |
body_background_fill="gray_50", | |
) | |
# ========== Issue Manager ========== | |
class IssueManager: | |
def __init__(self): | |
self.issues: Dict[int, dict] = {} | |
self.repo_url: Optional[str] = None | |
self.repo: Optional[Repo] = None | |
self.current_issue: Optional[int] = None | |
self.github_token: Optional[str] = None | |
self.hf_token: Optional[str] = None | |
self.collaborators: Dict[str, str] = {} | |
self.local_nlp = None # Local fallback | |
self.webhook_active = False | |
async def crawl_issues(self, repo_url: str, github_token: str, hf_token: str) -> Tuple[bool, str]: | |
self.repo_url = repo_url | |
self.github_token = github_token | |
self.hf_token = hf_token | |
match = re.match(r"https://github.com/([^/]+)/([^/]+)", repo_url) | |
if not match: | |
return False, "Invalid GitHub URL format" | |
owner, repo_name = match.groups() | |
all_issues = [] | |
page = 1 | |
headers = {"Authorization": f"Bearer {github_token}", "Accept": "application/vnd.github+json"} | |
try: | |
async with aiohttp.ClientSession() as session: | |
while True: | |
url = f"{GITHUB_API}/{owner}/{repo_name}/issues?page={page}&state=open" | |
async with session.get(url, headers=headers) as response: | |
if response.status == 403: | |
return False, "GitHub rate limit exceeded or invalid token" | |
if response.status != 200: | |
return False, f"GitHub API error: {response.status}" | |
issues = await response.json() | |
if not issues: | |
break | |
all_issues.extend(issues) | |
page += 1 | |
self.issues = { | |
i['number']: {**i, 'severity': self._determine_severity(i)} | |
for i in all_issues if not i.get('assignee') | |
} | |
return True, f"Found {len(self.issues)} unresolved/unassigned issues" | |
except Exception as e: | |
logger.error(f"Crawl error: {e}") | |
return False, str(e) | |
def _determine_severity(self, issue: dict) -> str: | |
body = (issue.get('body') or '').lower() | |
labels = [l['name'].lower() for l in issue.get('labels', [])] | |
if any(l in labels for l in ['critical', 'urgent', 'security']) or 'crash' in body: | |
return "Critical" | |
elif any(l in labels for l in ['high', 'important']) or 'error' in body: | |
return "High" | |
elif any(l in labels for l in ['medium', 'bug']) or 'issue' in body: | |
return "Medium" | |
return "Low" | |
async def clone_and_work(self, issue_number: int) -> Tuple[Dict, str, str]: | |
try: | |
repo_path = WORKSPACE / f"repo-{issue_number}" | |
if repo_path.exists(): | |
shutil.rmtree(repo_path) | |
self.repo = Repo.clone_from(self.repo_url, repo_path) | |
branch = f"issue-{issue_number}" | |
self.repo.git.checkout('-b', branch) | |
self.current_issue = issue_number | |
issue = self.issues[issue_number] | |
diff = "" | |
return {"success": f"Working on #{issue_number}"}, issue['body'], diff | |
except GitCommandError as e: | |
return {"error": str(e)}, "", "" | |
async def suggest_resolution(self, issue_number: int, model: str) -> str: | |
"""Use HF Inference API or local fallback""" | |
issue = self.issues[issue_number] | |
prompt = f"Suggest a detailed resolution for this GitHub issue:\nTitle: {issue['title']}\nBody: {issue['body']}" | |
if self.hf_token: | |
headers = {"Authorization": f"Bearer {self.hf_token}"} | |
payload = {"inputs": prompt, "parameters": {"max_new_tokens": 200}} | |
try: | |
async with aiohttp.ClientSession() as session: | |
async with session.post(f"{HF_INFERENCE_API}/{model}", headers=headers, json=payload) as resp: | |
if resp.status == 200: | |
result = await resp.json() | |
return result[0].get("generated_text", "No suggestion").strip() | |
elif resp.status == 429: | |
return await self._local_suggestion(prompt) | |
return f"HF API error: {resp.status}" | |
except Exception as e: | |
logger.error(f"HF suggestion error: {e}") | |
return await self._local_suggestion(prompt) | |
return await self._local_suggestion(prompt) | |
async def _local_suggestion(self, prompt: str) -> str: | |
"""Fallback to local model""" | |
if not self.local_nlp: | |
self.local_nlp = pipeline("text-generation", model="distilgpt2", device=-1) # CPU-only, lightweight | |
try: | |
result = self.local_nlp(prompt, max_length=200, num_return_sequences=1) | |
return result[0]["generated_text"].strip() | |
except Exception as e: | |
logger.error(f"Local fallback error: {e}") | |
return "Local suggestion unavailable" | |
async def resolve_and_verify(self, resolution: str) -> Tuple[Dict, str]: | |
if not self.current_issue or not self.repo: | |
return {"error": "No active issue/repo"}, "" | |
issue_number = self.current_issue | |
repo_path = self.repo.working_dir | |
try: | |
resolution_dir = repo_path / "resolutions" / str(issue_number) | |
resolution_dir.mkdir(parents=True, exist_ok=True) | |
timestamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S") | |
resolution_file = resolution_dir / f"resolution_{timestamp}.txt" | |
resolution_file.write_text(resolution) | |
self.repo.git.add(all=True) | |
self.repo.index.commit(f"Resolve #{issue_number}") | |
diff = self.repo.git.diff('HEAD^', 'HEAD') | |
v1 = await self._verify_resolution(issue_number, resolution) | |
await asyncio.sleep(1) | |
v2 = await self._verify_resolution(issue_number, resolution) | |
if v1["status"] and v2["status"]: | |
self.repo.git.push('origin', f"issue-{issue_number}") | |
pr_url = await self._create_pr(issue_number) | |
return { | |
"success": f"Issue #{issue_number} resolved and verified", | |
"pr_url": pr_url, | |
"verification": [v1, v2] | |
}, diff | |
return {"error": "Verification failed", "details": [v1, v2]}, diff | |
except Exception as e: | |
logger.error(f"Resolve error: {e}") | |
return {"error": str(e)}, "" | |
async def _verify_resolution(self, issue_number: int, resolution: str) -> Dict: | |
issue = self.issues[issue_number] | |
body = issue['body'].lower() | |
try: | |
diff = self.repo.git.diff('HEAD^', 'HEAD') | |
if 'file' in body: | |
file_match = re.search(r'file:\s*(\S+)', body) | |
if file_match and file_match.group(1) not in diff: | |
return {"status": False, "message": "Resolution doesn’t modify mentioned file"} | |
if (self.repo.working_dir / "tests").exists(): | |
proc = await asyncio.create_subprocess_exec( | |
"pytest", cwd=self.repo.working_dir, stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.PIPE | |
) | |
stdout, stderr = await proc.communicate() | |
if proc.returncode != 0: | |
return {"status": False, "message": f"Tests failed: {stderr.decode()}"} | |
return {"status": True, "message": "Resolution verified"} | |
except Exception as e: | |
return {"status": False, "message": str(e)} | |
async def _create_pr(self, issue_number: int) -> str: | |
owner, repo_name = re.match(r"https://github.com/([^/]+)/([^/]+)", self.repo_url).groups() | |
headers = {"Authorization": f"Bearer {self.github_token}", "Accept": "application/vnd.github+json"} | |
pr_data = { | |
"title": f"Resolve #{issue_number}: {self.issues[issue_number]['title']}", | |
"body": f"Resolution for #{issue_number}\nVerification: Passed twice", | |
"head": f"issue-{issue_number}", | |
"base": "main" | |
} | |
async with aiohttp.ClientSession() as session: | |
resp = await session.post(f"{GITHUB_API}/{owner}/{repo_name}/pulls", headers=headers, json=pr_data) | |
if resp.status == 201: | |
return (await resp.json())['html_url'] | |
raise Exception(f"PR creation failed: {await resp.text()}") | |
def get_stats(self) -> go.Figure: | |
severity_counts = {s: sum(1 for i in self.issues.values() if i['severity'] == s) for s in ["Critical", "High", "Medium", "Low"]} | |
return go.Figure( | |
data=[go.Bar(x=list(severity_counts.keys()), y=list(severity_counts.values()))], | |
layout={"title": "Issue Severity Distribution"} | |
) | |
async def webhook_listener(self): | |
"""Simulated webhook for real-time updates""" | |
while self.webhook_active: | |
await asyncio.sleep(10) # Poll every 10s (replace with real webhook in production) | |
await self.crawl_issues(self.repo_url, self.github_token, self.hf_token) | |
logger.info("Webhook: Issues refreshed") | |
manager = IssueManager() | |
# ========== UI ========== | |
def create_ui(): | |
with gr.Blocks(theme=theme, title="Hugging Face Git Resolver") as app: | |
gr.Markdown("# 🤗 Hugging Face Git Resolver\nFree AI-powered issue resolution for all") | |
current_diff = gr.State(value="") | |
with gr.Row(): | |
repo_url = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/username/repo") | |
github_token = gr.Textbox(label="GitHub Token", type="password") | |
hf_token = gr.Textbox(label="Hugging Face Token", type="password") | |
model_select = gr.Dropdown(choices=list(HF_MODELS.keys()), value="Mistral-8x7B (Powerful)", label="AI Model") | |
crawl_btn = gr.Button("Crawl Issues", variant="primary") | |
with gr.Tabs(): | |
with gr.Tab("Issues"): | |
status = gr.Textbox(label="Status") | |
issue_list = gr.Dataframe( | |
headers=["Number", "Title", "Severity"], | |
datatype=["number", "str", "str"], | |
interactive=True | |
) | |
progress = gr.Progress() | |
with gr.Tab("Work"): | |
with gr.Row(): | |
issue_num = gr.Number(label="Issue Number") | |
work_btn = gr.Button("Start Work") | |
issue_body = gr.Markdown() | |
with gr.Row(): | |
resolution = gr.Textbox(label="Resolution", lines=5) | |
suggest_btn = gr.Button("AI Suggest") | |
diff_view = gr.HTML(label="Changes Preview") | |
resolve_btn = gr.Button("Resolve & Verify", variant="primary") | |
result = gr.JSON() | |
with gr.Tab("Dashboard"): | |
stats_plot = gr.Plot(label="Issue Stats") | |
collab_status = gr.Textbox(label="Collaborators Online", value="No collaborators yet") | |
# Event Handlers | |
async def crawl_and_display(url, g_token, h_token, model): | |
success, msg = await manager.crawl_issues(url, g_token, h_token) | |
if success: | |
sorted_issues = sorted(manager.issues.values(), key=lambda x: {"Critical": 0, "High": 1, "Medium": 2, "Low": 3}[x['severity']]) | |
df = [[i['number'], i['title'], i['severity']] for i in sorted_issues] | |
if not manager.webhook_active: | |
manager.webhook_active = True | |
asyncio.create_task(manager.webhook_listener()) | |
return msg, gr.Dataframe(value=df), manager.get_stats() | |
return msg, gr.Dataframe(), None | |
async def start_work(num): | |
result, body, diff = await manager.clone_and_work(int(num)) | |
diff_html = f"<pre>{diff}</pre>" if diff else "No changes yet" | |
return result, body, diff_html, diff | |
async def suggest_res(num, model): | |
suggestion = await manager.suggest_resolution(int(num), HF_MODELS[model]) | |
return suggestion | |
async def resolve_issue(res, diff_state): | |
result, diff = await manager.resolve_and_verify(res) | |
diff_html = f"<pre>{diff}</pre>" if diff else "No changes" | |
return result, diff_html, diff | |
def select_issue(evt: gr.SelectData): | |
num = evt.value[0] | |
return num, manager.issues[num]['body'] | |
issue_list.select(select_issue, None, [issue_num, issue_body]) | |
crawl_btn.click(crawl_and_display, [repo_url, github_token, hf_token, model_select], [status, issue_list, stats_plot]) | |
work_btn.click(start_work, [issue_num], [status, issue_body, diff_view, current_diff]) | |
suggest_btn.click(suggest_res, [issue_num, model_select], [resolution]) | |
resolve_btn.click(resolve_issue, [resolution, current_diff], [result, diff_view, current_diff]) | |
async def update_collab(): | |
while True: | |
await asyncio.sleep(5) | |
manager.collaborators["user1"] = "Working on #5" # Simulated | |
yield "\n".join(f"{k}: {v}" for k, v in manager.collaborators.items()) | |
app.load(fn=update_collab, inputs=None, outputs=collab_status, _js="() => setInterval(() => gradioApp().dispatchEvent(new Event('update_collab')), 5000)") | |
return app | |
# ========== Execution ========== | |
if __name__ == "__main__": | |
app = create_ui() | |
app.launch(share=True) |