import sys import signal import shutil import logging import time import os from datetime import datetime from typing import List, Dict, Any import requests import gradio as gr import atexit import subprocess from urllib.parse import urlparse, quote import webbrowser from flask import Flask, request, jsonify, send_from_directory from threading import Thread device = "cuda" @spaces.GPU() def stream_chat( message: str, history: list, system_prompt: str, temperature: float = 0.8, max_new_tokens: int = 1024, top_p: float = 1.0, top_k: int = 20, penalty: float = 1.2, ): print(f'message: {message}') print(f'history: {history}') conversation = [ {"role": "system", "content": system_prompt} ] for prompt, answer in history: conversation.extend([ {"role": "user", "content": prompt}, {"role": "assistant", "content": answer}, ]) conversation.append({"role": "user", "content": message}) input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt").to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( input_ids=input_ids, max_new_tokens=max_new_tokens, do_sample=temperature != 0, top_p=top_p, top_k=top_k, temperature=temperature, eos_token_id=[128001, 128008, 128009], streamer=streamer, ) # Constants INPUT_DIRECTORY = 'input' OUTPUT_DIRECTORY = 'output' LOGS_DIRECTORY = 'logs' RESOLUTIONS_DIRECTORY = 'resolutions' REPOS_DIRECTORY = 'repos' # Set up environment def initialize_environment(input_file, output_directory): directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, input_file, output_directory] for directory in directories: os.makedirs(directory, exist_ok=True) # Set up logging def initialize_logger(): log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger(__name__) # Initialize environment and logger initialize_environment(INPUT_DIRECTORY, OUTPUT_DIRECTORY) logger = initialize_logger() # Initialize logger globally # GitHub API handler class GitHubAPI: def __init__(self, token: str): self.token = token self.headers = { 'Authorization': f'token {token}', 'Accept': 'application/vnd.github.v3+json' } self.base_url = "https://api.github.com" def _check_rate_limit(self) -> bool: try: response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) response.raise_for_status() limits = response.json() remaining = limits['resources']['core']['remaining'] reset_time = limits['resources']['core']['reset'] if remaining < 10: wait_time = max(0, reset_time - int(time.time())) if wait_time > 0: logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds...") time.sleep(wait_time) return False return True except requests.exceptions.RequestException as e: logger.error(f"Error checking rate limit: {str(e)}") return True def get_repository(self, owner: str, repo: str) -> Dict: try: response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers) response.raise_for_status() return response.json() except requests.HTTPError as e: logger.error(f"HTTP error getting repository info: {str(e)}") raise except Exception as e: logger.error(f"Error getting repository info: {str(e)}") raise def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]: if not self._check_rate_limit(): return [] try: response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state}) response.raise_for_status() issues = response.json() return [issue for issue in issues if 'pull_request' not in issue] except Exception as e: logger.error(f"Error fetching issues: {str(e)}") return [] # GitHub Bot class GitHubBot: def __init__(self): self.github_api = None def initialize_api(self, token: str): self.github_api = GitHubAPI(token) def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]: try: self.initialize_api(token) return self.github_api.get_issues(owner, repo) except Exception as e: logger.error(f"Error fetching issues: {str(e)}") return [] def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str: try: self.initialize_api(token) self.github_api.get_repository(owner, repo) # Create resolution file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md" with open(resolution_file, "w") as f: f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}") # Clone the forked repo subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True) # Change to the cloned directory os.chdir('/tmp/' + forked_repo.split('/')[-1]) # Assuming manual intervention now input(" Apply the fix manually and stage the changes (press ENTER)? ") # Commit and push the modifications subprocess.run(['git', 'add', '.'], check=True) subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({quote(resolution)})"], check=True) subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True) # Open Pull Request page webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{owner}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}') return f"Resolution saved: {resolution_file}" except Exception as e: error_msg = f"Error resolving issue: {str(e)}" logger.error(error_msg) return error_msg def handle_issue_selection(token, owner, repo, issue_number, resolution, forked_repo): bot = GitHubBot() result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo) return result def extract_info_from_url(url: str) -> Dict[str, Any]: info = {} try: response = requests.get(url) response.raise_for_status() info['status_code'] = response.status_code info['headers'] = dict(response.headers) info['content'] = response.text[:500] # Limit content to first 500 characters for brevity parsed_url = urlparse(url) if 'github.com' in parsed_url.netloc: parts = parsed_url.path.split('/') if len(parts) > 2: owner = parts[1] repo = parts[2] issues = bot.fetch_issues(github_token, owner, repo) info['issues'] = issues elif 'huggingface.co' in parsed_url.netloc: # Add Hugging Face specific handling if needed pass except requests.HTTPError as e: info['error'] = f"HTTP error: {str(e)}" except Exception as e: info['error'] = f"Error: {str(e)}" return info # Initialize GitHubBot globally bot = GitHubBot() # Define missing functions with validation def fetch_issues(token, repo_url): try: parts = repo_url.split('/') if len(parts) < 2: raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") owner, repo = parts[-2], parts[-1] issues = bot.fetch_issues(token, owner, repo) return issues except Exception as e: return str(e) def resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url): try: parts = repo_url.split('/') if len(parts) < 2: raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") owner, repo = parts[-2], parts[-1] result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url) return result except Exception as e: return str(e) def extract_info(url): try: info = extract_info_from_url(url) return info except Exception as e: return str(e) def submit_issue(token, repo_url, issue_number, resolution, forked_repo_url): try: parts = repo_url.split('/') if len(parts) < 2: raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.") owner, repo = parts[-2], parts[-1] result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url) return result except Exception as e: return str(e) app = Flask(__name__) @app.route('/index') def serve_index(): return send_from_directory('.', 'index.html') @app.route('/fetch_issues', methods=['POST']) def fetch_issues_route(): data = request.get_json() token = data.get('token') repo_url = data.get('repoUrl') if not token or not repo_url: return jsonify({'error': 'Missing token or repoUrl'}), 400 issues = fetch_issues(token, repo_url) return jsonify({'issues': issues}) @app.route('/resolve_issue', methods=['POST']) def resolve_issue_route(): data = request.get_json() token = data.get('token') repo_url = data.get('repoUrl') issue_number = data.get('issueNumber') resolution = data.get('resolution') forked_repo_url = data.get('forkedRepoUrl') if not token or not repo_url or not issue_number or not resolution: return jsonify({'error': 'Missing required fields'}), 400 result = resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url) return jsonify({'result': result}) @app.route('/extract_info', methods=['POST']) def extract_info_route(): data = request.get_json() url = data.get('url') if not url: return jsonify({'error': 'Missing URL'}), 400 info = extract_info(url) return jsonify(info) def run_flask_app(): app.run(debug=True, use_reloader=False) def create_gradio_interface(): with gr.Blocks(css=None, theme=None) as demo: gr.HTML(custom_html) return demo # Cleanup function def cleanup(): try: temp_dirs = [REPOS_DIRECTORY] for dir_name in temp_dirs: if os.path.exists(dir_name): shutil.rmtree(dir_name) logging.shutdown() except Exception as e: print(f"Error during cleanup: {str(e)}") # Signal handler def signal_handler(signum, frame): logger.info(f"Received signal {signum}") cleanup() sys.exit(0) if __name__ == "__main__": # Register cleanup handlers atexit.register(cleanup) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Run Flask app in a separate thread flask_thread = Thread(target=run_flask_app) flask_thread.start() # Create Gradio interface demo = create_gradio_interface() demo.launch()