Spaces:
Sleeping
Sleeping
import requests | |
import json | |
import os | |
import base64 | |
import re | |
import ast | |
import networkx as nx | |
# Make sure radon is installed: pip install radon | |
try: | |
import radon.metrics as metrics | |
import radon.complexity as complexity | |
except ImportError: | |
print("Warning: Radon library not found. Code complexity analysis will be limited.") | |
# Provide dummy functions if radon is not available | |
class DummyRadon: | |
def cc_visit(self, *args, **kwargs): return 0 | |
def cc_visit_ast(self, *args, **kwargs): return 0 | |
def mi_visit(self, *args, **kwargs): return None | |
metrics = DummyRadon() | |
complexity = DummyRadon() | |
from datetime import datetime, timedelta | |
from collections import defaultdict, Counter | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import matplotlib.dates as mdates | |
# Ensure IPython is available or handle its absence | |
try: | |
from IPython.display import display, Markdown, HTML | |
IPYTHON_AVAILABLE = True | |
except ImportError: | |
IPYTHON_AVAILABLE = False | |
# Define dummy display functions if not in IPython | |
def display(*args, **kwargs): print(*args) | |
def Markdown(text): print(f"--- Markdown ---\n{text}\n---------------") | |
def HTML(text): print(f"----- HTML -----\n{text}\n--------------") | |
import numpy as np | |
# Ensure PyGithub is installed: pip install PyGithub | |
try: | |
from github import Github, GithubException | |
except ImportError: | |
print("Warning: PyGithub library not found. Some features might be limited.") | |
Github = None # Set to None if not available | |
GithubException = Exception # Use base Exception | |
import time | |
# Ensure python-dotenv is installed: pip install python-dotenv | |
try: | |
from dotenv import load_dotenv | |
except ImportError: | |
print("Warning: python-dotenv not found. .env file will not be loaded.") | |
def load_dotenv(): pass # Dummy function | |
# Import Neo4j and Gemini libraries | |
# Ensure neo4j is installed: pip install neo4j | |
try: | |
from neo4j import GraphDatabase, basic_auth | |
except ImportError: | |
print("Warning: Neo4j library not found. Graph features will be disabled.") | |
GraphDatabase = None # Set to None | |
basic_auth = None | |
# Ensure google-generativeai is installed: pip install google-generativeai | |
try: | |
import google.generativeai as genai | |
except ImportError: | |
print("Warning: google-generativeai library not found. Gemini features will be disabled.") | |
genai = None # Set to None | |
# Import Vizro and Gradio | |
# Ensure vizro, vizro-plotly, plotly, gradio are installed | |
# pip install vizro vizro-plotly plotly gradio pandas networkx matplotlib numpy | |
try: | |
import vizro.plotly.express as px | |
import vizro | |
import vizro.models as vzm | |
import plotly.graph_objects as go | |
except ImportError: | |
print("Critical Error: Vizro or Plotly libraries not found. Dashboard generation will fail.") | |
# Define dummy classes/functions to avoid NameErrors later, though functionality will be broken | |
class DummyVzm: | |
Card = lambda **kwargs: None | |
Graph = lambda **kwargs: None | |
Page = lambda **kwargs: None | |
Dashboard = lambda **kwargs: type('obj', (object,), {'save': lambda self, path: print(f"Vizro not installed, cannot save to {path}")})() | |
vzm = DummyVzm() | |
px = None | |
go = None | |
vizro = None | |
try: | |
import gradio as gr | |
except ImportError: | |
print("Critical Error: Gradio library not found. Cannot launch the UI.") | |
gr = None # Set to None | |
# --- GitHubRepoInfo Class (Keep as provided, ensuring dependencies like PyGithub are handled) --- | |
class GitHubRepoInfo: | |
"""Enhanced class to get comprehensive information about a GitHub repository.""" | |
def __init__(self, token=None): | |
"""Initialize with optional GitHub API token.""" | |
self.base_url = "https://api.github.com" | |
self.headers = {"Accept": "application/vnd.github.v3+json"} | |
self.token = token | |
self.github = None # Initialize github attribute | |
# Set up authentication | |
if token: | |
self.headers["Authorization"] = f"token {token}" | |
if Github: # Check if PyGithub was imported | |
try: | |
self.github = Github(token) | |
self.github.get_user().login # Test connection | |
except Exception as e: | |
print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
self.github = Github() # Fallback to unauthenticated | |
else: | |
print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") | |
self.github = None # Explicitly set to None | |
elif os.environ.get("GITHUB_TOKEN"): | |
self.token = os.environ.get("GITHUB_TOKEN") | |
self.headers["Authorization"] = f"token {self.token}" | |
if Github: | |
try: | |
self.github = Github(self.token) | |
self.github.get_user().login # Test connection | |
except Exception as e: | |
print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
self.github = Github() # Fallback to unauthenticated | |
else: | |
print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") | |
self.github = None | |
else: | |
if Github: | |
self.github = Github() # Unauthenticated | |
else: | |
print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") | |
self.github = None | |
# Configure rate limit handling | |
self.rate_limit_remaining = 5000 # Assume higher limit if authenticated | |
self.rate_limit_reset = datetime.now() | |
# Initialize rate limit info if possible | |
if self.github: | |
try: | |
rate_limit = self.github.get_rate_limit() | |
self.rate_limit_remaining = rate_limit.core.remaining | |
self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) | |
except Exception as e: | |
# Don't print warning if self.github is None | |
if self.github is not None: | |
print(f"Warning: Could not get initial rate limit from PyGithub: {e}") | |
# Check rate limit via REST if PyGithub failed or wasn't used | |
elif self.token: | |
try: | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
if response.status_code == 200: | |
rate_data = response.json() | |
self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] | |
self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) | |
else: | |
print(f"Warning: Could not get initial rate limit via REST: Status {response.status_code}") | |
except Exception as e: | |
print(f"Warning: Could not get initial rate limit via REST: {e}") | |
def _check_rate_limit(self): | |
"""Check API rate limit and wait if necessary.""" | |
# Update rate limit info before checking | |
try: | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
if response.status_code == 200: | |
rate_data = response.json() | |
# Ensure keys exist before accessing | |
core_limits = rate_data.get("resources", {}).get("core", {}) | |
self.rate_limit_remaining = core_limits.get("remaining", self.rate_limit_remaining) # Use old value if missing | |
reset_timestamp = core_limits.get("reset") | |
if reset_timestamp: | |
self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp) | |
# No else needed, just use previous values if update fails | |
except Exception as e: | |
print(f"Warning: Failed to update rate limit info: {e}") | |
# Proceed with potentially outdated values | |
if self.rate_limit_remaining <= 10: | |
reset_time = self.rate_limit_reset | |
# Use timezone-naive comparison | |
current_time = datetime.now() | |
if reset_time > current_time: | |
wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer | |
if wait_time > 0: # Only wait if reset time is in the future | |
print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") | |
time.sleep(wait_time) | |
# Re-fetch rate limit after waiting | |
self._check_rate_limit() | |
def _paginated_get(self, url, params=None, max_items=None): | |
"""Handle paginated API responses with rate limit awareness.""" | |
if params is None: | |
params = {} | |
items = [] | |
page = 1 | |
# Use a smaller default per_page to be safer with rate limits if unauthenticated | |
default_per_page = 100 if self.token else 30 | |
per_page = min(100, params.get("per_page", default_per_page)) | |
params["per_page"] = per_page | |
while True: | |
self._check_rate_limit() # Check before each request | |
params["page"] = page | |
try: | |
response = requests.get(url, headers=self.headers, params=params, timeout=20) # Add timeout | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
page_items = response.json() | |
if not page_items: # No more items | |
break | |
items.extend(page_items) | |
page += 1 | |
# Check if we've reached the requested limit | |
if max_items and len(items) >= max_items: | |
return items[:max_items] | |
# Check if we've reached the end (GitHub returns fewer items than requested) | |
if len(page_items) < per_page: | |
break | |
except requests.exceptions.RequestException as e: | |
print(f"Error during paginated request to {url} (page {page}): {e}") | |
# Decide whether to break or retry (here we break) | |
break | |
except json.JSONDecodeError as e: | |
print(f"Error decoding JSON response from {url} (page {page}): {e}") | |
break | |
return items | |
def get_repo_info(self, owner, repo): | |
"""Get basic repository information.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}" | |
try: | |
response = requests.get(url, headers=self.headers, timeout=15) | |
response.raise_for_status() # Check for 4xx/5xx errors | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Error getting repository info for {owner}/{repo}: {e}") | |
return None # Return None on failure | |
# ... (other GitHubRepoInfo methods - assume they return sensible defaults like [] or {} on failure) ... | |
# --- Add safe defaults to methods that might return None unexpectedly --- | |
def get_languages(self, owner, repo): | |
"""Get languages used in the repository.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/languages" | |
try: | |
response = requests.get(url, headers=self.headers, timeout=15) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
print(f"Error getting languages for {owner}/{repo}: {e}") | |
return {} # Return empty dict on failure | |
def get_contributors(self, owner, repo, max_contributors=None): | |
"""Get repository contributors with pagination support.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/contributors" | |
# _paginated_get should already handle errors and return a list | |
return self._paginated_get(url, max_items=max_contributors) or [] # Ensure list return | |
def get_commits(self, owner, repo, params=None, max_commits=None): | |
"""Get commits with enhanced filtering and pagination.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/commits" | |
return self._paginated_get(url, params=params, max_items=max_commits) or [] # Ensure list return | |
def _get_stats_with_retry(self, url): | |
"""Helper for stats endpoints that might return 202.""" | |
retries = 3 | |
delay = 5 # Initial delay in seconds | |
for i in range(retries): | |
self._check_rate_limit() | |
try: | |
response = requests.get(url, headers=self.headers, timeout=30) # Longer timeout for stats | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202 and i < retries - 1: | |
print(f"GitHub is computing statistics for {url.split('/stats/')[1]}, waiting {delay}s and retrying ({i+1}/{retries})...") | |
time.sleep(delay) | |
delay *= 2 # Exponential backoff | |
continue | |
elif response.status_code == 204: # No content, valid response but empty data | |
print(f"No content (204) returned for {url.split('/stats/')[1]}. Returning empty list.") | |
return [] | |
else: | |
print(f"Error getting stats from {url}: Status {response.status_code}, Body: {response.text[:200]}") | |
return [] # Return empty list on other errors | |
except requests.exceptions.RequestException as e: | |
print(f"Request error getting stats from {url}: {e}") | |
return [] # Return empty list on request error | |
print(f"Failed to get stats from {url} after {retries} retries.") | |
return [] # Return empty list after all retries fail | |
def get_commit_activity(self, owner, repo): | |
"""Get commit activity stats for the past year.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" | |
return self._get_stats_with_retry(url) | |
def get_code_frequency(self, owner, repo): | |
"""Get weekly code addition and deletion statistics.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" | |
return self._get_stats_with_retry(url) | |
def get_contributor_activity(self, owner, repo): | |
"""Get contributor commit activity over time.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" | |
return self._get_stats_with_retry(url) | |
def get_branches(self, owner, repo): | |
"""Get repository branches.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/branches" | |
return self._paginated_get(url) or [] | |
def get_releases(self, owner, repo, max_releases=None): | |
"""Get repository releases with pagination support.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/releases" | |
return self._paginated_get(url, max_items=max_releases) or [] | |
def get_issues(self, owner, repo, state="all", max_issues=None, params=None): | |
"""Get repository issues with enhanced filtering.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/issues" | |
if params is None: | |
params = {} | |
params["state"] = state | |
return self._paginated_get(url, params=params, max_items=max_issues) or [] | |
def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): | |
"""Get repository pull requests with enhanced filtering.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/pulls" | |
if params is None: | |
params = {} | |
params["state"] = state | |
return self._paginated_get(url, params=params, max_items=max_prs) or [] | |
def get_contents(self, owner, repo, path="", ref=None): | |
"""Get repository contents at the specified path.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
try: | |
response = requests.get(url, headers=self.headers, params=params, timeout=15) | |
response.raise_for_status() | |
return response.json() | |
except requests.exceptions.RequestException as e: | |
# Handle 404 specifically for contents | |
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: | |
print(f"Contents not found at path '{path}' in {owner}/{repo}.") | |
else: | |
print(f"Error getting contents for {owner}/{repo} at path '{path}': {e}") | |
return [] # Return empty list on failure | |
def get_readme(self, owner, repo, ref=None): | |
"""Get repository README file.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/readme" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
try: | |
response = requests.get(url, headers=self.headers, params=params, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
if data.get("content"): | |
try: | |
content = base64.b64decode(data["content"]).decode("utf-8") | |
return { | |
"name": data.get("name", "README"), | |
"path": data.get("path", "README.md"), | |
"content": content | |
} | |
except (UnicodeDecodeError, base64.binascii.Error) as decode_error: | |
print(f"Error decoding README content: {decode_error}") | |
return None # Cannot decode | |
return None # No content key | |
except requests.exceptions.RequestException as e: | |
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: | |
print(f"README not found for {owner}/{repo}.") | |
else: | |
print(f"Error getting README for {owner}/{repo}: {e}") | |
return None | |
def get_file_content(self, owner, repo, path, ref=None): | |
"""Get the content of a specific file in the repository.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
try: | |
response = requests.get(url, headers=self.headers, params=params, timeout=15) | |
response.raise_for_status() | |
data = response.json() | |
if data.get("type") == "file" and data.get("content"): | |
try: | |
content = base64.b64decode(data["content"]).decode("utf-8") | |
return content | |
except (UnicodeDecodeError, base64.binascii.Error): | |
# Don't print error here, return indicator | |
return "[Binary file content not displayed]" | |
elif data.get("type") != "file": | |
print(f"Path '{path}' is not a file.") | |
return None | |
else: | |
# File exists but no content? Unlikely but handle. | |
return "" # Return empty string for empty file | |
except requests.exceptions.RequestException as e: | |
if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: | |
print(f"File not found at path '{path}' in {owner}/{repo}.") | |
else: | |
print(f"Error getting file content for {owner}/{repo}, path '{path}': {e}") | |
return None | |
# --- Methods like is_text_file, analyze_ast, analyze_js_ts are generally okay --- | |
# ... (keep them as they are) ... | |
# --- Ensure get_all_text_files handles errors from get_contents/get_file_content --- | |
def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None, _current_count=0): | |
"""Get content of all text files in the repository (with limit).""" | |
if _current_count >= max_files: | |
return [], _current_count | |
# Get contents for the current path | |
contents = self.get_contents(owner, repo, path, ref) # Returns [] on error | |
text_files = [] | |
file_count = _current_count | |
if not isinstance(contents, list): | |
print(f"Warning: get_contents did not return a list for path '{path}'. Skipping.") | |
return [], file_count | |
# Process current directory | |
for item in contents: | |
if file_count >= max_files: | |
break | |
# Ensure item is a dictionary and has 'type' and 'name' | |
if not isinstance(item, dict) or 'type' not in item or 'name' not in item: | |
print(f"Warning: Skipping malformed item in contents: {item}") | |
continue | |
item_path = item.get("path") # Get path safely | |
if not item_path: | |
print(f"Warning: Skipping item with missing path: {item}") | |
continue | |
if item["type"] == "file" and self.is_text_file(item["name"]): | |
content = self.get_file_content(owner, repo, item_path, ref) | |
# Check if content is valid text (not None or binary indicator) | |
if content and content != "[Binary file content not displayed]": | |
text_files.append({ | |
"name": item["name"], | |
"path": item_path, | |
"content": content | |
}) | |
file_count += 1 | |
elif item["type"] == "dir": | |
# Recursively get text files from subdirectories | |
if file_count < max_files: | |
try: | |
subdir_files, file_count = self.get_all_text_files( | |
owner, repo, item_path, max_files, ref, file_count | |
) | |
text_files.extend(subdir_files) | |
except Exception as e_rec: | |
print(f"Error processing subdirectory '{item_path}': {e_rec}") | |
# Continue with other items in the current directory | |
return text_files, file_count # Return count for recursive calls | |
# --- Ensure get_documentation_files handles errors --- | |
def get_documentation_files(self, owner, repo, ref=None): | |
"""Get documentation files from the repository.""" | |
doc_paths = [ | |
"README.md", "CONTRIBUTING.md", "CODE_OF_CONDUCT.md", "SECURITY.md", | |
"SUPPORT.md", # Files first | |
"docs", "doc", "documentation", "wiki", # Common Dirs | |
".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" | |
] | |
doc_files = [] | |
# 1. Get top-level files first | |
root_contents = self.get_contents(owner, repo, "", ref) | |
if isinstance(root_contents, list): | |
for item in root_contents: | |
if isinstance(item, dict) and item.get("type") == "file" and item.get("name") in doc_paths: | |
path = item.get("path") | |
if path: | |
content = self.get_file_content(owner, repo, path, ref) | |
if content and content != "[Binary file content not displayed]": | |
doc_files.append({ | |
"name": item["name"], | |
"path": path, | |
"content": content | |
}) | |
# 2. Check specific doc directories | |
doc_dirs_to_check = ["docs", "doc", "documentation", "wiki", ".github"] | |
for doc_dir in doc_dirs_to_check: | |
try: | |
dir_contents = self.get_contents(owner, repo, doc_dir, ref) | |
if isinstance(dir_contents, list): # It's a directory | |
for item in dir_contents: | |
if isinstance(item, dict) and item.get("type") == "file": | |
item_name = item.get("name", "").lower() | |
item_path = item.get("path") | |
if item_path and item_name.endswith((".md", ".rst", ".txt")): | |
content = self.get_file_content(owner, repo, item_path, ref) | |
if content and content != "[Binary file content not displayed]": | |
doc_files.append({ | |
"name": item["name"], | |
"path": item_path, | |
"content": content | |
}) | |
except Exception as e: | |
print(f"Error processing documentation path '{doc_dir}': {e}") | |
continue # Skip this path | |
return doc_files | |
# ... (rest of GitHubRepoInfo, display methods, etc. - keep as provided but be mindful of data access in display) ... | |
# Add specific error handling in display methods if needed, though Gradio errors often hide underlying data issues. | |
def get_all_info(self, owner, repo): | |
"""Get comprehensive information about a repository with enhanced metrics.""" | |
print(f"--- Fetching data for {owner}/{repo} ---") | |
result = { | |
"timestamp": datetime.now().isoformat() | |
} | |
print("Getting basic repo info...") | |
basic_info = self.get_repo_info(owner, repo) | |
if not basic_info: | |
print(f"CRITICAL: Could not retrieve basic repository information for {owner}/{repo}. Aborting analysis.") | |
return None # Cannot proceed without basic info | |
result["basic_info"] = basic_info | |
print("Getting languages...") | |
result["languages"] = self.get_languages(owner, repo) # Returns {} on error | |
print("Getting contributors...") | |
result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) # Returns [] on error | |
print("Getting recent commits...") | |
result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) # Returns [] on error | |
print("Getting branches...") | |
result["branches"] = self.get_branches(owner, repo) # Returns [] on error | |
print("Getting releases...") | |
result["releases"] = self.get_releases(owner, repo, max_releases=10) # Returns [] on error | |
print("Getting open issues...") | |
result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) # Returns [] on error | |
print("Getting open pull requests...") | |
result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) # Returns [] on error | |
print("Getting root contents...") | |
result["root_contents"] = self.get_contents(owner, repo) # Returns [] on error | |
print("Analyzing repository content (README, Docs, Code Files)...") | |
# This relies on other methods returning sensible defaults | |
try: | |
# Call get_all_text_files outside get_repo_text_summary to pass count correctly | |
all_text_files_content, _ = self.get_all_text_files(owner, repo, max_files=30) | |
# Pass the fetched content to get_repo_text_summary to avoid redundant API calls | |
result["text_content"] = self.get_repo_text_summary(owner, repo, pre_fetched_files=all_text_files_content) | |
except Exception as e: | |
print(f"Error during text content analysis: {e}") | |
result["text_content"] = {"error": str(e)} # Store error indicator | |
print("Analyzing repository activity over time...") | |
# This relies on stats methods returning [] on error/202 timeout | |
try: | |
result["temporal_analysis"] = self.get_temporal_analysis(owner, repo) | |
except Exception as e: | |
print(f"Error during temporal analysis: {e}") | |
result["temporal_analysis"] = {"error": str(e)} # Store error indicator | |
print(f"--- Finished fetching data for {owner}/{repo} ---") | |
return result | |
# Modify get_repo_text_summary to accept pre-fetched files | |
def get_repo_text_summary(self, owner, repo, max_files=25, pre_fetched_files=None): | |
"""Extract and summarize text content from the repository with improved metrics.""" | |
# Get README | |
readme = self.get_readme(owner, repo) # Returns None on error | |
# Get documentation | |
docs = self.get_documentation_files(owner, repo) # Returns [] on error | |
# Get key code files if not provided | |
if pre_fetched_files is None: | |
print("Fetching text files within get_repo_text_summary...") | |
text_files, _ = self.get_all_text_files(owner, repo, max_files=max_files) # Returns [] on error | |
else: | |
print("Using pre-fetched text files in get_repo_text_summary.") | |
text_files = pre_fetched_files # Use the provided list | |
# Analyze code files | |
code_summary = {} | |
complexity_metrics = { | |
'cyclomatic_complexity': [], | |
'maintainability_index': [], | |
'comment_ratios': [] | |
} | |
for file in text_files: | |
# Basic check for file structure | |
if not isinstance(file, dict) or 'name' not in file or 'content' not in file or 'path' not in file: | |
print(f"Skipping malformed file data in text summary: {file}") | |
continue | |
ext = os.path.splitext(file["name"])[1].lower() | |
if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: # Add other relevant code extensions if needed | |
try: | |
file_summary = self.extract_code_summary(file["content"], file["path"]) | |
if file_summary: # Ensure summary generation didn't fail | |
code_summary[file["path"]] = file_summary | |
# Collect complexity metrics safely | |
if file_summary.get('complexity'): | |
cc = file_summary['complexity'].get('overall') | |
# Ensure cc is a number before appending | |
if isinstance(cc, (int, float)): | |
complexity_metrics['cyclomatic_complexity'].append((file["path"], cc)) | |
mi = file_summary['complexity'].get('maintainability_index') | |
# Ensure mi is a number before appending | |
if isinstance(mi, (int, float)): | |
complexity_metrics['maintainability_index'].append((file["path"], mi)) | |
if file_summary.get('metrics'): | |
comment_ratio = file_summary['metrics'].get('comment_ratio') | |
# Ensure ratio is a number before appending | |
if isinstance(comment_ratio, (int, float)): | |
complexity_metrics['comment_ratios'].append((file["path"], comment_ratio)) | |
except Exception as e_sum: | |
print(f"Error extracting code summary for {file.get('path', 'unknown file')}: {e_sum}") | |
# Analyze dependencies (can be slow, consider limiting files further if needed) | |
# Use the already fetched text_files for dependency analysis | |
dependencies = self.analyze_dependencies(owner, repo, pre_fetched_code_files=text_files) | |
# Summarize repository content by file type | |
file_types = defaultdict(int) | |
for file in text_files: | |
if isinstance(file, dict) and 'name' in file: # Check again | |
ext = os.path.splitext(file["name"])[1].lower() | |
if ext: # Avoid counting files with no extension | |
file_types[ext] += 1 | |
# Calculate aggregate code metrics safely | |
total_code_lines = 0 | |
total_comment_lines = 0 | |
analyzed_code_files = 0 | |
for path, summary in code_summary.items(): | |
if summary and summary.get('metrics'): | |
analyzed_code_files += 1 | |
total_code_lines += summary['metrics'].get('code_lines', 0) or 0 | |
total_comment_lines += summary['metrics'].get('comment_lines', 0) or 0 | |
aggregate_metrics = { | |
'total_files_analyzed': len(text_files), # All text files fetched | |
'code_files_summarized': analyzed_code_files, # Files where summary succeeded | |
'total_code_lines': total_code_lines, | |
'total_comment_lines': total_comment_lines, | |
'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0 | |
} | |
return { | |
"readme": readme, # Can be None | |
"documentation": docs, # Should be list | |
"code_summary": code_summary, # Dict of summaries | |
"complexity_metrics": complexity_metrics, # Dict of lists | |
"dependencies": dependencies, # Dict | |
"file_type_counts": dict(file_types), # Dict | |
"aggregate_metrics": aggregate_metrics, # Dict | |
"text_files": text_files # List of fetched files | |
} | |
# Modify analyze_dependencies to accept pre-fetched files | |
def analyze_dependencies(self, owner, repo, max_files=100, pre_fetched_code_files=None): | |
"""Analyze code dependencies across the repository.""" | |
if pre_fetched_code_files is None: | |
# Get Python and JavaScript files if not provided | |
print("Fetching text files within analyze_dependencies...") | |
text_files, _ = self.get_all_text_files(owner, repo, max_files=max_files) | |
# Filter for Python and JS/TS files | |
code_files = [f for f in text_files if isinstance(f, dict) and f.get("name", "").endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))] | |
else: | |
print("Using pre-fetched files in analyze_dependencies.") | |
# Assume pre_fetched_code_files are already filtered if needed, or filter here | |
code_files = [f for f in pre_fetched_code_files if isinstance(f, dict) and f.get("name", "").endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))] | |
# Track dependencies | |
dependencies = { | |
'internal': defaultdict(set), # File to file dependencies | |
'external': defaultdict(set), # External package dependencies by file | |
'modules': defaultdict(set) # Defined modules/components by file | |
} | |
# Extract module names from file paths | |
file_to_module = {} | |
for file in code_files: | |
# Add checks here too | |
if not isinstance(file, dict) or 'path' not in file or 'content' not in file: continue | |
# Convert file path to potential module name | |
module_path = os.path.splitext(file["path"])[0].replace('/', '.') | |
file_to_module[file["path"]] = module_path | |
# Track what each file defines | |
try: | |
summary = self.extract_code_summary(file["content"], file["path"]) | |
if not summary: continue # Skip if summary failed | |
if file.get("name", "").endswith('.py'): | |
for function in summary.get("functions", []): | |
# Ensure function is a string before adding | |
if isinstance(function, str): | |
dependencies['modules'][file["path"]].add(f"{module_path}.{function}") | |
for class_name in summary.get("classes", []): | |
# Ensure class_name is a string before adding | |
if isinstance(class_name, str): | |
dependencies['modules'][file["path"]].add(f"{module_path}.{class_name}") | |
else: # JS/TS files | |
for export in summary.get("exports", []): | |
# Ensure export is a string before adding | |
if isinstance(export, str): | |
dependencies['modules'][file["path"]].add(export) | |
except Exception as e_dep_mod: | |
print(f"Error processing module definitions for {file.get('path', 'unknown file')}: {e_dep_mod}") | |
# Analyze imports/dependencies | |
for file in code_files: | |
if not isinstance(file, dict) or 'path' not in file or 'content' not in file: continue | |
try: | |
summary = self.extract_code_summary(file["content"], file["path"]) | |
if not summary: continue | |
for imp in summary.get("imports", []): | |
# Ensure import is a string | |
if not isinstance(imp, str) or not imp: continue | |
# Check if this is an internal import | |
is_internal = False | |
target_dep_path = None # Store the resolved internal path | |
if file.get("name","").endswith('.py'): | |
# For Python, check if the import matches any module path | |
# Normalize potential relative imports starting with '.' | |
current_module_parts = file_to_module[file["path"]].split('.') | |
if imp.startswith('.'): | |
# Resolve relative import (basic attempt) | |
level = 0 | |
while imp.startswith('.'): | |
level += 1 | |
imp = imp[1:] | |
base_parts = current_module_parts[:-level] if level > 0 else current_module_parts[:-1] # Go up levels or stay in package | |
resolved_imp = '.'.join(base_parts + [imp] if imp else base_parts) # Handle 'from . import foo' vs 'from ..bar import baz' | |
else: | |
resolved_imp = imp # Absolute import | |
# Check against known module paths | |
for f_path, m_path in file_to_module.items(): | |
# Exact match or parent package match | |
if resolved_imp == m_path or resolved_imp.startswith(f"{m_path}."): | |
target_dep_path = f_path | |
break | |
# Check if import is trying to import a specific module file directly | |
# e.g. import mypackage.module -> check if file path matches mypackage/module.py | |
potential_file_path = resolved_imp.replace('.', '/') + '.py' | |
if potential_file_path == f_path: | |
target_dep_path = f_path | |
break | |
else: # JS/TS | |
# For JS/TS, check relative imports or alias paths (more complex, basic check here) | |
if imp.startswith('./') or imp.startswith('../') or imp.startswith('@/'): # Basic checks | |
is_internal = True # Assume internal for now | |
# Basic resolution attempt | |
src_dir = os.path.dirname(file["path"]) | |
target_path_base = os.path.normpath(os.path.join(src_dir, imp)) | |
# Try adding common extensions | |
for ext in ['.js', '.ts', '.jsx', '.tsx', '/index.js', '/index.ts']: | |
test_path = f"{target_path_base}{ext}" | |
if test_path in file_to_module: | |
target_dep_path = test_path | |
break | |
# Check path without extension too (might be dir import) | |
if target_path_base in file_to_module: | |
target_dep_path = target_path_base | |
break | |
# If a target internal path was found, add the dependency | |
if target_dep_path: | |
# Ensure the target path actually exists in our list of files | |
if target_dep_path in file_to_module: | |
dependencies['internal'][file["path"]].add(target_dep_path) | |
is_internal = True # Confirm it was internal | |
# If not internal, consider it external | |
if not is_internal: | |
# Clean up the import name (remove relative path parts, take package name) | |
# Handle scoped packages like @angular/core -> @angular/core | |
# Handle imports like 'react-dom/client' -> react-dom | |
if '/' in imp and not imp.startswith('.') and not imp.startswith('@'): | |
package_base = imp.split('/')[0] | |
elif imp.startswith('@'): | |
parts = imp.split('/') | |
package_base = '/'.join(parts[:2]) if len(parts) >= 2 else parts[0] # Keep scope like @scope/package | |
else: | |
package_base = imp | |
# Add only non-empty strings | |
if package_base: | |
dependencies['external'][file["path"]].add(package_base) | |
except Exception as e_dep_ana: | |
print(f"Error processing dependencies for {file.get('path', 'unknown file')}: {e_dep_ana}") | |
return dependencies | |
# --- get_temporal_analysis: Ensure sub-methods return [] and handle potential errors --- | |
def get_temporal_analysis(self, owner, repo): | |
"""Perform temporal analysis of repository activity.""" | |
# Get commit activity over time | |
commit_activity = self.get_commit_activity(owner, repo) or [] # Ensure list | |
# Get code frequency (additions/deletions over time) | |
code_frequency = self.get_code_frequency(owner, repo) or [] # Ensure list | |
# Get contributor activity | |
contributor_activity = self.get_contributor_activity(owner, repo) or [] # Ensure list | |
# Get issue and PR timelines (These methods already return dicts with lists/values) | |
# Add error handling around the calls themselves | |
try: | |
issue_timeline = self.get_issue_timeline(owner, repo) | |
except Exception as e: | |
print(f"Error getting issue timeline: {e}") | |
issue_timeline = {} # Default empty dict | |
try: | |
pr_timeline = self.get_pr_timeline(owner, repo) | |
except Exception as e: | |
print(f"Error getting PR timeline: {e}") | |
pr_timeline = {} # Default empty dict | |
# Process data for visualization safely | |
# - Weekly commit counts | |
weekly_commits = [] | |
if isinstance(commit_activity, list): # Check if list | |
for week in commit_activity: | |
# Check if item is a dict with expected keys | |
if isinstance(week, dict) and 'week' in week and 'total' in week and 'days' in week: | |
try: | |
date = datetime.fromtimestamp(week['week']) | |
weekly_commits.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'total': int(week['total']), # Ensure integer | |
'days': week['days'] # Daily breakdown within the week | |
}) | |
except (TypeError, ValueError) as e: | |
print(f"Skipping invalid commit activity week data: {week}, Error: {e}") | |
else: | |
print(f"Skipping malformed commit activity week data: {week}") | |
else: | |
print(f"Warning: Commit activity data is not a list: {type(commit_activity)}") | |
# - Weekly code changes | |
weekly_code_changes = [] | |
if isinstance(code_frequency, list): # Check if list | |
for item in code_frequency: | |
# Check if item is a list/tuple of 3 numbers | |
if isinstance(item, (list, tuple)) and len(item) == 3: | |
try: | |
date = datetime.fromtimestamp(item[0]) | |
additions = int(item[1]) | |
deletions = int(item[2]) # Keep positive for calculation | |
weekly_code_changes.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'additions': additions, | |
'deletions': deletions # Store as positive deletions | |
}) | |
except (TypeError, ValueError, IndexError) as e: | |
print(f"Skipping invalid code frequency data: {item}, Error: {e}") | |
else: | |
print(f"Skipping malformed code frequency data: {item}") | |
else: | |
print(f"Warning: Code frequency data is not a list: {type(code_frequency)}") | |
# - Contributor timeline | |
contributor_timeline = {} | |
if isinstance(contributor_activity, list): # Check if list | |
for contributor in contributor_activity: | |
# Check structure | |
if (isinstance(contributor, dict) and | |
'author' in contributor and isinstance(contributor['author'], dict) and 'login' in contributor['author'] and | |
'weeks' in contributor and isinstance(contributor['weeks'], list)): | |
author = contributor['author']['login'] | |
weeks_data = contributor['weeks'] | |
if author not in contributor_timeline: | |
contributor_timeline[author] = [] | |
for week in weeks_data: | |
# Check week structure and values | |
if (isinstance(week, dict) and all(k in week for k in ['w', 'c', 'a', 'd']) and | |
isinstance(week['c'], int) and week['c'] >= 0): # Check commit count is valid non-negative int | |
if week['c'] > 0: # Only include weeks with commits | |
try: | |
date = datetime.fromtimestamp(week['w']) | |
contributor_timeline[author].append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'commits': int(week['c']), | |
'additions': int(week['a']), | |
'deletions': int(week['d']) | |
}) | |
except (TypeError, ValueError) as e: | |
print(f"Skipping invalid contributor week data for {author}: {week}, Error: {e}") | |
# No else needed, just skip malformed week data silently or add print if desired | |
else: | |
print(f"Skipping malformed contributor activity data: {contributor}") | |
else: | |
print(f"Warning: Contributor activity data is not a list: {type(contributor_activity)}") | |
# Ensure issue/pr timelines are dicts before returning | |
issue_timeline = issue_timeline if isinstance(issue_timeline, dict) else {} | |
pr_timeline = pr_timeline if isinstance(pr_timeline, dict) else {} | |
return { | |
'weekly_commits': weekly_commits, # List | |
'weekly_code_changes': weekly_code_changes, # List | |
'contributor_timeline': contributor_timeline, # Dict | |
'issue_timeline': issue_timeline, # Dict | |
'pr_timeline': pr_timeline # Dict | |
} | |
# --- Pull Request Details (Ensure PyGithub is checked) --- | |
def get_pull_request_details(self, owner, repo, pr_number): | |
"""Get detailed information for a specific Pull Request using PyGithub.""" | |
if not self.github: # Check if PyGithub client was initialized | |
print("PyGithub client not initialized or installed. Cannot fetch PR details.") | |
# Fallback maybe? Try direct REST call if needed | |
# For now, return None | |
return None | |
try: | |
# Ensure owner/repo are strings and pr_number is int | |
if not isinstance(owner, str) or not isinstance(repo, str): | |
raise ValueError("Owner and repo must be strings.") | |
pr_number = int(pr_number) | |
repo_obj = self.github.get_repo(f"{owner}/{repo}") | |
pr = repo_obj.get_pull(pr_number) | |
# Extract relevant information into a dictionary safely | |
details = { | |
"number": pr.number, | |
"title": pr.title or "N/A", | |
"state": pr.state or "N/A", # 'open', 'closed' | |
"merged": pr.merged or False, | |
"body": pr.body or "", # Ensure body is string | |
"url": pr.html_url or "N/A", | |
"created_at": pr.created_at.isoformat() if pr.created_at else None, | |
"updated_at": pr.updated_at.isoformat() if pr.updated_at else None, | |
"closed_at": pr.closed_at.isoformat() if pr.closed_at else None, | |
"merged_at": pr.merged_at.isoformat() if pr.merged_at else None, | |
"author": pr.user.login if pr.user else "N/A", | |
"commits_count": pr.commits if pr.commits is not None else 0, | |
"additions": pr.additions if pr.additions is not None else 0, | |
"deletions": pr.deletions if pr.deletions is not None else 0, | |
"changed_files_count": pr.changed_files if pr.changed_files is not None else 0, | |
"labels": [label.name for label in pr.labels] if pr.labels else [], | |
"assignees": [assignee.login for assignee in pr.assignees] if pr.assignees else [], | |
"milestone": pr.milestone.title if pr.milestone else None, | |
"repo_full_name": f"{owner}/{repo}", # Add repo context | |
# Add more fields if needed (e.g., comments, reviews) | |
} | |
return details | |
except GithubException as e: | |
if e.status == 404: | |
print(f"Error: Pull Request #{pr_number} not found in {owner}/{repo}.") | |
elif e.status == 401: | |
print(f"Error: Unauthorized (401). Check your GitHub token permissions for {owner}/{repo}.") | |
elif e.status == 403: | |
print(f"Error: Forbidden (403). Check token permissions or rate limits for {owner}/{repo}.") | |
else: | |
print(f"GitHub API Error fetching PR #{pr_number} details: Status={e.status}, Data={e.data}") | |
return None | |
except ValueError as e: # Catch potential int conversion error | |
print(f"Error: Invalid PR number '{pr_number}'. Must be an integer. {e}") | |
return None | |
except Exception as e: # Catch any other unexpected errors | |
print(f"An unexpected error occurred fetching PR details for #{pr_number}: {e}") | |
return None | |
# --- Colab Helpers (Keep as provided) --- | |
try: | |
from google.colab import files | |
IN_COLAB = True | |
except ImportError: | |
IN_COLAB = False | |
# ...(keep download_file and save_json_to_colab functions)... | |
# Use the provided robust JSON helpers | |
class CustomJSONEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, set): | |
return list(obj) | |
elif isinstance(obj, (datetime, np.datetime64)): | |
# Handle both standard datetime and numpy datetime64 | |
if isinstance(obj, np.datetime64): | |
# Convert numpy datetime64 to standard datetime | |
ts = pd.to_datetime(obj) | |
# Ensure it's offset-naive or convert to UTC for ISO format | |
if ts.tzinfo is not None: | |
ts = ts.tz_convert(None) # Make naive if aware | |
return ts.isoformat() | |
# Ensure standard datetime is offset-naive or convert to UTC | |
if obj.tzinfo is not None: | |
obj = obj.astimezone(timezone.utc).replace(tzinfo=None) # Convert to UTC and make naive | |
return obj.isoformat() | |
elif isinstance(obj, (np.int64, np.int32)): | |
return int(obj) | |
elif isinstance(obj, (np.float64, np.float32)): | |
return float(obj) | |
elif isinstance(obj, np.bool_): # Handle numpy bool | |
return bool(obj) | |
elif isinstance(obj, np.ndarray): # Handle numpy arrays | |
return obj.tolist() # Convert to list | |
# Be careful with complex objects, might expose too much or fail | |
# Let the base class default method raise the TypeError for others | |
try: | |
return super(CustomJSONEncoder, self).default(obj) | |
except TypeError: | |
return str(obj) # Fallback to string representation for unknown types | |
def convert_sets_to_lists(obj): | |
# Recursive function to convert sets and handle numpy types | |
if isinstance(obj, dict): | |
return {k: convert_sets_to_lists(v) for k, v in obj.items()} | |
elif isinstance(obj, list): | |
return [convert_sets_to_lists(i) for i in obj] | |
elif isinstance(obj, set): | |
# Convert set elements as well | |
return [convert_sets_to_lists(i) for i in sorted(list(obj))] # Sort for consistent output | |
elif isinstance(obj, tuple): | |
return tuple(convert_sets_to_lists(i) for i in obj) | |
# Handle numpy types specifically | |
elif isinstance(obj, (np.int64, np.int32, np.int_)): | |
return int(obj) | |
elif isinstance(obj, (np.float64, np.float32, np.float_)): | |
return float(obj) | |
elif isinstance(obj, np.datetime64): | |
# Consistent conversion to ISO format string (naive UTC) | |
ts = pd.to_datetime(obj).to_pydatetime() # Convert to standard datetime | |
if ts.tzinfo is not None: | |
ts = ts.astimezone(timezone.utc).replace(tzinfo=None) | |
return ts.isoformat() + "Z" # Add Z for UTC indication | |
elif isinstance(obj, np.bool_): | |
return bool(obj) | |
elif isinstance(obj, np.ndarray): | |
return convert_sets_to_lists(obj.tolist()) # Convert numpy arrays to lists | |
elif isinstance(obj, pd.Timestamp): # Handle Pandas Timestamp | |
ts = obj.to_pydatetime() | |
if ts.tzinfo is not None: | |
ts = ts.astimezone(timezone.utc).replace(tzinfo=None) | |
return ts.isoformat() + "Z" | |
elif isinstance(obj, datetime): # Handle standard datetime | |
if obj.tzinfo is not None: | |
obj = obj.astimezone(timezone.utc).replace(tzinfo=None) | |
return obj.isoformat() + "Z" | |
else: | |
# Attempt to handle other non-serializable types gracefully | |
# Test if the specific object is serializable by default | |
try: | |
json.dumps(obj) # Test serialization | |
return obj # Return as is if serializable | |
except TypeError: | |
# If not serializable by default, convert to string as a fallback | |
print(f"Warning: Converting non-serializable type {type(obj)} to string.") | |
return str(obj) | |
def save_json_to_colab(data, filename='/content/repo_info.json'): | |
"""Save JSON data to a file in Colab and provide download option.""" | |
# Deep conversion to handle nested structures and numpy types | |
try: | |
print("Converting data for JSON serialization...") | |
converted_data = convert_sets_to_lists(data) | |
print("Conversion complete. Saving JSON...") | |
except Exception as e: | |
print(f"Error during data conversion for JSON: {e}") | |
print("Attempting to save raw data (might fail)...") | |
# Fallback to trying without full conversion, might still fail | |
converted_data = data | |
try: | |
with open(filename, 'w', encoding='utf-8') as f: | |
# Use the custom encoder for any remaining types if conversion missed something | |
json.dump(converted_data, f, indent=2, cls=CustomJSONEncoder, ensure_ascii=False) | |
print(f"Data successfully saved to {filename}") | |
if IN_COLAB: | |
try: | |
print("To download the JSON file in Colab, run the following cell:") | |
print(f"from google.colab import files") | |
print(f"files.download('{filename}')") | |
except NameError: # files might not be imported if not in Colab context truly | |
pass | |
except TypeError as e: | |
print(f"Error saving JSON: {e}") | |
print("There might be non-serializable data types remaining even after conversion attempt.") | |
print("Consider inspecting the data structure for problematic types.") | |
except Exception as e: | |
print(f"An unexpected error occurred during JSON saving: {e}") | |
# --- GraphRepoAnalyzer Class (Check initializations and data access) --- | |
class GraphRepoAnalyzer: | |
"""Integrates GitHub analysis with Neo4j and Gemini.""" | |
def __init__(self, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None): | |
"""Initialize with credentials.""" | |
load_dotenv() # Load .env file if it exists | |
self.github_token = github_token or os.getenv("GITHUB_TOKEN") | |
self.neo4j_uri = neo4j_uri or os.getenv("NEO4J_URI") | |
self.neo4j_user = neo4j_user or os.getenv("NEO4J_USERNAME") | |
self.neo4j_password = neo4j_password or os.getenv("NEO4J_PASSWORD") | |
self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY") | |
# Initialize github_analyzer using the potentially updated GitHubRepoInfo | |
# Pass the token directly | |
print("Initializing GitHubRepoInfo...") | |
self.github_analyzer = GitHubRepoInfo(token=self.github_token) | |
print("GitHubRepoInfo initialized.") | |
self.neo4j_driver = None | |
# Check if Neo4j library was imported | |
if GraphDatabase and basic_auth and all([self.neo4j_uri, self.neo4j_user, self.neo4j_password]): | |
try: | |
print(f"Attempting to connect to Neo4j at {self.neo4j_uri}...") | |
# Use basic_auth for Neo4j driver authentication | |
self.neo4j_driver = GraphDatabase.driver(self.neo4j_uri, auth=basic_auth(self.neo4j_user, self.neo4j_password)) | |
self.neo4j_driver.verify_connectivity() | |
print("Successfully connected to Neo4j.") | |
self._create_neo4j_constraints() | |
except Exception as e: | |
print(f"Error connecting to Neo4j: {e}") | |
print("Graph features will be disabled.") | |
self.neo4j_driver = None | |
else: | |
if not (GraphDatabase and basic_auth): | |
print("Neo4j library not installed. Graph features disabled.") | |
else: | |
print("Warning: Neo4j credentials not fully provided or library missing. Graph features will be disabled.") | |
self.gemini_model = None | |
# Check if Gemini library was imported | |
if genai and self.gemini_api_key: | |
try: | |
print("Configuring Google Generative AI...") | |
genai.configure(api_key=self.gemini_api_key) | |
# Use a known stable model, check Gemini docs for latest recommended models | |
# 'gemini-1.5-flash-latest' is often a good balance | |
# model_name = 'gemini-1.5-flash-latest' | |
# Let's stick to the user's specified model if possible, fallback otherwise | |
model_name = 'gemini-1.5-pro-latest' # User's original choice in one definition | |
# Check if the model exists (basic check) | |
# available_models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods] | |
# if model_name not in available_models: | |
# print(f"Warning: Model '{model_name}' not found or doesn't support generateContent. Trying 'gemini-1.5-flash-latest'.") | |
# model_name = 'gemini-1.5-flash-latest' | |
# if model_name not in available_models: | |
# print("Error: Could not find a suitable Gemini model.") | |
# raise ValueError("No suitable Gemini model found.") | |
print(f"Initializing Gemini model: {model_name}") | |
self.gemini_model = genai.GenerativeModel(model_name) | |
# Test call (optional, might consume quota) | |
# self.gemini_model.generate_content("Test") | |
print("Gemini model initialized.") | |
except Exception as e: | |
print(f"Error initializing Gemini: {e}") | |
self.gemini_model = None | |
else: | |
if not genai: | |
print("Google Generative AI library not installed. Gemini features disabled.") | |
else: | |
print("Warning: Google API Key not provided or library missing. Gemini features will be disabled.") | |
self.repo_data = None | |
self.repo_full_name = None # Store repo name for context | |
self.owner = None # Store owner | |
self.repo = None # Store repo name | |
# ... (rest of GraphRepoAnalyzer methods, ensure self.repo_data is checked before use) ... | |
# --- analyze_repo: Ensure it handles None return from get_all_info --- | |
def analyze_repo(self, owner, repo, display=True, save_json=False, export_text=False): | |
"""Fetch, analyze, display, and optionally populate graph.""" | |
# Validate inputs | |
if not owner or not isinstance(owner, str): | |
print("Error: Repository owner must be provided as a string.") | |
self.repo_data = None | |
return # Stop processing | |
if not repo or not isinstance(repo, str): | |
print("Error: Repository name must be provided as a string.") | |
self.repo_data = None | |
return # Stop processing | |
self.owner = owner.strip() | |
self.repo = repo.strip() | |
self.repo_full_name = f"{self.owner}/{self.repo}" | |
print(f"\n--- Starting Analysis for {self.repo_full_name} ---") | |
# Reset previous data | |
self.repo_data = None | |
# Use the github_analyzer instance associated with this GraphRepoAnalyzer | |
if not self.github_analyzer: | |
print("Error: GitHubRepoInfo analyzer not initialized.") | |
return | |
try: | |
self.repo_data = self.github_analyzer.get_all_info(self.owner, self.repo) | |
except Exception as e: | |
print(f"An unexpected error occurred during get_all_info: {e}") | |
import traceback | |
traceback.print_exc() # Print stack trace for debugging | |
self.repo_data = None # Ensure repo_data is None on error | |
# Check if analysis succeeded and returned data | |
if self.repo_data and isinstance(self.repo_data, dict) and "basic_info" in self.repo_data: | |
print(f"--- Analysis Complete for {self.repo_full_name} ---") | |
# Proceed with display, save, export, populate etc. | |
if display and IPYTHON_AVAILABLE: # Only display if in IPython environment | |
print("\nGenerating visualizations and analysis (requires IPython environment)...") | |
try: | |
# Wrap display calls in try/except as they can fail with odd data | |
self.github_analyzer.display_repo_info(self.repo_data) | |
self.github_analyzer.display_code_files(self.repo_data) # Show code preview | |
except Exception as display_error: | |
print(f"Error during display generation: {display_error}") | |
elif display and not IPYTHON_AVAILABLE: | |
print("\nSkipping visualizations: Not in an IPython environment (like Colab or Jupyter).") | |
if self.neo4j_driver: | |
try: | |
# Use Gradio input later, for script execution use environment variable or fixed logic | |
populate_graph = os.getenv("POPULATE_NEO4J", "false").lower() == 'true' | |
# populate = input("\nPopulate Neo4j graph with this data? (y/n): ").lower() == 'y' | |
if populate_graph: | |
print("\nAttempting to populate Neo4j graph...") | |
self.populate_neo4j_graph() | |
else: | |
print("\nSkipping Neo4j population.") | |
except Exception as neo4j_error: | |
print(f"Error during Neo4j interaction prompt or population: {neo4j_error}") | |
if save_json: | |
# Use fixed path or environment variable for non-interactive saving | |
default_filename = f'./{self.repo}_info.json' | |
filename = os.getenv("JSON_OUTPUT_PATH", default_filename) | |
# filename = input(f"Enter filename for JSON output (default: {default_filename}): ") or default_filename | |
print(f"\nSaving analysis results to JSON: {filename}") | |
save_json_to_colab(self.repo_data, filename) # Use the enhanced save function | |
if export_text: | |
# Use fixed path or environment variable for non-interactive saving | |
default_dir = f'./{self.repo}_text' | |
output_dir = os.getenv("TEXT_EXPORT_DIR", default_dir) | |
# output_dir = input(f"Enter output directory for text export (default: {default_dir}): ") or default_dir | |
print(f"\nExporting text content to directory: {output_dir}") | |
self.github_analyzer.export_repo_text(self.repo_data, output_dir) | |
else: | |
# This case handles where get_all_info returned None or an invalid structure | |
print(f"--- Failed to get complete repository information for {self.repo_full_name} ---") | |
# self.repo_data is already None or invalid | |
# --- summarize_pull_request: Add checks --- | |
def summarize_pull_request(self, pr_number_str, role): | |
"""Fetches PR details and generates a role-based summary using Gemini.""" | |
if not self.gemini_model: | |
return "Gemini model not initialized. Cannot generate summary." | |
if not self.owner or not self.repo: | |
return "Repository owner and name not set. Analyze a repository first or provide them." | |
if not self.github_analyzer: | |
return "GitHub Analyzer not initialized." | |
# Validate PR number | |
try: | |
pr_number = int(pr_number_str) | |
except (ValueError, TypeError): | |
return f"Invalid Pull Request number: '{pr_number_str}'. Please provide an integer." | |
# Validate Role | |
valid_roles = ["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"] | |
if role not in valid_roles: | |
return f"Invalid role: '{role}'. Please choose from: {', '.join(valid_roles)}" | |
print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...") | |
# get_pull_request_details handles its own errors and returns None on failure | |
pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number) | |
if not pr_details: | |
# Error message was already printed by get_pull_request_details | |
return f"Could not retrieve details for PR #{pr_number}. See previous error messages." | |
print(f"Generating summary for role: {role}...") | |
# Generate the role-specific prompt | |
try: | |
prompt = self._get_pr_summary_prompt(pr_details, role) | |
except Exception as e: | |
print(f"Error generating Gemini prompt: {e}") | |
return "Error preparing the summary request." | |
# Send to Gemini and Get Response | |
try: | |
# print("--- Sending Prompt to Gemini ---") | |
# print(prompt[:1000] + "..." if len(prompt) > 1000 else prompt) # Debug: Print truncated prompt | |
# print("-----------------------------") | |
# Use safety_settings to reduce refusals for code-related content if needed | |
# safety_settings = [ | |
# {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
# {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
# {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
# {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE"}, | |
# ] | |
# response = self.gemini_model.generate_content(prompt, safety_settings=safety_settings) | |
response = self.gemini_model.generate_content(prompt) | |
# Check for empty or blocked response | |
if not response.parts: | |
# Check prompt feedback for blockage reason | |
block_reason = response.prompt_feedback.block_reason if response.prompt_feedback else "Unknown" | |
print(f"Warning: Gemini response was empty or blocked. Reason: {block_reason}") | |
return f"Summary generation failed. The request may have been blocked (Reason: {block_reason})." | |
summary_text = response.text | |
print("\n--- Gemini PR Summary ---") | |
# Don't use display(Markdown()) here as it might not work outside notebooks | |
# Return the raw text for Gradio Markdown component | |
print(summary_text) # Print to console as well | |
print("------------------------") | |
return summary_text # Return raw text | |
except Exception as e: | |
print(f"Error communicating with Gemini for PR summary: {e}") | |
return f"Error asking Gemini: {e}" | |
# --- create_vizro_dashboard: Add robust data checks --- | |
def create_vizro_dashboard(self, output_dir='./vizro_dashboard'): | |
"""Create a Vizro dashboard from repository data.""" | |
# Check if Vizro is installed | |
if not vzm or not px or not go: | |
print("Vizro/Plotly not installed. Cannot create dashboard.") | |
return None | |
# Check if data exists and is minimally valid | |
if not self.repo_data or not isinstance(self.repo_data, dict) or not self.repo_data.get("basic_info"): | |
print("No valid repository data available. Run analyze_repo() first.") | |
return None | |
print("Creating Vizro dashboard...") | |
# Create output directory if it doesn't exist | |
try: | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
except OSError as e: | |
print(f"Error creating output directory {output_dir}: {e}") | |
return None # Cannot proceed without output dir | |
# --- Safely Extract Data --- | |
basic_info = self.repo_data.get("basic_info", {}) # Default to empty dict | |
repo_name = basic_info.get("full_name", "Unknown Repo") | |
languages_data = self.repo_data.get("languages") # Can be None or {} | |
contributors_data = self.repo_data.get("contributors") # Can be None or [] | |
temporal_analysis = self.repo_data.get("temporal_analysis", {}) # Default to {} | |
text_content = self.repo_data.get("text_content", {}) # Default to {} | |
dependencies_data = text_content.get("dependencies", {}) # Default to {} | |
complexity_metrics = text_content.get("complexity_metrics", {}) # Default to {} | |
# --- Create dashboard pages --- | |
pages = [] | |
all_components = {} # Store components to avoid duplicates if needed | |
# --- 1. Overview Page --- | |
print("Building Overview Page...") | |
overview_components = [] | |
try: | |
# Basic repository info card - use .get for safety | |
repo_info_md = f""" | |
# {basic_info.get('full_name', 'N/A')} | |
**Description:** {basic_info.get('description', 'No description')} | |
**Stars:** {basic_info.get('stargazers_count', 'N/A')} | | |
**Forks:** {basic_info.get('forks_count', 'N/A')} | | |
**Open Issues:** {basic_info.get('open_issues_count', 'N/A')} | |
**Created:** {basic_info.get('created_at', 'N/A')} | | |
**Last Updated:** {basic_info.get('updated_at', 'N/A')} | |
**Default Branch:** {basic_info.get('default_branch', 'N/A')} | |
**License:** {basic_info.get('license', {}).get('name', 'Not specified')} | |
""" # Safe access for license | |
overview_components.append(vzm.Card(text=repo_info_md, title="Repository Info")) | |
all_components['repo_info_card'] = overview_components[-1] | |
# Languages pie chart | |
if isinstance(languages_data, dict) and languages_data: | |
langs_prep_data = [] | |
total_bytes = sum(v for v in languages_data.values() if isinstance(v, (int, float))) | |
if total_bytes > 0: | |
for lang, bytes_count in languages_data.items(): | |
if isinstance(bytes_count, (int, float)) and bytes_count > 0: | |
percentage = (bytes_count / total_bytes) * 100 | |
langs_prep_data.append({ | |
"Language": str(lang), # Ensure string | |
"Bytes": bytes_count, | |
"Percentage": percentage | |
}) | |
if langs_prep_data: # Check if we have data to plot | |
langs_df = pd.DataFrame(langs_prep_data) | |
lang_pie_fig = px.pie( | |
langs_df, | |
values="Percentage", | |
names="Language", | |
title="Language Distribution" | |
) | |
lang_pie = vzm.Graph(figure=lang_pie_fig) | |
overview_components.append(vzm.Card(children=[lang_pie])) # Wrap graph in children list | |
all_components['lang_pie'] = overview_components[-1] | |
else: | |
print("No valid language data to plot.") | |
else: | |
print("Language data present but total bytes are zero or invalid.") | |
else: | |
print("No language data found or data is not a dictionary.") | |
# Contributors bar chart | |
if isinstance(contributors_data, list) and contributors_data: | |
contrib_prep_data = [] | |
for contributor in contributors_data[:15]: # Limit display | |
if isinstance(contributor, dict) and 'login' in contributor and 'contributions' in contributor: | |
contrib_prep_data.append({ | |
"Username": str(contributor['login']), # Ensure string | |
"Contributions": int(contributor['contributions']) # Ensure int | |
}) | |
if contrib_prep_data: # Check if data to plot | |
contrib_df = pd.DataFrame(contrib_prep_data) | |
contrib_bar_fig = px.bar( | |
contrib_df, | |
x="Username", | |
y="Contributions", | |
title="Top Contributors" | |
) | |
contrib_bar = vzm.Graph(figure=contrib_bar_fig) | |
overview_components.append(vzm.Card(children=[contrib_bar])) # Wrap graph in children list | |
all_components['contrib_bar'] = overview_components[-1] | |
else: | |
print("No valid contributor data to plot.") | |
else: | |
print("No contributor data found or data is not a list.") | |
# Add overview page if components exist | |
if overview_components: | |
pages.append( | |
vzm.Page( | |
title="Overview", | |
components=overview_components, | |
path="overview" # Add unique path | |
) | |
) | |
else: | |
print("Skipping Overview page: No components generated.") | |
except Exception as e: | |
print(f"Error building Overview page: {e}") | |
# Optionally add an error card to the dashboard | |
# overview_components.append(vzm.Card(text=f"Error building overview: {e}")) | |
# --- 2. Activity Page --- | |
print("Building Activity Page...") | |
activity_components = [] | |
try: | |
# Commit activity over time | |
weekly_commits = temporal_analysis.get("weekly_commits", []) | |
if isinstance(weekly_commits, list) and weekly_commits: | |
commits_prep_data = [] | |
for week in weekly_commits: | |
if isinstance(week, dict) and 'date' in week and 'total' in week: | |
try: | |
# Validate date and convert total to int | |
date_val = pd.to_datetime(week['date']) | |
commits_val = int(week['total']) | |
commits_prep_data.append({"Date": date_val, "Commits": commits_val}) | |
except (ValueError, TypeError): | |
continue # Skip invalid entries | |
if commits_prep_data: | |
commits_df = pd.DataFrame(commits_prep_data) | |
if not commits_df.empty: | |
commits_line_fig = px.line( | |
commits_df, | |
x="Date", | |
y="Commits", | |
title="Weekly Commit Activity" | |
) | |
commits_line = vzm.Graph(figure=commits_line_fig) | |
activity_components.append(vzm.Card(children=[commits_line])) | |
all_components['commits_line'] = activity_components[-1] | |
else: | |
print("No valid commit data to plot.") | |
else: | |
print("No weekly commit data found or data is not a list.") | |
# Code changes over time | |
weekly_code_changes = temporal_analysis.get("weekly_code_changes", []) | |
if isinstance(weekly_code_changes, list) and weekly_code_changes: | |
changes_prep_data = [] | |
for week in weekly_code_changes: | |
if isinstance(week, dict) and 'date' in week and 'additions' in week and 'deletions' in week: | |
try: | |
date_val = pd.to_datetime(week['date']) | |
additions_val = int(week['additions']) | |
deletions_val = int(week['deletions']) | |
changes_prep_data.append({ | |
"Date": date_val, | |
"Additions": additions_val, | |
"Deletions": -abs(deletions_val) # Make negative for relative bar chart | |
}) | |
except (ValueError, TypeError): | |
continue # Skip invalid entries | |
if changes_prep_data: | |
changes_df = pd.DataFrame(changes_prep_data) | |
if not changes_df.empty: | |
changes_fig = go.Figure() | |
changes_fig.add_trace(go.Bar( | |
x=changes_df["Date"], y=changes_df["Additions"], name="Additions", marker_color="green" | |
)) | |
changes_fig.add_trace(go.Bar( | |
x=changes_df["Date"], y=changes_df["Deletions"], name="Deletions", marker_color="red" | |
)) | |
changes_fig.update_layout(title="Weekly Code Changes", barmode="relative", xaxis_title="Date", yaxis_title="Lines Changed") | |
changes_chart = vzm.Graph(figure=changes_fig) | |
activity_components.append(vzm.Card(children=[changes_chart])) | |
all_components['changes_chart'] = activity_components[-1] | |
else: | |
print("No valid code change data to plot.") | |
else: | |
print("No weekly code change data found or data is not a list.") | |
# Issue resolution times | |
issue_timeline = temporal_analysis.get("issue_timeline", {}) | |
if isinstance(issue_timeline, dict): | |
resolution_times = issue_timeline.get('resolution_times', []) | |
if isinstance(resolution_times, list) and resolution_times: | |
# Convert to hours safely, cap at one week (168 hours) | |
rt_hours = [] | |
for rt in resolution_times: | |
if isinstance(rt, (int, float)) and rt >= 0: | |
rt_hours.append(min(rt, 168)) | |
if rt_hours: # Check if we have valid data after cleaning | |
rt_hours_array = np.array(rt_hours) # For numpy functions | |
issue_resolution_fig = px.histogram( | |
x=rt_hours_array, | |
title="Issue Resolution Times (Capped at 1 Week)", | |
labels={"x": "Hours to Resolution"} | |
) | |
mean_rt = np.mean(rt_hours_array) | |
median_rt = np.median(rt_hours_array) | |
issue_resolution_fig.add_vline(x=mean_rt, line_dash="dash", line_color="red", annotation_text=f"Mean: {mean_rt:.2f} hrs") | |
issue_resolution_fig.add_vline(x=median_rt, line_dash="dash", line_color="green", annotation_text=f"Median: {median_rt:.2f} hrs") | |
resolution_hist = vzm.Graph(figure=issue_resolution_fig) | |
activity_components.append(vzm.Card(children=[resolution_hist])) | |
all_components['issue_res_hist'] = activity_components[-1] | |
else: | |
print("No valid numeric issue resolution times found.") | |
else: | |
print("No issue resolution times found or data is not a list.") | |
else: | |
print("Issue timeline data is not a dictionary.") | |
# Add activity page if components exist | |
if activity_components: | |
pages.append( | |
vzm.Page( | |
title="Activity", | |
components=activity_components, | |
path="activity" # Add unique path | |
) | |
) | |
else: | |
print("Skipping Activity page: No components generated.") | |
except Exception as e: | |
print(f"Error building Activity page: {e}") | |
# --- 3. Code Quality Page --- | |
print("Building Code Quality Page...") | |
code_components = [] | |
try: | |
# Code complexity metrics | |
cyclomatic_complexity = complexity_metrics.get("cyclomatic_complexity", []) | |
if isinstance(cyclomatic_complexity, list) and cyclomatic_complexity: | |
complexity_prep_data = [] | |
for item in cyclomatic_complexity: | |
if isinstance(item, (list, tuple)) and len(item) == 2: | |
path, cc = item | |
if isinstance(path, str) and isinstance(cc, (int, float)): | |
complexity_prep_data.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Complexity": cc | |
}) | |
if complexity_prep_data: | |
complexity_prep_data.sort(key=lambda x: x["Complexity"], reverse=True) | |
top_complex_files = complexity_prep_data[:15] # Show top 15 | |
complex_df = pd.DataFrame(top_complex_files) | |
if not complex_df.empty: | |
complex_bar_fig = px.bar( | |
complex_df, x="File", y="Complexity", title="Most Complex Files (Top 15)", hover_data=["Path"] | |
) | |
complex_bar = vzm.Graph(figure=complex_bar_fig) | |
code_components.append(vzm.Card(children=[complex_bar])) | |
all_components['complex_bar'] = code_components[-1] | |
# Complexity histogram (using all valid data) | |
cc_values = [d["Complexity"] for d in complexity_prep_data] | |
if cc_values: | |
cc_hist_fig = px.histogram( | |
x=cc_values, title="Cyclomatic Complexity Distribution", labels={"x": "Complexity"} | |
) | |
cc_hist = vzm.Graph(figure=cc_hist_fig) | |
code_components.append(vzm.Card(children=[cc_hist])) | |
all_components['cc_hist'] = code_components[-1] | |
else: | |
print("No valid cyclomatic complexity data found.") | |
else: | |
print("No cyclomatic complexity data found or data is not a list.") | |
# Comment ratio by file | |
comment_ratios = complexity_metrics.get("comment_ratios", []) | |
if isinstance(comment_ratios, list) and comment_ratios: | |
comment_prep_data = [] | |
for item in comment_ratios: | |
if isinstance(item, (list, tuple)) and len(item) == 2: | |
path, ratio = item | |
if isinstance(path, str) and isinstance(ratio, (int, float)) and ratio >= 0: | |
comment_prep_data.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Comment Ratio": ratio | |
}) | |
if comment_prep_data: | |
comment_prep_data.sort(key=lambda x: x["Comment Ratio"], reverse=True) | |
top_commented_files = comment_prep_data[:15] # Show top 15 | |
comment_df = pd.DataFrame(top_commented_files) | |
if not comment_df.empty: | |
comment_bar_fig = px.bar( | |
comment_df, x="File", y="Comment Ratio", title="Files with Highest Comment Ratio (Top 15)", hover_data=["Path"] | |
) | |
comment_bar = vzm.Graph(figure=comment_bar_fig) | |
code_components.append(vzm.Card(children=[comment_bar])) | |
all_components['comment_bar'] = code_components[-1] | |
else: | |
print("No valid comment ratio data found.") | |
else: | |
print("No comment ratio data found or data is not a list.") | |
# Add code quality page if components exist | |
if code_components: | |
pages.append( | |
vzm.Page( | |
title="Code Quality", | |
components=code_components, | |
path="code_quality" # Add unique path | |
) | |
) | |
else: | |
print("Skipping Code Quality page: No components generated.") | |
except Exception as e: | |
print(f"Error building Code Quality page: {e}") | |
# --- 4. Dependencies Page --- | |
print("Building Dependencies Page...") | |
dependencies_components = [] | |
try: | |
# External dependencies | |
external_deps = dependencies_data.get("external", {}) | |
if isinstance(external_deps, dict) and external_deps: | |
ext_counts = Counter() | |
for file_path, deps_set in external_deps.items(): | |
if isinstance(deps_set, (set, list)): # Handle set or list | |
for dep in deps_set: | |
if isinstance(dep, str): # Ensure dep is string | |
ext_counts[dep] += 1 | |
if ext_counts: | |
top_deps = ext_counts.most_common(15) # Show top 15 | |
deps_prep_data = [{"Package": pkg, "Count": count} for pkg, count in top_deps] | |
deps_df = pd.DataFrame(deps_prep_data) | |
if not deps_df.empty: | |
deps_bar_fig = px.bar( | |
deps_df, x="Package", y="Count", title="Most Used External Dependencies (Top 15)" | |
) | |
deps_bar = vzm.Graph(figure=deps_bar_fig) | |
dependencies_components.append(vzm.Card(children=[deps_bar])) | |
all_components['deps_bar'] = dependencies_components[-1] | |
else: | |
print("No external dependency data counted.") | |
else: | |
print("No external dependency data found or data is not a dictionary.") | |
# Internal dependencies graph (only for smaller graphs) | |
internal_deps = dependencies_data.get("internal", {}) | |
if isinstance(internal_deps, dict) and internal_deps: | |
num_nodes_internal = len(set(internal_deps.keys()) | set(d for deps in internal_deps.values() for d in deps)) | |
if num_nodes_internal <= 75: # Increased limit slightly | |
print(f"Attempting internal dependency graph ({num_nodes_internal} nodes)...") | |
try: | |
# Create NetworkX graph | |
G = nx.DiGraph() | |
nodes_added = set() | |
for source, targets in internal_deps.items(): | |
if isinstance(source, str): | |
source_name = os.path.basename(source) | |
if source not in nodes_added: | |
G.add_node(source, name=source_name) | |
nodes_added.add(source) | |
if isinstance(targets, (set, list)): | |
for target in targets: | |
if isinstance(target, str): | |
target_name = os.path.basename(target) | |
if target not in nodes_added: | |
G.add_node(target, name=target_name) | |
nodes_added.add(target) | |
# Add edge only if both nodes were added successfully | |
if source in G and target in G: | |
G.add_edge(source, target) | |
if G.number_of_nodes() > 0 and G.number_of_edges() > 0: | |
# Get position layout | |
pos = nx.spring_layout(G, seed=42, k=0.6, iterations=50) # Adjust layout params | |
# Create graph visualization | |
edge_x, edge_y = [], [] | |
for edge in G.edges(): | |
x0, y0 = pos[edge[0]] | |
x1, y1 = pos[edge[1]] | |
edge_x.extend([x0, x1, None]) | |
edge_y.extend([y0, y1, None]) | |
edge_trace = go.Scatter(x=edge_x, y=edge_y, line=dict(width=0.5, color='#888'), hoverinfo='none', mode='lines') | |
node_x, node_y, node_text, node_color_val = [], [], [], [] | |
for node in G.nodes(): | |
x, y = pos[node] | |
node_x.append(x) | |
node_y.append(y) | |
node_text.append(G.nodes[node].get('name', node)) | |
degree = G.degree(node) # Use degree for size/color | |
node_color_val.append(degree) | |
node_trace = go.Scatter( | |
x=node_x, y=node_y, mode='markers+text', hoverinfo='text', text=node_text, | |
textposition="top center", textfont=dict(size=8, color='black'), | |
marker=dict(showscale=True, colorscale='YlGnBu', size=10, color=node_color_val, | |
colorbar=dict(thickness=15, title='Node Degree', xanchor='left', titleside='right')) | |
) | |
dep_fig = go.Figure(data=[edge_trace, node_trace], | |
layout=go.Layout( | |
title='Internal File Dependency Network (Nodes <= 75)', showlegend=False, hovermode='closest', | |
margin=dict(b=20,l=5,r=5,t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)) | |
) | |
dep_graph_viz = vzm.Graph(figure=dep_fig) | |
dependencies_components.append(vzm.Card(children=[dep_graph_viz])) | |
all_components['dep_graph'] = dependencies_components[-1] | |
else: | |
print("Internal dependency graph has no nodes or edges after processing.") | |
except Exception as e_graph: | |
print(f"Error generating internal dependency network visualization: {e_graph}") | |
else: | |
print(f"Skipping internal dependency graph: Too large ({num_nodes_internal} nodes).") | |
else: | |
print("No internal dependency data found or data is not a dictionary.") | |
# Add dependencies page if components exist | |
if dependencies_components: | |
pages.append( | |
vzm.Page( | |
title="Dependencies", | |
components=dependencies_components, | |
path="dependencies" # Add unique path | |
) | |
) | |
else: | |
print("Skipping Dependencies page: No components generated.") | |
except Exception as e: | |
print(f"Error building Dependencies page: {e}") | |
# --- Create the dashboard --- | |
if not pages: | |
print("No pages were generated for the dashboard. Cannot create dashboard.") | |
return None | |
try: | |
# Define navigation if multiple pages exist | |
navigation = None | |
if len(pages) > 1: | |
navigation=vzm.Navigation(pages=[p.title for p in pages]) # Use titles for navigation links | |
dashboard = vzm.Dashboard( | |
title=f"GitHub Analysis: {repo_name}", | |
pages=pages, | |
navigation=navigation | |
) | |
# Export dashboard (using save method which implicitly builds and exports) | |
# dashboard_path = os.path.join(output_dir, "dashboard.html") | |
# dashboard.save(dashboard_path) # build() is deprecated, save() does it | |
# Build dashboard (required before getting HTML string or running) | |
print("Building dashboard...") | |
vizro.build(dashboard) | |
print("Dashboard built.") | |
# We don't save to file here, Gradio will handle serving if we run it | |
# Instead of returning path, maybe return the dashboard object or indicate success | |
return dashboard # Return the dashboard object for potential further use | |
except Exception as e: | |
print(f"Error creating or building Vizro dashboard object: {e}") | |
import traceback | |
traceback.print_exc() | |
return None | |
# --- Gradio Interface --- | |
def create_gradio_interface(): | |
"""Create a Gradio interface for the GitHub repository analyzer.""" | |
if not gr: | |
print("Gradio library not found. Cannot create interface.") | |
return None | |
# Shared state to store the analyzer instance | |
analyzer_instance = None | |
def analyze_repository_gradio(owner, repo, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None): | |
"""Gradio callback function to analyze a repository.""" | |
nonlocal analyzer_instance | |
print(f"\n--- Gradio: analyze_repository_gradio called for {owner}/{repo} ---") | |
report = f"Starting analysis for {owner}/{repo}...\n" | |
dashboard_html_content = "" # Default empty dashboard | |
try: | |
# Ensure owner and repo are provided | |
if not owner or not repo: | |
report += "\nError: Please provide both Repository Owner and Name." | |
return report, dashboard_html_content # Return report and empty dashboard HTML | |
# Instantiate the analyzer (or reuse if desired, but new instance is safer for credentials) | |
# Pass credentials safely, using None if empty string | |
analyzer_instance = GraphRepoAnalyzer( | |
github_token=github_token if github_token else None, | |
neo4j_uri=neo4j_uri if neo4j_uri else None, | |
neo4j_user=neo4j_user if neo4j_user else None, | |
neo4j_password=neo4j_password if neo4j_password else None, | |
gemini_api_key=gemini_api_key if gemini_api_key else None | |
) | |
report += f"Analyzer initialized for {owner}/{repo}.\n" | |
yield report, dashboard_html_content # Update Gradio UI | |
# Analyze repository (this prints logs to console) | |
# Set display=False as we handle output via Gradio components | |
# Set save/export to False unless specifically controlled via UI | |
analyzer_instance.analyze_repo(owner, repo, display=False, save_json=False, export_text=False) | |
# Check if analysis was successful | |
if not analyzer_instance.repo_data: | |
report += f"\nError: Failed to analyze repository: {owner}/{repo}. Check console logs for details (e.g., invalid name, token issues, rate limits)." | |
# analyzer_instance remains None or has no data | |
return report, dashboard_html_content # Return error report | |
report += f"\nAnalysis complete for {analyzer_instance.repo_full_name}.\nGenerating dashboard and report...\n" | |
yield report, dashboard_html_content # Update UI | |
# --- Generate Report String --- | |
try: | |
basic_info = analyzer_instance.repo_data.get("basic_info", {}) | |
report += f""" | |
### Repository Analysis: {basic_info.get('full_name', 'N/A')} | |
**Description:** {basic_info.get('description', 'No description')} | |
**Statistics:** | |
- Stars: {basic_info.get('stargazers_count', 'N/A')} | |
- Forks: {basic_info.get('forks_count', 'N/A')} | |
- Open Issues: {basic_info.get('open_issues_count', 'N/A')} | |
""" | |
# Add language info safely | |
languages = analyzer_instance.repo_data.get("languages") | |
if isinstance(languages, dict) and languages: | |
report += "**Language Summary:**\n" | |
total = sum(v for v in languages.values() if isinstance(v, (int, float))) | |
if total > 0: | |
# Sort languages by percentage | |
lang_items = [] | |
for lang, b_count in languages.items(): | |
if isinstance(b_count, (int, float)) and b_count > 0: | |
lang_items.append((lang, (b_count / total) * 100)) | |
# Sort descending by percentage | |
lang_items.sort(key=lambda item: item[1], reverse=True) | |
for lang, percentage in lang_items[:5]: # Show top 5 | |
report += f"- {lang}: {percentage:.1f}%\n" | |
if len(lang_items) > 5: | |
report += "- ... (other languages)\n" | |
else: | |
report += "- (No valid language byte counts found)\n" | |
else: | |
report += "**Language Summary:** Not available.\n" | |
# Add code metrics if available | |
text_content = analyzer_instance.repo_data.get("text_content", {}) | |
agg_metrics = text_content.get("aggregate_metrics") | |
if isinstance(agg_metrics, dict): | |
report += f""" | |
**Code Metrics (Approximate):** | |
- Text Files Analyzed: {agg_metrics.get('total_files_analyzed', 'N/A')} | |
- Code Files Summarized: {agg_metrics.get('code_files_summarized', 'N/A')} | |
- Total Code Lines: {agg_metrics.get('total_code_lines', 'N/A')} | |
- Comment Ratio: {agg_metrics.get('average_comment_ratio', -1):.2f} | |
""" # Use -1 or similar to indicate if ratio couldn't be calculated | |
else: | |
report += "\n**Code Metrics:** Not available.\n" | |
except Exception as report_err: | |
print(f"Error generating report section: {report_err}") | |
report += f"\nError generating parts of the report: {report_err}" | |
# --- Generate Dashboard --- | |
# Use a temporary directory for Gradio deployment | |
dashboard_dir = f"./gradio_dashboards/{owner}_{repo}" | |
dashboard_obj = analyzer_instance.create_vizro_dashboard(output_dir=dashboard_dir) | |
if dashboard_obj: | |
# Instead of saving, we want to serve it. Vizro doesn't directly give HTML string easily. | |
# Option 1: Save to file and load into IFrame (might have security issues / path issues) | |
# dashboard_path = os.path.join(dashboard_dir, 'dashboard.html') # create_vizro_dashboard doesn't save anymore | |
# vizro.run() # This blocks and runs a server - not ideal for embedding | |
# For Gradio, the best approach is often *not* to embed Vizro directly, | |
# but rather extract the Plotly figures and display them using gr.Plot. | |
# Let's try a simpler approach first: generate static plots for Gradio. | |
# --- Alternative: Generate static plots for Gradio --- | |
# This avoids Vizro complexity within Gradio's environment for now. | |
# We will just return the report. The dashboard creation is still useful if run standalone. | |
report += "\n\n**Dashboard Note:** Interactive dashboard generation logic exists but embedding Vizro directly in Gradio is complex. The dashboard can be generated by running the script standalone." | |
print("Dashboard object created, but not embedding in Gradio output for simplicity.") | |
# dashboard_html_content = f'<p>Vizro dashboard created but cannot be directly embedded here. Run script standalone.</p>' | |
# Option 2: If you *really* need to embed, save and use an iframe (less reliable) | |
# try: | |
# dashboard_path_rel = os.path.join(dashboard_dir, 'dashboard.html') | |
# dashboard_path_abs = os.path.abspath(dashboard_path_rel) | |
# # Vizro's save method implicitly builds and saves | |
# dashboard_obj.save(dashboard_path_abs) | |
# print(f"Dashboard saved to: {dashboard_path_abs}") | |
# # IMPORTANT: Gradio needs to be able to access this path. | |
# # This might only work if Gradio serves from the same root or paths are configured. | |
# # Use relative path for iframe src if possible, requires Gradio server setup. | |
# # Using absolute file URI might work locally but not when deployed. | |
# # dashboard_html_content = f'<iframe src="file:///{dashboard_path_abs}" width="100%" height="600px" style="border:none;"></iframe>' | |
# # Safer: Provide a link | |
# dashboard_html_content = f'<p>Dashboard saved to: <a href="file:///{dashboard_path_abs}" target="_blank">{dashboard_path_abs}</a> (Link may only work locally)</p>' | |
# report += f"\n\n**Dashboard:** Saved locally. See link below (may only work on the server machine)." | |
# except Exception as vizro_save_err: | |
# print(f"Error saving Vizro dashboard: {vizro_save_err}") | |
# report += f"\n\n**Dashboard:** Error saving dashboard: {vizro_save_err}" | |
# dashboard_html_content = f'<p>Error saving dashboard: {vizro_save_err}</p>' | |
else: | |
report += "\n\n**Dashboard:** Failed to generate dashboard object." | |
dashboard_html_content = "<p>Failed to generate dashboard.</p>" | |
print("--- Gradio analysis function finished ---") | |
yield report, dashboard_html_content # Final update | |
except Exception as e: | |
print(f"--- Error in analyze_repository_gradio for {owner}/{repo} ---") | |
import traceback | |
traceback.print_exc() | |
report += f"\n\nCritical Error during analysis: {str(e)}" | |
# Ensure analyzer_instance is cleared if it failed early | |
analyzer_instance = None | |
# Return error report and empty dashboard | |
# Need yield here if using generator | |
yield report, dashboard_html_content | |
def summarize_pr_gradio(owner, repo, pr_number_str, role, github_token=None, gemini_api_key=None): | |
"""Gradio callback function to summarize a PR.""" | |
print(f"\n--- Gradio: summarize_pr_gradio called for PR #{pr_number_str} in {owner}/{repo} ---") | |
summary_output = "Starting PR summarization...\n" | |
try: | |
# Ensure owner, repo, pr_number, role, and gemini_key are provided | |
if not all([owner, repo, pr_number_str, role, gemini_api_key]): | |
missing = [name for name, val in locals().items() if name in ['owner', 'repo', 'pr_number_str', 'role', 'gemini_api_key'] and not val] | |
summary_output += f"Error: Please provide all required fields (Missing: {', '.join(missing)})." | |
return summary_output | |
# --- Use a temporary analyzer instance for PR summary --- | |
# This avoids issues if the main analysis failed or used different credentials | |
# We only need GitHub and Gemini parts for this. | |
pr_analyzer = GraphRepoAnalyzer( | |
github_token=github_token if github_token else None, | |
gemini_api_key=gemini_api_key # Required | |
) | |
if not pr_analyzer.github_analyzer: | |
summary_output += "Error: Could not initialize GitHub analyzer (check token/installation)." | |
return summary_output | |
if not pr_analyzer.gemini_model: | |
summary_output += "Error: Could not initialize Gemini model (check API key/installation)." | |
return summary_output | |
# Set repo context for the analyzer | |
pr_analyzer.owner = owner | |
pr_analyzer.repo = repo | |
pr_analyzer.repo_full_name = f"{owner}/{repo}" | |
# Call the summarize_pull_request method (which now returns text) | |
summary = pr_analyzer.summarize_pull_request(pr_number_str, role) # Handles int conversion and validation internally | |
# summarize_pull_request returns the summary text or an error message | |
summary_output = summary # Assign the result directly | |
print("--- Gradio PR summary function finished ---") | |
return summary_output | |
except Exception as e: | |
print(f"--- Error in summarize_pr_gradio ---") | |
import traceback | |
traceback.print_exc() | |
summary_output += f"\n\nCritical Error during PR summarization: {str(e)}" | |
return summary_output | |
# --- Define Gradio UI --- | |
with gr.Blocks(title="GitHub Repository Analyzer", theme=gr.themes.Soft()) as app: | |
gr.Markdown("# GitHub Repository Analyzer & PR Summarizer") | |
gr.Markdown("Analyze GitHub repositories using GitHub API, generate reports, and summarize Pull Requests using Google Gemini.") | |
with gr.Tab("Repository Analysis"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Repository Input") | |
owner_input = gr.Textbox(label="Repository Owner", placeholder="e.g., 'google' or 'openai'") | |
repo_input = gr.Textbox(label="Repository Name", placeholder="e.g., 'guetzli' or 'whisper'") | |
gr.Markdown("### Credentials (Optional / Recommended)") | |
github_token = gr.Textbox(label="GitHub Token", type="password", placeholder="Enter personal access token (optional, increases rate limit)") | |
with gr.Accordion("Advanced Settings (Neo4j/Gemini - Optional for Analysis)", open=False): | |
neo4j_uri = gr.Textbox(label="Neo4j URI", placeholder="bolt://localhost:7687") | |
neo4j_user = gr.Textbox(label="Neo4j Username", placeholder="neo4j") | |
neo4j_password = gr.Textbox(label="Neo4j Password", type="password") | |
# Gemini key needed here if we add repo Q&A later | |
# gemini_api_key_analysis = gr.Textbox(label="Google API Key (for Repo Q&A)", type="password", placeholder="Enter Google API Key (if using Repo Q&A)") | |
analyze_btn = gr.Button("Analyze Repository", variant="primary") | |
with gr.Column(scale=2): | |
gr.Markdown("### Analysis Output") | |
report_output = gr.Markdown(label="Analysis Report", value="Analysis results will appear here...") | |
# Removed dashboard HTML output as direct embedding is unreliable | |
# dashboard_output = gr.HTML(label="Dashboard Preview") # Keep if attempting iframe later | |
# Wire the button click event | |
analyze_btn.click( | |
analyze_repository_gradio, | |
inputs=[ | |
owner_input, repo_input, github_token, | |
neo4j_uri, neo4j_user, neo4j_password, | |
# gemini_api_key_analysis # Pass if adding Repo Q&A | |
], | |
# Output only the report for now | |
outputs=[report_output] # Removed dashboard_output | |
) | |
with gr.Tab("PR Summarizer (Requires Gemini)"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### PR Input") | |
pr_owner_input = gr.Textbox(label="Repository Owner", placeholder="Owner of the repo containing the PR") | |
pr_repo_input = gr.Textbox(label="Repository Name", placeholder="Name of the repo containing the PR") | |
pr_number_input = gr.Textbox(label="PR Number", placeholder="e.g., 123") # Use Textbox for flexibility | |
pr_role_input = gr.Dropdown( | |
choices=["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"], | |
label="Your Role (Tailors Summary)", | |
value="Developer" # Default value | |
) | |
gr.Markdown("### Credentials") | |
pr_github_token = gr.Textbox(label="GitHub Token (Optional)", type="password", placeholder="Needed for private repos or high rate limits") | |
pr_gemini_api_key = gr.Textbox(label="Google API Key (Required)", type="password", placeholder="Enter Google API Key for Gemini") | |
summarize_btn = gr.Button("Summarize PR", variant="primary") | |
with gr.Column(scale=2): | |
gr.Markdown("### PR Summary Output") | |
pr_summary_output = gr.Markdown(label="Gemini PR Summary", value="PR Summary will appear here...") | |
# Wire the button click event | |
summarize_btn.click( | |
summarize_pr_gradio, | |
inputs=[ | |
pr_owner_input, pr_repo_input, pr_number_input, | |
pr_role_input, pr_github_token, pr_gemini_api_key | |
], | |
outputs=pr_summary_output | |
) | |
return app | |
# Main function to run the app | |
def main(): | |
"""Run the GitHub Repository Analyzer with Gradio interface.""" | |
# Load environment variables (optional, credentials can be entered in UI) | |
load_dotenv() | |
print("Starting Gradio application...") | |
# Check if Gradio is available before launching | |
if not gr: | |
print("Gradio library is not available. Cannot launch UI.") | |
return | |
# Create and launch the Gradio interface | |
try: | |
app = create_gradio_interface() | |
if app: | |
# Set share=False for local testing, share=True to create public link (use with caution) | |
# Set debug=True for more detailed logs during development | |
app.launch(share=False, debug=True) | |
else: | |
print("Failed to create Gradio interface.") | |
except Exception as e: | |
print(f"Error launching Gradio app: {e}") | |
import traceback | |
traceback.print_exc() | |
if __name__ == "__main__": | |
# Add basic checks for critical libraries before running main | |
if None in [gr, pd, np, requests, nx]: | |
missing = [] | |
if not gr: missing.append("gradio") | |
if not pd: missing.append("pandas") | |
if not np: missing.append("numpy") | |
if not requests: missing.append("requests") | |
if not nx: missing.append("networkx") | |
print(f"Error: Missing critical libraries: {', '.join(missing)}. Please install them.") | |
print("e.g., pip install gradio pandas numpy requests networkx PyGithub neo4j google-generativeai vizro vizro-plotly plotly python-dotenv radon matplotlib") | |
else: | |
main() |