GithubAnalyzerr / app.py
nihalaninihal's picture
Update app.py
93abbb3 verified
raw
history blame
115 kB
import requests
import json
import os
import base64
import re
import ast
import networkx as nx
# Make sure radon is installed: pip install radon
try:
import radon.metrics as metrics
import radon.complexity as complexity
except ImportError:
print("Warning: Radon library not found. Code complexity analysis will be limited.")
# Provide dummy functions if radon is not available
class DummyRadon:
def cc_visit(self, *args, **kwargs): return 0
def cc_visit_ast(self, *args, **kwargs): return 0
def mi_visit(self, *args, **kwargs): return None
metrics = DummyRadon()
complexity = DummyRadon()
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
# Ensure IPython is available or handle its absence
try:
from IPython.display import display, Markdown, HTML
IPYTHON_AVAILABLE = True
except ImportError:
IPYTHON_AVAILABLE = False
# Define dummy display functions if not in IPython
def display(*args, **kwargs): print(*args)
def Markdown(text): print(f"--- Markdown ---\n{text}\n---------------")
def HTML(text): print(f"----- HTML -----\n{text}\n--------------")
import numpy as np
# Ensure PyGithub is installed: pip install PyGithub
try:
from github import Github, GithubException
except ImportError:
print("Warning: PyGithub library not found. Some features might be limited.")
Github = None # Set to None if not available
GithubException = Exception # Use base Exception
import time
# Ensure python-dotenv is installed: pip install python-dotenv
try:
from dotenv import load_dotenv
except ImportError:
print("Warning: python-dotenv not found. .env file will not be loaded.")
def load_dotenv(): pass # Dummy function
# Import Neo4j and Gemini libraries
# Ensure neo4j is installed: pip install neo4j
try:
from neo4j import GraphDatabase, basic_auth
except ImportError:
print("Warning: Neo4j library not found. Graph features will be disabled.")
GraphDatabase = None # Set to None
basic_auth = None
# Ensure google-generativeai is installed: pip install google-generativeai
try:
import google.generativeai as genai
except ImportError:
print("Warning: google-generativeai library not found. Gemini features will be disabled.")
genai = None # Set to None
# Import Vizro and Gradio
# Ensure vizro, vizro-plotly, plotly, gradio are installed
# pip install vizro vizro-plotly plotly gradio pandas networkx matplotlib numpy
try:
import vizro.plotly.express as px
import vizro
import vizro.models as vzm
import plotly.graph_objects as go
except ImportError:
print("Critical Error: Vizro or Plotly libraries not found. Dashboard generation will fail.")
# Define dummy classes/functions to avoid NameErrors later, though functionality will be broken
class DummyVzm:
Card = lambda **kwargs: None
Graph = lambda **kwargs: None
Page = lambda **kwargs: None
Dashboard = lambda **kwargs: type('obj', (object,), {'save': lambda self, path: print(f"Vizro not installed, cannot save to {path}")})()
vzm = DummyVzm()
px = None
go = None
vizro = None
try:
import gradio as gr
except ImportError:
print("Critical Error: Gradio library not found. Cannot launch the UI.")
gr = None # Set to None
# --- GitHubRepoInfo Class (Keep as provided, ensuring dependencies like PyGithub are handled) ---
class GitHubRepoInfo:
"""Enhanced class to get comprehensive information about a GitHub repository."""
def __init__(self, token=None):
"""Initialize with optional GitHub API token."""
self.base_url = "https://api.github.com"
self.headers = {"Accept": "application/vnd.github.v3+json"}
self.token = token
self.github = None # Initialize github attribute
# Set up authentication
if token:
self.headers["Authorization"] = f"token {token}"
if Github: # Check if PyGithub was imported
try:
self.github = Github(token)
self.github.get_user().login # Test connection
except Exception as e:
print(f"Warning: Failed to initialize PyGithub with token: {e}")
self.github = Github() # Fallback to unauthenticated
else:
print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.")
self.github = None # Explicitly set to None
elif os.environ.get("GITHUB_TOKEN"):
self.token = os.environ.get("GITHUB_TOKEN")
self.headers["Authorization"] = f"token {self.token}"
if Github:
try:
self.github = Github(self.token)
self.github.get_user().login # Test connection
except Exception as e:
print(f"Warning: Failed to initialize PyGithub with token: {e}")
self.github = Github() # Fallback to unauthenticated
else:
print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.")
self.github = None
else:
if Github:
self.github = Github() # Unauthenticated
else:
print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.")
self.github = None
# Configure rate limit handling
self.rate_limit_remaining = 5000 # Assume higher limit if authenticated
self.rate_limit_reset = datetime.now()
# Initialize rate limit info if possible
if self.github:
try:
rate_limit = self.github.get_rate_limit()
self.rate_limit_remaining = rate_limit.core.remaining
self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset)
except Exception as e:
# Don't print warning if self.github is None
if self.github is not None:
print(f"Warning: Could not get initial rate limit from PyGithub: {e}")
# Check rate limit via REST if PyGithub failed or wasn't used
elif self.token:
try:
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers)
if response.status_code == 200:
rate_data = response.json()
self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"]
self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"])
else:
print(f"Warning: Could not get initial rate limit via REST: Status {response.status_code}")
except Exception as e:
print(f"Warning: Could not get initial rate limit via REST: {e}")
def _check_rate_limit(self):
"""Check API rate limit and wait if necessary."""
# Update rate limit info before checking
try:
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers)
if response.status_code == 200:
rate_data = response.json()
# Ensure keys exist before accessing
core_limits = rate_data.get("resources", {}).get("core", {})
self.rate_limit_remaining = core_limits.get("remaining", self.rate_limit_remaining) # Use old value if missing
reset_timestamp = core_limits.get("reset")
if reset_timestamp:
self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp)
# No else needed, just use previous values if update fails
except Exception as e:
print(f"Warning: Failed to update rate limit info: {e}")
# Proceed with potentially outdated values
if self.rate_limit_remaining <= 10:
reset_time = self.rate_limit_reset
# Use timezone-naive comparison
current_time = datetime.now()
if reset_time > current_time:
wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer
if wait_time > 0: # Only wait if reset time is in the future
print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.")
time.sleep(wait_time)
# Re-fetch rate limit after waiting
self._check_rate_limit()
def _paginated_get(self, url, params=None, max_items=None):
"""Handle paginated API responses with rate limit awareness."""
if params is None:
params = {}
items = []
page = 1
# Use a smaller default per_page to be safer with rate limits if unauthenticated
default_per_page = 100 if self.token else 30
per_page = min(100, params.get("per_page", default_per_page))
params["per_page"] = per_page
while True:
self._check_rate_limit() # Check before each request
params["page"] = page
try:
response = requests.get(url, headers=self.headers, params=params, timeout=20) # Add timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
page_items = response.json()
if not page_items: # No more items
break
items.extend(page_items)
page += 1
# Check if we've reached the requested limit
if max_items and len(items) >= max_items:
return items[:max_items]
# Check if we've reached the end (GitHub returns fewer items than requested)
if len(page_items) < per_page:
break
except requests.exceptions.RequestException as e:
print(f"Error during paginated request to {url} (page {page}): {e}")
# Decide whether to break or retry (here we break)
break
except json.JSONDecodeError as e:
print(f"Error decoding JSON response from {url} (page {page}): {e}")
break
return items
def get_repo_info(self, owner, repo):
"""Get basic repository information."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}"
try:
response = requests.get(url, headers=self.headers, timeout=15)
response.raise_for_status() # Check for 4xx/5xx errors
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error getting repository info for {owner}/{repo}: {e}")
return None # Return None on failure
# ... (other GitHubRepoInfo methods - assume they return sensible defaults like [] or {} on failure) ...
# --- Add safe defaults to methods that might return None unexpectedly ---
def get_languages(self, owner, repo):
"""Get languages used in the repository."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/languages"
try:
response = requests.get(url, headers=self.headers, timeout=15)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error getting languages for {owner}/{repo}: {e}")
return {} # Return empty dict on failure
def get_contributors(self, owner, repo, max_contributors=None):
"""Get repository contributors with pagination support."""
url = f"{self.base_url}/repos/{owner}/{repo}/contributors"
# _paginated_get should already handle errors and return a list
return self._paginated_get(url, max_items=max_contributors) or [] # Ensure list return
def get_commits(self, owner, repo, params=None, max_commits=None):
"""Get commits with enhanced filtering and pagination."""
url = f"{self.base_url}/repos/{owner}/{repo}/commits"
return self._paginated_get(url, params=params, max_items=max_commits) or [] # Ensure list return
def _get_stats_with_retry(self, url):
"""Helper for stats endpoints that might return 202."""
retries = 3
delay = 5 # Initial delay in seconds
for i in range(retries):
self._check_rate_limit()
try:
response = requests.get(url, headers=self.headers, timeout=30) # Longer timeout for stats
if response.status_code == 200:
return response.json()
elif response.status_code == 202 and i < retries - 1:
print(f"GitHub is computing statistics for {url.split('/stats/')[1]}, waiting {delay}s and retrying ({i+1}/{retries})...")
time.sleep(delay)
delay *= 2 # Exponential backoff
continue
elif response.status_code == 204: # No content, valid response but empty data
print(f"No content (204) returned for {url.split('/stats/')[1]}. Returning empty list.")
return []
else:
print(f"Error getting stats from {url}: Status {response.status_code}, Body: {response.text[:200]}")
return [] # Return empty list on other errors
except requests.exceptions.RequestException as e:
print(f"Request error getting stats from {url}: {e}")
return [] # Return empty list on request error
print(f"Failed to get stats from {url} after {retries} retries.")
return [] # Return empty list after all retries fail
def get_commit_activity(self, owner, repo):
"""Get commit activity stats for the past year."""
url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity"
return self._get_stats_with_retry(url)
def get_code_frequency(self, owner, repo):
"""Get weekly code addition and deletion statistics."""
url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency"
return self._get_stats_with_retry(url)
def get_contributor_activity(self, owner, repo):
"""Get contributor commit activity over time."""
url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors"
return self._get_stats_with_retry(url)
def get_branches(self, owner, repo):
"""Get repository branches."""
url = f"{self.base_url}/repos/{owner}/{repo}/branches"
return self._paginated_get(url) or []
def get_releases(self, owner, repo, max_releases=None):
"""Get repository releases with pagination support."""
url = f"{self.base_url}/repos/{owner}/{repo}/releases"
return self._paginated_get(url, max_items=max_releases) or []
def get_issues(self, owner, repo, state="all", max_issues=None, params=None):
"""Get repository issues with enhanced filtering."""
url = f"{self.base_url}/repos/{owner}/{repo}/issues"
if params is None:
params = {}
params["state"] = state
return self._paginated_get(url, params=params, max_items=max_issues) or []
def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None):
"""Get repository pull requests with enhanced filtering."""
url = f"{self.base_url}/repos/{owner}/{repo}/pulls"
if params is None:
params = {}
params["state"] = state
return self._paginated_get(url, params=params, max_items=max_prs) or []
def get_contents(self, owner, repo, path="", ref=None):
"""Get repository contents at the specified path."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}"
params = {}
if ref:
params["ref"] = ref
try:
response = requests.get(url, headers=self.headers, params=params, timeout=15)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
# Handle 404 specifically for contents
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404:
print(f"Contents not found at path '{path}' in {owner}/{repo}.")
else:
print(f"Error getting contents for {owner}/{repo} at path '{path}': {e}")
return [] # Return empty list on failure
def get_readme(self, owner, repo, ref=None):
"""Get repository README file."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/readme"
params = {}
if ref:
params["ref"] = ref
try:
response = requests.get(url, headers=self.headers, params=params, timeout=15)
response.raise_for_status()
data = response.json()
if data.get("content"):
try:
content = base64.b64decode(data["content"]).decode("utf-8")
return {
"name": data.get("name", "README"),
"path": data.get("path", "README.md"),
"content": content
}
except (UnicodeDecodeError, base64.binascii.Error) as decode_error:
print(f"Error decoding README content: {decode_error}")
return None # Cannot decode
return None # No content key
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404:
print(f"README not found for {owner}/{repo}.")
else:
print(f"Error getting README for {owner}/{repo}: {e}")
return None
def get_file_content(self, owner, repo, path, ref=None):
"""Get the content of a specific file in the repository."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}"
params = {}
if ref:
params["ref"] = ref
try:
response = requests.get(url, headers=self.headers, params=params, timeout=15)
response.raise_for_status()
data = response.json()
if data.get("type") == "file" and data.get("content"):
try:
content = base64.b64decode(data["content"]).decode("utf-8")
return content
except (UnicodeDecodeError, base64.binascii.Error):
# Don't print error here, return indicator
return "[Binary file content not displayed]"
elif data.get("type") != "file":
print(f"Path '{path}' is not a file.")
return None
else:
# File exists but no content? Unlikely but handle.
return "" # Return empty string for empty file
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404:
print(f"File not found at path '{path}' in {owner}/{repo}.")
else:
print(f"Error getting file content for {owner}/{repo}, path '{path}': {e}")
return None
# --- Methods like is_text_file, analyze_ast, analyze_js_ts are generally okay ---
# ... (keep them as they are) ...
# --- Ensure get_all_text_files handles errors from get_contents/get_file_content ---
def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None, _current_count=0):
"""Get content of all text files in the repository (with limit)."""
if _current_count >= max_files:
return [], _current_count
# Get contents for the current path
contents = self.get_contents(owner, repo, path, ref) # Returns [] on error
text_files = []
file_count = _current_count
if not isinstance(contents, list):
print(f"Warning: get_contents did not return a list for path '{path}'. Skipping.")
return [], file_count
# Process current directory
for item in contents:
if file_count >= max_files:
break
# Ensure item is a dictionary and has 'type' and 'name'
if not isinstance(item, dict) or 'type' not in item or 'name' not in item:
print(f"Warning: Skipping malformed item in contents: {item}")
continue
item_path = item.get("path") # Get path safely
if not item_path:
print(f"Warning: Skipping item with missing path: {item}")
continue
if item["type"] == "file" and self.is_text_file(item["name"]):
content = self.get_file_content(owner, repo, item_path, ref)
# Check if content is valid text (not None or binary indicator)
if content and content != "[Binary file content not displayed]":
text_files.append({
"name": item["name"],
"path": item_path,
"content": content
})
file_count += 1
elif item["type"] == "dir":
# Recursively get text files from subdirectories
if file_count < max_files:
try:
subdir_files, file_count = self.get_all_text_files(
owner, repo, item_path, max_files, ref, file_count
)
text_files.extend(subdir_files)
except Exception as e_rec:
print(f"Error processing subdirectory '{item_path}': {e_rec}")
# Continue with other items in the current directory
return text_files, file_count # Return count for recursive calls
# --- Ensure get_documentation_files handles errors ---
def get_documentation_files(self, owner, repo, ref=None):
"""Get documentation files from the repository."""
doc_paths = [
"README.md", "CONTRIBUTING.md", "CODE_OF_CONDUCT.md", "SECURITY.md",
"SUPPORT.md", # Files first
"docs", "doc", "documentation", "wiki", # Common Dirs
".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md"
]
doc_files = []
# 1. Get top-level files first
root_contents = self.get_contents(owner, repo, "", ref)
if isinstance(root_contents, list):
for item in root_contents:
if isinstance(item, dict) and item.get("type") == "file" and item.get("name") in doc_paths:
path = item.get("path")
if path:
content = self.get_file_content(owner, repo, path, ref)
if content and content != "[Binary file content not displayed]":
doc_files.append({
"name": item["name"],
"path": path,
"content": content
})
# 2. Check specific doc directories
doc_dirs_to_check = ["docs", "doc", "documentation", "wiki", ".github"]
for doc_dir in doc_dirs_to_check:
try:
dir_contents = self.get_contents(owner, repo, doc_dir, ref)
if isinstance(dir_contents, list): # It's a directory
for item in dir_contents:
if isinstance(item, dict) and item.get("type") == "file":
item_name = item.get("name", "").lower()
item_path = item.get("path")
if item_path and item_name.endswith((".md", ".rst", ".txt")):
content = self.get_file_content(owner, repo, item_path, ref)
if content and content != "[Binary file content not displayed]":
doc_files.append({
"name": item["name"],
"path": item_path,
"content": content
})
except Exception as e:
print(f"Error processing documentation path '{doc_dir}': {e}")
continue # Skip this path
return doc_files
# ... (rest of GitHubRepoInfo, display methods, etc. - keep as provided but be mindful of data access in display) ...
# Add specific error handling in display methods if needed, though Gradio errors often hide underlying data issues.
def get_all_info(self, owner, repo):
"""Get comprehensive information about a repository with enhanced metrics."""
print(f"--- Fetching data for {owner}/{repo} ---")
result = {
"timestamp": datetime.now().isoformat()
}
print("Getting basic repo info...")
basic_info = self.get_repo_info(owner, repo)
if not basic_info:
print(f"CRITICAL: Could not retrieve basic repository information for {owner}/{repo}. Aborting analysis.")
return None # Cannot proceed without basic info
result["basic_info"] = basic_info
print("Getting languages...")
result["languages"] = self.get_languages(owner, repo) # Returns {} on error
print("Getting contributors...")
result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) # Returns [] on error
print("Getting recent commits...")
result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) # Returns [] on error
print("Getting branches...")
result["branches"] = self.get_branches(owner, repo) # Returns [] on error
print("Getting releases...")
result["releases"] = self.get_releases(owner, repo, max_releases=10) # Returns [] on error
print("Getting open issues...")
result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) # Returns [] on error
print("Getting open pull requests...")
result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) # Returns [] on error
print("Getting root contents...")
result["root_contents"] = self.get_contents(owner, repo) # Returns [] on error
print("Analyzing repository content (README, Docs, Code Files)...")
# This relies on other methods returning sensible defaults
try:
# Call get_all_text_files outside get_repo_text_summary to pass count correctly
all_text_files_content, _ = self.get_all_text_files(owner, repo, max_files=30)
# Pass the fetched content to get_repo_text_summary to avoid redundant API calls
result["text_content"] = self.get_repo_text_summary(owner, repo, pre_fetched_files=all_text_files_content)
except Exception as e:
print(f"Error during text content analysis: {e}")
result["text_content"] = {"error": str(e)} # Store error indicator
print("Analyzing repository activity over time...")
# This relies on stats methods returning [] on error/202 timeout
try:
result["temporal_analysis"] = self.get_temporal_analysis(owner, repo)
except Exception as e:
print(f"Error during temporal analysis: {e}")
result["temporal_analysis"] = {"error": str(e)} # Store error indicator
print(f"--- Finished fetching data for {owner}/{repo} ---")
return result
# Modify get_repo_text_summary to accept pre-fetched files
def get_repo_text_summary(self, owner, repo, max_files=25, pre_fetched_files=None):
"""Extract and summarize text content from the repository with improved metrics."""
# Get README
readme = self.get_readme(owner, repo) # Returns None on error
# Get documentation
docs = self.get_documentation_files(owner, repo) # Returns [] on error
# Get key code files if not provided
if pre_fetched_files is None:
print("Fetching text files within get_repo_text_summary...")
text_files, _ = self.get_all_text_files(owner, repo, max_files=max_files) # Returns [] on error
else:
print("Using pre-fetched text files in get_repo_text_summary.")
text_files = pre_fetched_files # Use the provided list
# Analyze code files
code_summary = {}
complexity_metrics = {
'cyclomatic_complexity': [],
'maintainability_index': [],
'comment_ratios': []
}
for file in text_files:
# Basic check for file structure
if not isinstance(file, dict) or 'name' not in file or 'content' not in file or 'path' not in file:
print(f"Skipping malformed file data in text summary: {file}")
continue
ext = os.path.splitext(file["name"])[1].lower()
if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: # Add other relevant code extensions if needed
try:
file_summary = self.extract_code_summary(file["content"], file["path"])
if file_summary: # Ensure summary generation didn't fail
code_summary[file["path"]] = file_summary
# Collect complexity metrics safely
if file_summary.get('complexity'):
cc = file_summary['complexity'].get('overall')
# Ensure cc is a number before appending
if isinstance(cc, (int, float)):
complexity_metrics['cyclomatic_complexity'].append((file["path"], cc))
mi = file_summary['complexity'].get('maintainability_index')
# Ensure mi is a number before appending
if isinstance(mi, (int, float)):
complexity_metrics['maintainability_index'].append((file["path"], mi))
if file_summary.get('metrics'):
comment_ratio = file_summary['metrics'].get('comment_ratio')
# Ensure ratio is a number before appending
if isinstance(comment_ratio, (int, float)):
complexity_metrics['comment_ratios'].append((file["path"], comment_ratio))
except Exception as e_sum:
print(f"Error extracting code summary for {file.get('path', 'unknown file')}: {e_sum}")
# Analyze dependencies (can be slow, consider limiting files further if needed)
# Use the already fetched text_files for dependency analysis
dependencies = self.analyze_dependencies(owner, repo, pre_fetched_code_files=text_files)
# Summarize repository content by file type
file_types = defaultdict(int)
for file in text_files:
if isinstance(file, dict) and 'name' in file: # Check again
ext = os.path.splitext(file["name"])[1].lower()
if ext: # Avoid counting files with no extension
file_types[ext] += 1
# Calculate aggregate code metrics safely
total_code_lines = 0
total_comment_lines = 0
analyzed_code_files = 0
for path, summary in code_summary.items():
if summary and summary.get('metrics'):
analyzed_code_files += 1
total_code_lines += summary['metrics'].get('code_lines', 0) or 0
total_comment_lines += summary['metrics'].get('comment_lines', 0) or 0
aggregate_metrics = {
'total_files_analyzed': len(text_files), # All text files fetched
'code_files_summarized': analyzed_code_files, # Files where summary succeeded
'total_code_lines': total_code_lines,
'total_comment_lines': total_comment_lines,
'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0
}
return {
"readme": readme, # Can be None
"documentation": docs, # Should be list
"code_summary": code_summary, # Dict of summaries
"complexity_metrics": complexity_metrics, # Dict of lists
"dependencies": dependencies, # Dict
"file_type_counts": dict(file_types), # Dict
"aggregate_metrics": aggregate_metrics, # Dict
"text_files": text_files # List of fetched files
}
# Modify analyze_dependencies to accept pre-fetched files
def analyze_dependencies(self, owner, repo, max_files=100, pre_fetched_code_files=None):
"""Analyze code dependencies across the repository."""
if pre_fetched_code_files is None:
# Get Python and JavaScript files if not provided
print("Fetching text files within analyze_dependencies...")
text_files, _ = self.get_all_text_files(owner, repo, max_files=max_files)
# Filter for Python and JS/TS files
code_files = [f for f in text_files if isinstance(f, dict) and f.get("name", "").endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))]
else:
print("Using pre-fetched files in analyze_dependencies.")
# Assume pre_fetched_code_files are already filtered if needed, or filter here
code_files = [f for f in pre_fetched_code_files if isinstance(f, dict) and f.get("name", "").endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))]
# Track dependencies
dependencies = {
'internal': defaultdict(set), # File to file dependencies
'external': defaultdict(set), # External package dependencies by file
'modules': defaultdict(set) # Defined modules/components by file
}
# Extract module names from file paths
file_to_module = {}
for file in code_files:
# Add checks here too
if not isinstance(file, dict) or 'path' not in file or 'content' not in file: continue
# Convert file path to potential module name
module_path = os.path.splitext(file["path"])[0].replace('/', '.')
file_to_module[file["path"]] = module_path
# Track what each file defines
try:
summary = self.extract_code_summary(file["content"], file["path"])
if not summary: continue # Skip if summary failed
if file.get("name", "").endswith('.py'):
for function in summary.get("functions", []):
# Ensure function is a string before adding
if isinstance(function, str):
dependencies['modules'][file["path"]].add(f"{module_path}.{function}")
for class_name in summary.get("classes", []):
# Ensure class_name is a string before adding
if isinstance(class_name, str):
dependencies['modules'][file["path"]].add(f"{module_path}.{class_name}")
else: # JS/TS files
for export in summary.get("exports", []):
# Ensure export is a string before adding
if isinstance(export, str):
dependencies['modules'][file["path"]].add(export)
except Exception as e_dep_mod:
print(f"Error processing module definitions for {file.get('path', 'unknown file')}: {e_dep_mod}")
# Analyze imports/dependencies
for file in code_files:
if not isinstance(file, dict) or 'path' not in file or 'content' not in file: continue
try:
summary = self.extract_code_summary(file["content"], file["path"])
if not summary: continue
for imp in summary.get("imports", []):
# Ensure import is a string
if not isinstance(imp, str) or not imp: continue
# Check if this is an internal import
is_internal = False
target_dep_path = None # Store the resolved internal path
if file.get("name","").endswith('.py'):
# For Python, check if the import matches any module path
# Normalize potential relative imports starting with '.'
current_module_parts = file_to_module[file["path"]].split('.')
if imp.startswith('.'):
# Resolve relative import (basic attempt)
level = 0
while imp.startswith('.'):
level += 1
imp = imp[1:]
base_parts = current_module_parts[:-level] if level > 0 else current_module_parts[:-1] # Go up levels or stay in package
resolved_imp = '.'.join(base_parts + [imp] if imp else base_parts) # Handle 'from . import foo' vs 'from ..bar import baz'
else:
resolved_imp = imp # Absolute import
# Check against known module paths
for f_path, m_path in file_to_module.items():
# Exact match or parent package match
if resolved_imp == m_path or resolved_imp.startswith(f"{m_path}."):
target_dep_path = f_path
break
# Check if import is trying to import a specific module file directly
# e.g. import mypackage.module -> check if file path matches mypackage/module.py
potential_file_path = resolved_imp.replace('.', '/') + '.py'
if potential_file_path == f_path:
target_dep_path = f_path
break
else: # JS/TS
# For JS/TS, check relative imports or alias paths (more complex, basic check here)
if imp.startswith('./') or imp.startswith('../') or imp.startswith('@/'): # Basic checks
is_internal = True # Assume internal for now
# Basic resolution attempt
src_dir = os.path.dirname(file["path"])
target_path_base = os.path.normpath(os.path.join(src_dir, imp))
# Try adding common extensions
for ext in ['.js', '.ts', '.jsx', '.tsx', '/index.js', '/index.ts']:
test_path = f"{target_path_base}{ext}"
if test_path in file_to_module:
target_dep_path = test_path
break
# Check path without extension too (might be dir import)
if target_path_base in file_to_module:
target_dep_path = target_path_base
break
# If a target internal path was found, add the dependency
if target_dep_path:
# Ensure the target path actually exists in our list of files
if target_dep_path in file_to_module:
dependencies['internal'][file["path"]].add(target_dep_path)
is_internal = True # Confirm it was internal
# If not internal, consider it external
if not is_internal:
# Clean up the import name (remove relative path parts, take package name)
# Handle scoped packages like @angular/core -> @angular/core
# Handle imports like 'react-dom/client' -> react-dom
if '/' in imp and not imp.startswith('.') and not imp.startswith('@'):
package_base = imp.split('/')[0]
elif imp.startswith('@'):
parts = imp.split('/')
package_base = '/'.join(parts[:2]) if len(parts) >= 2 else parts[0] # Keep scope like @scope/package
else:
package_base = imp
# Add only non-empty strings
if package_base:
dependencies['external'][file["path"]].add(package_base)
except Exception as e_dep_ana:
print(f"Error processing dependencies for {file.get('path', 'unknown file')}: {e_dep_ana}")
return dependencies
# --- get_temporal_analysis: Ensure sub-methods return [] and handle potential errors ---
def get_temporal_analysis(self, owner, repo):
"""Perform temporal analysis of repository activity."""
# Get commit activity over time
commit_activity = self.get_commit_activity(owner, repo) or [] # Ensure list
# Get code frequency (additions/deletions over time)
code_frequency = self.get_code_frequency(owner, repo) or [] # Ensure list
# Get contributor activity
contributor_activity = self.get_contributor_activity(owner, repo) or [] # Ensure list
# Get issue and PR timelines (These methods already return dicts with lists/values)
# Add error handling around the calls themselves
try:
issue_timeline = self.get_issue_timeline(owner, repo)
except Exception as e:
print(f"Error getting issue timeline: {e}")
issue_timeline = {} # Default empty dict
try:
pr_timeline = self.get_pr_timeline(owner, repo)
except Exception as e:
print(f"Error getting PR timeline: {e}")
pr_timeline = {} # Default empty dict
# Process data for visualization safely
# - Weekly commit counts
weekly_commits = []
if isinstance(commit_activity, list): # Check if list
for week in commit_activity:
# Check if item is a dict with expected keys
if isinstance(week, dict) and 'week' in week and 'total' in week and 'days' in week:
try:
date = datetime.fromtimestamp(week['week'])
weekly_commits.append({
'date': date.strftime('%Y-%m-%d'),
'total': int(week['total']), # Ensure integer
'days': week['days'] # Daily breakdown within the week
})
except (TypeError, ValueError) as e:
print(f"Skipping invalid commit activity week data: {week}, Error: {e}")
else:
print(f"Skipping malformed commit activity week data: {week}")
else:
print(f"Warning: Commit activity data is not a list: {type(commit_activity)}")
# - Weekly code changes
weekly_code_changes = []
if isinstance(code_frequency, list): # Check if list
for item in code_frequency:
# Check if item is a list/tuple of 3 numbers
if isinstance(item, (list, tuple)) and len(item) == 3:
try:
date = datetime.fromtimestamp(item[0])
additions = int(item[1])
deletions = int(item[2]) # Keep positive for calculation
weekly_code_changes.append({
'date': date.strftime('%Y-%m-%d'),
'additions': additions,
'deletions': deletions # Store as positive deletions
})
except (TypeError, ValueError, IndexError) as e:
print(f"Skipping invalid code frequency data: {item}, Error: {e}")
else:
print(f"Skipping malformed code frequency data: {item}")
else:
print(f"Warning: Code frequency data is not a list: {type(code_frequency)}")
# - Contributor timeline
contributor_timeline = {}
if isinstance(contributor_activity, list): # Check if list
for contributor in contributor_activity:
# Check structure
if (isinstance(contributor, dict) and
'author' in contributor and isinstance(contributor['author'], dict) and 'login' in contributor['author'] and
'weeks' in contributor and isinstance(contributor['weeks'], list)):
author = contributor['author']['login']
weeks_data = contributor['weeks']
if author not in contributor_timeline:
contributor_timeline[author] = []
for week in weeks_data:
# Check week structure and values
if (isinstance(week, dict) and all(k in week for k in ['w', 'c', 'a', 'd']) and
isinstance(week['c'], int) and week['c'] >= 0): # Check commit count is valid non-negative int
if week['c'] > 0: # Only include weeks with commits
try:
date = datetime.fromtimestamp(week['w'])
contributor_timeline[author].append({
'date': date.strftime('%Y-%m-%d'),
'commits': int(week['c']),
'additions': int(week['a']),
'deletions': int(week['d'])
})
except (TypeError, ValueError) as e:
print(f"Skipping invalid contributor week data for {author}: {week}, Error: {e}")
# No else needed, just skip malformed week data silently or add print if desired
else:
print(f"Skipping malformed contributor activity data: {contributor}")
else:
print(f"Warning: Contributor activity data is not a list: {type(contributor_activity)}")
# Ensure issue/pr timelines are dicts before returning
issue_timeline = issue_timeline if isinstance(issue_timeline, dict) else {}
pr_timeline = pr_timeline if isinstance(pr_timeline, dict) else {}
return {
'weekly_commits': weekly_commits, # List
'weekly_code_changes': weekly_code_changes, # List
'contributor_timeline': contributor_timeline, # Dict
'issue_timeline': issue_timeline, # Dict
'pr_timeline': pr_timeline # Dict
}
# --- Pull Request Details (Ensure PyGithub is checked) ---
def get_pull_request_details(self, owner, repo, pr_number):
"""Get detailed information for a specific Pull Request using PyGithub."""
if not self.github: # Check if PyGithub client was initialized
print("PyGithub client not initialized or installed. Cannot fetch PR details.")
# Fallback maybe? Try direct REST call if needed
# For now, return None
return None
try:
# Ensure owner/repo are strings and pr_number is int
if not isinstance(owner, str) or not isinstance(repo, str):
raise ValueError("Owner and repo must be strings.")
pr_number = int(pr_number)
repo_obj = self.github.get_repo(f"{owner}/{repo}")
pr = repo_obj.get_pull(pr_number)
# Extract relevant information into a dictionary safely
details = {
"number": pr.number,
"title": pr.title or "N/A",
"state": pr.state or "N/A", # 'open', 'closed'
"merged": pr.merged or False,
"body": pr.body or "", # Ensure body is string
"url": pr.html_url or "N/A",
"created_at": pr.created_at.isoformat() if pr.created_at else None,
"updated_at": pr.updated_at.isoformat() if pr.updated_at else None,
"closed_at": pr.closed_at.isoformat() if pr.closed_at else None,
"merged_at": pr.merged_at.isoformat() if pr.merged_at else None,
"author": pr.user.login if pr.user else "N/A",
"commits_count": pr.commits if pr.commits is not None else 0,
"additions": pr.additions if pr.additions is not None else 0,
"deletions": pr.deletions if pr.deletions is not None else 0,
"changed_files_count": pr.changed_files if pr.changed_files is not None else 0,
"labels": [label.name for label in pr.labels] if pr.labels else [],
"assignees": [assignee.login for assignee in pr.assignees] if pr.assignees else [],
"milestone": pr.milestone.title if pr.milestone else None,
"repo_full_name": f"{owner}/{repo}", # Add repo context
# Add more fields if needed (e.g., comments, reviews)
}
return details
except GithubException as e:
if e.status == 404:
print(f"Error: Pull Request #{pr_number} not found in {owner}/{repo}.")
elif e.status == 401:
print(f"Error: Unauthorized (401). Check your GitHub token permissions for {owner}/{repo}.")
elif e.status == 403:
print(f"Error: Forbidden (403). Check token permissions or rate limits for {owner}/{repo}.")
else:
print(f"GitHub API Error fetching PR #{pr_number} details: Status={e.status}, Data={e.data}")
return None
except ValueError as e: # Catch potential int conversion error
print(f"Error: Invalid PR number '{pr_number}'. Must be an integer. {e}")
return None
except Exception as e: # Catch any other unexpected errors
print(f"An unexpected error occurred fetching PR details for #{pr_number}: {e}")
return None
# --- Colab Helpers (Keep as provided) ---
try:
from google.colab import files
IN_COLAB = True
except ImportError:
IN_COLAB = False
# ...(keep download_file and save_json_to_colab functions)...
# Use the provided robust JSON helpers
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
elif isinstance(obj, (datetime, np.datetime64)):
# Handle both standard datetime and numpy datetime64
if isinstance(obj, np.datetime64):
# Convert numpy datetime64 to standard datetime
ts = pd.to_datetime(obj)
# Ensure it's offset-naive or convert to UTC for ISO format
if ts.tzinfo is not None:
ts = ts.tz_convert(None) # Make naive if aware
return ts.isoformat()
# Ensure standard datetime is offset-naive or convert to UTC
if obj.tzinfo is not None:
obj = obj.astimezone(timezone.utc).replace(tzinfo=None) # Convert to UTC and make naive
return obj.isoformat()
elif isinstance(obj, (np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.float64, np.float32)):
return float(obj)
elif isinstance(obj, np.bool_): # Handle numpy bool
return bool(obj)
elif isinstance(obj, np.ndarray): # Handle numpy arrays
return obj.tolist() # Convert to list
# Be careful with complex objects, might expose too much or fail
# Let the base class default method raise the TypeError for others
try:
return super(CustomJSONEncoder, self).default(obj)
except TypeError:
return str(obj) # Fallback to string representation for unknown types
def convert_sets_to_lists(obj):
# Recursive function to convert sets and handle numpy types
if isinstance(obj, dict):
return {k: convert_sets_to_lists(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_sets_to_lists(i) for i in obj]
elif isinstance(obj, set):
# Convert set elements as well
return [convert_sets_to_lists(i) for i in sorted(list(obj))] # Sort for consistent output
elif isinstance(obj, tuple):
return tuple(convert_sets_to_lists(i) for i in obj)
# Handle numpy types specifically
elif isinstance(obj, (np.int64, np.int32, np.int_)):
return int(obj)
elif isinstance(obj, (np.float64, np.float32, np.float_)):
return float(obj)
elif isinstance(obj, np.datetime64):
# Consistent conversion to ISO format string (naive UTC)
ts = pd.to_datetime(obj).to_pydatetime() # Convert to standard datetime
if ts.tzinfo is not None:
ts = ts.astimezone(timezone.utc).replace(tzinfo=None)
return ts.isoformat() + "Z" # Add Z for UTC indication
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.ndarray):
return convert_sets_to_lists(obj.tolist()) # Convert numpy arrays to lists
elif isinstance(obj, pd.Timestamp): # Handle Pandas Timestamp
ts = obj.to_pydatetime()
if ts.tzinfo is not None:
ts = ts.astimezone(timezone.utc).replace(tzinfo=None)
return ts.isoformat() + "Z"
elif isinstance(obj, datetime): # Handle standard datetime
if obj.tzinfo is not None:
obj = obj.astimezone(timezone.utc).replace(tzinfo=None)
return obj.isoformat() + "Z"
else:
# Attempt to handle other non-serializable types gracefully
# Test if the specific object is serializable by default
try:
json.dumps(obj) # Test serialization
return obj # Return as is if serializable
except TypeError:
# If not serializable by default, convert to string as a fallback
print(f"Warning: Converting non-serializable type {type(obj)} to string.")
return str(obj)
def save_json_to_colab(data, filename='/content/repo_info.json'):
"""Save JSON data to a file in Colab and provide download option."""
# Deep conversion to handle nested structures and numpy types
try:
print("Converting data for JSON serialization...")
converted_data = convert_sets_to_lists(data)
print("Conversion complete. Saving JSON...")
except Exception as e:
print(f"Error during data conversion for JSON: {e}")
print("Attempting to save raw data (might fail)...")
# Fallback to trying without full conversion, might still fail
converted_data = data
try:
with open(filename, 'w', encoding='utf-8') as f:
# Use the custom encoder for any remaining types if conversion missed something
json.dump(converted_data, f, indent=2, cls=CustomJSONEncoder, ensure_ascii=False)
print(f"Data successfully saved to {filename}")
if IN_COLAB:
try:
print("To download the JSON file in Colab, run the following cell:")
print(f"from google.colab import files")
print(f"files.download('{filename}')")
except NameError: # files might not be imported if not in Colab context truly
pass
except TypeError as e:
print(f"Error saving JSON: {e}")
print("There might be non-serializable data types remaining even after conversion attempt.")
print("Consider inspecting the data structure for problematic types.")
except Exception as e:
print(f"An unexpected error occurred during JSON saving: {e}")
# --- GraphRepoAnalyzer Class (Check initializations and data access) ---
class GraphRepoAnalyzer:
"""Integrates GitHub analysis with Neo4j and Gemini."""
def __init__(self, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None):
"""Initialize with credentials."""
load_dotenv() # Load .env file if it exists
self.github_token = github_token or os.getenv("GITHUB_TOKEN")
self.neo4j_uri = neo4j_uri or os.getenv("NEO4J_URI")
self.neo4j_user = neo4j_user or os.getenv("NEO4J_USERNAME")
self.neo4j_password = neo4j_password or os.getenv("NEO4J_PASSWORD")
self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY")
# Initialize github_analyzer using the potentially updated GitHubRepoInfo
# Pass the token directly
print("Initializing GitHubRepoInfo...")
self.github_analyzer = GitHubRepoInfo(token=self.github_token)
print("GitHubRepoInfo initialized.")
self.neo4j_driver = None
# Check if Neo4j library was imported
if GraphDatabase and basic_auth and all([self.neo4j_uri, self.neo4j_user, self.neo4j_password]):
try:
print(f"Attempting to connect to Neo4j at {self.neo4j_uri}...")
# Use basic_auth for Neo4j driver authentication
self.neo4j_driver = GraphDatabase.driver(self.neo4j_uri, auth=basic_auth(self.neo4j_user, self.neo4j_password))
self.neo4j_driver.verify_connectivity()
print("Successfully connected to Neo4j.")
self._create_neo4j_constraints()
except Exception as e:
print(f"Error connecting to Neo4j: {e}")
print("Graph features will be disabled.")
self.neo4j_driver = None
else:
if not (GraphDatabase and basic_auth):
print("Neo4j library not installed. Graph features disabled.")
else:
print("Warning: Neo4j credentials not fully provided or library missing. Graph features will be disabled.")
self.gemini_model = None
# Check if Gemini library was imported
if genai and self.gemini_api_key:
try:
print("Configuring Google Generative AI...")
genai.configure(api_key=self.gemini_api_key)
# Use a known stable model, check Gemini docs for latest recommended models
# 'gemini-1.5-flash-latest' is often a good balance
# model_name = 'gemini-1.5-flash-latest'
# Let's stick to the user's specified model if possible, fallback otherwise
model_name = 'gemini-1.5-pro-latest' # User's original choice in one definition
# Check if the model exists (basic check)
# available_models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods]
# if model_name not in available_models:
# print(f"Warning: Model '{model_name}' not found or doesn't support generateContent. Trying 'gemini-1.5-flash-latest'.")
# model_name = 'gemini-1.5-flash-latest'
# if model_name not in available_models:
# print("Error: Could not find a suitable Gemini model.")
# raise ValueError("No suitable Gemini model found.")
print(f"Initializing Gemini model: {model_name}")
self.gemini_model = genai.GenerativeModel(model_name)
# Test call (optional, might consume quota)
# self.gemini_model.generate_content("Test")
print("Gemini model initialized.")
except Exception as e:
print(f"Error initializing Gemini: {e}")
self.gemini_model = None
else:
if not genai:
print("Google Generative AI library not installed. Gemini features disabled.")
else:
print("Warning: Google API Key not provided or library missing. Gemini features will be disabled.")
self.repo_data = None
self.repo_full_name = None # Store repo name for context
self.owner = None # Store owner
self.repo = None # Store repo name
# ... (rest of GraphRepoAnalyzer methods, ensure self.repo_data is checked before use) ...
# --- analyze_repo: Ensure it handles None return from get_all_info ---
def analyze_repo(self, owner, repo, display=True, save_json=False, export_text=False):
"""Fetch, analyze, display, and optionally populate graph."""
# Validate inputs
if not owner or not isinstance(owner, str):
print("Error: Repository owner must be provided as a string.")
self.repo_data = None
return # Stop processing
if not repo or not isinstance(repo, str):
print("Error: Repository name must be provided as a string.")
self.repo_data = None
return # Stop processing
self.owner = owner.strip()
self.repo = repo.strip()
self.repo_full_name = f"{self.owner}/{self.repo}"
print(f"\n--- Starting Analysis for {self.repo_full_name} ---")
# Reset previous data
self.repo_data = None
# Use the github_analyzer instance associated with this GraphRepoAnalyzer
if not self.github_analyzer:
print("Error: GitHubRepoInfo analyzer not initialized.")
return
try:
self.repo_data = self.github_analyzer.get_all_info(self.owner, self.repo)
except Exception as e:
print(f"An unexpected error occurred during get_all_info: {e}")
import traceback
traceback.print_exc() # Print stack trace for debugging
self.repo_data = None # Ensure repo_data is None on error
# Check if analysis succeeded and returned data
if self.repo_data and isinstance(self.repo_data, dict) and "basic_info" in self.repo_data:
print(f"--- Analysis Complete for {self.repo_full_name} ---")
# Proceed with display, save, export, populate etc.
if display and IPYTHON_AVAILABLE: # Only display if in IPython environment
print("\nGenerating visualizations and analysis (requires IPython environment)...")
try:
# Wrap display calls in try/except as they can fail with odd data
self.github_analyzer.display_repo_info(self.repo_data)
self.github_analyzer.display_code_files(self.repo_data) # Show code preview
except Exception as display_error:
print(f"Error during display generation: {display_error}")
elif display and not IPYTHON_AVAILABLE:
print("\nSkipping visualizations: Not in an IPython environment (like Colab or Jupyter).")
if self.neo4j_driver:
try:
# Use Gradio input later, for script execution use environment variable or fixed logic
populate_graph = os.getenv("POPULATE_NEO4J", "false").lower() == 'true'
# populate = input("\nPopulate Neo4j graph with this data? (y/n): ").lower() == 'y'
if populate_graph:
print("\nAttempting to populate Neo4j graph...")
self.populate_neo4j_graph()
else:
print("\nSkipping Neo4j population.")
except Exception as neo4j_error:
print(f"Error during Neo4j interaction prompt or population: {neo4j_error}")
if save_json:
# Use fixed path or environment variable for non-interactive saving
default_filename = f'./{self.repo}_info.json'
filename = os.getenv("JSON_OUTPUT_PATH", default_filename)
# filename = input(f"Enter filename for JSON output (default: {default_filename}): ") or default_filename
print(f"\nSaving analysis results to JSON: {filename}")
save_json_to_colab(self.repo_data, filename) # Use the enhanced save function
if export_text:
# Use fixed path or environment variable for non-interactive saving
default_dir = f'./{self.repo}_text'
output_dir = os.getenv("TEXT_EXPORT_DIR", default_dir)
# output_dir = input(f"Enter output directory for text export (default: {default_dir}): ") or default_dir
print(f"\nExporting text content to directory: {output_dir}")
self.github_analyzer.export_repo_text(self.repo_data, output_dir)
else:
# This case handles where get_all_info returned None or an invalid structure
print(f"--- Failed to get complete repository information for {self.repo_full_name} ---")
# self.repo_data is already None or invalid
# --- summarize_pull_request: Add checks ---
def summarize_pull_request(self, pr_number_str, role):
"""Fetches PR details and generates a role-based summary using Gemini."""
if not self.gemini_model:
return "Gemini model not initialized. Cannot generate summary."
if not self.owner or not self.repo:
return "Repository owner and name not set. Analyze a repository first or provide them."
if not self.github_analyzer:
return "GitHub Analyzer not initialized."
# Validate PR number
try:
pr_number = int(pr_number_str)
except (ValueError, TypeError):
return f"Invalid Pull Request number: '{pr_number_str}'. Please provide an integer."
# Validate Role
valid_roles = ["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"]
if role not in valid_roles:
return f"Invalid role: '{role}'. Please choose from: {', '.join(valid_roles)}"
print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...")
# get_pull_request_details handles its own errors and returns None on failure
pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number)
if not pr_details:
# Error message was already printed by get_pull_request_details
return f"Could not retrieve details for PR #{pr_number}. See previous error messages."
print(f"Generating summary for role: {role}...")
# Generate the role-specific prompt
try:
prompt = self._get_pr_summary_prompt(pr_details, role)
except Exception as e:
print(f"Error generating Gemini prompt: {e}")
return "Error preparing the summary request."
# Send to Gemini and Get Response
try:
# print("--- Sending Prompt to Gemini ---")
# print(prompt[:1000] + "..." if len(prompt) > 1000 else prompt) # Debug: Print truncated prompt
# print("-----------------------------")
# Use safety_settings to reduce refusals for code-related content if needed
# safety_settings = [
# {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
# {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
# {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
# {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"},
# ]
# response = self.gemini_model.generate_content(prompt, safety_settings=safety_settings)
response = self.gemini_model.generate_content(prompt)
# Check for empty or blocked response
if not response.parts:
# Check prompt feedback for blockage reason
block_reason = response.prompt_feedback.block_reason if response.prompt_feedback else "Unknown"
print(f"Warning: Gemini response was empty or blocked. Reason: {block_reason}")
return f"Summary generation failed. The request may have been blocked (Reason: {block_reason})."
summary_text = response.text
print("\n--- Gemini PR Summary ---")
# Don't use display(Markdown()) here as it might not work outside notebooks
# Return the raw text for Gradio Markdown component
print(summary_text) # Print to console as well
print("------------------------")
return summary_text # Return raw text
except Exception as e:
print(f"Error communicating with Gemini for PR summary: {e}")
return f"Error asking Gemini: {e}"
# --- create_vizro_dashboard: Add robust data checks ---
def create_vizro_dashboard(self, output_dir='./vizro_dashboard'):
"""Create a Vizro dashboard from repository data."""
# Check if Vizro is installed
if not vzm or not px or not go:
print("Vizro/Plotly not installed. Cannot create dashboard.")
return None
# Check if data exists and is minimally valid
if not self.repo_data or not isinstance(self.repo_data, dict) or not self.repo_data.get("basic_info"):
print("No valid repository data available. Run analyze_repo() first.")
return None
print("Creating Vizro dashboard...")
# Create output directory if it doesn't exist
try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
except OSError as e:
print(f"Error creating output directory {output_dir}: {e}")
return None # Cannot proceed without output dir
# --- Safely Extract Data ---
basic_info = self.repo_data.get("basic_info", {}) # Default to empty dict
repo_name = basic_info.get("full_name", "Unknown Repo")
languages_data = self.repo_data.get("languages") # Can be None or {}
contributors_data = self.repo_data.get("contributors") # Can be None or []
temporal_analysis = self.repo_data.get("temporal_analysis", {}) # Default to {}
text_content = self.repo_data.get("text_content", {}) # Default to {}
dependencies_data = text_content.get("dependencies", {}) # Default to {}
complexity_metrics = text_content.get("complexity_metrics", {}) # Default to {}
# --- Create dashboard pages ---
pages = []
all_components = {} # Store components to avoid duplicates if needed
# --- 1. Overview Page ---
print("Building Overview Page...")
overview_components = []
try:
# Basic repository info card - use .get for safety
repo_info_md = f"""
# {basic_info.get('full_name', 'N/A')}
**Description:** {basic_info.get('description', 'No description')}
**Stars:** {basic_info.get('stargazers_count', 'N/A')} |
**Forks:** {basic_info.get('forks_count', 'N/A')} |
**Open Issues:** {basic_info.get('open_issues_count', 'N/A')}
**Created:** {basic_info.get('created_at', 'N/A')} |
**Last Updated:** {basic_info.get('updated_at', 'N/A')}
**Default Branch:** {basic_info.get('default_branch', 'N/A')}
**License:** {basic_info.get('license', {}).get('name', 'Not specified')}
""" # Safe access for license
overview_components.append(vzm.Card(text=repo_info_md, title="Repository Info"))
all_components['repo_info_card'] = overview_components[-1]
# Languages pie chart
if isinstance(languages_data, dict) and languages_data:
langs_prep_data = []
total_bytes = sum(v for v in languages_data.values() if isinstance(v, (int, float)))
if total_bytes > 0:
for lang, bytes_count in languages_data.items():
if isinstance(bytes_count, (int, float)) and bytes_count > 0:
percentage = (bytes_count / total_bytes) * 100
langs_prep_data.append({
"Language": str(lang), # Ensure string
"Bytes": bytes_count,
"Percentage": percentage
})
if langs_prep_data: # Check if we have data to plot
langs_df = pd.DataFrame(langs_prep_data)
lang_pie_fig = px.pie(
langs_df,
values="Percentage",
names="Language",
title="Language Distribution"
)
lang_pie = vzm.Graph(figure=lang_pie_fig)
overview_components.append(vzm.Card(children=[lang_pie])) # Wrap graph in children list
all_components['lang_pie'] = overview_components[-1]
else:
print("No valid language data to plot.")
else:
print("Language data present but total bytes are zero or invalid.")
else:
print("No language data found or data is not a dictionary.")
# Contributors bar chart
if isinstance(contributors_data, list) and contributors_data:
contrib_prep_data = []
for contributor in contributors_data[:15]: # Limit display
if isinstance(contributor, dict) and 'login' in contributor and 'contributions' in contributor:
contrib_prep_data.append({
"Username": str(contributor['login']), # Ensure string
"Contributions": int(contributor['contributions']) # Ensure int
})
if contrib_prep_data: # Check if data to plot
contrib_df = pd.DataFrame(contrib_prep_data)
contrib_bar_fig = px.bar(
contrib_df,
x="Username",
y="Contributions",
title="Top Contributors"
)
contrib_bar = vzm.Graph(figure=contrib_bar_fig)
overview_components.append(vzm.Card(children=[contrib_bar])) # Wrap graph in children list
all_components['contrib_bar'] = overview_components[-1]
else:
print("No valid contributor data to plot.")
else:
print("No contributor data found or data is not a list.")
# Add overview page if components exist
if overview_components:
pages.append(
vzm.Page(
title="Overview",
components=overview_components,
path="overview" # Add unique path
)
)
else:
print("Skipping Overview page: No components generated.")
except Exception as e:
print(f"Error building Overview page: {e}")
# Optionally add an error card to the dashboard
# overview_components.append(vzm.Card(text=f"Error building overview: {e}"))
# --- 2. Activity Page ---
print("Building Activity Page...")
activity_components = []
try:
# Commit activity over time
weekly_commits = temporal_analysis.get("weekly_commits", [])
if isinstance(weekly_commits, list) and weekly_commits:
commits_prep_data = []
for week in weekly_commits:
if isinstance(week, dict) and 'date' in week and 'total' in week:
try:
# Validate date and convert total to int
date_val = pd.to_datetime(week['date'])
commits_val = int(week['total'])
commits_prep_data.append({"Date": date_val, "Commits": commits_val})
except (ValueError, TypeError):
continue # Skip invalid entries
if commits_prep_data:
commits_df = pd.DataFrame(commits_prep_data)
if not commits_df.empty:
commits_line_fig = px.line(
commits_df,
x="Date",
y="Commits",
title="Weekly Commit Activity"
)
commits_line = vzm.Graph(figure=commits_line_fig)
activity_components.append(vzm.Card(children=[commits_line]))
all_components['commits_line'] = activity_components[-1]
else:
print("No valid commit data to plot.")
else:
print("No weekly commit data found or data is not a list.")
# Code changes over time
weekly_code_changes = temporal_analysis.get("weekly_code_changes", [])
if isinstance(weekly_code_changes, list) and weekly_code_changes:
changes_prep_data = []
for week in weekly_code_changes:
if isinstance(week, dict) and 'date' in week and 'additions' in week and 'deletions' in week:
try:
date_val = pd.to_datetime(week['date'])
additions_val = int(week['additions'])
deletions_val = int(week['deletions'])
changes_prep_data.append({
"Date": date_val,
"Additions": additions_val,
"Deletions": -abs(deletions_val) # Make negative for relative bar chart
})
except (ValueError, TypeError):
continue # Skip invalid entries
if changes_prep_data:
changes_df = pd.DataFrame(changes_prep_data)
if not changes_df.empty:
changes_fig = go.Figure()
changes_fig.add_trace(go.Bar(
x=changes_df["Date"], y=changes_df["Additions"], name="Additions", marker_color="green"
))
changes_fig.add_trace(go.Bar(
x=changes_df["Date"], y=changes_df["Deletions"], name="Deletions", marker_color="red"
))
changes_fig.update_layout(title="Weekly Code Changes", barmode="relative", xaxis_title="Date", yaxis_title="Lines Changed")
changes_chart = vzm.Graph(figure=changes_fig)
activity_components.append(vzm.Card(children=[changes_chart]))
all_components['changes_chart'] = activity_components[-1]
else:
print("No valid code change data to plot.")
else:
print("No weekly code change data found or data is not a list.")
# Issue resolution times
issue_timeline = temporal_analysis.get("issue_timeline", {})
if isinstance(issue_timeline, dict):
resolution_times = issue_timeline.get('resolution_times', [])
if isinstance(resolution_times, list) and resolution_times:
# Convert to hours safely, cap at one week (168 hours)
rt_hours = []
for rt in resolution_times:
if isinstance(rt, (int, float)) and rt >= 0:
rt_hours.append(min(rt, 168))
if rt_hours: # Check if we have valid data after cleaning
rt_hours_array = np.array(rt_hours) # For numpy functions
issue_resolution_fig = px.histogram(
x=rt_hours_array,
title="Issue Resolution Times (Capped at 1 Week)",
labels={"x": "Hours to Resolution"}
)
mean_rt = np.mean(rt_hours_array)
median_rt = np.median(rt_hours_array)
issue_resolution_fig.add_vline(x=mean_rt, line_dash="dash", line_color="red", annotation_text=f"Mean: {mean_rt:.2f} hrs")
issue_resolution_fig.add_vline(x=median_rt, line_dash="dash", line_color="green", annotation_text=f"Median: {median_rt:.2f} hrs")
resolution_hist = vzm.Graph(figure=issue_resolution_fig)
activity_components.append(vzm.Card(children=[resolution_hist]))
all_components['issue_res_hist'] = activity_components[-1]
else:
print("No valid numeric issue resolution times found.")
else:
print("No issue resolution times found or data is not a list.")
else:
print("Issue timeline data is not a dictionary.")
# Add activity page if components exist
if activity_components:
pages.append(
vzm.Page(
title="Activity",
components=activity_components,
path="activity" # Add unique path
)
)
else:
print("Skipping Activity page: No components generated.")
except Exception as e:
print(f"Error building Activity page: {e}")
# --- 3. Code Quality Page ---
print("Building Code Quality Page...")
code_components = []
try:
# Code complexity metrics
cyclomatic_complexity = complexity_metrics.get("cyclomatic_complexity", [])
if isinstance(cyclomatic_complexity, list) and cyclomatic_complexity:
complexity_prep_data = []
for item in cyclomatic_complexity:
if isinstance(item, (list, tuple)) and len(item) == 2:
path, cc = item
if isinstance(path, str) and isinstance(cc, (int, float)):
complexity_prep_data.append({
"File": os.path.basename(path),
"Path": path,
"Complexity": cc
})
if complexity_prep_data:
complexity_prep_data.sort(key=lambda x: x["Complexity"], reverse=True)
top_complex_files = complexity_prep_data[:15] # Show top 15
complex_df = pd.DataFrame(top_complex_files)
if not complex_df.empty:
complex_bar_fig = px.bar(
complex_df, x="File", y="Complexity", title="Most Complex Files (Top 15)", hover_data=["Path"]
)
complex_bar = vzm.Graph(figure=complex_bar_fig)
code_components.append(vzm.Card(children=[complex_bar]))
all_components['complex_bar'] = code_components[-1]
# Complexity histogram (using all valid data)
cc_values = [d["Complexity"] for d in complexity_prep_data]
if cc_values:
cc_hist_fig = px.histogram(
x=cc_values, title="Cyclomatic Complexity Distribution", labels={"x": "Complexity"}
)
cc_hist = vzm.Graph(figure=cc_hist_fig)
code_components.append(vzm.Card(children=[cc_hist]))
all_components['cc_hist'] = code_components[-1]
else:
print("No valid cyclomatic complexity data found.")
else:
print("No cyclomatic complexity data found or data is not a list.")
# Comment ratio by file
comment_ratios = complexity_metrics.get("comment_ratios", [])
if isinstance(comment_ratios, list) and comment_ratios:
comment_prep_data = []
for item in comment_ratios:
if isinstance(item, (list, tuple)) and len(item) == 2:
path, ratio = item
if isinstance(path, str) and isinstance(ratio, (int, float)) and ratio >= 0:
comment_prep_data.append({
"File": os.path.basename(path),
"Path": path,
"Comment Ratio": ratio
})
if comment_prep_data:
comment_prep_data.sort(key=lambda x: x["Comment Ratio"], reverse=True)
top_commented_files = comment_prep_data[:15] # Show top 15
comment_df = pd.DataFrame(top_commented_files)
if not comment_df.empty:
comment_bar_fig = px.bar(
comment_df, x="File", y="Comment Ratio", title="Files with Highest Comment Ratio (Top 15)", hover_data=["Path"]
)
comment_bar = vzm.Graph(figure=comment_bar_fig)
code_components.append(vzm.Card(children=[comment_bar]))
all_components['comment_bar'] = code_components[-1]
else:
print("No valid comment ratio data found.")
else:
print("No comment ratio data found or data is not a list.")
# Add code quality page if components exist
if code_components:
pages.append(
vzm.Page(
title="Code Quality",
components=code_components,
path="code_quality" # Add unique path
)
)
else:
print("Skipping Code Quality page: No components generated.")
except Exception as e:
print(f"Error building Code Quality page: {e}")
# --- 4. Dependencies Page ---
print("Building Dependencies Page...")
dependencies_components = []
try:
# External dependencies
external_deps = dependencies_data.get("external", {})
if isinstance(external_deps, dict) and external_deps:
ext_counts = Counter()
for file_path, deps_set in external_deps.items():
if isinstance(deps_set, (set, list)): # Handle set or list
for dep in deps_set:
if isinstance(dep, str): # Ensure dep is string
ext_counts[dep] += 1
if ext_counts:
top_deps = ext_counts.most_common(15) # Show top 15
deps_prep_data = [{"Package": pkg, "Count": count} for pkg, count in top_deps]
deps_df = pd.DataFrame(deps_prep_data)
if not deps_df.empty:
deps_bar_fig = px.bar(
deps_df, x="Package", y="Count", title="Most Used External Dependencies (Top 15)"
)
deps_bar = vzm.Graph(figure=deps_bar_fig)
dependencies_components.append(vzm.Card(children=[deps_bar]))
all_components['deps_bar'] = dependencies_components[-1]
else:
print("No external dependency data counted.")
else:
print("No external dependency data found or data is not a dictionary.")
# Internal dependencies graph (only for smaller graphs)
internal_deps = dependencies_data.get("internal", {})
if isinstance(internal_deps, dict) and internal_deps:
num_nodes_internal = len(set(internal_deps.keys()) | set(d for deps in internal_deps.values() for d in deps))
if num_nodes_internal <= 75: # Increased limit slightly
print(f"Attempting internal dependency graph ({num_nodes_internal} nodes)...")
try:
# Create NetworkX graph
G = nx.DiGraph()
nodes_added = set()
for source, targets in internal_deps.items():
if isinstance(source, str):
source_name = os.path.basename(source)
if source not in nodes_added:
G.add_node(source, name=source_name)
nodes_added.add(source)
if isinstance(targets, (set, list)):
for target in targets:
if isinstance(target, str):
target_name = os.path.basename(target)
if target not in nodes_added:
G.add_node(target, name=target_name)
nodes_added.add(target)
# Add edge only if both nodes were added successfully
if source in G and target in G:
G.add_edge(source, target)
if G.number_of_nodes() > 0 and G.number_of_edges() > 0:
# Get position layout
pos = nx.spring_layout(G, seed=42, k=0.6, iterations=50) # Adjust layout params
# Create graph visualization
edge_x, edge_y = [], []
for edge in G.edges():
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=0.5, color='#888'), hoverinfo='none', mode='lines')
node_x, node_y, node_text, node_color_val = [], [], [], []
for node in G.nodes():
x, y = pos[node]
node_x.append(x)
node_y.append(y)
node_text.append(G.nodes[node].get('name', node))
degree = G.degree(node) # Use degree for size/color
node_color_val.append(degree)
node_trace = go.Scatter(
x=node_x, y=node_y, mode='markers+text', hoverinfo='text', text=node_text,
textposition="top center", textfont=dict(size=8, color='black'),
marker=dict(showscale=True, colorscale='YlGnBu', size=10, color=node_color_val,
colorbar=dict(thickness=15, title='Node Degree', xanchor='left', titleside='right'))
)
dep_fig = go.Figure(data=[edge_trace, node_trace],
layout=go.Layout(
title='Internal File Dependency Network (Nodes <= 75)', showlegend=False, hovermode='closest',
margin=dict(b=20,l=5,r=5,t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False))
)
dep_graph_viz = vzm.Graph(figure=dep_fig)
dependencies_components.append(vzm.Card(children=[dep_graph_viz]))
all_components['dep_graph'] = dependencies_components[-1]
else:
print("Internal dependency graph has no nodes or edges after processing.")
except Exception as e_graph:
print(f"Error generating internal dependency network visualization: {e_graph}")
else:
print(f"Skipping internal dependency graph: Too large ({num_nodes_internal} nodes).")
else:
print("No internal dependency data found or data is not a dictionary.")
# Add dependencies page if components exist
if dependencies_components:
pages.append(
vzm.Page(
title="Dependencies",
components=dependencies_components,
path="dependencies" # Add unique path
)
)
else:
print("Skipping Dependencies page: No components generated.")
except Exception as e:
print(f"Error building Dependencies page: {e}")
# --- Create the dashboard ---
if not pages:
print("No pages were generated for the dashboard. Cannot create dashboard.")
return None
try:
# Define navigation if multiple pages exist
navigation = None
if len(pages) > 1:
navigation=vzm.Navigation(pages=[p.title for p in pages]) # Use titles for navigation links
dashboard = vzm.Dashboard(
title=f"GitHub Analysis: {repo_name}",
pages=pages,
navigation=navigation
)
# Export dashboard (using save method which implicitly builds and exports)
# dashboard_path = os.path.join(output_dir, "dashboard.html")
# dashboard.save(dashboard_path) # build() is deprecated, save() does it
# Build dashboard (required before getting HTML string or running)
print("Building dashboard...")
vizro.build(dashboard)
print("Dashboard built.")
# We don't save to file here, Gradio will handle serving if we run it
# Instead of returning path, maybe return the dashboard object or indicate success
return dashboard # Return the dashboard object for potential further use
except Exception as e:
print(f"Error creating or building Vizro dashboard object: {e}")
import traceback
traceback.print_exc()
return None
# --- Gradio Interface ---
def create_gradio_interface():
"""Create a Gradio interface for the GitHub repository analyzer."""
if not gr:
print("Gradio library not found. Cannot create interface.")
return None
# Shared state to store the analyzer instance
analyzer_instance = None
def analyze_repository_gradio(owner, repo, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None):
"""Gradio callback function to analyze a repository."""
nonlocal analyzer_instance
print(f"\n--- Gradio: analyze_repository_gradio called for {owner}/{repo} ---")
report = f"Starting analysis for {owner}/{repo}...\n"
dashboard_html_content = "" # Default empty dashboard
try:
# Ensure owner and repo are provided
if not owner or not repo:
report += "\nError: Please provide both Repository Owner and Name."
return report, dashboard_html_content # Return report and empty dashboard HTML
# Instantiate the analyzer (or reuse if desired, but new instance is safer for credentials)
# Pass credentials safely, using None if empty string
analyzer_instance = GraphRepoAnalyzer(
github_token=github_token if github_token else None,
neo4j_uri=neo4j_uri if neo4j_uri else None,
neo4j_user=neo4j_user if neo4j_user else None,
neo4j_password=neo4j_password if neo4j_password else None,
gemini_api_key=gemini_api_key if gemini_api_key else None
)
report += f"Analyzer initialized for {owner}/{repo}.\n"
yield report, dashboard_html_content # Update Gradio UI
# Analyze repository (this prints logs to console)
# Set display=False as we handle output via Gradio components
# Set save/export to False unless specifically controlled via UI
analyzer_instance.analyze_repo(owner, repo, display=False, save_json=False, export_text=False)
# Check if analysis was successful
if not analyzer_instance.repo_data:
report += f"\nError: Failed to analyze repository: {owner}/{repo}. Check console logs for details (e.g., invalid name, token issues, rate limits)."
# analyzer_instance remains None or has no data
return report, dashboard_html_content # Return error report
report += f"\nAnalysis complete for {analyzer_instance.repo_full_name}.\nGenerating dashboard and report...\n"
yield report, dashboard_html_content # Update UI
# --- Generate Report String ---
try:
basic_info = analyzer_instance.repo_data.get("basic_info", {})
report += f"""
### Repository Analysis: {basic_info.get('full_name', 'N/A')}
**Description:** {basic_info.get('description', 'No description')}
**Statistics:**
- Stars: {basic_info.get('stargazers_count', 'N/A')}
- Forks: {basic_info.get('forks_count', 'N/A')}
- Open Issues: {basic_info.get('open_issues_count', 'N/A')}
"""
# Add language info safely
languages = analyzer_instance.repo_data.get("languages")
if isinstance(languages, dict) and languages:
report += "**Language Summary:**\n"
total = sum(v for v in languages.values() if isinstance(v, (int, float)))
if total > 0:
# Sort languages by percentage
lang_items = []
for lang, b_count in languages.items():
if isinstance(b_count, (int, float)) and b_count > 0:
lang_items.append((lang, (b_count / total) * 100))
# Sort descending by percentage
lang_items.sort(key=lambda item: item[1], reverse=True)
for lang, percentage in lang_items[:5]: # Show top 5
report += f"- {lang}: {percentage:.1f}%\n"
if len(lang_items) > 5:
report += "- ... (other languages)\n"
else:
report += "- (No valid language byte counts found)\n"
else:
report += "**Language Summary:** Not available.\n"
# Add code metrics if available
text_content = analyzer_instance.repo_data.get("text_content", {})
agg_metrics = text_content.get("aggregate_metrics")
if isinstance(agg_metrics, dict):
report += f"""
**Code Metrics (Approximate):**
- Text Files Analyzed: {agg_metrics.get('total_files_analyzed', 'N/A')}
- Code Files Summarized: {agg_metrics.get('code_files_summarized', 'N/A')}
- Total Code Lines: {agg_metrics.get('total_code_lines', 'N/A')}
- Comment Ratio: {agg_metrics.get('average_comment_ratio', -1):.2f}
""" # Use -1 or similar to indicate if ratio couldn't be calculated
else:
report += "\n**Code Metrics:** Not available.\n"
except Exception as report_err:
print(f"Error generating report section: {report_err}")
report += f"\nError generating parts of the report: {report_err}"
# --- Generate Dashboard ---
# Use a temporary directory for Gradio deployment
dashboard_dir = f"./gradio_dashboards/{owner}_{repo}"
dashboard_obj = analyzer_instance.create_vizro_dashboard(output_dir=dashboard_dir)
if dashboard_obj:
# Instead of saving, we want to serve it. Vizro doesn't directly give HTML string easily.
# Option 1: Save to file and load into IFrame (might have security issues / path issues)
# dashboard_path = os.path.join(dashboard_dir, 'dashboard.html') # create_vizro_dashboard doesn't save anymore
# vizro.run() # This blocks and runs a server - not ideal for embedding
# For Gradio, the best approach is often *not* to embed Vizro directly,
# but rather extract the Plotly figures and display them using gr.Plot.
# Let's try a simpler approach first: generate static plots for Gradio.
# --- Alternative: Generate static plots for Gradio ---
# This avoids Vizro complexity within Gradio's environment for now.
# We will just return the report. The dashboard creation is still useful if run standalone.
report += "\n\n**Dashboard Note:** Interactive dashboard generation logic exists but embedding Vizro directly in Gradio is complex. The dashboard can be generated by running the script standalone."
print("Dashboard object created, but not embedding in Gradio output for simplicity.")
# dashboard_html_content = f'<p>Vizro dashboard created but cannot be directly embedded here. Run script standalone.</p>'
# Option 2: If you *really* need to embed, save and use an iframe (less reliable)
# try:
# dashboard_path_rel = os.path.join(dashboard_dir, 'dashboard.html')
# dashboard_path_abs = os.path.abspath(dashboard_path_rel)
# # Vizro's save method implicitly builds and saves
# dashboard_obj.save(dashboard_path_abs)
# print(f"Dashboard saved to: {dashboard_path_abs}")
# # IMPORTANT: Gradio needs to be able to access this path.
# # This might only work if Gradio serves from the same root or paths are configured.
# # Use relative path for iframe src if possible, requires Gradio server setup.
# # Using absolute file URI might work locally but not when deployed.
# # dashboard_html_content = f'<iframe src="file:///{dashboard_path_abs}" width="100%" height="600px" style="border:none;"></iframe>'
# # Safer: Provide a link
# dashboard_html_content = f'<p>Dashboard saved to: <a href="file:///{dashboard_path_abs}" target="_blank">{dashboard_path_abs}</a> (Link may only work locally)</p>'
# report += f"\n\n**Dashboard:** Saved locally. See link below (may only work on the server machine)."
# except Exception as vizro_save_err:
# print(f"Error saving Vizro dashboard: {vizro_save_err}")
# report += f"\n\n**Dashboard:** Error saving dashboard: {vizro_save_err}"
# dashboard_html_content = f'<p>Error saving dashboard: {vizro_save_err}</p>'
else:
report += "\n\n**Dashboard:** Failed to generate dashboard object."
dashboard_html_content = "<p>Failed to generate dashboard.</p>"
print("--- Gradio analysis function finished ---")
yield report, dashboard_html_content # Final update
except Exception as e:
print(f"--- Error in analyze_repository_gradio for {owner}/{repo} ---")
import traceback
traceback.print_exc()
report += f"\n\nCritical Error during analysis: {str(e)}"
# Ensure analyzer_instance is cleared if it failed early
analyzer_instance = None
# Return error report and empty dashboard
# Need yield here if using generator
yield report, dashboard_html_content
def summarize_pr_gradio(owner, repo, pr_number_str, role, github_token=None, gemini_api_key=None):
"""Gradio callback function to summarize a PR."""
print(f"\n--- Gradio: summarize_pr_gradio called for PR #{pr_number_str} in {owner}/{repo} ---")
summary_output = "Starting PR summarization...\n"
try:
# Ensure owner, repo, pr_number, role, and gemini_key are provided
if not all([owner, repo, pr_number_str, role, gemini_api_key]):
missing = [name for name, val in locals().items() if name in ['owner', 'repo', 'pr_number_str', 'role', 'gemini_api_key'] and not val]
summary_output += f"Error: Please provide all required fields (Missing: {', '.join(missing)})."
return summary_output
# --- Use a temporary analyzer instance for PR summary ---
# This avoids issues if the main analysis failed or used different credentials
# We only need GitHub and Gemini parts for this.
pr_analyzer = GraphRepoAnalyzer(
github_token=github_token if github_token else None,
gemini_api_key=gemini_api_key # Required
)
if not pr_analyzer.github_analyzer:
summary_output += "Error: Could not initialize GitHub analyzer (check token/installation)."
return summary_output
if not pr_analyzer.gemini_model:
summary_output += "Error: Could not initialize Gemini model (check API key/installation)."
return summary_output
# Set repo context for the analyzer
pr_analyzer.owner = owner
pr_analyzer.repo = repo
pr_analyzer.repo_full_name = f"{owner}/{repo}"
# Call the summarize_pull_request method (which now returns text)
summary = pr_analyzer.summarize_pull_request(pr_number_str, role) # Handles int conversion and validation internally
# summarize_pull_request returns the summary text or an error message
summary_output = summary # Assign the result directly
print("--- Gradio PR summary function finished ---")
return summary_output
except Exception as e:
print(f"--- Error in summarize_pr_gradio ---")
import traceback
traceback.print_exc()
summary_output += f"\n\nCritical Error during PR summarization: {str(e)}"
return summary_output
# --- Define Gradio UI ---
with gr.Blocks(title="GitHub Repository Analyzer", theme=gr.themes.Soft()) as app:
gr.Markdown("# GitHub Repository Analyzer & PR Summarizer")
gr.Markdown("Analyze GitHub repositories using GitHub API, generate reports, and summarize Pull Requests using Google Gemini.")
with gr.Tab("Repository Analysis"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Repository Input")
owner_input = gr.Textbox(label="Repository Owner", placeholder="e.g., 'google' or 'openai'")
repo_input = gr.Textbox(label="Repository Name", placeholder="e.g., 'guetzli' or 'whisper'")
gr.Markdown("### Credentials (Optional / Recommended)")
github_token = gr.Textbox(label="GitHub Token", type="password", placeholder="Enter personal access token (optional, increases rate limit)")
with gr.Accordion("Advanced Settings (Neo4j/Gemini - Optional for Analysis)", open=False):
neo4j_uri = gr.Textbox(label="Neo4j URI", placeholder="bolt://localhost:7687")
neo4j_user = gr.Textbox(label="Neo4j Username", placeholder="neo4j")
neo4j_password = gr.Textbox(label="Neo4j Password", type="password")
# Gemini key needed here if we add repo Q&A later
# gemini_api_key_analysis = gr.Textbox(label="Google API Key (for Repo Q&A)", type="password", placeholder="Enter Google API Key (if using Repo Q&A)")
analyze_btn = gr.Button("Analyze Repository", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### Analysis Output")
report_output = gr.Markdown(label="Analysis Report", value="Analysis results will appear here...")
# Removed dashboard HTML output as direct embedding is unreliable
# dashboard_output = gr.HTML(label="Dashboard Preview") # Keep if attempting iframe later
# Wire the button click event
analyze_btn.click(
analyze_repository_gradio,
inputs=[
owner_input, repo_input, github_token,
neo4j_uri, neo4j_user, neo4j_password,
# gemini_api_key_analysis # Pass if adding Repo Q&A
],
# Output only the report for now
outputs=[report_output] # Removed dashboard_output
)
with gr.Tab("PR Summarizer (Requires Gemini)"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### PR Input")
pr_owner_input = gr.Textbox(label="Repository Owner", placeholder="Owner of the repo containing the PR")
pr_repo_input = gr.Textbox(label="Repository Name", placeholder="Name of the repo containing the PR")
pr_number_input = gr.Textbox(label="PR Number", placeholder="e.g., 123") # Use Textbox for flexibility
pr_role_input = gr.Dropdown(
choices=["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"],
label="Your Role (Tailors Summary)",
value="Developer" # Default value
)
gr.Markdown("### Credentials")
pr_github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", placeholder="Needed for private repos or high rate limits")
pr_gemini_api_key = gr.Textbox(label="Google API Key (Required)", type="password", placeholder="Enter Google API Key for Gemini")
summarize_btn = gr.Button("Summarize PR", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### PR Summary Output")
pr_summary_output = gr.Markdown(label="Gemini PR Summary", value="PR Summary will appear here...")
# Wire the button click event
summarize_btn.click(
summarize_pr_gradio,
inputs=[
pr_owner_input, pr_repo_input, pr_number_input,
pr_role_input, pr_github_token, pr_gemini_api_key
],
outputs=pr_summary_output
)
return app
# Main function to run the app
def main():
"""Run the GitHub Repository Analyzer with Gradio interface."""
# Load environment variables (optional, credentials can be entered in UI)
load_dotenv()
print("Starting Gradio application...")
# Check if Gradio is available before launching
if not gr:
print("Gradio library is not available. Cannot launch UI.")
return
# Create and launch the Gradio interface
try:
app = create_gradio_interface()
if app:
# Set share=False for local testing, share=True to create public link (use with caution)
# Set debug=True for more detailed logs during development
app.launch(share=False, debug=True)
else:
print("Failed to create Gradio interface.")
except Exception as e:
print(f"Error launching Gradio app: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# Add basic checks for critical libraries before running main
if None in [gr, pd, np, requests, nx]:
missing = []
if not gr: missing.append("gradio")
if not pd: missing.append("pandas")
if not np: missing.append("numpy")
if not requests: missing.append("requests")
if not nx: missing.append("networkx")
print(f"Error: Missing critical libraries: {', '.join(missing)}. Please install them.")
print("e.g., pip install gradio pandas numpy requests networkx PyGithub neo4j google-generativeai vizro vizro-plotly plotly python-dotenv radon matplotlib")
else:
main()