Spaces:
Runtime error
Runtime error
import sys | |
import signal | |
import shutil | |
import logging | |
import time | |
import os | |
from datetime import datetime | |
from typing import List, Dict | |
import requests | |
import gradio as gr | |
import atexit | |
import subprocess | |
import webbrowser | |
import urllib.parse | |
import http.client | |
import spaces | |
import torch | |
from accelerate import Accelerator | |
import warnings | |
warnings.filterwarnings('ignore', category=UserWarning, module='__main__') | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' | |
def initialize_environment(): | |
"""Initialize application environment and configurations""" | |
directories = ['logs', 'resolutions', 'repos'] | |
for directory in directories: | |
os.makedirs(directory, exist_ok=True) | |
log_file = f"logs/github_bot_{datetime.now(). strftime('%Y%m%d_%H%M%S')}.log" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file), | |
logging.StreamHandler() | |
] | |
) | |
def handle_exception(exc_type, exc_value, exc_traceback): | |
if issubclass(exc_type, KeyboardInterrupt): | |
sys.__excepthook__(exc_type, exc_value, exc_traceback) | |
return | |
logging.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback)) | |
sys.excepthook = handle_exception | |
return logging.getLogger(__name__) | |
logger = initialize_environment() | |
class GitHubAPI: | |
"""GitHub API handler with rate limiting and error handling""" | |
def __init__(self, token: str): | |
self.token = token | |
self.headers = { | |
'Authorization': f'token {token}', | |
'Accept': 'application/vnd.github.v3+json' | |
} | |
self.base_url = "https://api.github.com" | |
def _check_rate_limit(self) -> bool: | |
"""Check and handle GitHub API rate limits""" | |
try: | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
response.raise_for_status() | |
limits = response.json() | |
remaining = limits['resources']['core']['remaining'] | |
reset_time = limits['resources']['core']['reset'] | |
if remaining < 10: | |
wait_time = max(0, reset_time - int(time.time())) | |
if wait_time > 0: | |
logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds...") | |
time.sleep(wait_time) | |
return False | |
return True | |
except Exception as e: | |
logger.error(f"Error checking rate limit: {str(e)}") | |
return True | |
def get_repository(self, owner: str, repo: str) -> Dict: | |
"""Get repository information""" | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.HTTPError as e: | |
logger.error(f"HTTP error getting repository info: {str(e)}") | |
raise | |
except Exception as e: | |
logger.error(f"Error getting repository info: {str(e)}") | |
raise | |
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]: | |
"""Fetch repository issues""" | |
if not self._check_rate_limit(): | |
return [] | |
try: | |
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state}) | |
response.raise_for_status() | |
issues = response.json() | |
return [issue for issue in issues if 'pull_request' not in issue] | |
except requests.HTTPError as e: | |
logger.error(f"HTTP error fetching issues: {str(e)}") | |
return [] | |
except Exception as e: | |
logger.error(f"Error fetching issues: {str(e)}") | |
return [] | |
class GitHubBot: | |
"""Main GitHub bot implementation""" | |
def __init__(self): | |
self.github_api = None | |
def initialize_api(self, token: str ): | |
"""Initialize GitHub API with token""" | |
self.github_api = GitHubAPI(token) | |
def fetch_issues(self, token: str, username: str, repo: str) -> List[Dict]: | |
"""Fetch issues from GitHub repository""" | |
try: | |
self.initialize_api(token) | |
return self.github_api.get_issues(username, repo) | |
except Exception as e: | |
logger.error(f"Error fetching issues: {str(e)}") | |
return [] | |
def resolve_issue(self, token: str, username: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str: | |
"""Resolve a GitHub issue and submit PR.""" | |
try: | |
self.initialize_api(token) | |
self.github_api.get_repository(username, repo) | |
# Create resolution file | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
resolution_file = f"resolutions/resolution_{issue_number}_{timestamp}.md" | |
with open(resolution_file, "w") as f: | |
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}") | |
# Clone the forked repo | |
subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True) | |
# Change to the cloned directory | |
os.chdir('/tmp/' + forked_repo.split('/')[-1]) | |
# Assuming manual intervention now | |
input(" Apply the fix manually and stage the changes (press ENTER)? ") | |
# Commit and push the modifications | |
subprocess.run(['git', 'add', '.'], check=True) | |
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({urllib.parse.quote(resolution)})"], check=True) | |
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True) | |
# Open Pull Request page | |
webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{username}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}') | |
return f"Resolution saved: {resolution_file}" | |
except Exception as e: | |
error_msg = f"Error resolving issue: {str(e)}" | |
logger.error(error_msg) | |
return error_msg | |
def handle_issue_selection(token, username, repo, issue_number, resolution, forked_repo): | |
bot = GitHubBot() | |
result = bot.resolve_issue(token, username, repo, issue_number, resolution, forked_repo) | |
return result | |
custom_css = """ | |
/* DaisyUI and Tailwind are loaded from CDN */ | |
@import url('https://cdn.jsdelivr.net/npm/daisyui@3.1.0/dist/full.css'); | |
@import url('https://cdn.tailwindcss.com'); | |
/* Custom gradient background */ | |
body { | |
background: linear-gradient(to bottom right, #121212, #303030) !important; | |
} | |
/* Card styling */ | |
.container { | |
max-width: 800px; | |
margin: 2rem auto; | |
padding: 0 1rem; | |
} | |
.card { | |
background: #2a2a2a; | |
border-radius: 1rem; | |
box-shadow: 0 10px 15px -3px rgba(0, 0, 0, 0.1); | |
padding: 2rem; | |
} | |
/* Form controls */ | |
.input, .select, .textarea { | |
background: #363636; | |
border: 1px solid #4a4a4a; | |
color: #ffffff; | |
border-radius: 0.5rem; | |
padding: 0.75rem; | |
width: 100%; | |
margin-bottom: 1rem; | |
} | |
/* Buttons */ | |
.button { | |
width: 100%; | |
padding: 0.75rem; | |
border-radius: 0.5rem; | |
font-weight: bold; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.button-primary { | |
background: #570df8; | |
color: white; | |
} | |
.button-success { | |
background: #36d399; | |
color: white; | |
} | |
/* Labels */ | |
.label { | |
color: #a3a3a3; | |
margin-bottom: 0.5rem; | |
display: block; | |
} | |
/* Output area */ | |
.output-area { | |
margin-top: 2rem; | |
padding: 1rem; | |
background: #363636; | |
border-radius: 0.5rem; | |
color: #ffffff; | |
} | |
""" | |
def create_gradio_interface(): | |
"""Create and configure Gradio interface with custom styling""" | |
bot = GitHubBot() | |
with gr.Blocks(css=custom_css, theme=gr.themes.Base()) as demo: | |
gr.HTML(""" | |
<div class="container"> | |
<h1 class="text-4xl font-bold text-white mb-8 text-center">GitHub Issue Manager</h1> | |
<div class="card"> | |
""") | |
with gr.Group(): | |
github_token = gr.Textbox( | |
label="GitHub Token", | |
placeholder="Enter your GitHub token", | |
type="password", | |
elem_classes="input input-bordered input-primary" | |
) | |
github_username = gr.Textbox( | |
label="Repository Owner", | |
placeholder="Enter repository owner username", | |
elem_classes="input input-bordered input-primary" | |
) | |
github_repo = gr.Textbox( | |
label="Repository Name", | |
placeholder="Enter repository name", | |
elem_classes="input input-bordered input-primary" | |
) | |
with gr.Group(): | |
fetch_button = gr.Button( | |
"Fetch Issues", | |
elem_classes="button button-primary" | |
) | |
issue_dropdown = gr.Dropdown( | |
choices=[], | |
label="Select Issue", | |
interactive=True, | |
elem_classes="select select-primary" | |
) | |
with gr.Group(): | |
resolution_text = gr.Textbox( | |
label="Resolution", | |
placeholder="Enter your resolution here...", | |
lines=5, | |
elem_classes="textarea textarea-primary" | |
) | |
forked_repo = gr.Textbox( | |
label="Forked Repository Path", | |
placeholder="Enter the path of the forked repository", | |
elem_classes="input input-bordered input-primary" | |
) | |
resolve_button = gr.Button( | |
"Resolve Issue", | |
elem_classes="button button-success" | |
) | |
output_text = gr.Textbox( | |
label="Output", | |
interactive=False, | |
elem_classes="output-area" | |
) | |
gr.HTML("</div></div>") # Close card and container divs | |
def fetch_issues_handler(token, username, repo): | |
if not all([token, username, repo]): | |
return gr.Dropdown(choices=["Please provide all required fields"]) | |
issues = bot.fetch_issues(token, username, repo) | |
if not issues: | |
return gr.Dropdown(choices=["No issues found or error occurred"]) | |
return gr.Dropdown( | |
choices=[f"#{issue['number']}: {issue['title']}" for issue in issues] | |
) | |
def resolve_issue_handler(token, username, repo, selected_issue, resolution, forked_repo): | |
if not selected_issue: | |
return "Please select an issue first" | |
try: | |
issue_number = int(selected_issue.split(':')[0][1:]) | |
return handle_issue_selection(token, username, repo, issue_number, resolution, forked_repo) | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Connect components | |
fetch_button.click( | |
fetch_issues_handler, | |
inputs=[github_token, github_username, github_repo], | |
outputs=[issue_dropdown] | |
) | |
resolve_button.click( | |
resolve_issue_handler, | |
inputs=[ | |
github_token, | |
github_username, | |
github_repo, | |
issue_dropdown, | |
resolution_text, | |
forked_repo | |
], | |
outputs=[output_text] | |
) | |
return demo | |
def cleanup(): | |
"""Cleanup function for graceful shutdown""" | |
try: | |
temp_dirs = ['repos'] | |
for dir_name in temp_dirs: | |
if os.path.exists(dir_name): | |
shutil.rmtree(dir_name) | |
logging.shutdown() | |
except Exception as e: | |
print(f"Error during cleanup: {str(e)}") | |
def signal_handler(signum, frame): | |
"""Handle termination signals""" | |
logger.info(f"Received signal {signum}") | |
cleanup() | |
sys.exit(0) | |
if __name__ == "__main__": | |
initialize_zero_gpu() | |
# Register cleanup handlers | |
atexit.register(cleanup) | |
signal.signal(signal.SIGINT, signal_handler) | |
signal.signal(signal.SIGTERM, signal_handler) | |
try: | |
# Create and launch interface | |
demo = create_gradio_interface() | |
# Configure launch parameters | |
is_on_spaces = os.getenv("SPACE_ID") is not None | |
launch_kwargs = { | |
"server_name": "0.0.0.0", | |
"server_port": 7860, | |
"debug": True, | |
} | |
if not is_on_spaces: | |
launch_kwargs["share"] = True | |
logger.info("Running in local mode with public URL enabled") | |
else: | |
logger.info("Running on Hugging Face Spaces") | |
# Launch application | |
logger.info("Launching Gradio interface...") | |
demo = demo.queue() | |
demo.launch(**launch_kwargs) | |
except Exception as e: | |
logger.error(f"Error launching application: {str(e)}") | |
raise | |
finally: | |
logger.info("Application shutdown") |