GithubAnalyzerr / app.py
nihalaninihal's picture
Update app.py
27ee1b8 verified
import requests
import json
import os
import base64
import re
import ast
import networkx as nx
import radon.metrics as metrics
import radon.complexity as complexity
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import pandas as pd
import numpy as np
from github import Github, GithubException
import time
from dotenv import load_dotenv
# Visualization imports
import vizro.plotly.express as px
import vizro
import vizro.models as vzm
import plotly.graph_objects as go
import gradio as gr
from pyvis.network import Network
# Google Gemini AI (optional)
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
print("Google Generative AI package not found. PR summarization feature will be disabled.")
class GitHubRepoInfo:
"""Enhanced class to get comprehensive information about a GitHub repository."""
def __init__(self, token=None):
"""Initialize with optional GitHub API token."""
self.base_url = "https://api.github.com"
self.headers = {"Accept": "application/vnd.github.v3+json"}
self.token = token
self.github = None # Initialize github attribute
# Set up authentication
if token:
self.headers["Authorization"] = f"token {token}"
try:
self.github = Github(token)
self.github.get_user().login # Test connection
except Exception as e:
print(f"Warning: Failed to initialize PyGithub with token: {e}")
self.github = Github() # Fallback to unauthenticated
elif os.environ.get("GITHUB_TOKEN"):
self.token = os.environ.get("GITHUB_TOKEN")
self.headers["Authorization"] = f"token {self.token}"
try:
self.github = Github(self.token)
self.github.get_user().login # Test connection
except Exception as e:
print(f"Warning: Failed to initialize PyGithub with token: {e}")
self.github = Github() # Fallback to unauthenticated
else:
self.github = Github() # Unauthenticated
# Configure rate limit handling
self.rate_limit_remaining = 5000 # Assume higher limit if authenticated
self.rate_limit_reset = datetime.now()
# Initialize rate limit info if possible
if self.github:
try:
rate_limit = self.github.get_rate_limit()
self.rate_limit_remaining = rate_limit.core.remaining
self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset)
except Exception as e:
print(f"Warning: Could not get initial rate limit from PyGithub: {e}")
def _check_rate_limit(self):
"""Check API rate limit and wait if necessary."""
if self.rate_limit_remaining <= 10:
reset_time = self.rate_limit_reset
current_time = datetime.now()
if reset_time > current_time:
wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer
print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.")
time.sleep(wait_time)
# Update rate limit info after each API call
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers)
if response.status_code == 200:
rate_data = response.json()
self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"]
self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"])
def _paginated_get(self, url, params=None, max_items=None):
"""Handle paginated API responses with rate limit awareness."""
if params is None:
params = {}
items = []
page = 1
per_page = min(100, params.get("per_page", 30))
params["per_page"] = per_page
while True:
self._check_rate_limit()
params["page"] = page
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
page_items = response.json()
if not page_items:
break
items.extend(page_items)
page += 1
# Check if we've reached the requested limit
if max_items and len(items) >= max_items:
return items[:max_items]
# Check if we've reached the end (GitHub returns fewer items than requested)
if len(page_items) < per_page:
break
else:
print(f"Error {response.status_code}: {response.text}")
break
return items
def get_repo_info(self, owner, repo):
"""Get basic repository information."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
else:
print(f"Error {response.status_code}: {response.text}")
return None
def get_contributors(self, owner, repo, max_contributors=None):
"""Get repository contributors with pagination support."""
url = f"{self.base_url}/repos/{owner}/{repo}/contributors"
return self._paginated_get(url, max_items=max_contributors)
def get_languages(self, owner, repo):
"""Get languages used in the repository."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/languages"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
else:
print(f"Error getting languages: {response.status_code}")
return {}
def get_commits(self, owner, repo, params=None, max_commits=None):
"""Get commits with enhanced filtering and pagination."""
url = f"{self.base_url}/repos/{owner}/{repo}/commits"
return self._paginated_get(url, params=params, max_items=max_commits)
def get_commit_activity(self, owner, repo):
"""Get commit activity stats for the past year."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 202:
# GitHub is computing the statistics, wait and retry
print("GitHub is computing statistics, waiting and retrying...")
time.sleep(2)
return self.get_commit_activity(owner, repo)
else:
print(f"Error getting commit activity: {response.status_code}")
return []
def get_code_frequency(self, owner, repo):
"""Get weekly code addition and deletion statistics."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 202:
# GitHub is computing the statistics, wait and retry
print("GitHub is computing statistics, waiting and retrying...")
time.sleep(2)
return self.get_code_frequency(owner, repo)
else:
print(f"Error getting code frequency: {response.status_code}")
return []
def get_contributor_activity(self, owner, repo):
"""Get contributor commit activity over time."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 202:
# GitHub is computing the statistics, wait and retry
print("GitHub is computing statistics, waiting and retrying...")
time.sleep(2)
return self.get_contributor_activity(owner, repo)
else:
print(f"Error getting contributor activity: {response.status_code}")
return []
def get_branches(self, owner, repo):
"""Get repository branches."""
url = f"{self.base_url}/repos/{owner}/{repo}/branches"
return self._paginated_get(url)
def get_releases(self, owner, repo, max_releases=None):
"""Get repository releases with pagination support."""
url = f"{self.base_url}/repos/{owner}/{repo}/releases"
return self._paginated_get(url, max_items=max_releases)
def get_issues(self, owner, repo, state="all", max_issues=None, params=None):
"""Get repository issues with enhanced filtering."""
url = f"{self.base_url}/repos/{owner}/{repo}/issues"
if params is None:
params = {}
params["state"] = state
return self._paginated_get(url, params=params, max_items=max_issues)
def get_issue_timeline(self, owner, repo, days_back=180):
"""Analyze issue creation and closing over time."""
# Get issues including closed ones
issues = self.get_issues(owner, repo, state="all")
# Prepare timeline data
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
# Initialize daily counters
date_range = pd.date_range(start=start_date, end=end_date)
created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range}
closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range}
# Collect issue creation and closing dates
for issue in issues:
created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ')
if created_at >= start_date:
created_counts[created_at.strftime('%Y-%m-%d')] += 1
if issue['state'] == 'closed' and issue.get('closed_at'):
closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ')
if closed_at >= start_date:
closed_counts[closed_at.strftime('%Y-%m-%d')] += 1
# Calculate resolution times for closed issues
resolution_times = []
for issue in issues:
if issue['state'] == 'closed' and issue.get('closed_at'):
created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ')
closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ')
resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours
resolution_times.append(resolution_time)
# Calculate issue labels distribution
label_counts = defaultdict(int)
for issue in issues:
for label in issue.get('labels', []):
label_counts[label['name']] += 1
return {
'created': created_counts,
'closed': closed_counts,
'resolution_times': resolution_times,
'labels': dict(label_counts)
}
def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None):
"""Get repository pull requests with enhanced filtering."""
url = f"{self.base_url}/repos/{owner}/{repo}/pulls"
if params is None:
params = {}
params["state"] = state
return self._paginated_get(url, params=params, max_items=max_prs)
def get_pr_timeline(self, owner, repo, days_back=180):
"""Analyze PR creation, closing, and metrics over time."""
# Get PRs including closed and merged ones
prs = self.get_pull_requests(owner, repo, state="all")
# Prepare timeline data
end_date = datetime.now()
start_date = end_date - timedelta(days=days_back)
# Initialize daily counters
date_range = pd.date_range(start=start_date, end=end_date)
created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range}
closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range}
merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range}
# Track metrics
merge_times = []
pr_sizes = []
# Collect PR data
for pr in prs:
created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ')
if created_at >= start_date:
created_counts[created_at.strftime('%Y-%m-%d')] += 1
# Get PR size (additions + deletions)
if pr.get('additions') is not None and pr.get('deletions') is not None:
pr_sizes.append({
'additions': pr['additions'],
'deletions': pr['deletions'],
'total': pr['additions'] + pr['deletions'],
'files_changed': pr.get('changed_files', 0)
})
# Check if PR is closed
if pr['state'] == 'closed':
closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ')
if closed_at >= start_date:
closed_counts[closed_at.strftime('%Y-%m-%d')] += 1
# Check if PR was merged
if pr['merged_at']:
merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ')
if merged_at >= start_date:
merged_counts[merged_at.strftime('%Y-%m-%d')] += 1
# Calculate time to merge
merge_time = (merged_at - created_at).total_seconds() / 3600 # hours
merge_times.append(merge_time)
# Calculate acceptance rate
total_closed = sum(closed_counts.values())
total_merged = sum(merged_counts.values())
acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0
return {
'created': created_counts,
'closed': closed_counts,
'merged': merged_counts,
'merge_times': merge_times,
'pr_sizes': pr_sizes,
'acceptance_rate': acceptance_rate
}
def get_contents(self, owner, repo, path="", ref=None):
"""Get repository contents at the specified path."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}"
params = {}
if ref:
params["ref"] = ref
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
return response.json()
else:
print(f"Error getting contents: {response.status_code}")
return []
def get_readme(self, owner, repo, ref=None):
"""Get repository README file."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/readme"
params = {}
if ref:
params["ref"] = ref
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
data = response.json()
if data.get("content"):
content = base64.b64decode(data["content"]).decode("utf-8")
return {
"name": data["name"],
"path": data["path"],
"content": content
}
return data
else:
print(f"README not found or error: {response.status_code}")
return None
def get_file_content(self, owner, repo, path, ref=None):
"""Get the content of a specific file in the repository."""
self._check_rate_limit()
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}"
params = {}
if ref:
params["ref"] = ref
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
data = response.json()
if data.get("content"):
try:
content = base64.b64decode(data["content"]).decode("utf-8")
return content
except UnicodeDecodeError:
return "[Binary file content not displayed]"
return None
else:
print(f"Error getting file content: {response.status_code}")
return None
def is_text_file(self, file_path):
"""Determine if a file is likely a text file based on extension."""
text_extensions = [
'.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c',
'.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml',
'.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php',
'.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala',
'.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl',
'.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore',
'.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb',
'.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp',
'.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle'
]
extension = os.path.splitext(file_path)[1].lower()
return extension in text_extensions
def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None):
"""Recursively get repository contents with a depth limit and file count limit."""
if current_depth >= max_depth:
return []
contents = self.get_contents(owner, repo, path, ref)
results = []
file_count = 0
for item in contents:
if file_count >= max_files:
break
if item["type"] == "dir":
# For directories, add the directory itself and recursively get contents
dir_item = {
"type": "dir",
"name": item["name"],
"path": item["path"],
"contents": self.get_recursive_contents(
owner, repo, item["path"], max_depth, current_depth + 1,
max_files - file_count, ref
)
}
results.append(dir_item)
else:
# For files, add the file info
results.append({
"type": "file",
"name": item["name"],
"path": item["path"],
"size": item["size"],
"url": item["html_url"]
})
file_count += 1
return results
def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None):
"""Get content of all text files in the repository (with limit)."""
contents = self.get_contents(owner, repo, path, ref)
text_files = []
file_count = 0
# Process current directory
for item in contents:
if file_count >= max_files:
break
if item["type"] == "file" and self.is_text_file(item["name"]):
content = self.get_file_content(owner, repo, item["path"], ref)
if content and content != "[Binary file content not displayed]":
text_files.append({
"name": item["name"],
"path": item["path"],
"content": content
})
file_count += 1
elif item["type"] == "dir":
# Recursively get text files from subdirectories
subdir_files = self.get_all_text_files(
owner, repo, item["path"], max_files - file_count, ref
)
text_files.extend(subdir_files)
file_count += len(subdir_files)
return text_files
def get_documentation_files(self, owner, repo, ref=None):
"""Get documentation files from the repository."""
# Common documentation file paths and directories
doc_paths = [
"docs", "doc", "documentation", "wiki", "CONTRIBUTING.md",
"CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md",
"SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md",
".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md"
]
doc_files = []
# Try to get each documentation file/directory
for path in doc_paths:
try:
contents = self.get_contents(owner, repo, path, ref)
# If it's a directory, get all markdown files in it
if isinstance(contents, list):
for item in contents:
if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")):
content = self.get_file_content(owner, repo, item["path"], ref)
if content:
doc_files.append({
"name": item["name"],
"path": item["path"],
"content": content
})
# If it's a file, get its content
elif isinstance(contents, dict) and contents.get("type") == "file":
content = self.get_file_content(owner, repo, path, ref)
if content:
doc_files.append({
"name": contents["name"],
"path": contents["path"],
"content": content
})
except:
# Path doesn't exist or access issues
continue
return doc_files
def analyze_ast(self, code, file_path):
"""Analyze Python code using AST (Abstract Syntax Tree)."""
if not file_path.endswith('.py'):
return None
try:
tree = ast.parse(code)
# Extract more detailed information using AST
functions = []
classes = []
imports = []
function_complexities = {}
for node in ast.walk(tree):
# Get function definitions with arguments
if isinstance(node, ast.FunctionDef):
args = []
defaults = len(node.args.defaults)
args_count = len(node.args.args) - defaults
# Get positional args
for arg in node.args.args[:args_count]:
if hasattr(arg, 'arg'): # Python 3
args.append(arg.arg)
else: # Python 2
args.append(arg.id)
# Get args with defaults
for i, arg in enumerate(node.args.args[args_count:]):
if hasattr(arg, 'arg'): # Python 3
args.append(f"{arg.arg}=...")
else: # Python 2
args.append(f"{arg.id}=...")
# Calculate function complexity
func_complexity = complexity.cc_visit(node)
function_complexities[node.name] = func_complexity
# Get docstring if available
docstring = ast.get_docstring(node)
functions.append({
'name': node.name,
'args': args,
'complexity': func_complexity,
'docstring': docstring
})
# Get class definitions
elif isinstance(node, ast.ClassDef):
methods = []
class_docstring = ast.get_docstring(node)
# Get class methods
for child in node.body:
if isinstance(child, ast.FunctionDef):
method_complexity = complexity.cc_visit(child)
method_docstring = ast.get_docstring(child)
methods.append({
'name': child.name,
'complexity': method_complexity,
'docstring': method_docstring
})
classes.append({
'name': node.name,
'methods': methods,
'docstring': class_docstring
})
# Get imports
elif isinstance(node, ast.Import):
for name in node.names:
imports.append(name.name)
elif isinstance(node, ast.ImportFrom):
module = node.module or ""
for name in node.names:
imports.append(f"{module}.{name.name}")
# Calculate overall code complexity
code_complexity = complexity.cc_visit_ast(tree)
# Calculate maintainability index
try:
mi_score = metrics.mi_visit(code, True)
except:
mi_score = None
return {
'functions': functions,
'classes': classes,
'imports': imports,
'complexity': {
'overall': code_complexity,
'functions': function_complexities,
'maintainability_index': mi_score
}
}
except SyntaxError:
print(f"Syntax error in Python file: {file_path}")
return None
except Exception as e:
print(f"Error analyzing {file_path}: {str(e)}")
return None
def analyze_js_ts(self, code, file_path):
"""Analyze JavaScript/TypeScript code using regex with improved patterns."""
if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')):
return None
# More sophisticated regex patterns for JS/TS analysis
results = {
'functions': [],
'classes': [],
'imports': [],
'exports': [],
'hooks': [] # For React hooks
}
# Function patterns (covering various declaration styles)
function_patterns = [
# Regular functions
r'function\s+(\w+)\s*\(([^)]*)\)',
# Arrow functions assigned to variables
r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{',
# Class methods
r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{',
# Object methods
r'(\w+)\s*:\s*function\s*\(([^)]*)\)'
]
for pattern in function_patterns:
for match in re.finditer(pattern, code):
func_name = match.group(1)
args = match.group(2).strip() if len(match.groups()) > 1 else ""
results['functions'].append({
'name': func_name,
'args': args
})
# Class pattern
class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}'
for match in re.finditer(class_pattern, code, re.DOTALL):
class_name = match.group(1)
parent_class = match.group(2) if match.group(2) else None
class_body = match.group(3)
# Find methods in class
methods = []
method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}'
for method_match in re.finditer(method_pattern, class_body):
method_name = method_match.group(1)
methods.append(method_name)
results['classes'].append({
'name': class_name,
'extends': parent_class,
'methods': methods
})
# Import patterns
import_patterns = [
# ES6 imports
r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]',
# CommonJS requires
r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)'
]
for pattern in import_patterns:
for match in re.finditer(pattern, code):
groups = match.groups()
if groups[0]: # Destructured import
imports = [name.strip() for name in groups[0].split(',')]
for imp in imports:
results['imports'].append(imp)
elif groups[1]: # Namespace import (import * as X)
results['imports'].append(groups[1])
elif groups[2]: # Default import
results['imports'].append(groups[2])
elif groups[3]: # Module name
results['imports'].append(groups[3])
# React hooks detection (for React files)
if file_path.endswith(('.jsx', '.tsx')):
hook_pattern = r'use([A-Z]\w+)\s*\('
for match in re.finditer(hook_pattern, code):
hook_name = 'use' + match.group(1)
results['hooks'].append(hook_name)
# Export patterns
export_patterns = [
# Named exports
r'export\s+(?:const|let|var|function|class)\s+(\w+)',
# Default exports
r'export\s+default\s+(?:function|class)?\s*(\w+)?'
]
for pattern in export_patterns:
for match in re.finditer(pattern, code):
if match.group(1):
results['exports'].append(match.group(1))
return results
def extract_code_summary(self, file_content, file_path):
"""Extract comprehensive summary information from code files."""
extension = os.path.splitext(file_path)[1].lower()
# Initialize summary
summary = {
"functions": [],
"classes": [],
"imports": [],
"description": "",
"complexity": None
}
# Extract Python definitions with AST
if extension == '.py':
ast_result = self.analyze_ast(file_content, file_path)
if ast_result:
summary["functions"] = [f["name"] for f in ast_result["functions"]]
summary["classes"] = [c["name"] for c in ast_result["classes"]]
summary["imports"] = ast_result["imports"]
summary["complexity"] = ast_result["complexity"]
# Try to extract module docstring
try:
tree = ast.parse(file_content)
module_docstring = ast.get_docstring(tree)
if module_docstring:
summary["description"] = module_docstring
except:
pass
# Add detailed function and class info
summary["detailed_functions"] = ast_result["functions"]
summary["detailed_classes"] = ast_result["classes"]
# Extract JavaScript/TypeScript definitions
elif extension in ['.js', '.ts', '.jsx', '.tsx']:
js_result = self.analyze_js_ts(file_content, file_path)
if js_result:
summary["functions"] = [f["name"] for f in js_result["functions"]]
summary["classes"] = [c["name"] for c in js_result["classes"]]
summary["imports"] = js_result["imports"]
# Add detailed function and class info
summary["detailed_functions"] = js_result["functions"]
summary["detailed_classes"] = js_result["classes"]
summary["hooks"] = js_result.get("hooks", [])
summary["exports"] = js_result.get("exports", [])
# Calculate basic code metrics for any text file
if file_content:
lines = file_content.split('\n')
code_lines = 0
comment_lines = 0
blank_lines = 0
comment_prefixes = ['#', '//', '/*', '*', '<!--']
for line in lines:
line = line.strip()
if not line:
blank_lines += 1
elif any(line.startswith(prefix) for prefix in comment_prefixes):
comment_lines += 1
else:
code_lines += 1
summary["metrics"] = {
"total_lines": len(lines),
"code_lines": code_lines,
"comment_lines": comment_lines,
"blank_lines": blank_lines,
"comment_ratio": comment_lines / max(1, code_lines + comment_lines)
}
return summary
def analyze_dependencies(self, owner, repo, max_files=100):
"""Analyze code dependencies across the repository."""
# Get Python and JavaScript files
text_files = self.get_all_text_files(owner, repo, max_files=max_files)
# Filter for Python and JS/TS files
code_files = [f for f in text_files if f["name"].endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))]
# Track dependencies
dependencies = {
'internal': defaultdict(set), # File to file dependencies
'external': defaultdict(set), # External package dependencies by file
'modules': defaultdict(set) # Defined modules/components by file
}
# Extract module names from file paths
file_to_module = {}
for file in code_files:
# Convert file path to potential module name
module_path = os.path.splitext(file["path"])[0].replace('/', '.')
file_to_module[file["path"]] = module_path
# Track what each file defines
summary = self.extract_code_summary(file["content"], file["path"])
if file["name"].endswith('.py'):
for function in summary.get("functions", []):
dependencies['modules'][file["path"]].add(f"{module_path}.{function}")
for class_name in summary.get("classes", []):
dependencies['modules'][file["path"]].add(f"{module_path}.{class_name}")
else: # JS/TS files
for export in summary.get("exports", []):
dependencies['modules'][file["path"]].add(export)
# Analyze imports/dependencies
for file in code_files:
summary = self.extract_code_summary(file["content"], file["path"])
for imp in summary.get("imports", []):
# Check if this is an internal import
is_internal = False
if file["name"].endswith('.py'):
# For Python, check if the import matches any module path
for module_path in file_to_module.values():
if imp == module_path or imp.startswith(f"{module_path}."):
is_internal = True
# Find the file that defines this module
for f_path, m_path in file_to_module.items():
if m_path == imp.split('.')[0]:
dependencies['internal'][file["path"]].add(f_path)
break
else:
# For JS/TS, check relative imports
if imp.startswith('./') or imp.startswith('../'):
is_internal = True
# Try to resolve the relative import
src_dir = os.path.dirname(file["path"])
target_path = os.path.normpath(os.path.join(src_dir, imp))
# Add known extensions if not specified
if '.' not in os.path.basename(target_path):
for ext in ['.js', '.ts', '.jsx', '.tsx']:
test_path = f"{target_path}{ext}"
if test_path in file_to_module:
dependencies['internal'][file["path"]].add(test_path)
break
# If not internal, consider it external
if not is_internal:
# Clean up the import name (remove relative path parts)
if not file["name"].endswith('.py'):
imp = imp.split('/')[0] # Take the package name part
dependencies['external'][file["path"]].add(imp)
return dependencies
def create_dependency_graph(self, dependencies):
"""Create a NetworkX graph from dependencies for visualization."""
G = nx.DiGraph()
# Add nodes for files
for file_path in dependencies['internal'].keys():
G.add_node(file_path, type='file')
# Add edges for internal dependencies
for file_path, deps in dependencies['internal'].items():
for dep in deps:
G.add_edge(file_path, dep)
# Add nodes and edges for external dependencies
external_nodes = set()
for file_path, deps in dependencies['external'].items():
for dep in deps:
external_node = f"ext:{dep}"
if external_node not in external_nodes:
G.add_node(external_node, type='external')
external_nodes.add(external_node)
G.add_edge(file_path, external_node)
return G
def get_repo_text_summary(self, owner, repo, max_files=25):
"""Extract and summarize text content from the repository with improved metrics."""
# Get README
readme = self.get_readme(owner, repo)
# Get documentation
docs = self.get_documentation_files(owner, repo)
# Get key code files (limit to avoid API rate limits)
text_files = self.get_all_text_files(owner, repo, max_files=max_files)
# Analyze code files
code_summary = {}
complexity_metrics = {
'cyclomatic_complexity': [],
'maintainability_index': [],
'comment_ratios': []
}
for file in text_files:
ext = os.path.splitext(file["name"])[1].lower()
if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']:
file_summary = self.extract_code_summary(file["content"], file["path"])
code_summary[file["path"]] = file_summary
# Collect complexity metrics
if file_summary.get('complexity'):
cc = file_summary['complexity'].get('overall')
if cc is not None:
complexity_metrics['cyclomatic_complexity'].append((file["path"], cc))
mi = file_summary['complexity'].get('maintainability_index')
if mi is not None:
complexity_metrics['maintainability_index'].append((file["path"], mi))
if file_summary.get('metrics'):
comment_ratio = file_summary['metrics'].get('comment_ratio', 0)
complexity_metrics['comment_ratios'].append((file["path"], comment_ratio))
# Analyze dependencies
dependencies = self.analyze_dependencies(owner, repo, max_files=max_files)
# Summarize repository content by file type
file_types = defaultdict(int)
for file in text_files:
ext = os.path.splitext(file["name"])[1].lower()
file_types[ext] += 1
# Calculate aggregate code metrics
total_code_lines = sum(summary.get('metrics', {}).get('code_lines', 0)
for summary in code_summary.values())
total_comment_lines = sum(summary.get('metrics', {}).get('comment_lines', 0)
for summary in code_summary.values())
aggregate_metrics = {
'total_files': len(text_files),
'total_code_lines': total_code_lines,
'total_comment_lines': total_comment_lines,
'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0
}
return {
"readme": readme,
"documentation": docs,
"code_summary": code_summary,
"complexity_metrics": complexity_metrics,
"dependencies": dependencies,
"file_type_counts": dict(file_types),
"aggregate_metrics": aggregate_metrics,
"text_files": text_files # Include the actual text file contents
}
def get_temporal_analysis(self, owner, repo):
"""Perform temporal analysis of repository activity."""
# Get commit activity over time
commit_activity = self.get_commit_activity(owner, repo)
# Get code frequency (additions/deletions over time)
code_frequency = self.get_code_frequency(owner, repo)
# Get contributor activity
contributor_activity = self.get_contributor_activity(owner, repo)
# Get issue and PR timelines
issue_timeline = self.get_issue_timeline(owner, repo)
pr_timeline = self.get_pr_timeline(owner, repo)
# Process data for visualization
# - Weekly commit counts
weekly_commits = []
if commit_activity:
for week in commit_activity:
date = datetime.fromtimestamp(week['week'])
weekly_commits.append({
'date': date.strftime('%Y-%m-%d'),
'total': week['total'],
'days': week['days'] # Daily breakdown within the week
})
# - Weekly code changes
weekly_code_changes = []
if code_frequency:
for item in code_frequency:
date = datetime.fromtimestamp(item[0])
weekly_code_changes.append({
'date': date.strftime('%Y-%m-%d'),
'additions': item[1],
'deletions': -item[2] # Convert to positive for visualization
})
# - Contributor timeline
contributor_timeline = {}
if contributor_activity:
for contributor in contributor_activity:
author = contributor['author']['login']
weeks = contributor['weeks']
if author not in contributor_timeline:
contributor_timeline[author] = []
for week in weeks:
if week['c'] > 0: # Only include weeks with commits
date = datetime.fromtimestamp(week['w'])
contributor_timeline[author].append({
'date': date.strftime('%Y-%m-%d'),
'commits': week['c'],
'additions': week['a'],
'deletions': week['d']
})
return {
'weekly_commits': weekly_commits,
'weekly_code_changes': weekly_code_changes,
'contributor_timeline': contributor_timeline,
'issue_timeline': issue_timeline,
'pr_timeline': pr_timeline
}
def get_all_info(self, owner, repo):
"""Get comprehensive information about a repository with enhanced metrics."""
result = {
"timestamp": datetime.now().isoformat(),
"basic_info": self.get_repo_info(owner, repo)
}
if not result["basic_info"]:
print(f"Could not retrieve repository information for {owner}/{repo}")
return None
print("Getting repository statistics...")
# Get additional information
result["languages"] = self.get_languages(owner, repo)
result["contributors"] = self.get_contributors(owner, repo, max_contributors=30)
result["recent_commits"] = self.get_commits(owner, repo, max_commits=30)
result["branches"] = self.get_branches(owner, repo)
result["releases"] = self.get_releases(owner, repo, max_releases=10)
result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50)
result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50)
result["root_contents"] = self.get_contents(owner, repo)
print("Analyzing repository content...")
# Get text content and documentation
result["text_content"] = self.get_repo_text_summary(owner, repo, max_files=30)
print("Analyzing repository activity over time...")
# Get temporal analysis
result["temporal_analysis"] = self.get_temporal_analysis(owner, repo)
return result
def get_pull_request_details(self, owner, repo, pr_number):
"""Get detailed information for a specific Pull Request using PyGithub."""
if not self.github:
print("PyGithub client not initialized. Cannot fetch PR details.")
return None
try:
repo_obj = self.github.get_repo(f"{owner}/{repo}")
pr = repo_obj.get_pull(pr_number)
# Extract relevant information into a dictionary
details = {
"number": pr.number,
"title": pr.title,
"state": pr.state, # 'open', 'closed'
"merged": pr.merged,
"body": pr.body or "", # Ensure body is string
"url": pr.html_url,
"created_at": pr.created_at.isoformat() if pr.created_at else None,
"updated_at": pr.updated_at.isoformat() if pr.updated_at else None,
"closed_at": pr.closed_at.isoformat() if pr.closed_at else None,
"merged_at": pr.merged_at.isoformat() if pr.merged_at else None,
"author": pr.user.login if pr.user else "N/A",
"commits_count": pr.commits,
"additions": pr.additions,
"deletions": pr.deletions,
"changed_files_count": pr.changed_files,
"labels": [label.name for label in pr.labels],
"assignees": [assignee.login for assignee in pr.assignees],
"milestone": pr.milestone.title if pr.milestone else None,
"repo_full_name": f"{owner}/{repo}", # Add repo context
}
return details
except GithubException as e:
if e.status == 404:
print(f"Error: Pull Request #{pr_number} not found in {owner}/{repo}.")
else:
print(f"Error fetching PR #{pr_number} details: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred fetching PR details: {e}")
return None
class RepoAnalyzer:
"""Streamlined class to analyze GitHub repositories."""
def __init__(self, github_token=None, gemini_api_key=None):
"""Initialize with GitHub and optional Gemini credentials."""
load_dotenv() # Load .env file if it exists
self.github_token = github_token or os.getenv("GITHUB_TOKEN")
self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY")
# Initialize GitHub analyzer
self.github_analyzer = GitHubRepoInfo(token=self.github_token)
# Initialize Gemini model if API key is provided
self.gemini_model = None
if self.gemini_api_key and GEMINI_AVAILABLE:
try:
genai.configure(api_key=self.gemini_api_key)
self.gemini_model = genai.GenerativeModel('gemini-1.5-pro-latest')
print("Gemini model initialized for PR summarization.")
except Exception as e:
print(f"Error initializing Gemini: {e}")
self.repo_data = None
self.owner = None
self.repo = None
self.repo_full_name = None
def analyze_repo(self, owner, repo):
"""Analyze a GitHub repository and store the data."""
self.owner = owner
self.repo = repo
self.repo_full_name = f"{owner}/{repo}"
print(f"\nFetching repository information for {self.repo_full_name}...")
self.repo_data = self.github_analyzer.get_all_info(owner, repo)
if not self.repo_data:
print(f"Failed to get repository information for {self.repo_full_name}")
return False
print(f"Successfully analyzed repository: {self.repo_full_name}")
return True
def _get_pr_summary_prompt(self, pr_details, role):
"""Generate a prompt for Gemini to summarize PR based on role."""
# Extract key details safely
title = pr_details.get('title', 'N/A')
body = pr_details.get('body', 'No description provided.')
pr_number = pr_details.get('number', 'N/A')
repo_name = pr_details.get('repo_full_name', 'N/A')
author = pr_details.get('author', 'N/A')
state = pr_details.get('state', 'N/A')
merged_status = 'Merged' if pr_details.get('merged') else ('Closed' if state == 'closed' else 'Open')
created_at = pr_details.get('created_at', 'N/A')
commits_count = pr_details.get('commits_count', 'N/A')
changed_files = pr_details.get('changed_files_count', 'N/A')
additions = pr_details.get('additions', 'N/A')
deletions = pr_details.get('deletions', 'N/A')
labels = ', '.join(pr_details.get('labels', [])) or 'None'
# Truncate long body
max_body_len = 1500
truncated_body = body[:max_body_len] + ('...' if len(body) > max_body_len else '')
base_prompt = f"""
You are an AI assistant specializing in summarizing GitHub Pull Requests.
Analyze the following Pull Request details from repository '{repo_name}' and provide a summary tailored for a '{role}'.
**Pull Request #{pr_number}: {title}**
* **Author:** {author}
* **Status:** {state.capitalize()} ({merged_status})
* **Created:** {created_at}
* **Commits:** {commits_count}
* **Changed Files:** {changed_files}
* **Code Churn:** +{additions} / -{deletions} lines
* **Labels:** {labels}
* **Description/Body:**
{truncated_body}
---
"""
role_instructions = ""
# Define role-specific instructions
if role == 'Developer':
role_instructions = """
**Summary Focus (Developer):**
* Summarize the core technical changes and their purpose.
* Identify key files, modules, or functions affected.
* Mention any potential technical complexities, risks, or areas needing careful code review (based *only* on the description and metadata).
* Note any mention of tests added or modified.
* Be concise and focus on technical aspects relevant for peer review or understanding the change.
"""
elif role == 'Manager' or role == 'Team Lead':
role_instructions = """
**Summary Focus (Manager/Team Lead):**
* Explain the high-level purpose and business value (what problem does this PR solve or what feature does it add?).
* Summarize the overall status (e.g., Ready for Review, Needs Work, Merged, Blocked?).
* Give a sense of the PR's size/complexity (e.g., Small/Medium/Large based on file/line changes and description).
* Highlight any mentioned risks, blockers, or dependencies on other work.
* Include the author and key dates (created, merged/closed).
* Focus on information needed for tracking progress and impact.
"""
elif role == 'Program Manager' or role == 'Product Owner':
role_instructions = """
**Summary Focus (Program/Product Manager):**
* Describe the user-facing impact or the feature/bug fix being addressed.
* Relate the PR to product goals or requirements if possible (based on title/body/labels).
* Note the status (especially if merged or closed).
* Mention associated issues or tickets if referenced in the body (though not explicitly provided here, look for patterns like '#123').
* Focus on 'what' and 'why' from a product perspective.
"""
else: # Default/General
role_instructions = """
**Summary Focus (General):**
* State the main goal or purpose of the PR clearly.
* Identify the author and the current status (Open/Closed/Merged).
* Provide a brief, balanced overview of the key changes made.
* Keep the summary accessible to a wider audience.
"""
return base_prompt + role_instructions + "\n**Summary:**"
def summarize_pull_request(self, pr_number, role='Developer'):
"""Summarize a pull request using Gemini AI."""
if not self.gemini_model:
return "Gemini model not initialized. Cannot generate summary."
if not self.owner or not self.repo:
return "Repository owner and name not set. Analyze a repository first."
print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...")
pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number)
if not pr_details:
return f"Could not retrieve details for PR #{pr_number}."
print(f"Generating summary for role: {role}...")
# Generate the role-specific prompt
prompt = self._get_pr_summary_prompt(pr_details, role)
try:
response = self.gemini_model.generate_content(prompt)
summary_text = response.text
return summary_text
except Exception as e:
print(f"Error communicating with Gemini for PR summary: {e}")
return f"Error generating PR summary: {e}"
def create_dependency_network_html(self, output_file="dependency_network.html"):
"""Create an interactive network visualization of dependencies using PyVis."""
if not self.repo_data:
print("No repository data available.")
return None
# Get the dependencies
dependencies = self.repo_data.get("text_content", {}).get("dependencies", {})
if not dependencies:
print("No dependency data available.")
return None
internal_deps = dependencies.get('internal', {})
external_deps = dependencies.get('external', {})
# Create NetworkX graph first
G = nx.DiGraph()
# Add file nodes and internal dependencies
for file_path, deps in internal_deps.items():
file_name = os.path.basename(file_path)
G.add_node(file_path, label=file_name, title=file_path, group="file")
for dep in deps:
dep_name = os.path.basename(dep)
G.add_node(dep, label=dep_name, title=dep, group="file")
G.add_edge(file_path, dep)
# Add external dependencies
for file_path, deps in external_deps.items():
if file_path not in G.nodes:
file_name = os.path.basename(file_path)
G.add_node(file_path, label=file_name, title=file_path, group="file")
for dep in deps:
ext_node = f"ext:{dep}"
G.add_node(ext_node, label=dep, title=dep, group="external")
G.add_edge(file_path, ext_node)
# Create PyVis network from NetworkX graph
net = Network(height="750px", width="100%", directed=True, notebook=False)
# Set network options for better visualization
net.set_options("""
{
"physics": {
"hierarchicalRepulsion": {
"centralGravity": 0.0,
"springLength": 100,
"springConstant": 0.01,
"nodeDistance": 120
},
"maxVelocity": 50,
"minVelocity": 0.1,
"solver": "hierarchicalRepulsion"
},
"layout": {
"improvedLayout": true
}
}
""")
# Add nodes with properties from NetworkX graph
for node, node_attrs in G.nodes(data=True):
group = node_attrs.get('group', 'file')
# Set colors based on node type
color = "#97c2fc" if group == "file" else "#fb7e81" # blue for files, red for external
net.add_node(
node,
label=node_attrs.get('label', str(node)),
title=node_attrs.get('title', str(node)),
color=color
)
# Add edges
for source, target in G.edges():
net.add_edge(source, target)
# Generate and save the HTML file
net.save_graph(output_file)
print(f"Dependency network visualization saved to {output_file}")
return output_file
def create_vizro_dashboard(self, output_dir='./vizro_dashboard'):
"""Create a Vizro dashboard from repository data."""
if not self.repo_data:
print("No repository data available. Run analyze_repo() first.")
return None
# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Extract repository data
repo_name = self.repo_data["basic_info"]["full_name"]
basic_info = self.repo_data["basic_info"]
# Create dashboard pages
pages = []
# 1. Overview Page
overview_components = []
# Basic repository info as a card
repo_info_md = f"""
# {basic_info['full_name']}
**Description:** {basic_info.get('description', 'No description')}
**Stars:** {basic_info['stargazers_count']} |
**Forks:** {basic_info['forks_count']} |
**Open Issues:** {basic_info['open_issues_count']}
**Created:** {basic_info['created_at']} |
**Last Updated:** {basic_info['updated_at']}
**Default Branch:** {basic_info['default_branch']}
**License:** {basic_info['license']['name'] if basic_info.get('license') else 'Not specified'}
"""
overview_components.append(vzm.Card(text=repo_info_md))
# Languages pie chart
if self.repo_data.get("languages"):
langs_data = []
total = sum(self.repo_data["languages"].values())
for lang, bytes_count in self.repo_data["languages"].items():
percentage = (bytes_count / total) * 100
langs_data.append({
"Language": lang,
"Bytes": bytes_count,
"Percentage": percentage
})
langs_df = pd.DataFrame(langs_data)
lang_pie = vzm.Graph(
figure=px.pie(
langs_df,
values="Percentage",
names="Language",
title="Language Distribution"
)
)
overview_components.append(vzm.Card(graph=lang_pie))
# Contributors bar chart
if self.repo_data.get("contributors"):
contrib_data = []
for contributor in self.repo_data["contributors"][:15]:
contrib_data.append({
"Username": contributor['login'],
"Contributions": contributor['contributions']
})
contrib_df = pd.DataFrame(contrib_data)
contrib_bar = vzm.Graph(
figure=px.bar(
contrib_df,
x="Username",
y="Contributions",
title="Top Contributors"
)
)
overview_components.append(vzm.Card(graph=contrib_bar))
# Add overview page
pages.append(
vzm.Page(
title="Overview",
components=overview_components
)
)
# 2. Activity Page
activity_components = []
# Commit activity over time
weekly_commits = self.repo_data.get("temporal_analysis", {}).get("weekly_commits", [])
if weekly_commits:
commits_df = pd.DataFrame([
{"Date": week['date'], "Commits": week['total']}
for week in weekly_commits
])
commits_line = vzm.Graph(
figure=px.line(
commits_df,
x="Date",
y="Commits",
title="Weekly Commit Activity"
)
)
activity_components.append(vzm.Card(graph=commits_line))
# Code changes over time
weekly_code_changes = self.repo_data.get("temporal_analysis", {}).get("weekly_code_changes", [])
if weekly_code_changes:
changes_data = []
for week in weekly_code_changes:
changes_data.append({
"Date": week['date'],
"Additions": week['additions'],
"Deletions": -abs(week['deletions']) # Make negative for visualization
})
changes_df = pd.DataFrame(changes_data)
# Create a stacked bar chart
changes_fig = go.Figure()
changes_fig.add_trace(go.Bar(
x=changes_df["Date"],
y=changes_df["Additions"],
name="Additions",
marker_color="green"
))
changes_fig.add_trace(go.Bar(
x=changes_df["Date"],
y=changes_df["Deletions"],
name="Deletions",
marker_color="red"
))
changes_fig.update_layout(
title="Weekly Code Changes",
barmode="relative"
)
changes_chart = vzm.Graph(figure=changes_fig)
activity_components.append(vzm.Card(graph=changes_chart))
# Issue resolution times
issue_timeline = self.repo_data.get("temporal_analysis", {}).get("issue_timeline", {})
if issue_timeline and issue_timeline.get('resolution_times'):
resolution_times = issue_timeline['resolution_times']
# Convert to hours for better visualization (cap at one week)
rt_hours = [min(rt, 168) for rt in resolution_times if rt is not None]
# Create histogram
issue_resolution_fig = px.histogram(
x=rt_hours,
title="Issue Resolution Times (Capped at 1 Week)",
labels={"x": "Hours to Resolution", "y": "Number of Issues"}
)
# Add mean and median lines
if rt_hours:
mean_rt = np.mean(rt_hours)
median_rt = np.median(rt_hours)
issue_resolution_fig.add_vline(
x=mean_rt,
line_dash="dash",
line_color="red",
annotation_text=f"Mean: {mean_rt:.2f} hours"
)
issue_resolution_fig.add_vline(
x=median_rt,
line_dash="dash",
line_color="green",
annotation_text=f"Median: {median_rt:.2f} hours"
)
resolution_hist = vzm.Graph(figure=issue_resolution_fig)
activity_components.append(vzm.Card(graph=resolution_hist))
# Add activity page
pages.append(
vzm.Page(
title="Activity",
components=activity_components
)
)
# 3. Code Quality Page
code_components = []
# Code complexity metrics
complexity_metrics = self.repo_data.get("text_content", {}).get("complexity_metrics", {})
cyclomatic_complexity = complexity_metrics.get("cyclomatic_complexity", [])
if cyclomatic_complexity:
# Prepare data for top complex files
complexity_data = []
for path, cc in cyclomatic_complexity:
# Ensure cc is numeric
if isinstance(cc, (int, float)):
complexity_data.append({
"File": os.path.basename(path),
"Path": path,
"Complexity": cc
})
if complexity_data:
# Sort by complexity
complexity_data.sort(key=lambda x: x["Complexity"], reverse=True)
# Take top 10
top_complex_files = complexity_data[:10]
complex_df = pd.DataFrame(top_complex_files)
complex_bar = vzm.Graph(
figure=px.bar(
complex_df,
x="File",
y="Complexity",
title="Most Complex Files",
hover_data=["Path"]
)
)
code_components.append(vzm.Card(graph=complex_bar))
# Complexity histogram
cc_values = [d["Complexity"] for d in complexity_data]
cc_hist = vzm.Graph(
figure=px.histogram(
x=cc_values,
title="Cyclomatic Complexity Distribution",
labels={"x": "Complexity", "y": "Number of Files"}
)
)
code_components.append(vzm.Card(graph=cc_hist))
# Comment ratio by file
comment_ratios = complexity_metrics.get("comment_ratios", [])
if comment_ratios:
comment_data = []
for path, ratio in comment_ratios:
comment_data.append({
"File": os.path.basename(path),
"Path": path,
"Comment Ratio": ratio
})
# Sort by ratio
comment_data.sort(key=lambda x: x["Comment Ratio"], reverse=True)
# Take top 10
top_commented_files = comment_data[:10]
comment_df = pd.DataFrame(top_commented_files)
comment_bar = vzm.Graph(
figure=px.bar(
comment_df,
x="File",
y="Comment Ratio",
title="Most Commented Files",
hover_data=["Path"]
)
)
code_components.append(vzm.Card(graph=comment_bar))
# Add code quality page
pages.append(
vzm.Page(
title="Code Quality",
components=code_components
)
)
# 4. Dependencies Page
dependencies = self.repo_data.get("text_content", {}).get("dependencies", {})
if dependencies:
dependencies_components = []
# External dependencies
external_deps = dependencies.get("external", {})
if external_deps:
# Count packages
ext_counts = Counter()
for file_deps in external_deps.values():
ext_counts.update(dep for dep in file_deps if isinstance(dep, str))
# Get top dependencies
top_deps = ext_counts.most_common(10)
deps_data = []
for pkg, count in top_deps:
deps_data.append({
"Package": pkg,
"Count": count
})
deps_df = pd.DataFrame(deps_data)
deps_bar = vzm.Graph(
figure=px.bar(
deps_df,
x="Package",
y="Count",
title="Most Used External Dependencies"
)
)
dependencies_components.append(vzm.Card(graph=deps_bar))
# Create dependency network visualization with PyVis in a separate HTML file
# and embed a note about it in the dashboard
try:
network_file = self.create_dependency_network_html(
output_file=os.path.join(output_dir, "dependency_network.html")
)
if network_file:
network_note = f"""
## Code Dependency Network
An interactive visualization of code dependencies has been created as a separate file:
`{os.path.basename(network_file)}`
Open this file in a web browser to explore the code dependency network.
"""
dependencies_components.append(vzm.Card(text=network_note))
except Exception as e:
print(f"Error creating dependency network: {e}")
# Add dependencies page if we have components
if dependencies_components:
pages.append(
vzm.Page(
title="Dependencies",
components=dependencies_components
)
)
# Create the dashboard
dashboard = vzm.Dashboard(
title=f"GitHub Repository Analysis: {repo_name}",
pages=pages
)
# Export dashboard
dashboard_path = os.path.join(output_dir, "dashboard.html")
try:
dashboard.save(dashboard_path)
print(f"Vizro dashboard saved to {dashboard_path}")
return dashboard_path
except Exception as e:
print(f"Error saving dashboard: {e}")
return None
# Create Gradio interface
def create_gradio_interface():
"""Create a Gradio interface for the GitHub repository analyzer."""
def analyze_repository(owner, repo, github_token=None, gemini_api_key=None):
"""Function to analyze a repository and return a Vizro dashboard."""
try:
analyzer = RepoAnalyzer(
github_token=github_token if github_token else None,
gemini_api_key=gemini_api_key if gemini_api_key else None
)
# Analyze repository
success = analyzer.analyze_repo(owner, repo)
if not success:
return None, None, f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token."
# Create Vizro dashboard
dashboard_path = analyzer.create_vizro_dashboard(output_dir='./vizro_dashboard')
# Create dependency network visualization
network_path = analyzer.create_dependency_network_html(output_file='./vizro_dashboard/dependency_network.html')
# Generate a simple report
basic_info = analyzer.repo_data["basic_info"]
report = f"""
### Repository Analysis: {basic_info['full_name']}
**Description:** {basic_info.get('description', 'No description')}
**Statistics:**
- Stars: {basic_info['stargazers_count']}
- Forks: {basic_info['forks_count']}
- Open Issues: {basic_info['open_issues_count']}
**Interactive Dashboard:**
The full interactive Vizro dashboard has been created at: `{dashboard_path}`
**Dependency Network:**
The interactive dependency network visualization has been created at: `{network_path}`
**Language Summary:**
"""
# Add language info
if analyzer.repo_data.get("languages"):
langs = analyzer.repo_data["languages"]
total = sum(langs.values())
for lang, bytes_count in sorted(langs.items(), key=lambda x: x[1], reverse=True):
percentage = (bytes_count / total) * 100
report += f"- {lang}: {percentage:.1f}%\n"
# Add code metrics if available
if analyzer.repo_data.get("text_content", {}).get("aggregate_metrics"):
metrics = analyzer.repo_data["text_content"]["aggregate_metrics"]
report += f"""
**Code Metrics:**
- Total Files Analyzed: {metrics.get('total_files', 'N/A')}
- Total Code Lines: {metrics.get('total_code_lines', 'N/A')}
- Comment Ratio: {metrics.get('average_comment_ratio', 'N/A'):.2f}
"""
return dashboard_path, network_path, report
except Exception as e:
return None, None, f"Error analyzing repository: {str(e)}"
def summarize_pr(owner, repo, pr_number, role, github_token=None, gemini_api_key=None):
"""Function to summarize a PR for Gradio."""
try:
analyzer = RepoAnalyzer(
github_token=github_token if github_token else None,
gemini_api_key=gemini_api_key if gemini_api_key else None
)
# Analyze repo first (lightweight)
success = analyzer.analyze_repo(owner, repo)
if not success:
return f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token."
# Summarize the PR
summary = analyzer.summarize_pull_request(int(pr_number), role)
return summary
except Exception as e:
return f"Error summarizing PR: {str(e)}"
def view_dashboard(dashboard_path):
"""Load dashboard content for the iframe."""
try:
if not dashboard_path or not os.path.exists(dashboard_path):
return "Dashboard file not found"
with open(dashboard_path, 'r', encoding='utf-8') as f:
html_content = f.read()
return html_content
except Exception as e:
return f"Error loading dashboard: {str(e)}"
def view_network(network_path):
"""Load network visualization content for the iframe."""
try:
if not network_path or not os.path.exists(network_path):
return "Network visualization file not found"
with open(network_path, 'r', encoding='utf-8') as f:
html_content = f.read()
return html_content
except Exception as e:
return f"Error loading network visualization: {str(e)}"
# UI Components
with gr.Blocks(title="GitHub Repository Analyzer") as app:
gr.Markdown("# GitHub Repository Analyzer with Vizro and PyVis")
gr.Markdown("Analyze GitHub repositories, visualize code dependencies, and summarize pull requests")
with gr.Tab("Repository Analysis"):
with gr.Row():
with gr.Column(scale=1):
owner_input = gr.Textbox(label="Repository Owner (Username/Organization)")
repo_input = gr.Textbox(label="Repository Name")
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password")
gemini_api_key = gr.Textbox(label="Google API Key (Optional, for PR Summaries)", type="password")
analyze_btn = gr.Button("Analyze Repository")
with gr.Column(scale=2):
report_output = gr.Markdown(label="Analysis Report")
# Store paths but don't display them
dashboard_path_state = gr.State()
network_path_state = gr.State()
with gr.Tabs():
with gr.TabItem("Dashboard"):
# Fix: Remove height parameter from HTML component
dashboard_frame = gr.HTML(label="Dashboard Preview")
with gr.TabItem("Dependency Network"):
# Fix: Remove height parameter from HTML component
network_frame = gr.HTML(label="Dependency Network")
analyze_btn.click(
analyze_repository,
inputs=[
owner_input, repo_input, github_token, gemini_api_key
],
outputs=[dashboard_path_state, network_path_state, report_output]
)
# Update iframes when paths change
dashboard_path_state.change(
view_dashboard,
inputs=[dashboard_path_state],
outputs=[dashboard_frame]
)
network_path_state.change(
view_network,
inputs=[network_path_state],
outputs=[network_frame]
)
with gr.Tab("PR Summarizer"):
with gr.Row():
with gr.Column(scale=1):
pr_owner_input = gr.Textbox(label="Repository Owner")
pr_repo_input = gr.Textbox(label="Repository Name")
pr_number_input = gr.Number(label="PR Number", precision=0)
pr_role_input = gr.Dropdown(
choices=["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"],
label="Your Role",
value="Developer"
)
pr_github_token = gr.Textbox(label="GitHub Token (Optional)", type="password")
pr_gemini_api_key = gr.Textbox(label="Google API Key (Required for Gemini)", type="password")
summarize_btn = gr.Button("Summarize PR")
with gr.Column(scale=2):
pr_summary_output = gr.Markdown(label="PR Summary")
summarize_btn.click(
summarize_pr,
inputs=[
pr_owner_input, pr_repo_input, pr_number_input,
pr_role_input, pr_github_token, pr_gemini_api_key
],
outputs=pr_summary_output
)
return app
# Main function to run the app
def main():
"""Run the GitHub Repository Analyzer with Gradio interface."""
# Load environment variables
load_dotenv()
# Create and launch the Gradio interface
app = create_gradio_interface()
app.launch(share=True)
if __name__ == "__main__":
main()