GitBot / app.py
acecalisto3's picture
Update app.py
b8d31e7 verified
raw
history blame
6.66 kB
import gradio as gr
import os
import requests
import uuid
from pathlib import Path
from typing import Optional, Union, List, Tuple, Dict, Any
from pypdf import PdfReader
from bs4 import BeautifulSoup
import zipfile
import nltk
import logging
import tempfile
import shutil
nltk.download('punkt')
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# Utility to log messages
def log(message: str):
logger.info(message)
# File and Web Processing Utilities
def chunk_text(text: str, max_chunk_size: int) -> List[str]:
"""Breaks large text into manageable chunks."""
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
sentences = tokenizer.tokenize(text)
chunks = []
current_chunk = ""
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > max_chunk_size:
chunks.append(current_chunk.strip())
current_chunk = ""
current_chunk += sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
def read_pdf(file_path: str) -> str:
"""Reads text content from a PDF file."""
try:
reader = PdfReader(file_path)
return "\n".join(page.extract_text() for page in reader.pages)
except Exception as e:
logger.error(f"Error reading PDF: {e}")
return ""
def read_txt(file_path: str) -> str:
"""Reads content from a TXT file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
except Exception as e:
logger.error(f"Error reading TXT file: {e}")
return ""
def read_zip(zip_path: str) -> str:
"""Extracts and processes text and PDF files within a ZIP archive."""
extracted_data = []
try:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for file_info in zip_ref.infolist():
if file_info.filename.endswith((".txt", ".pdf")):
with zip_ref.open(file_info) as file:
try:
if file_info.filename.endswith(".txt"):
extracted_data.append(file.read().decode("utf-8"))
elif file_info.filename.endswith(".pdf"):
temp_path = f"/tmp/{uuid.uuid4()}"
with open(temp_path, "wb") as temp_file:
temp_file.write(file.read())
extracted_data.append(read_pdf(temp_path))
os.remove(temp_path)
except Exception as e:
logger.error(f"Error processing file in ZIP: {e}")
return "\n".join(extracted_data)
except Exception as e:
logger.error(f"Error extracting ZIP: {e}")
return ""
def fetch_url(url: str, max_depth: int) -> str:
"""Fetches and scrapes text content from a webpage."""
visited = set()
to_visit = [(url, 0)]
results = []
while to_visit:
current_url, depth = to_visit.pop(0)
if current_url in visited:
continue
visited.add(current_url)
if depth < max_depth:
try:
response = requests.get(current_url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'lxml')
results.append(soup.text)
for link in soup.find_all("a", href=True):
absolute_url = requests.compat.urljoin(current_url, link.get('href'))
if absolute_url.startswith("http") and absolute_url not in visited:
to_visit.append((absolute_url, depth + 1))
except Exception as e:
logger.error(f"Error fetching URL {current_url}: {e}")
return "\n".join(results)
# Main Workflow Processing
def process_workflow(command: str, issue_details: str, files: List[Path], url: str, token: str, max_depth: int) -> Dict[str, Any]:
"""Processes user input and performs selected command."""
datasets = []
errors = []
try:
# Add issue details to dataset
if issue_details:
datasets.append(issue_details)
# Process uploaded files
if files:
for file in files:
if file.name.endswith(".pdf"):
datasets.append(read_pdf(file.name))
elif file.name.endswith(".txt"):
datasets.append(read_txt(file.name))
elif file.name.endswith(".zip"):
datasets.append(read_zip(file.name))
# Fetch URL content
if url:
datasets.append(fetch_url(url, max_depth=max_depth))
# Execute commands
if command == "Analyze Issue":
analysis = chunk_text("\n".join(datasets), 8192)
return {"analysis": analysis}
elif command == "Propose Resolution":
resolution = f"Proposed resolution based on:\n\n{'\n'.join(datasets)}"
return {"resolution": resolution}
elif command == "Generate PR":
combined_data = "\n".join(datasets)
return {"pr_content": combined_data, "message": "Pull request content generated."}
else:
return {"error": "Invalid command"}
except Exception as e:
errors.append(str(e))
return {"error": "\n".join(errors)}
# Gradio Interface
with gr.Blocks() as demo:
gr.Markdown("# GitHub Issue Resolver - Advanced Edition")
gr.Markdown("Analyze issues, propose resolutions, and generate PRs.")
# Input Fields
with gr.Row():
command = gr.Dropdown(["Analyze Issue", "Propose Resolution", "Generate PR"], label="Command")
issue_details = gr.Textbox(label="Issue Details", lines=4, placeholder="Describe the issue or paste details.")
files = gr.Files(label="Upload Files", file_types=[".pdf", ".txt", ".zip"])
url = gr.Textbox(label="Documentation URL", placeholder="Enter related documentation URL.")
token = gr.Textbox(label="GitHub Token", type="password", placeholder="Enter your GitHub token securely.")
max_depth = gr.Slider(label="Web Crawl Depth", minimum=1, maximum=10, value=3, step=1)
# Outputs
result_output = gr.JSON(label="Results")
process_button = gr.Button("Process")
# Button Logic
process_button.click(
process_workflow,
inputs=[command, issue_details, files, url, token, max_depth],
outputs=[result_output]
)
# Launch Application
demo.launch()