Spaces:
Sleeping
Sleeping
import requests | |
import json | |
import os | |
import base64 | |
import re | |
import ast | |
import networkx as nx | |
import radon.metrics as metrics | |
import radon.complexity as complexity | |
from datetime import datetime, timedelta | |
from collections import defaultdict, Counter | |
import pandas as pd | |
import numpy as np | |
from github import Github, GithubException | |
import time | |
from dotenv import load_dotenv | |
# Visualization imports | |
import vizro.plotly.express as px | |
import vizro | |
import vizro.models as vzm | |
import plotly.graph_objects as go | |
import gradio as gr | |
from pyvis.network import Network | |
# Google Gemini AI (optional) | |
try: | |
import google.generativeai as genai | |
GEMINI_AVAILABLE = True | |
except ImportError: | |
GEMINI_AVAILABLE = False | |
print("Google Generative AI package not found. PR summarization feature will be disabled.") | |
class GitHubRepoInfo: | |
"""Enhanced class to get comprehensive information about a GitHub repository.""" | |
def __init__(self, token=None): | |
"""Initialize with optional GitHub API token.""" | |
self.base_url = "https://api.github.com" | |
self.headers = {"Accept": "application/vnd.github.v3+json"} | |
self.token = token | |
self.github = None # Initialize github attribute | |
# Set up authentication | |
if token: | |
self.headers["Authorization"] = f"token {token}" | |
try: | |
self.github = Github(token) | |
self.github.get_user().login # Test connection | |
except Exception as e: | |
print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
self.github = Github() # Fallback to unauthenticated | |
elif os.environ.get("GITHUB_TOKEN"): | |
self.token = os.environ.get("GITHUB_TOKEN") | |
self.headers["Authorization"] = f"token {self.token}" | |
try: | |
self.github = Github(self.token) | |
self.github.get_user().login # Test connection | |
except Exception as e: | |
print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
self.github = Github() # Fallback to unauthenticated | |
else: | |
self.github = Github() # Unauthenticated | |
# Configure rate limit handling | |
self.rate_limit_remaining = 5000 # Assume higher limit if authenticated | |
self.rate_limit_reset = datetime.now() | |
# Initialize rate limit info if possible | |
if self.github: | |
try: | |
rate_limit = self.github.get_rate_limit() | |
self.rate_limit_remaining = rate_limit.core.remaining | |
self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) | |
except Exception as e: | |
print(f"Warning: Could not get initial rate limit from PyGithub: {e}") | |
def _check_rate_limit(self): | |
"""Check API rate limit and wait if necessary.""" | |
if self.rate_limit_remaining <= 10: | |
reset_time = self.rate_limit_reset | |
current_time = datetime.now() | |
if reset_time > current_time: | |
wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer | |
print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") | |
time.sleep(wait_time) | |
# Update rate limit info after each API call | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
if response.status_code == 200: | |
rate_data = response.json() | |
self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] | |
self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) | |
def _paginated_get(self, url, params=None, max_items=None): | |
"""Handle paginated API responses with rate limit awareness.""" | |
if params is None: | |
params = {} | |
items = [] | |
page = 1 | |
per_page = min(100, params.get("per_page", 30)) | |
params["per_page"] = per_page | |
while True: | |
self._check_rate_limit() | |
params["page"] = page | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
page_items = response.json() | |
if not page_items: | |
break | |
items.extend(page_items) | |
page += 1 | |
# Check if we've reached the requested limit | |
if max_items and len(items) >= max_items: | |
return items[:max_items] | |
# Check if we've reached the end (GitHub returns fewer items than requested) | |
if len(page_items) < per_page: | |
break | |
else: | |
print(f"Error {response.status_code}: {response.text}") | |
break | |
return items | |
def get_repo_info(self, owner, repo): | |
"""Get basic repository information.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error {response.status_code}: {response.text}") | |
return None | |
def get_contributors(self, owner, repo, max_contributors=None): | |
"""Get repository contributors with pagination support.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/contributors" | |
return self._paginated_get(url, max_items=max_contributors) | |
def get_languages(self, owner, repo): | |
"""Get languages used in the repository.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/languages" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error getting languages: {response.status_code}") | |
return {} | |
def get_commits(self, owner, repo, params=None, max_commits=None): | |
"""Get commits with enhanced filtering and pagination.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/commits" | |
return self._paginated_get(url, params=params, max_items=max_commits) | |
def get_commit_activity(self, owner, repo): | |
"""Get commit activity stats for the past year.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202: | |
# GitHub is computing the statistics, wait and retry | |
print("GitHub is computing statistics, waiting and retrying...") | |
time.sleep(2) | |
return self.get_commit_activity(owner, repo) | |
else: | |
print(f"Error getting commit activity: {response.status_code}") | |
return [] | |
def get_code_frequency(self, owner, repo): | |
"""Get weekly code addition and deletion statistics.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202: | |
# GitHub is computing the statistics, wait and retry | |
print("GitHub is computing statistics, waiting and retrying...") | |
time.sleep(2) | |
return self.get_code_frequency(owner, repo) | |
else: | |
print(f"Error getting code frequency: {response.status_code}") | |
return [] | |
def get_contributor_activity(self, owner, repo): | |
"""Get contributor commit activity over time.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202: | |
# GitHub is computing the statistics, wait and retry | |
print("GitHub is computing statistics, waiting and retrying...") | |
time.sleep(2) | |
return self.get_contributor_activity(owner, repo) | |
else: | |
print(f"Error getting contributor activity: {response.status_code}") | |
return [] | |
def get_branches(self, owner, repo): | |
"""Get repository branches.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/branches" | |
return self._paginated_get(url) | |
def get_releases(self, owner, repo, max_releases=None): | |
"""Get repository releases with pagination support.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/releases" | |
return self._paginated_get(url, max_items=max_releases) | |
def get_issues(self, owner, repo, state="all", max_issues=None, params=None): | |
"""Get repository issues with enhanced filtering.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/issues" | |
if params is None: | |
params = {} | |
params["state"] = state | |
return self._paginated_get(url, params=params, max_items=max_issues) | |
def get_issue_timeline(self, owner, repo, days_back=180): | |
"""Analyze issue creation and closing over time.""" | |
# Get issues including closed ones | |
issues = self.get_issues(owner, repo, state="all") | |
# Prepare timeline data | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=days_back) | |
# Initialize daily counters | |
date_range = pd.date_range(start=start_date, end=end_date) | |
created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
# Collect issue creation and closing dates | |
for issue in issues: | |
created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if created_at >= start_date: | |
created_counts[created_at.strftime('%Y-%m-%d')] += 1 | |
if issue['state'] == 'closed' and issue.get('closed_at'): | |
closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if closed_at >= start_date: | |
closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 | |
# Calculate resolution times for closed issues | |
resolution_times = [] | |
for issue in issues: | |
if issue['state'] == 'closed' and issue.get('closed_at'): | |
created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours | |
resolution_times.append(resolution_time) | |
# Calculate issue labels distribution | |
label_counts = defaultdict(int) | |
for issue in issues: | |
for label in issue.get('labels', []): | |
label_counts[label['name']] += 1 | |
return { | |
'created': created_counts, | |
'closed': closed_counts, | |
'resolution_times': resolution_times, | |
'labels': dict(label_counts) | |
} | |
def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): | |
"""Get repository pull requests with enhanced filtering.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/pulls" | |
if params is None: | |
params = {} | |
params["state"] = state | |
return self._paginated_get(url, params=params, max_items=max_prs) | |
def get_pr_timeline(self, owner, repo, days_back=180): | |
"""Analyze PR creation, closing, and metrics over time.""" | |
# Get PRs including closed and merged ones | |
prs = self.get_pull_requests(owner, repo, state="all") | |
# Prepare timeline data | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=days_back) | |
# Initialize daily counters | |
date_range = pd.date_range(start=start_date, end=end_date) | |
created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
# Track metrics | |
merge_times = [] | |
pr_sizes = [] | |
# Collect PR data | |
for pr in prs: | |
created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if created_at >= start_date: | |
created_counts[created_at.strftime('%Y-%m-%d')] += 1 | |
# Get PR size (additions + deletions) | |
if pr.get('additions') is not None and pr.get('deletions') is not None: | |
pr_sizes.append({ | |
'additions': pr['additions'], | |
'deletions': pr['deletions'], | |
'total': pr['additions'] + pr['deletions'], | |
'files_changed': pr.get('changed_files', 0) | |
}) | |
# Check if PR is closed | |
if pr['state'] == 'closed': | |
closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if closed_at >= start_date: | |
closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 | |
# Check if PR was merged | |
if pr['merged_at']: | |
merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if merged_at >= start_date: | |
merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 | |
# Calculate time to merge | |
merge_time = (merged_at - created_at).total_seconds() / 3600 # hours | |
merge_times.append(merge_time) | |
# Calculate acceptance rate | |
total_closed = sum(closed_counts.values()) | |
total_merged = sum(merged_counts.values()) | |
acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 | |
return { | |
'created': created_counts, | |
'closed': closed_counts, | |
'merged': merged_counts, | |
'merge_times': merge_times, | |
'pr_sizes': pr_sizes, | |
'acceptance_rate': acceptance_rate | |
} | |
def get_contents(self, owner, repo, path="", ref=None): | |
"""Get repository contents at the specified path.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error getting contents: {response.status_code}") | |
return [] | |
def get_readme(self, owner, repo, ref=None): | |
"""Get repository README file.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/readme" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get("content"): | |
content = base64.b64decode(data["content"]).decode("utf-8") | |
return { | |
"name": data["name"], | |
"path": data["path"], | |
"content": content | |
} | |
return data | |
else: | |
print(f"README not found or error: {response.status_code}") | |
return None | |
def get_file_content(self, owner, repo, path, ref=None): | |
"""Get the content of a specific file in the repository.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get("content"): | |
try: | |
content = base64.b64decode(data["content"]).decode("utf-8") | |
return content | |
except UnicodeDecodeError: | |
return "[Binary file content not displayed]" | |
return None | |
else: | |
print(f"Error getting file content: {response.status_code}") | |
return None | |
def is_text_file(self, file_path): | |
"""Determine if a file is likely a text file based on extension.""" | |
text_extensions = [ | |
'.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', | |
'.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', | |
'.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', | |
'.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', | |
'.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', | |
'.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', | |
'.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', | |
'.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', | |
'.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' | |
] | |
extension = os.path.splitext(file_path)[1].lower() | |
return extension in text_extensions | |
def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): | |
"""Recursively get repository contents with a depth limit and file count limit.""" | |
if current_depth >= max_depth: | |
return [] | |
contents = self.get_contents(owner, repo, path, ref) | |
results = [] | |
file_count = 0 | |
for item in contents: | |
if file_count >= max_files: | |
break | |
if item["type"] == "dir": | |
# For directories, add the directory itself and recursively get contents | |
dir_item = { | |
"type": "dir", | |
"name": item["name"], | |
"path": item["path"], | |
"contents": self.get_recursive_contents( | |
owner, repo, item["path"], max_depth, current_depth + 1, | |
max_files - file_count, ref | |
) | |
} | |
results.append(dir_item) | |
else: | |
# For files, add the file info | |
results.append({ | |
"type": "file", | |
"name": item["name"], | |
"path": item["path"], | |
"size": item["size"], | |
"url": item["html_url"] | |
}) | |
file_count += 1 | |
return results | |
def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): | |
"""Get content of all text files in the repository (with limit).""" | |
contents = self.get_contents(owner, repo, path, ref) | |
text_files = [] | |
file_count = 0 | |
# Process current directory | |
for item in contents: | |
if file_count >= max_files: | |
break | |
if item["type"] == "file" and self.is_text_file(item["name"]): | |
content = self.get_file_content(owner, repo, item["path"], ref) | |
if content and content != "[Binary file content not displayed]": | |
text_files.append({ | |
"name": item["name"], | |
"path": item["path"], | |
"content": content | |
}) | |
file_count += 1 | |
elif item["type"] == "dir": | |
# Recursively get text files from subdirectories | |
subdir_files = self.get_all_text_files( | |
owner, repo, item["path"], max_files - file_count, ref | |
) | |
text_files.extend(subdir_files) | |
file_count += len(subdir_files) | |
return text_files | |
def get_documentation_files(self, owner, repo, ref=None): | |
"""Get documentation files from the repository.""" | |
# Common documentation file paths and directories | |
doc_paths = [ | |
"docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", | |
"CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", | |
"SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", | |
".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" | |
] | |
doc_files = [] | |
# Try to get each documentation file/directory | |
for path in doc_paths: | |
try: | |
contents = self.get_contents(owner, repo, path, ref) | |
# If it's a directory, get all markdown files in it | |
if isinstance(contents, list): | |
for item in contents: | |
if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): | |
content = self.get_file_content(owner, repo, item["path"], ref) | |
if content: | |
doc_files.append({ | |
"name": item["name"], | |
"path": item["path"], | |
"content": content | |
}) | |
# If it's a file, get its content | |
elif isinstance(contents, dict) and contents.get("type") == "file": | |
content = self.get_file_content(owner, repo, path, ref) | |
if content: | |
doc_files.append({ | |
"name": contents["name"], | |
"path": contents["path"], | |
"content": content | |
}) | |
except: | |
# Path doesn't exist or access issues | |
continue | |
return doc_files | |
def analyze_ast(self, code, file_path): | |
"""Analyze Python code using AST (Abstract Syntax Tree).""" | |
if not file_path.endswith('.py'): | |
return None | |
try: | |
tree = ast.parse(code) | |
# Extract more detailed information using AST | |
functions = [] | |
classes = [] | |
imports = [] | |
function_complexities = {} | |
for node in ast.walk(tree): | |
# Get function definitions with arguments | |
if isinstance(node, ast.FunctionDef): | |
args = [] | |
defaults = len(node.args.defaults) | |
args_count = len(node.args.args) - defaults | |
# Get positional args | |
for arg in node.args.args[:args_count]: | |
if hasattr(arg, 'arg'): # Python 3 | |
args.append(arg.arg) | |
else: # Python 2 | |
args.append(arg.id) | |
# Get args with defaults | |
for i, arg in enumerate(node.args.args[args_count:]): | |
if hasattr(arg, 'arg'): # Python 3 | |
args.append(f"{arg.arg}=...") | |
else: # Python 2 | |
args.append(f"{arg.id}=...") | |
# Calculate function complexity | |
func_complexity = complexity.cc_visit(node) | |
function_complexities[node.name] = func_complexity | |
# Get docstring if available | |
docstring = ast.get_docstring(node) | |
functions.append({ | |
'name': node.name, | |
'args': args, | |
'complexity': func_complexity, | |
'docstring': docstring | |
}) | |
# Get class definitions | |
elif isinstance(node, ast.ClassDef): | |
methods = [] | |
class_docstring = ast.get_docstring(node) | |
# Get class methods | |
for child in node.body: | |
if isinstance(child, ast.FunctionDef): | |
method_complexity = complexity.cc_visit(child) | |
method_docstring = ast.get_docstring(child) | |
methods.append({ | |
'name': child.name, | |
'complexity': method_complexity, | |
'docstring': method_docstring | |
}) | |
classes.append({ | |
'name': node.name, | |
'methods': methods, | |
'docstring': class_docstring | |
}) | |
# Get imports | |
elif isinstance(node, ast.Import): | |
for name in node.names: | |
imports.append(name.name) | |
elif isinstance(node, ast.ImportFrom): | |
module = node.module or "" | |
for name in node.names: | |
imports.append(f"{module}.{name.name}") | |
# Calculate overall code complexity | |
code_complexity = complexity.cc_visit_ast(tree) | |
# Calculate maintainability index | |
try: | |
mi_score = metrics.mi_visit(code, True) | |
except: | |
mi_score = None | |
return { | |
'functions': functions, | |
'classes': classes, | |
'imports': imports, | |
'complexity': { | |
'overall': code_complexity, | |
'functions': function_complexities, | |
'maintainability_index': mi_score | |
} | |
} | |
except SyntaxError: | |
print(f"Syntax error in Python file: {file_path}") | |
return None | |
except Exception as e: | |
print(f"Error analyzing {file_path}: {str(e)}") | |
return None | |
def analyze_js_ts(self, code, file_path): | |
"""Analyze JavaScript/TypeScript code using regex with improved patterns.""" | |
if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): | |
return None | |
# More sophisticated regex patterns for JS/TS analysis | |
results = { | |
'functions': [], | |
'classes': [], | |
'imports': [], | |
'exports': [], | |
'hooks': [] # For React hooks | |
} | |
# Function patterns (covering various declaration styles) | |
function_patterns = [ | |
# Regular functions | |
r'function\s+(\w+)\s*\(([^)]*)\)', | |
# Arrow functions assigned to variables | |
r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', | |
# Class methods | |
r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', | |
# Object methods | |
r'(\w+)\s*:\s*function\s*\(([^)]*)\)' | |
] | |
for pattern in function_patterns: | |
for match in re.finditer(pattern, code): | |
func_name = match.group(1) | |
args = match.group(2).strip() if len(match.groups()) > 1 else "" | |
results['functions'].append({ | |
'name': func_name, | |
'args': args | |
}) | |
# Class pattern | |
class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' | |
for match in re.finditer(class_pattern, code, re.DOTALL): | |
class_name = match.group(1) | |
parent_class = match.group(2) if match.group(2) else None | |
class_body = match.group(3) | |
# Find methods in class | |
methods = [] | |
method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' | |
for method_match in re.finditer(method_pattern, class_body): | |
method_name = method_match.group(1) | |
methods.append(method_name) | |
results['classes'].append({ | |
'name': class_name, | |
'extends': parent_class, | |
'methods': methods | |
}) | |
# Import patterns | |
import_patterns = [ | |
# ES6 imports | |
r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', | |
# CommonJS requires | |
r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' | |
] | |
for pattern in import_patterns: | |
for match in re.finditer(pattern, code): | |
groups = match.groups() | |
if groups[0]: # Destructured import | |
imports = [name.strip() for name in groups[0].split(',')] | |
for imp in imports: | |
results['imports'].append(imp) | |
elif groups[1]: # Namespace import (import * as X) | |
results['imports'].append(groups[1]) | |
elif groups[2]: # Default import | |
results['imports'].append(groups[2]) | |
elif groups[3]: # Module name | |
results['imports'].append(groups[3]) | |
# React hooks detection (for React files) | |
if file_path.endswith(('.jsx', '.tsx')): | |
hook_pattern = r'use([A-Z]\w+)\s*\(' | |
for match in re.finditer(hook_pattern, code): | |
hook_name = 'use' + match.group(1) | |
results['hooks'].append(hook_name) | |
# Export patterns | |
export_patterns = [ | |
# Named exports | |
r'export\s+(?:const|let|var|function|class)\s+(\w+)', | |
# Default exports | |
r'export\s+default\s+(?:function|class)?\s*(\w+)?' | |
] | |
for pattern in export_patterns: | |
for match in re.finditer(pattern, code): | |
if match.group(1): | |
results['exports'].append(match.group(1)) | |
return results | |
def extract_code_summary(self, file_content, file_path): | |
"""Extract comprehensive summary information from code files.""" | |
extension = os.path.splitext(file_path)[1].lower() | |
# Initialize summary | |
summary = { | |
"functions": [], | |
"classes": [], | |
"imports": [], | |
"description": "", | |
"complexity": None | |
} | |
# Extract Python definitions with AST | |
if extension == '.py': | |
ast_result = self.analyze_ast(file_content, file_path) | |
if ast_result: | |
summary["functions"] = [f["name"] for f in ast_result["functions"]] | |
summary["classes"] = [c["name"] for c in ast_result["classes"]] | |
summary["imports"] = ast_result["imports"] | |
summary["complexity"] = ast_result["complexity"] | |
# Try to extract module docstring | |
try: | |
tree = ast.parse(file_content) | |
module_docstring = ast.get_docstring(tree) | |
if module_docstring: | |
summary["description"] = module_docstring | |
except: | |
pass | |
# Add detailed function and class info | |
summary["detailed_functions"] = ast_result["functions"] | |
summary["detailed_classes"] = ast_result["classes"] | |
# Extract JavaScript/TypeScript definitions | |
elif extension in ['.js', '.ts', '.jsx', '.tsx']: | |
js_result = self.analyze_js_ts(file_content, file_path) | |
if js_result: | |
summary["functions"] = [f["name"] for f in js_result["functions"]] | |
summary["classes"] = [c["name"] for c in js_result["classes"]] | |
summary["imports"] = js_result["imports"] | |
# Add detailed function and class info | |
summary["detailed_functions"] = js_result["functions"] | |
summary["detailed_classes"] = js_result["classes"] | |
summary["hooks"] = js_result.get("hooks", []) | |
summary["exports"] = js_result.get("exports", []) | |
# Calculate basic code metrics for any text file | |
if file_content: | |
lines = file_content.split('\n') | |
code_lines = 0 | |
comment_lines = 0 | |
blank_lines = 0 | |
comment_prefixes = ['#', '//', '/*', '*', '<!--'] | |
for line in lines: | |
line = line.strip() | |
if not line: | |
blank_lines += 1 | |
elif any(line.startswith(prefix) for prefix in comment_prefixes): | |
comment_lines += 1 | |
else: | |
code_lines += 1 | |
summary["metrics"] = { | |
"total_lines": len(lines), | |
"code_lines": code_lines, | |
"comment_lines": comment_lines, | |
"blank_lines": blank_lines, | |
"comment_ratio": comment_lines / max(1, code_lines + comment_lines) | |
} | |
return summary | |
def analyze_dependencies(self, owner, repo, max_files=100): | |
"""Analyze code dependencies across the repository.""" | |
# Get Python and JavaScript files | |
text_files = self.get_all_text_files(owner, repo, max_files=max_files) | |
# Filter for Python and JS/TS files | |
code_files = [f for f in text_files if f["name"].endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))] | |
# Track dependencies | |
dependencies = { | |
'internal': defaultdict(set), # File to file dependencies | |
'external': defaultdict(set), # External package dependencies by file | |
'modules': defaultdict(set) # Defined modules/components by file | |
} | |
# Extract module names from file paths | |
file_to_module = {} | |
for file in code_files: | |
# Convert file path to potential module name | |
module_path = os.path.splitext(file["path"])[0].replace('/', '.') | |
file_to_module[file["path"]] = module_path | |
# Track what each file defines | |
summary = self.extract_code_summary(file["content"], file["path"]) | |
if file["name"].endswith('.py'): | |
for function in summary.get("functions", []): | |
dependencies['modules'][file["path"]].add(f"{module_path}.{function}") | |
for class_name in summary.get("classes", []): | |
dependencies['modules'][file["path"]].add(f"{module_path}.{class_name}") | |
else: # JS/TS files | |
for export in summary.get("exports", []): | |
dependencies['modules'][file["path"]].add(export) | |
# Analyze imports/dependencies | |
for file in code_files: | |
summary = self.extract_code_summary(file["content"], file["path"]) | |
for imp in summary.get("imports", []): | |
# Check if this is an internal import | |
is_internal = False | |
if file["name"].endswith('.py'): | |
# For Python, check if the import matches any module path | |
for module_path in file_to_module.values(): | |
if imp == module_path or imp.startswith(f"{module_path}."): | |
is_internal = True | |
# Find the file that defines this module | |
for f_path, m_path in file_to_module.items(): | |
if m_path == imp.split('.')[0]: | |
dependencies['internal'][file["path"]].add(f_path) | |
break | |
else: | |
# For JS/TS, check relative imports | |
if imp.startswith('./') or imp.startswith('../'): | |
is_internal = True | |
# Try to resolve the relative import | |
src_dir = os.path.dirname(file["path"]) | |
target_path = os.path.normpath(os.path.join(src_dir, imp)) | |
# Add known extensions if not specified | |
if '.' not in os.path.basename(target_path): | |
for ext in ['.js', '.ts', '.jsx', '.tsx']: | |
test_path = f"{target_path}{ext}" | |
if test_path in file_to_module: | |
dependencies['internal'][file["path"]].add(test_path) | |
break | |
# If not internal, consider it external | |
if not is_internal: | |
# Clean up the import name (remove relative path parts) | |
if not file["name"].endswith('.py'): | |
imp = imp.split('/')[0] # Take the package name part | |
dependencies['external'][file["path"]].add(imp) | |
return dependencies | |
def create_dependency_graph(self, dependencies): | |
"""Create a NetworkX graph from dependencies for visualization.""" | |
G = nx.DiGraph() | |
# Add nodes for files | |
for file_path in dependencies['internal'].keys(): | |
G.add_node(file_path, type='file') | |
# Add edges for internal dependencies | |
for file_path, deps in dependencies['internal'].items(): | |
for dep in deps: | |
G.add_edge(file_path, dep) | |
# Add nodes and edges for external dependencies | |
external_nodes = set() | |
for file_path, deps in dependencies['external'].items(): | |
for dep in deps: | |
external_node = f"ext:{dep}" | |
if external_node not in external_nodes: | |
G.add_node(external_node, type='external') | |
external_nodes.add(external_node) | |
G.add_edge(file_path, external_node) | |
return G | |
def get_repo_text_summary(self, owner, repo, max_files=25): | |
"""Extract and summarize text content from the repository with improved metrics.""" | |
# Get README | |
readme = self.get_readme(owner, repo) | |
# Get documentation | |
docs = self.get_documentation_files(owner, repo) | |
# Get key code files (limit to avoid API rate limits) | |
text_files = self.get_all_text_files(owner, repo, max_files=max_files) | |
# Analyze code files | |
code_summary = {} | |
complexity_metrics = { | |
'cyclomatic_complexity': [], | |
'maintainability_index': [], | |
'comment_ratios': [] | |
} | |
for file in text_files: | |
ext = os.path.splitext(file["name"])[1].lower() | |
if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: | |
file_summary = self.extract_code_summary(file["content"], file["path"]) | |
code_summary[file["path"]] = file_summary | |
# Collect complexity metrics | |
if file_summary.get('complexity'): | |
cc = file_summary['complexity'].get('overall') | |
if cc is not None: | |
complexity_metrics['cyclomatic_complexity'].append((file["path"], cc)) | |
mi = file_summary['complexity'].get('maintainability_index') | |
if mi is not None: | |
complexity_metrics['maintainability_index'].append((file["path"], mi)) | |
if file_summary.get('metrics'): | |
comment_ratio = file_summary['metrics'].get('comment_ratio', 0) | |
complexity_metrics['comment_ratios'].append((file["path"], comment_ratio)) | |
# Analyze dependencies | |
dependencies = self.analyze_dependencies(owner, repo, max_files=max_files) | |
# Summarize repository content by file type | |
file_types = defaultdict(int) | |
for file in text_files: | |
ext = os.path.splitext(file["name"])[1].lower() | |
file_types[ext] += 1 | |
# Calculate aggregate code metrics | |
total_code_lines = sum(summary.get('metrics', {}).get('code_lines', 0) | |
for summary in code_summary.values()) | |
total_comment_lines = sum(summary.get('metrics', {}).get('comment_lines', 0) | |
for summary in code_summary.values()) | |
aggregate_metrics = { | |
'total_files': len(text_files), | |
'total_code_lines': total_code_lines, | |
'total_comment_lines': total_comment_lines, | |
'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0 | |
} | |
return { | |
"readme": readme, | |
"documentation": docs, | |
"code_summary": code_summary, | |
"complexity_metrics": complexity_metrics, | |
"dependencies": dependencies, | |
"file_type_counts": dict(file_types), | |
"aggregate_metrics": aggregate_metrics, | |
"text_files": text_files # Include the actual text file contents | |
} | |
def get_temporal_analysis(self, owner, repo): | |
"""Perform temporal analysis of repository activity.""" | |
# Get commit activity over time | |
commit_activity = self.get_commit_activity(owner, repo) | |
# Get code frequency (additions/deletions over time) | |
code_frequency = self.get_code_frequency(owner, repo) | |
# Get contributor activity | |
contributor_activity = self.get_contributor_activity(owner, repo) | |
# Get issue and PR timelines | |
issue_timeline = self.get_issue_timeline(owner, repo) | |
pr_timeline = self.get_pr_timeline(owner, repo) | |
# Process data for visualization | |
# - Weekly commit counts | |
weekly_commits = [] | |
if commit_activity: | |
for week in commit_activity: | |
date = datetime.fromtimestamp(week['week']) | |
weekly_commits.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'total': week['total'], | |
'days': week['days'] # Daily breakdown within the week | |
}) | |
# - Weekly code changes | |
weekly_code_changes = [] | |
if code_frequency: | |
for item in code_frequency: | |
date = datetime.fromtimestamp(item[0]) | |
weekly_code_changes.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'additions': item[1], | |
'deletions': -item[2] # Convert to positive for visualization | |
}) | |
# - Contributor timeline | |
contributor_timeline = {} | |
if contributor_activity: | |
for contributor in contributor_activity: | |
author = contributor['author']['login'] | |
weeks = contributor['weeks'] | |
if author not in contributor_timeline: | |
contributor_timeline[author] = [] | |
for week in weeks: | |
if week['c'] > 0: # Only include weeks with commits | |
date = datetime.fromtimestamp(week['w']) | |
contributor_timeline[author].append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'commits': week['c'], | |
'additions': week['a'], | |
'deletions': week['d'] | |
}) | |
return { | |
'weekly_commits': weekly_commits, | |
'weekly_code_changes': weekly_code_changes, | |
'contributor_timeline': contributor_timeline, | |
'issue_timeline': issue_timeline, | |
'pr_timeline': pr_timeline | |
} | |
def get_all_info(self, owner, repo): | |
"""Get comprehensive information about a repository with enhanced metrics.""" | |
result = { | |
"timestamp": datetime.now().isoformat(), | |
"basic_info": self.get_repo_info(owner, repo) | |
} | |
if not result["basic_info"]: | |
print(f"Could not retrieve repository information for {owner}/{repo}") | |
return None | |
print("Getting repository statistics...") | |
# Get additional information | |
result["languages"] = self.get_languages(owner, repo) | |
result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) | |
result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) | |
result["branches"] = self.get_branches(owner, repo) | |
result["releases"] = self.get_releases(owner, repo, max_releases=10) | |
result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) | |
result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) | |
result["root_contents"] = self.get_contents(owner, repo) | |
print("Analyzing repository content...") | |
# Get text content and documentation | |
result["text_content"] = self.get_repo_text_summary(owner, repo, max_files=30) | |
print("Analyzing repository activity over time...") | |
# Get temporal analysis | |
result["temporal_analysis"] = self.get_temporal_analysis(owner, repo) | |
return result | |
def get_pull_request_details(self, owner, repo, pr_number): | |
"""Get detailed information for a specific Pull Request using PyGithub.""" | |
if not self.github: | |
print("PyGithub client not initialized. Cannot fetch PR details.") | |
return None | |
try: | |
repo_obj = self.github.get_repo(f"{owner}/{repo}") | |
pr = repo_obj.get_pull(pr_number) | |
# Extract relevant information into a dictionary | |
details = { | |
"number": pr.number, | |
"title": pr.title, | |
"state": pr.state, # 'open', 'closed' | |
"merged": pr.merged, | |
"body": pr.body or "", # Ensure body is string | |
"url": pr.html_url, | |
"created_at": pr.created_at.isoformat() if pr.created_at else None, | |
"updated_at": pr.updated_at.isoformat() if pr.updated_at else None, | |
"closed_at": pr.closed_at.isoformat() if pr.closed_at else None, | |
"merged_at": pr.merged_at.isoformat() if pr.merged_at else None, | |
"author": pr.user.login if pr.user else "N/A", | |
"commits_count": pr.commits, | |
"additions": pr.additions, | |
"deletions": pr.deletions, | |
"changed_files_count": pr.changed_files, | |
"labels": [label.name for label in pr.labels], | |
"assignees": [assignee.login for assignee in pr.assignees], | |
"milestone": pr.milestone.title if pr.milestone else None, | |
"repo_full_name": f"{owner}/{repo}", # Add repo context | |
} | |
return details | |
except GithubException as e: | |
if e.status == 404: | |
print(f"Error: Pull Request #{pr_number} not found in {owner}/{repo}.") | |
else: | |
print(f"Error fetching PR #{pr_number} details: {e}") | |
return None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching PR details: {e}") | |
return None | |
class RepoAnalyzer: | |
"""Streamlined class to analyze GitHub repositories.""" | |
def __init__(self, github_token=None, gemini_api_key=None): | |
"""Initialize with GitHub and optional Gemini credentials.""" | |
load_dotenv() # Load .env file if it exists | |
self.github_token = github_token or os.getenv("GITHUB_TOKEN") | |
self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY") | |
# Initialize GitHub analyzer | |
self.github_analyzer = GitHubRepoInfo(token=self.github_token) | |
# Initialize Gemini model if API key is provided | |
self.gemini_model = None | |
if self.gemini_api_key and GEMINI_AVAILABLE: | |
try: | |
genai.configure(api_key=self.gemini_api_key) | |
self.gemini_model = genai.GenerativeModel('gemini-1.5-pro-latest') | |
print("Gemini model initialized for PR summarization.") | |
except Exception as e: | |
print(f"Error initializing Gemini: {e}") | |
self.repo_data = None | |
self.owner = None | |
self.repo = None | |
self.repo_full_name = None | |
def analyze_repo(self, owner, repo): | |
"""Analyze a GitHub repository and store the data.""" | |
self.owner = owner | |
self.repo = repo | |
self.repo_full_name = f"{owner}/{repo}" | |
print(f"\nFetching repository information for {self.repo_full_name}...") | |
self.repo_data = self.github_analyzer.get_all_info(owner, repo) | |
if not self.repo_data: | |
print(f"Failed to get repository information for {self.repo_full_name}") | |
return False | |
print(f"Successfully analyzed repository: {self.repo_full_name}") | |
return True | |
def _get_pr_summary_prompt(self, pr_details, role): | |
"""Generate a prompt for Gemini to summarize PR based on role.""" | |
# Extract key details safely | |
title = pr_details.get('title', 'N/A') | |
body = pr_details.get('body', 'No description provided.') | |
pr_number = pr_details.get('number', 'N/A') | |
repo_name = pr_details.get('repo_full_name', 'N/A') | |
author = pr_details.get('author', 'N/A') | |
state = pr_details.get('state', 'N/A') | |
merged_status = 'Merged' if pr_details.get('merged') else ('Closed' if state == 'closed' else 'Open') | |
created_at = pr_details.get('created_at', 'N/A') | |
commits_count = pr_details.get('commits_count', 'N/A') | |
changed_files = pr_details.get('changed_files_count', 'N/A') | |
additions = pr_details.get('additions', 'N/A') | |
deletions = pr_details.get('deletions', 'N/A') | |
labels = ', '.join(pr_details.get('labels', [])) or 'None' | |
# Truncate long body | |
max_body_len = 1500 | |
truncated_body = body[:max_body_len] + ('...' if len(body) > max_body_len else '') | |
base_prompt = f""" | |
You are an AI assistant specializing in summarizing GitHub Pull Requests. | |
Analyze the following Pull Request details from repository '{repo_name}' and provide a summary tailored for a '{role}'. | |
**Pull Request #{pr_number}: {title}** | |
* **Author:** {author} | |
* **Status:** {state.capitalize()} ({merged_status}) | |
* **Created:** {created_at} | |
* **Commits:** {commits_count} | |
* **Changed Files:** {changed_files} | |
* **Code Churn:** +{additions} / -{deletions} lines | |
* **Labels:** {labels} | |
* **Description/Body:** | |
{truncated_body} | |
--- | |
""" | |
role_instructions = "" | |
# Define role-specific instructions | |
if role == 'Developer': | |
role_instructions = """ | |
**Summary Focus (Developer):** | |
* Summarize the core technical changes and their purpose. | |
* Identify key files, modules, or functions affected. | |
* Mention any potential technical complexities, risks, or areas needing careful code review (based *only* on the description and metadata). | |
* Note any mention of tests added or modified. | |
* Be concise and focus on technical aspects relevant for peer review or understanding the change. | |
""" | |
elif role == 'Manager' or role == 'Team Lead': | |
role_instructions = """ | |
**Summary Focus (Manager/Team Lead):** | |
* Explain the high-level purpose and business value (what problem does this PR solve or what feature does it add?). | |
* Summarize the overall status (e.g., Ready for Review, Needs Work, Merged, Blocked?). | |
* Give a sense of the PR's size/complexity (e.g., Small/Medium/Large based on file/line changes and description). | |
* Highlight any mentioned risks, blockers, or dependencies on other work. | |
* Include the author and key dates (created, merged/closed). | |
* Focus on information needed for tracking progress and impact. | |
""" | |
elif role == 'Program Manager' or role == 'Product Owner': | |
role_instructions = """ | |
**Summary Focus (Program/Product Manager):** | |
* Describe the user-facing impact or the feature/bug fix being addressed. | |
* Relate the PR to product goals or requirements if possible (based on title/body/labels). | |
* Note the status (especially if merged or closed). | |
* Mention associated issues or tickets if referenced in the body (though not explicitly provided here, look for patterns like '#123'). | |
* Focus on 'what' and 'why' from a product perspective. | |
""" | |
else: # Default/General | |
role_instructions = """ | |
**Summary Focus (General):** | |
* State the main goal or purpose of the PR clearly. | |
* Identify the author and the current status (Open/Closed/Merged). | |
* Provide a brief, balanced overview of the key changes made. | |
* Keep the summary accessible to a wider audience. | |
""" | |
return base_prompt + role_instructions + "\n**Summary:**" | |
def summarize_pull_request(self, pr_number, role='Developer'): | |
"""Summarize a pull request using Gemini AI.""" | |
if not self.gemini_model: | |
return "Gemini model not initialized. Cannot generate summary." | |
if not self.owner or not self.repo: | |
return "Repository owner and name not set. Analyze a repository first." | |
print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...") | |
pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number) | |
if not pr_details: | |
return f"Could not retrieve details for PR #{pr_number}." | |
print(f"Generating summary for role: {role}...") | |
# Generate the role-specific prompt | |
prompt = self._get_pr_summary_prompt(pr_details, role) | |
try: | |
response = self.gemini_model.generate_content(prompt) | |
summary_text = response.text | |
return summary_text | |
except Exception as e: | |
print(f"Error communicating with Gemini for PR summary: {e}") | |
return f"Error generating PR summary: {e}" | |
def create_dependency_network_html(self, output_file="dependency_network.html"): | |
"""Create an interactive network visualization of dependencies using PyVis.""" | |
if not self.repo_data: | |
print("No repository data available.") | |
return None | |
# Get the dependencies | |
dependencies = self.repo_data.get("text_content", {}).get("dependencies", {}) | |
if not dependencies: | |
print("No dependency data available.") | |
return None | |
internal_deps = dependencies.get('internal', {}) | |
external_deps = dependencies.get('external', {}) | |
# Create NetworkX graph first | |
G = nx.DiGraph() | |
# Add file nodes and internal dependencies | |
for file_path, deps in internal_deps.items(): | |
file_name = os.path.basename(file_path) | |
G.add_node(file_path, label=file_name, title=file_path, group="file") | |
for dep in deps: | |
dep_name = os.path.basename(dep) | |
G.add_node(dep, label=dep_name, title=dep, group="file") | |
G.add_edge(file_path, dep) | |
# Add external dependencies | |
for file_path, deps in external_deps.items(): | |
if file_path not in G.nodes: | |
file_name = os.path.basename(file_path) | |
G.add_node(file_path, label=file_name, title=file_path, group="file") | |
for dep in deps: | |
ext_node = f"ext:{dep}" | |
G.add_node(ext_node, label=dep, title=dep, group="external") | |
G.add_edge(file_path, ext_node) | |
# Create PyVis network from NetworkX graph | |
net = Network(height="750px", width="100%", directed=True, notebook=False) | |
# Set network options for better visualization | |
net.set_options(""" | |
{ | |
"physics": { | |
"hierarchicalRepulsion": { | |
"centralGravity": 0.0, | |
"springLength": 100, | |
"springConstant": 0.01, | |
"nodeDistance": 120 | |
}, | |
"maxVelocity": 50, | |
"minVelocity": 0.1, | |
"solver": "hierarchicalRepulsion" | |
}, | |
"layout": { | |
"improvedLayout": true | |
} | |
} | |
""") | |
# Add nodes with properties from NetworkX graph | |
for node, node_attrs in G.nodes(data=True): | |
group = node_attrs.get('group', 'file') | |
# Set colors based on node type | |
color = "#97c2fc" if group == "file" else "#fb7e81" # blue for files, red for external | |
net.add_node( | |
node, | |
label=node_attrs.get('label', str(node)), | |
title=node_attrs.get('title', str(node)), | |
color=color | |
) | |
# Add edges | |
for source, target in G.edges(): | |
net.add_edge(source, target) | |
# Generate and save the HTML file | |
net.save_graph(output_file) | |
print(f"Dependency network visualization saved to {output_file}") | |
return output_file | |
def create_vizro_dashboard(self, output_dir='./vizro_dashboard'): | |
"""Create a Vizro dashboard from repository data.""" | |
if not self.repo_data: | |
print("No repository data available. Run analyze_repo() first.") | |
return None | |
# Create output directory if it doesn't exist | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
# Extract repository data | |
repo_name = self.repo_data["basic_info"]["full_name"] | |
basic_info = self.repo_data["basic_info"] | |
# Create dashboard pages | |
pages = [] | |
# 1. Overview Page | |
overview_components = [] | |
# Basic repository info as a card | |
repo_info_md = f""" | |
# {basic_info['full_name']} | |
**Description:** {basic_info.get('description', 'No description')} | |
**Stars:** {basic_info['stargazers_count']} | | |
**Forks:** {basic_info['forks_count']} | | |
**Open Issues:** {basic_info['open_issues_count']} | |
**Created:** {basic_info['created_at']} | | |
**Last Updated:** {basic_info['updated_at']} | |
**Default Branch:** {basic_info['default_branch']} | |
**License:** {basic_info['license']['name'] if basic_info.get('license') else 'Not specified'} | |
""" | |
overview_components.append(vzm.Card(text=repo_info_md)) | |
# Languages pie chart | |
if self.repo_data.get("languages"): | |
langs_data = [] | |
total = sum(self.repo_data["languages"].values()) | |
for lang, bytes_count in self.repo_data["languages"].items(): | |
percentage = (bytes_count / total) * 100 | |
langs_data.append({ | |
"Language": lang, | |
"Bytes": bytes_count, | |
"Percentage": percentage | |
}) | |
langs_df = pd.DataFrame(langs_data) | |
lang_pie = vzm.Graph( | |
figure=px.pie( | |
langs_df, | |
values="Percentage", | |
names="Language", | |
title="Language Distribution" | |
) | |
) | |
overview_components.append(vzm.Card(graph=lang_pie)) | |
# Contributors bar chart | |
if self.repo_data.get("contributors"): | |
contrib_data = [] | |
for contributor in self.repo_data["contributors"][:15]: | |
contrib_data.append({ | |
"Username": contributor['login'], | |
"Contributions": contributor['contributions'] | |
}) | |
contrib_df = pd.DataFrame(contrib_data) | |
contrib_bar = vzm.Graph( | |
figure=px.bar( | |
contrib_df, | |
x="Username", | |
y="Contributions", | |
title="Top Contributors" | |
) | |
) | |
overview_components.append(vzm.Card(graph=contrib_bar)) | |
# Add overview page | |
pages.append( | |
vzm.Page( | |
title="Overview", | |
components=overview_components | |
) | |
) | |
# 2. Activity Page | |
activity_components = [] | |
# Commit activity over time | |
weekly_commits = self.repo_data.get("temporal_analysis", {}).get("weekly_commits", []) | |
if weekly_commits: | |
commits_df = pd.DataFrame([ | |
{"Date": week['date'], "Commits": week['total']} | |
for week in weekly_commits | |
]) | |
commits_line = vzm.Graph( | |
figure=px.line( | |
commits_df, | |
x="Date", | |
y="Commits", | |
title="Weekly Commit Activity" | |
) | |
) | |
activity_components.append(vzm.Card(graph=commits_line)) | |
# Code changes over time | |
weekly_code_changes = self.repo_data.get("temporal_analysis", {}).get("weekly_code_changes", []) | |
if weekly_code_changes: | |
changes_data = [] | |
for week in weekly_code_changes: | |
changes_data.append({ | |
"Date": week['date'], | |
"Additions": week['additions'], | |
"Deletions": -abs(week['deletions']) # Make negative for visualization | |
}) | |
changes_df = pd.DataFrame(changes_data) | |
# Create a stacked bar chart | |
changes_fig = go.Figure() | |
changes_fig.add_trace(go.Bar( | |
x=changes_df["Date"], | |
y=changes_df["Additions"], | |
name="Additions", | |
marker_color="green" | |
)) | |
changes_fig.add_trace(go.Bar( | |
x=changes_df["Date"], | |
y=changes_df["Deletions"], | |
name="Deletions", | |
marker_color="red" | |
)) | |
changes_fig.update_layout( | |
title="Weekly Code Changes", | |
barmode="relative" | |
) | |
changes_chart = vzm.Graph(figure=changes_fig) | |
activity_components.append(vzm.Card(graph=changes_chart)) | |
# Issue resolution times | |
issue_timeline = self.repo_data.get("temporal_analysis", {}).get("issue_timeline", {}) | |
if issue_timeline and issue_timeline.get('resolution_times'): | |
resolution_times = issue_timeline['resolution_times'] | |
# Convert to hours for better visualization (cap at one week) | |
rt_hours = [min(rt, 168) for rt in resolution_times if rt is not None] | |
# Create histogram | |
issue_resolution_fig = px.histogram( | |
x=rt_hours, | |
title="Issue Resolution Times (Capped at 1 Week)", | |
labels={"x": "Hours to Resolution", "y": "Number of Issues"} | |
) | |
# Add mean and median lines | |
if rt_hours: | |
mean_rt = np.mean(rt_hours) | |
median_rt = np.median(rt_hours) | |
issue_resolution_fig.add_vline( | |
x=mean_rt, | |
line_dash="dash", | |
line_color="red", | |
annotation_text=f"Mean: {mean_rt:.2f} hours" | |
) | |
issue_resolution_fig.add_vline( | |
x=median_rt, | |
line_dash="dash", | |
line_color="green", | |
annotation_text=f"Median: {median_rt:.2f} hours" | |
) | |
resolution_hist = vzm.Graph(figure=issue_resolution_fig) | |
activity_components.append(vzm.Card(graph=resolution_hist)) | |
# Add activity page | |
pages.append( | |
vzm.Page( | |
title="Activity", | |
components=activity_components | |
) | |
) | |
# 3. Code Quality Page | |
code_components = [] | |
# Code complexity metrics | |
complexity_metrics = self.repo_data.get("text_content", {}).get("complexity_metrics", {}) | |
cyclomatic_complexity = complexity_metrics.get("cyclomatic_complexity", []) | |
if cyclomatic_complexity: | |
# Prepare data for top complex files | |
complexity_data = [] | |
for path, cc in cyclomatic_complexity: | |
# Ensure cc is numeric | |
if isinstance(cc, (int, float)): | |
complexity_data.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Complexity": cc | |
}) | |
if complexity_data: | |
# Sort by complexity | |
complexity_data.sort(key=lambda x: x["Complexity"], reverse=True) | |
# Take top 10 | |
top_complex_files = complexity_data[:10] | |
complex_df = pd.DataFrame(top_complex_files) | |
complex_bar = vzm.Graph( | |
figure=px.bar( | |
complex_df, | |
x="File", | |
y="Complexity", | |
title="Most Complex Files", | |
hover_data=["Path"] | |
) | |
) | |
code_components.append(vzm.Card(graph=complex_bar)) | |
# Complexity histogram | |
cc_values = [d["Complexity"] for d in complexity_data] | |
cc_hist = vzm.Graph( | |
figure=px.histogram( | |
x=cc_values, | |
title="Cyclomatic Complexity Distribution", | |
labels={"x": "Complexity", "y": "Number of Files"} | |
) | |
) | |
code_components.append(vzm.Card(graph=cc_hist)) | |
# Comment ratio by file | |
comment_ratios = complexity_metrics.get("comment_ratios", []) | |
if comment_ratios: | |
comment_data = [] | |
for path, ratio in comment_ratios: | |
comment_data.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Comment Ratio": ratio | |
}) | |
# Sort by ratio | |
comment_data.sort(key=lambda x: x["Comment Ratio"], reverse=True) | |
# Take top 10 | |
top_commented_files = comment_data[:10] | |
comment_df = pd.DataFrame(top_commented_files) | |
comment_bar = vzm.Graph( | |
figure=px.bar( | |
comment_df, | |
x="File", | |
y="Comment Ratio", | |
title="Most Commented Files", | |
hover_data=["Path"] | |
) | |
) | |
code_components.append(vzm.Card(graph=comment_bar)) | |
# Add code quality page | |
pages.append( | |
vzm.Page( | |
title="Code Quality", | |
components=code_components | |
) | |
) | |
# 4. Dependencies Page | |
dependencies = self.repo_data.get("text_content", {}).get("dependencies", {}) | |
if dependencies: | |
dependencies_components = [] | |
# External dependencies | |
external_deps = dependencies.get("external", {}) | |
if external_deps: | |
# Count packages | |
ext_counts = Counter() | |
for file_deps in external_deps.values(): | |
ext_counts.update(dep for dep in file_deps if isinstance(dep, str)) | |
# Get top dependencies | |
top_deps = ext_counts.most_common(10) | |
deps_data = [] | |
for pkg, count in top_deps: | |
deps_data.append({ | |
"Package": pkg, | |
"Count": count | |
}) | |
deps_df = pd.DataFrame(deps_data) | |
deps_bar = vzm.Graph( | |
figure=px.bar( | |
deps_df, | |
x="Package", | |
y="Count", | |
title="Most Used External Dependencies" | |
) | |
) | |
dependencies_components.append(vzm.Card(graph=deps_bar)) | |
# Create dependency network visualization with PyVis in a separate HTML file | |
# and embed a note about it in the dashboard | |
try: | |
network_file = self.create_dependency_network_html( | |
output_file=os.path.join(output_dir, "dependency_network.html") | |
) | |
if network_file: | |
network_note = f""" | |
## Code Dependency Network | |
An interactive visualization of code dependencies has been created as a separate file: | |
`{os.path.basename(network_file)}` | |
Open this file in a web browser to explore the code dependency network. | |
""" | |
dependencies_components.append(vzm.Card(text=network_note)) | |
except Exception as e: | |
print(f"Error creating dependency network: {e}") | |
# Add dependencies page if we have components | |
if dependencies_components: | |
pages.append( | |
vzm.Page( | |
title="Dependencies", | |
components=dependencies_components | |
) | |
) | |
# Create the dashboard | |
dashboard = vzm.Dashboard( | |
title=f"GitHub Repository Analysis: {repo_name}", | |
pages=pages | |
) | |
# Export dashboard | |
dashboard_path = os.path.join(output_dir, "dashboard.html") | |
try: | |
dashboard.save(dashboard_path) | |
print(f"Vizro dashboard saved to {dashboard_path}") | |
return dashboard_path | |
except Exception as e: | |
print(f"Error saving dashboard: {e}") | |
return None | |
# Create Gradio interface | |
def create_gradio_interface(): | |
"""Create a Gradio interface for the GitHub repository analyzer.""" | |
def analyze_repository(owner, repo, github_token=None, gemini_api_key=None): | |
"""Function to analyze a repository and return a Vizro dashboard.""" | |
try: | |
analyzer = RepoAnalyzer( | |
github_token=github_token if github_token else None, | |
gemini_api_key=gemini_api_key if gemini_api_key else None | |
) | |
# Analyze repository | |
success = analyzer.analyze_repo(owner, repo) | |
if not success: | |
return None, None, f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token." | |
# Create Vizro dashboard | |
dashboard_path = analyzer.create_vizro_dashboard(output_dir='./vizro_dashboard') | |
# Create dependency network visualization | |
network_path = analyzer.create_dependency_network_html(output_file='./vizro_dashboard/dependency_network.html') | |
# Generate a simple report | |
basic_info = analyzer.repo_data["basic_info"] | |
report = f""" | |
### Repository Analysis: {basic_info['full_name']} | |
**Description:** {basic_info.get('description', 'No description')} | |
**Statistics:** | |
- Stars: {basic_info['stargazers_count']} | |
- Forks: {basic_info['forks_count']} | |
- Open Issues: {basic_info['open_issues_count']} | |
**Interactive Dashboard:** | |
The full interactive Vizro dashboard has been created at: `{dashboard_path}` | |
**Dependency Network:** | |
The interactive dependency network visualization has been created at: `{network_path}` | |
**Language Summary:** | |
""" | |
# Add language info | |
if analyzer.repo_data.get("languages"): | |
langs = analyzer.repo_data["languages"] | |
total = sum(langs.values()) | |
for lang, bytes_count in sorted(langs.items(), key=lambda x: x[1], reverse=True): | |
percentage = (bytes_count / total) * 100 | |
report += f"- {lang}: {percentage:.1f}%\n" | |
# Add code metrics if available | |
if analyzer.repo_data.get("text_content", {}).get("aggregate_metrics"): | |
metrics = analyzer.repo_data["text_content"]["aggregate_metrics"] | |
report += f""" | |
**Code Metrics:** | |
- Total Files Analyzed: {metrics.get('total_files', 'N/A')} | |
- Total Code Lines: {metrics.get('total_code_lines', 'N/A')} | |
- Comment Ratio: {metrics.get('average_comment_ratio', 'N/A'):.2f} | |
""" | |
return dashboard_path, network_path, report | |
except Exception as e: | |
return None, None, f"Error analyzing repository: {str(e)}" | |
def summarize_pr(owner, repo, pr_number, role, github_token=None, gemini_api_key=None): | |
"""Function to summarize a PR for Gradio.""" | |
try: | |
analyzer = RepoAnalyzer( | |
github_token=github_token if github_token else None, | |
gemini_api_key=gemini_api_key if gemini_api_key else None | |
) | |
# Analyze repo first (lightweight) | |
success = analyzer.analyze_repo(owner, repo) | |
if not success: | |
return f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token." | |
# Summarize the PR | |
summary = analyzer.summarize_pull_request(int(pr_number), role) | |
return summary | |
except Exception as e: | |
return f"Error summarizing PR: {str(e)}" | |
def view_dashboard(dashboard_path): | |
"""Load dashboard content for the iframe.""" | |
try: | |
if not dashboard_path or not os.path.exists(dashboard_path): | |
return "Dashboard file not found" | |
with open(dashboard_path, 'r', encoding='utf-8') as f: | |
html_content = f.read() | |
return html_content | |
except Exception as e: | |
return f"Error loading dashboard: {str(e)}" | |
def view_network(network_path): | |
"""Load network visualization content for the iframe.""" | |
try: | |
if not network_path or not os.path.exists(network_path): | |
return "Network visualization file not found" | |
with open(network_path, 'r', encoding='utf-8') as f: | |
html_content = f.read() | |
return html_content | |
except Exception as e: | |
return f"Error loading network visualization: {str(e)}" | |
# UI Components | |
with gr.Blocks(title="GitHub Repository Analyzer") as app: | |
gr.Markdown("# GitHub Repository Analyzer with Vizro and PyVis") | |
gr.Markdown("Analyze GitHub repositories, visualize code dependencies, and summarize pull requests") | |
with gr.Tab("Repository Analysis"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
owner_input = gr.Textbox(label="Repository Owner (Username/Organization)") | |
repo_input = gr.Textbox(label="Repository Name") | |
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password") | |
gemini_api_key = gr.Textbox(label="Google API Key (Optional, for PR Summaries)", type="password") | |
analyze_btn = gr.Button("Analyze Repository") | |
with gr.Column(scale=2): | |
report_output = gr.Markdown(label="Analysis Report") | |
# Store paths but don't display them | |
dashboard_path_state = gr.State() | |
network_path_state = gr.State() | |
with gr.Tabs(): | |
with gr.TabItem("Dashboard"): | |
# Fix: Remove height parameter from HTML component | |
dashboard_frame = gr.HTML(label="Dashboard Preview") | |
with gr.TabItem("Dependency Network"): | |
# Fix: Remove height parameter from HTML component | |
network_frame = gr.HTML(label="Dependency Network") | |
analyze_btn.click( | |
analyze_repository, | |
inputs=[ | |
owner_input, repo_input, github_token, gemini_api_key | |
], | |
outputs=[dashboard_path_state, network_path_state, report_output] | |
) | |
# Update iframes when paths change | |
dashboard_path_state.change( | |
view_dashboard, | |
inputs=[dashboard_path_state], | |
outputs=[dashboard_frame] | |
) | |
network_path_state.change( | |
view_network, | |
inputs=[network_path_state], | |
outputs=[network_frame] | |
) | |
with gr.Tab("PR Summarizer"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
pr_owner_input = gr.Textbox(label="Repository Owner") | |
pr_repo_input = gr.Textbox(label="Repository Name") | |
pr_number_input = gr.Number(label="PR Number", precision=0) | |
pr_role_input = gr.Dropdown( | |
choices=["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"], | |
label="Your Role", | |
value="Developer" | |
) | |
pr_github_token = gr.Textbox(label="GitHub Token (Optional)", type="password") | |
pr_gemini_api_key = gr.Textbox(label="Google API Key (Required for Gemini)", type="password") | |
summarize_btn = gr.Button("Summarize PR") | |
with gr.Column(scale=2): | |
pr_summary_output = gr.Markdown(label="PR Summary") | |
summarize_btn.click( | |
summarize_pr, | |
inputs=[ | |
pr_owner_input, pr_repo_input, pr_number_input, | |
pr_role_input, pr_github_token, pr_gemini_api_key | |
], | |
outputs=pr_summary_output | |
) | |
return app | |
# Main function to run the app | |
def main(): | |
"""Run the GitHub Repository Analyzer with Gradio interface.""" | |
# Load environment variables | |
load_dotenv() | |
# Create and launch the Gradio interface | |
app = create_gradio_interface() | |
app.launch(share=True) | |
if __name__ == "__main__": | |
main() |