Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import requests | |
import uuid | |
from pathlib import Path | |
from typing import Optional, Union, List, Tuple, Dict, Any | |
from pypdf import PdfReader | |
from bs4 import BeautifulSoup | |
import zipfile | |
import nltk | |
import logging | |
import tempfile | |
import shutil | |
nltk.download('punkt') | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
# Utility to log messages | |
def log(message: str): | |
logger.info(message) | |
# File and Web Processing Utilities | |
def chunk_text(text: str, max_chunk_size: int) -> List[str]: | |
"""Breaks large text into manageable chunks.""" | |
tokenizer = nltk.data.load('tokenizers/punkt/english.pickle') | |
sentences = tokenizer.tokenize(text) | |
chunks = [] | |
current_chunk = "" | |
for sentence in sentences: | |
if len(current_chunk) + len(sentence) + 1 > max_chunk_size: | |
chunks.append(current_chunk.strip()) | |
current_chunk = "" | |
current_chunk += sentence + " " | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
def read_pdf(file_path: str) -> str: | |
"""Reads text content from a PDF file.""" | |
try: | |
reader = PdfReader(file_path) | |
return "\n".join(page.extract_text() for page in reader.pages) | |
except Exception as e: | |
logger.error(f"Error reading PDF: {e}") | |
return "" | |
def read_txt(file_path: str) -> str: | |
"""Reads content from a TXT file.""" | |
try: | |
with open(file_path, "r", encoding="utf-8") as f: | |
return f.read() | |
except Exception as e: | |
logger.error(f"Error reading TXT file: {e}") | |
return "" | |
def read_zip(zip_path: str) -> str: | |
"""Extracts and processes text and PDF files within a ZIP archive.""" | |
extracted_data = [] | |
try: | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
for file_info in zip_ref.infolist(): | |
if file_info.filename.endswith((".txt", ".pdf")): | |
with zip_ref.open(file_info) as file: | |
try: | |
if file_info.filename.endswith(".txt"): | |
extracted_data.append(file.read().decode("utf-8")) | |
elif file_info.filename.endswith(".pdf"): | |
temp_path = f"/tmp/{uuid.uuid4()}" | |
with open(temp_path, "wb") as temp_file: | |
temp_file.write(file.read()) | |
extracted_data.append(read_pdf(temp_path)) | |
os.remove(temp_path) | |
except Exception as e: | |
logger.error(f"Error processing file in ZIP: {e}") | |
return "\n".join(extracted_data) | |
except Exception as e: | |
logger.error(f"Error extracting ZIP: {e}") | |
return "" | |
def fetch_url(url: str, max_depth: int) -> str: | |
"""Fetches and scrapes text content from a webpage.""" | |
visited = set() | |
to_visit = [(url, 0)] | |
results = [] | |
while to_visit: | |
current_url, depth = to_visit.pop(0) | |
if current_url in visited: | |
continue | |
visited.add(current_url) | |
if depth < max_depth: | |
try: | |
response = requests.get(current_url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'lxml') | |
results.append(soup.text) | |
for link in soup.find_all("a", href=True): | |
absolute_url = requests.compat.urljoin(current_url, link.get('href')) | |
if absolute_url.startswith("http") and absolute_url not in visited: | |
to_visit.append((absolute_url, depth + 1)) | |
except Exception as e: | |
logger.error(f"Error fetching URL {current_url}: {e}") | |
return "\n".join(results) | |
# Main Workflow Processing | |
def process_workflow(command: str, issue_details: str, files: List[Path], url: str, token: str, max_depth: int) -> Dict[str, Any]: | |
"""Processes user input and performs selected command.""" | |
datasets = [] | |
errors = [] | |
try: | |
# Add issue details to dataset | |
if issue_details: | |
datasets.append(issue_details) | |
# Process uploaded files | |
if files: | |
for file in files: | |
if file.name.endswith(".pdf"): | |
datasets.append(read_pdf(file.name)) | |
elif file.name.endswith(".txt"): | |
datasets.append(read_txt(file.name)) | |
elif file.name.endswith(".zip"): | |
datasets.append(read_zip(file.name)) | |
# Fetch URL content | |
if url: | |
datasets.append(fetch_url(url, max_depth=max_depth)) | |
# Execute commands | |
if command == "Analyze Issue": | |
analysis = chunk_text("\n".join(datasets), 8192) | |
return {"analysis": analysis} | |
elif command == "Propose Resolution": | |
resolution = f"Proposed resolution based on:\n\n{'\n'.join(datasets)}" | |
return {"resolution": resolution} | |
elif command == "Generate PR": | |
combined_data = "\n".join(datasets) | |
return {"pr_content": combined_data, "message": "Pull request content generated."} | |
else: | |
return {"error": "Invalid command"} | |
except Exception as e: | |
errors.append(str(e)) | |
return {"error": "\n".join(errors)} | |
# Gradio Interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# GitHub Issue Resolver - Advanced Edition") | |
gr.Markdown("Analyze issues, propose resolutions, and generate PRs.") | |
# Input Fields | |
with gr.Row(): | |
command = gr.Dropdown(["Analyze Issue", "Propose Resolution", "Generate PR"], label="Command") | |
issue_details = gr.Textbox(label="Issue Details", lines=4, placeholder="Describe the issue or paste details.") | |
files = gr.Files(label="Upload Files", file_types=[".pdf", ".txt", ".zip"]) | |
url = gr.Textbox(label="Documentation URL", placeholder="Enter related documentation URL.") | |
token = gr.Textbox(label="GitHub Token", type="password", placeholder="Enter your GitHub token securely.") | |
max_depth = gr.Slider(label="Web Crawl Depth", minimum=1, maximum=10, value=3, step=1) | |
# Outputs | |
result_output = gr.JSON(label="Results") | |
process_button = gr.Button("Process") | |
# Button Logic | |
process_button.click( | |
process_workflow, | |
inputs=[command, issue_details, files, url, token, max_depth], | |
outputs=[result_output] | |
) | |
# Launch Application | |
demo.launch() | |