GitBot / app.py
acecalisto3's picture
Update app.py
d55a3b2 verified
raw
history blame
11.9 kB
import sys
import signal
import shutil
import logging
import time
import os
from datetime import datetime
from typing import List, Dict, Any
import requests
import gradio as gr
import atexit
import subprocess
from urllib.parse import urlparse, quote
import webbrowser
from flask import Flask, request, jsonify, send_from_directory
from threading import Thread
device = "cuda"
@spaces.GPU()
def stream_chat(
message: str,
history: list,
system_prompt: str,
temperature: float = 0.8,
max_new_tokens: int = 1024,
top_p: float = 1.0,
top_k: int = 20,
penalty: float = 1.2,
):
print(f'message: {message}')
print(f'history: {history}')
conversation = [
{"role": "system", "content": system_prompt}
]
for prompt, answer in history:
conversation.extend([
{"role": "user", "content": prompt},
{"role": "assistant", "content": answer},
])
conversation.append({"role": "user", "content": message})
input_ids = tokenizer.apply_chat_template(conversation, add_generation_prompt=True, return_tensors="pt").to(model.device)
streamer = TextIteratorStreamer(tokenizer, timeout=60.0, skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
input_ids=input_ids,
max_new_tokens=max_new_tokens,
do_sample=temperature != 0,
top_p=top_p,
top_k=top_k,
temperature=temperature,
eos_token_id=[128001, 128008, 128009],
streamer=streamer,
)
# Constants
INPUT_DIRECTORY = 'input'
OUTPUT_DIRECTORY = 'output'
LOGS_DIRECTORY = 'logs'
RESOLUTIONS_DIRECTORY = 'resolutions'
REPOS_DIRECTORY = 'repos'
# Set up environment
def initialize_environment(input_file, output_directory):
directories = [LOGS_DIRECTORY, RESOLUTIONS_DIRECTORY, REPOS_DIRECTORY, input_file, output_directory]
for directory in directories:
os.makedirs(directory, exist_ok=True)
# Set up logging
def initialize_logger():
log_file = f"{LOGS_DIRECTORY}/github_bot_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
# Initialize environment and logger
initialize_environment(INPUT_DIRECTORY, OUTPUT_DIRECTORY)
logger = initialize_logger() # Initialize logger globally
# GitHub API handler
class GitHubAPI:
def __init__(self, token: str):
self.token = token
self.headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3+json'
}
self.base_url = "https://api.github.com"
def _check_rate_limit(self) -> bool:
try:
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers)
response.raise_for_status()
limits = response.json()
remaining = limits['resources']['core']['remaining']
reset_time = limits['resources']['core']['reset']
if remaining < 10:
wait_time = max(0, reset_time - int(time.time()))
if wait_time > 0:
logger.warning(f"Rate limit nearly exceeded. Waiting {wait_time} seconds...")
time.sleep(wait_time)
return False
return True
except requests.exceptions.RequestException as e:
logger.error(f"Error checking rate limit: {str(e)}")
return True
def get_repository(self, owner: str, repo: str) -> Dict:
try:
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}", headers=self.headers)
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
logger.error(f"HTTP error getting repository info: {str(e)}")
raise
except Exception as e:
logger.error(f"Error getting repository info: {str(e)}")
raise
def get_issues(self, owner: str, repo: str, state: str = 'open') -> List[Dict]:
if not self._check_rate_limit():
return []
try:
response = requests.get(f"{self.base_url}/repos/{owner}/{repo}/issues", headers=self.headers, params={'state': state})
response.raise_for_status()
issues = response.json()
return [issue for issue in issues if 'pull_request' not in issue]
except Exception as e:
logger.error(f"Error fetching issues: {str(e)}")
return []
# GitHub Bot
class GitHubBot:
def __init__(self):
self.github_api = None
def initialize_api(self, token: str):
self.github_api = GitHubAPI(token)
def fetch_issues(self, token: str, owner: str, repo: str) -> List[Dict]:
try:
self.initialize_api(token)
return self.github_api.get_issues(owner, repo)
except Exception as e:
logger.error(f"Error fetching issues: {str(e)}")
return []
def resolve_issue(self, token: str, owner: str, repo: str, issue_number: int, resolution: str, forked_repo: str) -> str:
try:
self.initialize_api(token)
self.github_api.get_repository(owner, repo)
# Create resolution file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
resolution_file = f"{RESOLUTIONS_DIRECTORY}/resolution_{issue_number}_{timestamp}.md"
with open(resolution_file, "w") as f:
f.write(f"# Resolution for Issue #{issue_number}\n\n{resolution}")
# Clone the forked repo
subprocess.run(['git', 'clone', forked_repo, '/tmp/' + forked_repo.split('/')[-1]], check=True)
# Change to the cloned directory
os.chdir('/tmp/' + forked_repo.split('/')[-1])
# Assuming manual intervention now
input(" Apply the fix manually and stage the changes (press ENTER)? ")
# Commit and push the modifications
subprocess.run(['git', 'add', '.'], check=True)
subprocess.run(['git', 'commit', '-m', f"Resolved issue #{issue_number} ({quote(resolution)})"], check=True)
subprocess.run(['git', 'push', 'origin', 'HEAD'], check=True)
# Open Pull Request page
webbrowser.open(f'https://github.com/{forked_repo.split("/")[-1]}/compare/master...{owner}:{forked_repo.split("/")[-1]}_resolved_issue_{issue_number}')
return f"Resolution saved: {resolution_file}"
except Exception as e:
error_msg = f"Error resolving issue: {str(e)}"
logger.error(error_msg)
return error_msg
def handle_issue_selection(token, owner, repo, issue_number, resolution, forked_repo):
bot = GitHubBot()
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo)
return result
def extract_info_from_url(url: str) -> Dict[str, Any]:
info = {}
try:
response = requests.get(url)
response.raise_for_status()
info['status_code'] = response.status_code
info['headers'] = dict(response.headers)
info['content'] = response.text[:500] # Limit content to first 500 characters for brevity
parsed_url = urlparse(url)
if 'github.com' in parsed_url.netloc:
parts = parsed_url.path.split('/')
if len(parts) > 2:
owner = parts[1]
repo = parts[2]
issues = bot.fetch_issues(github_token, owner, repo)
info['issues'] = issues
elif 'huggingface.co' in parsed_url.netloc:
# Add Hugging Face specific handling if needed
pass
except requests.HTTPError as e:
info['error'] = f"HTTP error: {str(e)}"
except Exception as e:
info['error'] = f"Error: {str(e)}"
return info
# Initialize GitHubBot globally
bot = GitHubBot()
# Define missing functions with validation
def fetch_issues(token, repo_url):
try:
parts = repo_url.split('/')
if len(parts) < 2:
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.")
owner, repo = parts[-2], parts[-1]
issues = bot.fetch_issues(token, owner, repo)
return issues
except Exception as e:
return str(e)
def resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url):
try:
parts = repo_url.split('/')
if len(parts) < 2:
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.")
owner, repo = parts[-2], parts[-1]
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url)
return result
except Exception as e:
return str(e)
def extract_info(url):
try:
info = extract_info_from_url(url)
return info
except Exception as e:
return str(e)
def submit_issue(token, repo_url, issue_number, resolution, forked_repo_url):
try:
parts = repo_url.split('/')
if len(parts) < 2:
raise ValueError("Repository URL is not in the correct format. Expected format: 'owner/repo'.")
owner, repo = parts[-2], parts[-1]
result = bot.resolve_issue(token, owner, repo, issue_number, resolution, forked_repo_url)
return result
except Exception as e:
return str(e)
app = Flask(__name__)
@app.route('/index')
def serve_index():
return send_from_directory('.', 'index.html')
@app.route('/fetch_issues', methods=['POST'])
def fetch_issues_route():
data = request.get_json()
token = data.get('token')
repo_url = data.get('repoUrl')
if not token or not repo_url:
return jsonify({'error': 'Missing token or repoUrl'}), 400
issues = fetch_issues(token, repo_url)
return jsonify({'issues': issues})
@app.route('/resolve_issue', methods=['POST'])
def resolve_issue_route():
data = request.get_json()
token = data.get('token')
repo_url = data.get('repoUrl')
issue_number = data.get('issueNumber')
resolution = data.get('resolution')
forked_repo_url = data.get('forkedRepoUrl')
if not token or not repo_url or not issue_number or not resolution:
return jsonify({'error': 'Missing required fields'}), 400
result = resolve_issue(token, repo_url, issue_number, resolution, forked_repo_url)
return jsonify({'result': result})
@app.route('/extract_info', methods=['POST'])
def extract_info_route():
data = request.get_json()
url = data.get('url')
if not url:
return jsonify({'error': 'Missing URL'}), 400
info = extract_info(url)
return jsonify(info)
def run_flask_app():
app.run(debug=True, use_reloader=False)
def create_gradio_interface():
with gr.Blocks(css=None, theme=None) as demo:
gr.HTML(custom_html)
return demo
# Cleanup function
def cleanup():
try:
temp_dirs = [REPOS_DIRECTORY]
for dir_name in temp_dirs:
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
logging.shutdown()
except Exception as e:
print(f"Error during cleanup: {str(e)}")
# Signal handler
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}")
cleanup()
sys.exit(0)
if __name__ == "__main__":
# Register cleanup handlers
atexit.register(cleanup)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Run Flask app in a separate thread
flask_thread = Thread(target=run_flask_app)
flask_thread.start()
# Create Gradio interface
demo = create_gradio_interface()
demo.launch()