import sys import signal import shutil import logging import time import os from datetime import datetime from typing import List, Dict import requests import gradio as gr import atexit import subprocess import webbrowser import urllib.parse import warnings import torch import flask import spaces from accelerate import Accelerator # Constants INPUT_DIRECTORY = 'input' OUTPUT_DIRECTORY = 'output' LOGS_DIRECTORY = 'logs' RESOLUTIONS_DIRECTORY = 'resolutions' REPOS_DIRECTORY = 'repos' # Set up logging def initialize_logger(): log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger(__name__) # Set up environment @spaces.GPU def initialize_environment(input_file, output_directory): directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, input_file, output_directory] for directory in directories: os.makedirs(directory, exist_ok=True) return initialize_logger() # GitHub API handler class GitHubAPI: def __init__(self, token: str): self.token = token self.headers = { 'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json' } self.base_url = "https://api.github.com" def _check_rate_limit(self) -> bool: try: response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) response.raise_for_status() limits = response.json() remaining = limits['resources']['core']['remaining'] reset_time = limits['resources']['core']['reset'] if remaining < 10: wait_time = max(0, reset_time - int(time.time())) if wait_time > 0: logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds...") time.sleep(wait_time) return False return True except Exception as e: logger.error(f"Error checking rate limit: {str(e)}") return True def get_repository(self, owner: str, repo: str) -> Dict[str, Any]: try: response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error getting repository: {str(e)}") return {} def get_issues(self, owner: str, repo: str) -> List[Dict[str, Any]]: try: response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error getting issues: {str(e)}") return [] def create_issue(self, owner: str, repo: str, title: str, body: str) -> Dict[str, Any]: try: response = requests.post(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, json={'title': title, 'body': body}) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error creating issue: {str(e)}") return {} def update_issue(self, owner: str, repo: str, issue_number: int, title: str, body: str) -> Dict[str, Any]: try: response = requests.patch(f"{self.base_url}/repos/{owner}/{repo}/issues/{issue_number}", headers=self.headers, json={'title': title, 'body': body}) response.raise_for_status() return response.json() except Exception as e: logger.error(f"Error updating issue: {str(e)}") return {} @app.route('/fetch-issues', methods=['POST']) def fetch_issues(): data = request.get_json() github_token = data['githubToken'] repo_url = data['repoUrl'] owner, repo = repo_url.split('/')[-2:] github_api = GitHubAPI(github_token) issues = github_api.get_issues(owner, repo) return jsonify({'issues': issues}) @app.route('/resolve-issue', methods=['POST']) def resolve_issue(): data = request.get_json() github_token = data['githubToken'] repo_url = data['repoUrl'] issue = data['issue'] resolution = data['resolution'] forked_repo_url = data['forkedRepoUrl'] owner, repo = repo_url.split('/')[-2:] github_api = GitHubAPI(github_token) issue_number = issue['number'] github_api.update_issue(owner, repo, issue_number, issue['title'], resolution) return jsonify({'output': f"Issue {issue_number} resolved"}) @app.route('/extract-info', methods=['POST']) def extract_info(): data = request.get_json() url = data['url'] # Extract info from URL return jsonify({'output': f"Info extracted from {url}"}) def run_flask_app(): app.run(debug=True, use_reloader=False) # HTML and CSS integration custom_html = """ GitHub Issue Manager

GitHub Issue Manager

""" def create_gradio_interface(): with gr.Blocks(css=None, theme=None) as demo: gr.HTML(custom_html) return demo # Cleanup function def cleanup(): try: temp_dirs = [REPOS_DIRECTORY] for dir_name in temp_dirs: if os.path.exists(dir_name): shutil.rmtree(dir_name) logging.shutdown() except Exception as e: print(f"Error during cleanup: {str(e)}") # Signal handler def signal_handler(signum, frame): logger.info(f"Received signal {signum}") cleanup() sys.exit(0) if __name__ == "__main__": # Register cleanup handlers atexit.register(cleanup) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Run Flask app in a separate thread flask_thread = Thread(target=run_flask_app) flask_thread.start() # Create Gradio interface demo = create_gradio_interface() demo.launch()