Spaces:
Sleeping
Sleeping
# Import existing libraries from the original code | |
import requests | |
import json | |
import os | |
import base64 | |
import re | |
import ast | |
import networkx as nx | |
import radon.metrics as metrics | |
import radon.complexity as complexity | |
from datetime import datetime, timedelta | |
from collections import defaultdict, Counter | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import matplotlib.dates as mdates | |
from IPython.display import display, Markdown, HTML | |
import numpy as np | |
from github import Github, GithubException | |
import time | |
from dotenv import load_dotenv | |
# Import Neo4j and Gemini libraries | |
from neo4j import GraphDatabase, basic_auth | |
import google.generativeai as genai | |
# Import Vizro and Gradio | |
import vizro.plotly.express as px | |
import vizro | |
import vizro.models as vzm | |
import plotly.graph_objects as go | |
import gradio as gr | |
# Keep GitHubRepoInfo class unchanged | |
class GitHubRepoInfo: | |
"""Enhanced class to get comprehensive information about a GitHub repository.""" | |
def __init__(self, token=None): | |
"""Initialize with optional GitHub API token.""" | |
self.base_url = "https://api.github.com" | |
self.headers = {"Accept": "application/vnd.github.v3+json"} | |
self.token = token | |
self.github = None # Initialize github attribute | |
# Set up authentication | |
if token: | |
self.headers["Authorization"] = f"token {token}" | |
try: | |
self.github = Github(token) | |
self.github.get_user().login # Test connection | |
except Exception as e: | |
print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
self.github = Github() # Fallback to unauthenticated | |
elif os.environ.get("GITHUB_TOKEN"): | |
self.token = os.environ.get("GITHUB_TOKEN") | |
self.headers["Authorization"] = f"token {self.token}" | |
try: | |
self.github = Github(self.token) | |
self.github.get_user().login # Test connection | |
except Exception as e: | |
print(f"Warning: Failed to initialize PyGithub with token: {e}") | |
self.github = Github() # Fallback to unauthenticated | |
else: | |
self.github = Github() # Unauthenticated | |
# Configure rate limit handling | |
self.rate_limit_remaining = 5000 # Assume higher limit if authenticated | |
self.rate_limit_reset = datetime.now() | |
# Initialize rate limit info if possible | |
if self.github: | |
try: | |
rate_limit = self.github.get_rate_limit() | |
self.rate_limit_remaining = rate_limit.core.remaining | |
self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) | |
except Exception as e: | |
print(f"Warning: Could not get initial rate limit from PyGithub: {e}") | |
# --- Keep ALL existing methods from the original GitHubRepoInfo class --- | |
# ... ( _check_rate_limit, _paginated_get, get_repo_info, get_contributors, ...) | |
def _check_rate_limit(self): | |
"""Check API rate limit and wait if necessary.""" | |
if self.rate_limit_remaining <= 10: | |
reset_time = self.rate_limit_reset | |
current_time = datetime.now() | |
if reset_time > current_time: | |
wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer | |
print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") | |
time.sleep(wait_time) | |
# Update rate limit info after each API call | |
response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) | |
if response.status_code == 200: | |
rate_data = response.json() | |
self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] | |
self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) | |
def _paginated_get(self, url, params=None, max_items=None): | |
"""Handle paginated API responses with rate limit awareness.""" | |
if params is None: | |
params = {} | |
items = [] | |
page = 1 | |
per_page = min(100, params.get("per_page", 30)) | |
params["per_page"] = per_page | |
while True: | |
self._check_rate_limit() | |
params["page"] = page | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
page_items = response.json() | |
if not page_items: | |
break | |
items.extend(page_items) | |
page += 1 | |
# Check if we've reached the requested limit | |
if max_items and len(items) >= max_items: | |
return items[:max_items] | |
# Check if we've reached the end (GitHub returns fewer items than requested) | |
if len(page_items) < per_page: | |
break | |
else: | |
print(f"Error {response.status_code}: {response.text}") | |
break | |
return items | |
def get_repo_info(self, owner, repo): | |
"""Get basic repository information.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error {response.status_code}: {response.text}") | |
return None | |
def get_contributors(self, owner, repo, max_contributors=None): | |
"""Get repository contributors with pagination support.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/contributors" | |
return self._paginated_get(url, max_items=max_contributors) | |
# ... ( get_languages, get_commits, get_commit_activity, get_code_frequency, ...) | |
def get_languages(self, owner, repo): | |
"""Get languages used in the repository.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/languages" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error getting languages: {response.status_code}") | |
return {} | |
def get_commits(self, owner, repo, params=None, max_commits=None): | |
"""Get commits with enhanced filtering and pagination.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/commits" | |
return self._paginated_get(url, params=params, max_items=max_commits) | |
def get_commit_activity(self, owner, repo): | |
"""Get commit activity stats for the past year.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202: | |
# GitHub is computing the statistics, wait and retry | |
print("GitHub is computing statistics, waiting and retrying...") | |
time.sleep(2) | |
return self.get_commit_activity(owner, repo) | |
else: | |
print(f"Error getting commit activity: {response.status_code}") | |
return [] | |
def get_code_frequency(self, owner, repo): | |
"""Get weekly code addition and deletion statistics.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202: | |
# GitHub is computing the statistics, wait and retry | |
print("GitHub is computing statistics, waiting and retrying...") | |
time.sleep(2) | |
return self.get_code_frequency(owner, repo) | |
else: | |
print(f"Error getting code frequency: {response.status_code}") | |
return [] | |
# ... ( get_contributor_activity, get_branches, get_releases, get_issues, ...) | |
def get_contributor_activity(self, owner, repo): | |
"""Get contributor commit activity over time.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" | |
response = requests.get(url, headers=self.headers) | |
if response.status_code == 200: | |
return response.json() | |
elif response.status_code == 202: | |
# GitHub is computing the statistics, wait and retry | |
print("GitHub is computing statistics, waiting and retrying...") | |
time.sleep(2) | |
return self.get_contributor_activity(owner, repo) | |
else: | |
print(f"Error getting contributor activity: {response.status_code}") | |
return [] | |
def get_branches(self, owner, repo): | |
"""Get repository branches.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/branches" | |
return self._paginated_get(url) | |
def get_releases(self, owner, repo, max_releases=None): | |
"""Get repository releases with pagination support.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/releases" | |
return self._paginated_get(url, max_items=max_releases) | |
def get_issues(self, owner, repo, state="all", max_issues=None, params=None): | |
"""Get repository issues with enhanced filtering.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/issues" | |
if params is None: | |
params = {} | |
params["state"] = state | |
return self._paginated_get(url, params=params, max_items=max_issues) | |
# ... ( get_issue_timeline, get_pull_requests, get_pr_timeline, get_contents, ...) | |
def get_issue_timeline(self, owner, repo, days_back=180): | |
"""Analyze issue creation and closing over time.""" | |
# Get issues including closed ones | |
issues = self.get_issues(owner, repo, state="all") | |
# Prepare timeline data | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=days_back) | |
# Initialize daily counters | |
date_range = pd.date_range(start=start_date, end=end_date) | |
created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
# Collect issue creation and closing dates | |
for issue in issues: | |
created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if created_at >= start_date: | |
created_counts[created_at.strftime('%Y-%m-%d')] += 1 | |
if issue['state'] == 'closed' and issue.get('closed_at'): | |
closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if closed_at >= start_date: | |
closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 | |
# Calculate resolution times for closed issues | |
resolution_times = [] | |
for issue in issues: | |
if issue['state'] == 'closed' and issue.get('closed_at'): | |
created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours | |
resolution_times.append(resolution_time) | |
# Calculate issue labels distribution | |
label_counts = defaultdict(int) | |
for issue in issues: | |
for label in issue.get('labels', []): | |
label_counts[label['name']] += 1 | |
return { | |
'created': created_counts, | |
'closed': closed_counts, | |
'resolution_times': resolution_times, | |
'labels': dict(label_counts) | |
} | |
def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): | |
"""Get repository pull requests with enhanced filtering.""" | |
url = f"{self.base_url}/repos/{owner}/{repo}/pulls" | |
if params is None: | |
params = {} | |
params["state"] = state | |
return self._paginated_get(url, params=params, max_items=max_prs) | |
def get_pr_timeline(self, owner, repo, days_back=180): | |
"""Analyze PR creation, closing, and metrics over time.""" | |
# Get PRs including closed and merged ones | |
prs = self.get_pull_requests(owner, repo, state="all") | |
# Prepare timeline data | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=days_back) | |
# Initialize daily counters | |
date_range = pd.date_range(start=start_date, end=end_date) | |
created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} | |
# Track metrics | |
merge_times = [] | |
pr_sizes = [] | |
# Collect PR data | |
for pr in prs: | |
created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if created_at >= start_date: | |
created_counts[created_at.strftime('%Y-%m-%d')] += 1 | |
# Get PR size (additions + deletions) | |
if pr.get('additions') is not None and pr.get('deletions') is not None: | |
pr_sizes.append({ | |
'additions': pr['additions'], | |
'deletions': pr['deletions'], | |
'total': pr['additions'] + pr['deletions'], | |
'files_changed': pr.get('changed_files', 0) | |
}) | |
# Check if PR is closed | |
if pr['state'] == 'closed': | |
closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if closed_at >= start_date: | |
closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 | |
# Check if PR was merged | |
if pr['merged_at']: | |
merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') | |
if merged_at >= start_date: | |
merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 | |
# Calculate time to merge | |
merge_time = (merged_at - created_at).total_seconds() / 3600 # hours | |
merge_times.append(merge_time) | |
# Calculate acceptance rate | |
total_closed = sum(closed_counts.values()) | |
total_merged = sum(merged_counts.values()) | |
acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 | |
return { | |
'created': created_counts, | |
'closed': closed_counts, | |
'merged': merged_counts, | |
'merge_times': merge_times, | |
'pr_sizes': pr_sizes, | |
'acceptance_rate': acceptance_rate | |
} | |
def get_contents(self, owner, repo, path="", ref=None): | |
"""Get repository contents at the specified path.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error getting contents: {response.status_code}") | |
return [] | |
# ... ( get_readme, get_file_content, is_text_file, get_recursive_contents, ...) | |
def get_readme(self, owner, repo, ref=None): | |
"""Get repository README file.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/readme" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get("content"): | |
content = base64.b64decode(data["content"]).decode("utf-8") | |
return { | |
"name": data["name"], | |
"path": data["path"], | |
"content": content | |
} | |
return data | |
else: | |
print(f"README not found or error: {response.status_code}") | |
return None | |
def get_file_content(self, owner, repo, path, ref=None): | |
"""Get the content of a specific file in the repository.""" | |
self._check_rate_limit() | |
url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" | |
params = {} | |
if ref: | |
params["ref"] = ref | |
response = requests.get(url, headers=self.headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
if data.get("content"): | |
try: | |
content = base64.b64decode(data["content"]).decode("utf-8") | |
return content | |
except UnicodeDecodeError: | |
return "[Binary file content not displayed]" | |
return None | |
else: | |
print(f"Error getting file content: {response.status_code}") | |
return None | |
def is_text_file(self, file_path): | |
"""Determine if a file is likely a text file based on extension.""" | |
text_extensions = [ | |
'.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', | |
'.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', | |
'.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', | |
'.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', | |
'.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', | |
'.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', | |
'.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', | |
'.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', | |
'.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' | |
] | |
extension = os.path.splitext(file_path)[1].lower() | |
return extension in text_extensions | |
def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): | |
"""Recursively get repository contents with a depth limit and file count limit.""" | |
if current_depth >= max_depth: | |
return [] | |
contents = self.get_contents(owner, repo, path, ref) | |
results = [] | |
file_count = 0 | |
for item in contents: | |
if file_count >= max_files: | |
break | |
if item["type"] == "dir": | |
# For directories, add the directory itself and recursively get contents | |
dir_item = { | |
"type": "dir", | |
"name": item["name"], | |
"path": item["path"], | |
"contents": self.get_recursive_contents( | |
owner, repo, item["path"], max_depth, current_depth + 1, | |
max_files - file_count, ref | |
) | |
} | |
results.append(dir_item) | |
else: | |
# For files, add the file info | |
results.append({ | |
"type": "file", | |
"name": item["name"], | |
"path": item["path"], | |
"size": item["size"], | |
"url": item["html_url"] | |
}) | |
file_count += 1 | |
return results | |
# ... ( get_all_text_files, get_documentation_files, analyze_ast, analyze_js_ts, ...) | |
def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): | |
"""Get content of all text files in the repository (with limit).""" | |
contents = self.get_contents(owner, repo, path, ref) | |
text_files = [] | |
file_count = 0 | |
# Process current directory | |
for item in contents: | |
if file_count >= max_files: | |
break | |
if item["type"] == "file" and self.is_text_file(item["name"]): | |
content = self.get_file_content(owner, repo, item["path"], ref) | |
if content and content != "[Binary file content not displayed]": | |
text_files.append({ | |
"name": item["name"], | |
"path": item["path"], | |
"content": content | |
}) | |
file_count += 1 | |
elif item["type"] == "dir": | |
# Recursively get text files from subdirectories | |
subdir_files = self.get_all_text_files( | |
owner, repo, item["path"], max_files - file_count, ref | |
) | |
text_files.extend(subdir_files) | |
file_count += len(subdir_files) | |
return text_files | |
def get_documentation_files(self, owner, repo, ref=None): | |
"""Get documentation files from the repository.""" | |
# Common documentation file paths and directories | |
doc_paths = [ | |
"docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", | |
"CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", | |
"SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", | |
".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" | |
] | |
doc_files = [] | |
# Try to get each documentation file/directory | |
for path in doc_paths: | |
try: | |
contents = self.get_contents(owner, repo, path, ref) | |
# If it's a directory, get all markdown files in it | |
if isinstance(contents, list): | |
for item in contents: | |
if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): | |
content = self.get_file_content(owner, repo, item["path"], ref) | |
if content: | |
doc_files.append({ | |
"name": item["name"], | |
"path": item["path"], | |
"content": content | |
}) | |
# If it's a file, get its content | |
elif isinstance(contents, dict) and contents.get("type") == "file": | |
content = self.get_file_content(owner, repo, path, ref) | |
if content: | |
doc_files.append({ | |
"name": contents["name"], | |
"path": contents["path"], | |
"content": content | |
}) | |
except: | |
# Path doesn't exist or access issues | |
continue | |
return doc_files | |
def analyze_ast(self, code, file_path): | |
"""Analyze Python code using AST (Abstract Syntax Tree).""" | |
if not file_path.endswith('.py'): | |
return None | |
try: | |
tree = ast.parse(code) | |
# Extract more detailed information using AST | |
functions = [] | |
classes = [] | |
imports = [] | |
function_complexities = {} | |
for node in ast.walk(tree): | |
# Get function definitions with arguments | |
if isinstance(node, ast.FunctionDef): | |
args = [] | |
defaults = len(node.args.defaults) | |
args_count = len(node.args.args) - defaults | |
# Get positional args | |
for arg in node.args.args[:args_count]: | |
if hasattr(arg, 'arg'): # Python 3 | |
args.append(arg.arg) | |
else: # Python 2 | |
args.append(arg.id) | |
# Get args with defaults | |
for i, arg in enumerate(node.args.args[args_count:]): | |
if hasattr(arg, 'arg'): # Python 3 | |
args.append(f"{arg.arg}=...") | |
else: # Python 2 | |
args.append(f"{arg.id}=...") | |
# Calculate function complexity | |
func_complexity = complexity.cc_visit(node) | |
function_complexities[node.name] = func_complexity | |
# Get docstring if available | |
docstring = ast.get_docstring(node) | |
functions.append({ | |
'name': node.name, | |
'args': args, | |
'complexity': func_complexity, | |
'docstring': docstring | |
}) | |
# Get class definitions | |
elif isinstance(node, ast.ClassDef): | |
methods = [] | |
class_docstring = ast.get_docstring(node) | |
# Get class methods | |
for child in node.body: | |
if isinstance(child, ast.FunctionDef): | |
method_complexity = complexity.cc_visit(child) | |
method_docstring = ast.get_docstring(child) | |
methods.append({ | |
'name': child.name, | |
'complexity': method_complexity, | |
'docstring': method_docstring | |
}) | |
classes.append({ | |
'name': node.name, | |
'methods': methods, | |
'docstring': class_docstring | |
}) | |
# Get imports | |
elif isinstance(node, ast.Import): | |
for name in node.names: | |
imports.append(name.name) | |
elif isinstance(node, ast.ImportFrom): | |
module = node.module or "" | |
for name in node.names: | |
imports.append(f"{module}.{name.name}") | |
# Calculate overall code complexity | |
code_complexity = complexity.cc_visit_ast(tree) | |
# Calculate maintainability index | |
try: | |
mi_score = metrics.mi_visit(code, True) | |
except: | |
mi_score = None | |
return { | |
'functions': functions, | |
'classes': classes, | |
'imports': imports, | |
'complexity': { | |
'overall': code_complexity, | |
'functions': function_complexities, | |
'maintainability_index': mi_score | |
} | |
} | |
except SyntaxError: | |
print(f"Syntax error in Python file: {file_path}") | |
return None | |
except Exception as e: | |
print(f"Error analyzing {file_path}: {str(e)}") | |
return None | |
def analyze_js_ts(self, code, file_path): | |
"""Analyze JavaScript/TypeScript code using regex with improved patterns.""" | |
if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): | |
return None | |
# More sophisticated regex patterns for JS/TS analysis | |
results = { | |
'functions': [], | |
'classes': [], | |
'imports': [], | |
'exports': [], | |
'hooks': [] # For React hooks | |
} | |
# Function patterns (covering various declaration styles) | |
function_patterns = [ | |
# Regular functions | |
r'function\s+(\w+)\s*\(([^)]*)\)', | |
# Arrow functions assigned to variables | |
r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', | |
# Class methods | |
r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', | |
# Object methods | |
r'(\w+)\s*:\s*function\s*\(([^)]*)\)' | |
] | |
for pattern in function_patterns: | |
for match in re.finditer(pattern, code): | |
func_name = match.group(1) | |
args = match.group(2).strip() if len(match.groups()) > 1 else "" | |
results['functions'].append({ | |
'name': func_name, | |
'args': args | |
}) | |
# Class pattern | |
class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' | |
for match in re.finditer(class_pattern, code, re.DOTALL): | |
class_name = match.group(1) | |
parent_class = match.group(2) if match.group(2) else None | |
class_body = match.group(3) | |
# Find methods in class | |
methods = [] | |
method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' | |
for method_match in re.finditer(method_pattern, class_body): | |
method_name = method_match.group(1) | |
methods.append(method_name) | |
results['classes'].append({ | |
'name': class_name, | |
'extends': parent_class, | |
'methods': methods | |
}) | |
# Import patterns | |
import_patterns = [ | |
# ES6 imports | |
r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', | |
# CommonJS requires | |
r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' | |
] | |
for pattern in import_patterns: | |
for match in re.finditer(pattern, code): | |
groups = match.groups() | |
if groups[0]: # Destructured import | |
imports = [name.strip() for name in groups[0].split(',')] | |
for imp in imports: | |
results['imports'].append(imp) | |
elif groups[1]: # Namespace import (import * as X) | |
results['imports'].append(groups[1]) | |
elif groups[2]: # Default import | |
results['imports'].append(groups[2]) | |
elif groups[3]: # Module name | |
results['imports'].append(groups[3]) | |
# React hooks detection (for React files) | |
if file_path.endswith(('.jsx', '.tsx')): | |
hook_pattern = r'use([A-Z]\w+)\s*\(' | |
for match in re.finditer(hook_pattern, code): | |
hook_name = 'use' + match.group(1) | |
results['hooks'].append(hook_name) | |
# Export patterns | |
export_patterns = [ | |
# Named exports | |
r'export\s+(?:const|let|var|function|class)\s+(\w+)', | |
# Default exports | |
r'export\s+default\s+(?:function|class)?\s*(\w+)?' | |
] | |
for pattern in export_patterns: | |
for match in re.finditer(pattern, code): | |
if match.group(1): | |
results['exports'].append(match.group(1)) | |
return results | |
# ... ( extract_code_summary, analyze_dependencies, create_dependency_graph, ...) | |
def extract_code_summary(self, file_content, file_path): | |
"""Extract comprehensive summary information from code files.""" | |
extension = os.path.splitext(file_path)[1].lower() | |
# Initialize summary | |
summary = { | |
"functions": [], | |
"classes": [], | |
"imports": [], | |
"description": "", | |
"complexity": None | |
} | |
# Extract Python definitions with AST | |
if extension == '.py': | |
ast_result = self.analyze_ast(file_content, file_path) | |
if ast_result: | |
summary["functions"] = [f["name"] for f in ast_result["functions"]] | |
summary["classes"] = [c["name"] for c in ast_result["classes"]] | |
summary["imports"] = ast_result["imports"] | |
summary["complexity"] = ast_result["complexity"] | |
# Try to extract module docstring | |
try: | |
tree = ast.parse(file_content) | |
module_docstring = ast.get_docstring(tree) | |
if module_docstring: | |
summary["description"] = module_docstring | |
except: | |
pass | |
# Add detailed function and class info | |
summary["detailed_functions"] = ast_result["functions"] | |
summary["detailed_classes"] = ast_result["classes"] | |
# Extract JavaScript/TypeScript definitions | |
elif extension in ['.js', '.ts', '.jsx', '.tsx']: | |
js_result = self.analyze_js_ts(file_content, file_path) | |
if js_result: | |
summary["functions"] = [f["name"] for f in js_result["functions"]] | |
summary["classes"] = [c["name"] for c in js_result["classes"]] | |
summary["imports"] = js_result["imports"] | |
# Add detailed function and class info | |
summary["detailed_functions"] = js_result["functions"] | |
summary["detailed_classes"] = js_result["classes"] | |
summary["hooks"] = js_result.get("hooks", []) | |
summary["exports"] = js_result.get("exports", []) | |
# Calculate basic code metrics for any text file | |
if file_content: | |
lines = file_content.split('\n') | |
code_lines = 0 | |
comment_lines = 0 | |
blank_lines = 0 | |
comment_prefixes = ['#', '//', '/*', '*', '<!--'] | |
for line in lines: | |
line = line.strip() | |
if not line: | |
blank_lines += 1 | |
elif any(line.startswith(prefix) for prefix in comment_prefixes): | |
comment_lines += 1 | |
else: | |
code_lines += 1 | |
summary["metrics"] = { | |
"total_lines": len(lines), | |
"code_lines": code_lines, | |
"comment_lines": comment_lines, | |
"blank_lines": blank_lines, | |
"comment_ratio": comment_lines / max(1, code_lines + comment_lines) | |
} | |
return summary | |
def analyze_dependencies(self, owner, repo, max_files=100): | |
"""Analyze code dependencies across the repository.""" | |
# Get Python and JavaScript files | |
text_files = self.get_all_text_files(owner, repo, max_files=max_files) | |
# Filter for Python and JS/TS files | |
code_files = [f for f in text_files if f["name"].endswith(('.py', '.js', '.ts', '.jsx', '.tsx'))] | |
# Track dependencies | |
dependencies = { | |
'internal': defaultdict(set), # File to file dependencies | |
'external': defaultdict(set), # External package dependencies by file | |
'modules': defaultdict(set) # Defined modules/components by file | |
} | |
# Extract module names from file paths | |
file_to_module = {} | |
for file in code_files: | |
# Convert file path to potential module name | |
module_path = os.path.splitext(file["path"])[0].replace('/', '.') | |
file_to_module[file["path"]] = module_path | |
# Track what each file defines | |
summary = self.extract_code_summary(file["content"], file["path"]) | |
if file["name"].endswith('.py'): | |
for function in summary.get("functions", []): | |
dependencies['modules'][file["path"]].add(f"{module_path}.{function}") | |
for class_name in summary.get("classes", []): | |
dependencies['modules'][file["path"]].add(f"{module_path}.{class_name}") | |
else: # JS/TS files | |
for export in summary.get("exports", []): | |
dependencies['modules'][file["path"]].add(export) | |
# Analyze imports/dependencies | |
for file in code_files: | |
summary = self.extract_code_summary(file["content"], file["path"]) | |
for imp in summary.get("imports", []): | |
# Check if this is an internal import | |
is_internal = False | |
if file["name"].endswith('.py'): | |
# For Python, check if the import matches any module path | |
for module_path in file_to_module.values(): | |
if imp == module_path or imp.startswith(f"{module_path}."): | |
is_internal = True | |
# Find the file that defines this module | |
for f_path, m_path in file_to_module.items(): | |
if m_path == imp.split('.')[0]: | |
dependencies['internal'][file["path"]].add(f_path) | |
break | |
else: | |
# For JS/TS, check relative imports | |
if imp.startswith('./') or imp.startswith('../'): | |
is_internal = True | |
# Try to resolve the relative import | |
src_dir = os.path.dirname(file["path"]) | |
target_path = os.path.normpath(os.path.join(src_dir, imp)) | |
# Add known extensions if not specified | |
if '.' not in os.path.basename(target_path): | |
for ext in ['.js', '.ts', '.jsx', '.tsx']: | |
test_path = f"{target_path}{ext}" | |
if test_path in file_to_module: | |
dependencies['internal'][file["path"]].add(test_path) | |
break | |
# If not internal, consider it external | |
if not is_internal: | |
# Clean up the import name (remove relative path parts) | |
if not file["name"].endswith('.py'): | |
imp = imp.split('/')[0] # Take the package name part | |
dependencies['external'][file["path"]].add(imp) | |
return dependencies | |
def create_dependency_graph(self, dependencies): | |
"""Create a NetworkX graph from dependencies for visualization.""" | |
G = nx.DiGraph() | |
# Add nodes for files | |
for file_path in dependencies['internal'].keys(): | |
G.add_node(file_path, type='file') | |
# Add edges for internal dependencies | |
for file_path, deps in dependencies['internal'].items(): | |
for dep in deps: | |
G.add_edge(file_path, dep) | |
# Add nodes and edges for external dependencies | |
external_nodes = set() | |
for file_path, deps in dependencies['external'].items(): | |
for dep in deps: | |
external_node = f"ext:{dep}" | |
if external_node not in external_nodes: | |
G.add_node(external_node, type='external') | |
external_nodes.add(external_node) | |
G.add_edge(file_path, external_node) | |
return G | |
# ... ( get_repo_text_summary, get_temporal_analysis, get_all_info, ...) | |
def get_repo_text_summary(self, owner, repo, max_files=25): | |
"""Extract and summarize text content from the repository with improved metrics.""" | |
# Get README | |
readme = self.get_readme(owner, repo) | |
# Get documentation | |
docs = self.get_documentation_files(owner, repo) | |
# Get key code files (limit to avoid API rate limits) | |
text_files = self.get_all_text_files(owner, repo, max_files=max_files) | |
# Analyze code files | |
code_summary = {} | |
complexity_metrics = { | |
'cyclomatic_complexity': [], | |
'maintainability_index': [], | |
'comment_ratios': [] | |
} | |
for file in text_files: | |
ext = os.path.splitext(file["name"])[1].lower() | |
if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: | |
file_summary = self.extract_code_summary(file["content"], file["path"]) | |
code_summary[file["path"]] = file_summary | |
# Collect complexity metrics | |
if file_summary.get('complexity'): | |
cc = file_summary['complexity'].get('overall') | |
if cc is not None: | |
complexity_metrics['cyclomatic_complexity'].append((file["path"], cc)) | |
mi = file_summary['complexity'].get('maintainability_index') | |
if mi is not None: | |
complexity_metrics['maintainability_index'].append((file["path"], mi)) | |
if file_summary.get('metrics'): | |
comment_ratio = file_summary['metrics'].get('comment_ratio', 0) | |
complexity_metrics['comment_ratios'].append((file["path"], comment_ratio)) | |
# Analyze dependencies | |
dependencies = self.analyze_dependencies(owner, repo, max_files=max_files) | |
# Summarize repository content by file type | |
file_types = defaultdict(int) | |
for file in text_files: | |
ext = os.path.splitext(file["name"])[1].lower() | |
file_types[ext] += 1 | |
# Calculate aggregate code metrics | |
total_code_lines = sum(summary.get('metrics', {}).get('code_lines', 0) | |
for summary in code_summary.values()) | |
total_comment_lines = sum(summary.get('metrics', {}).get('comment_lines', 0) | |
for summary in code_summary.values()) | |
aggregate_metrics = { | |
'total_files': len(text_files), | |
'total_code_lines': total_code_lines, | |
'total_comment_lines': total_comment_lines, | |
'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0 | |
} | |
return { | |
"readme": readme, | |
"documentation": docs, | |
"code_summary": code_summary, | |
"complexity_metrics": complexity_metrics, | |
"dependencies": dependencies, | |
"file_type_counts": dict(file_types), | |
"aggregate_metrics": aggregate_metrics, | |
"text_files": text_files # Include the actual text file contents | |
} | |
def get_temporal_analysis(self, owner, repo): | |
"""Perform temporal analysis of repository activity.""" | |
# Get commit activity over time | |
commit_activity = self.get_commit_activity(owner, repo) | |
# Get code frequency (additions/deletions over time) | |
code_frequency = self.get_code_frequency(owner, repo) | |
# Get contributor activity | |
contributor_activity = self.get_contributor_activity(owner, repo) | |
# Get issue and PR timelines | |
issue_timeline = self.get_issue_timeline(owner, repo) | |
pr_timeline = self.get_pr_timeline(owner, repo) | |
# Process data for visualization | |
# - Weekly commit counts | |
weekly_commits = [] | |
if commit_activity: | |
for week in commit_activity: | |
date = datetime.fromtimestamp(week['week']) | |
weekly_commits.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'total': week['total'], | |
'days': week['days'] # Daily breakdown within the week | |
}) | |
# - Weekly code changes | |
weekly_code_changes = [] | |
if code_frequency: | |
for item in code_frequency: | |
date = datetime.fromtimestamp(item[0]) | |
weekly_code_changes.append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'additions': item[1], | |
'deletions': -item[2] # Convert to positive for visualization | |
}) | |
# - Contributor timeline | |
contributor_timeline = {} | |
if contributor_activity: | |
for contributor in contributor_activity: | |
author = contributor['author']['login'] | |
weeks = contributor['weeks'] | |
if author not in contributor_timeline: | |
contributor_timeline[author] = [] | |
for week in weeks: | |
if week['c'] > 0: # Only include weeks with commits | |
date = datetime.fromtimestamp(week['w']) | |
contributor_timeline[author].append({ | |
'date': date.strftime('%Y-%m-%d'), | |
'commits': week['c'], | |
'additions': week['a'], | |
'deletions': week['d'] | |
}) | |
return { | |
'weekly_commits': weekly_commits, | |
'weekly_code_changes': weekly_code_changes, | |
'contributor_timeline': contributor_timeline, | |
'issue_timeline': issue_timeline, | |
'pr_timeline': pr_timeline | |
} | |
def get_all_info(self, owner, repo): | |
"""Get comprehensive information about a repository with enhanced metrics.""" | |
result = { | |
"timestamp": datetime.now().isoformat(), | |
"basic_info": self.get_repo_info(owner, repo) | |
} | |
if not result["basic_info"]: | |
print(f"Could not retrieve repository information for {owner}/{repo}") | |
return None | |
print("Getting repository statistics...") | |
# Get additional information | |
result["languages"] = self.get_languages(owner, repo) | |
result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) | |
result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) | |
result["branches"] = self.get_branches(owner, repo) | |
result["releases"] = self.get_releases(owner, repo, max_releases=10) | |
result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) | |
result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) | |
result["root_contents"] = self.get_contents(owner, repo) | |
print("Analyzing repository content...") | |
# Get text content and documentation | |
result["text_content"] = self.get_repo_text_summary(owner, repo, max_files=30) | |
print("Analyzing repository activity over time...") | |
# Get temporal analysis | |
result["temporal_analysis"] = self.get_temporal_analysis(owner, repo) | |
return result | |
# ... ( display_repo_info, display_code_files, export_repo_text ) | |
def display_repo_info(self, repo_data): | |
"""Display repository information in a Colab-friendly format with enhanced visualizations.""" | |
if not repo_data or not repo_data["basic_info"]: | |
return | |
basic = repo_data["basic_info"] | |
# Display basic repository information | |
display(HTML(f""" | |
<h1 style="text-align:center;">Repository: {basic['full_name']}</h1> | |
<div style="text-align:center;"><img src="{basic.get('owner', {}).get('avatar_url', '')}" width="100" height="100" style="border-radius:50%"></div> | |
<div style="background-color:#f5f5f5; padding:15px; border-radius:5px; margin:10px 0;"> | |
<p><strong>Description:</strong> {basic['description'] or 'No description'}</p> | |
<p><strong>URL:</strong> <a href="{basic['html_url']}" target="_blank">{basic['html_url']}</a></p> | |
<p><strong>Created:</strong> {basic['created_at']}</p> | |
<p><strong>Last updated:</strong> {basic['updated_at']}</p> | |
<p><strong>Default branch:</strong> {basic['default_branch']}</p> | |
<p><strong>Stars:</strong> {basic['stargazers_count']}</p> | |
<p><strong>Forks:</strong> {basic['forks_count']}</p> | |
<p><strong>Open issues:</strong> {basic['open_issues_count']}</p> | |
<p><strong>License:</strong> {basic['license']['name'] if basic.get('license') else 'Not specified'}</p> | |
<p><strong>Topics:</strong> {', '.join(basic.get('topics', ['None']))}</p> | |
</div> | |
""")) | |
# Display language distribution | |
if repo_data["languages"]: | |
display(Markdown("## Languages")) | |
# Create DataFrame for languages | |
lang_data = [] | |
total = sum(repo_data["languages"].values()) | |
for lang, bytes_count in repo_data["languages"].items(): | |
percentage = (bytes_count / total) * 100 | |
lang_data.append({ | |
"Language": lang, | |
"Bytes": bytes_count, | |
"Percentage": percentage | |
}) | |
lang_df = pd.DataFrame(lang_data) | |
display(lang_df) | |
# Create pie chart | |
plt.figure(figsize=(10, 6)) | |
plt.pie(lang_df["Percentage"], labels=lang_df["Language"], autopct='%1.1f%%') | |
plt.title("Language Distribution") | |
plt.axis('equal') | |
plt.show() | |
# Display contributors | |
if repo_data["contributors"]: | |
display(Markdown("## Top Contributors")) | |
# Create DataFrame for contributors | |
contrib_data = [] | |
for contributor in repo_data["contributors"][:15]: | |
contrib_data.append({ | |
"Username": contributor['login'], | |
"Contributions": contributor['contributions'], | |
"Profile": contributor['html_url'] | |
}) | |
contrib_df = pd.DataFrame(contrib_data) | |
display(contrib_df) | |
# Create bar chart | |
plt.figure(figsize=(12, 6)) | |
plt.bar(contrib_df["Username"], contrib_df["Contributions"]) | |
plt.title("Top Contributors") | |
plt.xlabel("Contributor") | |
plt.ylabel("Number of Contributions") | |
plt.xticks(rotation=45, ha='right') | |
plt.tight_layout() | |
plt.show() | |
# Display recent commits | |
if repo_data["recent_commits"]: | |
display(Markdown("## Recent Commits")) | |
commit_data = [] | |
for commit in repo_data["recent_commits"][:10]: | |
author = commit['commit']['author']['name'] | |
message = commit['commit']['message'].split('\n')[0] # First line only | |
date = commit['commit']['author']['date'] | |
commit_data.append({ | |
"Author": author, | |
"Date": date, | |
"Message": message, | |
"URL": commit.get('html_url', '') | |
}) | |
commit_df = pd.DataFrame(commit_data) | |
display(commit_df) | |
# Display repository structure | |
if repo_data["root_contents"]: | |
display(Markdown("## Repository Structure")) | |
dir_content = [] | |
for item in repo_data["root_contents"]: | |
dir_content.append({ | |
"Name": item["name"], | |
"Type": item["type"], | |
"Size": item.get("size", ""), | |
"URL": item.get("html_url", "") | |
}) | |
dir_df = pd.DataFrame(dir_content) | |
display(dir_df) | |
# Display README preview if available | |
if repo_data["text_content"]["readme"]: | |
display(Markdown("## README Preview")) | |
readme = repo_data["text_content"]["readme"] | |
display(Markdown(f"**{readme['name']}**")) | |
# Show a preview of the README content (first few lines) | |
lines = readme["content"].split("\n") | |
preview_lines = lines[:min(15, len(lines))] | |
preview = "\n".join(preview_lines) | |
display(Markdown(preview)) | |
if len(lines) > 15: | |
display(Markdown("*... (content truncated)* ...")) | |
# Display code summary | |
if repo_data["text_content"]["code_summary"]: | |
display(Markdown("## Code Summary")) | |
# Count total functions and classes | |
total_functions = sum(len(summary.get("functions", [])) for summary in repo_data["text_content"]["code_summary"].values()) | |
total_classes = sum(len(summary.get("classes", [])) for summary in repo_data["text_content"]["code_summary"].values()) | |
# Get aggregate metrics | |
agg_metrics = repo_data["text_content"]["aggregate_metrics"] | |
display(HTML(f""" | |
<div style="background-color:#e8f4f8; padding:15px; border-radius:5px; margin:10px 0;"> | |
<p><strong>Total Files Analyzed:</strong> {agg_metrics['total_files']}</p> | |
<p><strong>Total Code Lines:</strong> {agg_metrics['total_code_lines']}</p> | |
<p><strong>Total Comment Lines:</strong> {agg_metrics['total_comment_lines']}</p> | |
<p><strong>Comment Ratio:</strong> {agg_metrics['average_comment_ratio']:.2f}</p> | |
<p><strong>Total Functions:</strong> {total_functions}</p> | |
<p><strong>Total Classes:</strong> {total_classes}</p> | |
</div> | |
""")) | |
# Display complexity metrics | |
if repo_data["text_content"]["complexity_metrics"]["cyclomatic_complexity"]: | |
display(Markdown("### Code Complexity")) | |
# Get top 10 most complex files | |
complexity_data = repo_data["text_content"]["complexity_metrics"]["cyclomatic_complexity"] | |
complexity_data.sort(key=lambda x: x[1], reverse=True) | |
complex_files = [] | |
for path, cc in complexity_data[:10]: | |
complex_files.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Cyclomatic Complexity": cc | |
}) | |
complex_df = pd.DataFrame(complex_files) | |
display(complex_df) | |
# Plot complexity distribution - ensure we have numeric values only | |
cc_values = [] | |
for _, cc in complexity_data: | |
try: | |
# Handle both direct numbers and lists that might contain complexity values | |
if isinstance(cc, (int, float)): | |
cc_values.append(float(cc)) | |
elif isinstance(cc, list) and len(cc) > 0: | |
# If it's a list, use the first numeric value | |
for val in cc: | |
if isinstance(val, (int, float)): | |
cc_values.append(float(val)) | |
break | |
except (ValueError, TypeError): | |
# Skip values that can't be converted to float | |
continue | |
if cc_values: # Only plot if we have data | |
plt.figure(figsize=(10, 6)) | |
plt.hist(cc_values, bins=10, alpha=0.7) | |
plt.title("Cyclomatic Complexity Distribution") | |
plt.xlabel("Complexity") | |
plt.ylabel("Number of Files") | |
plt.axvline(np.mean(cc_values), color='r', linestyle='dashed', linewidth=1, label=f"Mean: {np.mean(cc_values):.2f}") | |
plt.legend() | |
plt.tight_layout() | |
plt.show() | |
# Display maintainability index if available | |
if repo_data["text_content"]["complexity_metrics"]["maintainability_index"]: | |
mi_data = repo_data["text_content"]["complexity_metrics"]["maintainability_index"] | |
# Ensure we have numeric values only | |
mi_values = [float(mi) for _, mi in mi_data if mi is not None] | |
if mi_values: # Only plot if we have data | |
plt.figure(figsize=(10, 6)) | |
plt.hist(mi_values, bins=10, alpha=0.7) | |
plt.title("Maintainability Index Distribution") | |
plt.xlabel("Maintainability Index (higher is better)") | |
plt.ylabel("Number of Files") | |
plt.axvline(np.mean(mi_values), color='g', linestyle='dashed', linewidth=1, label=f"Mean: {np.mean(mi_values):.2f}") | |
plt.legend() | |
plt.tight_layout() | |
plt.show() | |
# Display file type distribution | |
if repo_data["text_content"]["file_type_counts"]: | |
display(Markdown("### File Type Distribution")) | |
file_type_data = [] | |
for ext, count in repo_data["text_content"]["file_type_counts"].items(): | |
if ext: # Skip empty extensions | |
file_type_data.append({ | |
"Extension": ext, | |
"Count": count | |
}) | |
file_type_df = pd.DataFrame(file_type_data) | |
display(file_type_df) | |
# Create bar chart | |
plt.figure(figsize=(10, 6)) | |
plt.bar(file_type_df["Extension"], file_type_df["Count"]) | |
plt.title("File Type Distribution") | |
plt.xlabel("File Extension") | |
plt.ylabel("Count") | |
plt.xticks(rotation=45, ha='right') | |
plt.tight_layout() | |
plt.show() | |
# Display dependency graph if available | |
if repo_data["text_content"]["dependencies"]: | |
display(Markdown("## Code Dependencies")) | |
# Create dependency graph | |
G = self.create_dependency_graph(repo_data["text_content"]["dependencies"]) | |
# Display dependency statistics | |
internal_deps = repo_data["text_content"]["dependencies"]["internal"] | |
external_deps = repo_data["text_content"]["dependencies"]["external"] | |
# Count unique external dependencies | |
all_external = set() | |
for deps in external_deps.values(): | |
all_external.update(deps) | |
# Find most imported packages | |
ext_counts = Counter() | |
for deps in external_deps.values(): | |
ext_counts.update(deps) | |
top_imports = ext_counts.most_common(10) | |
display(HTML(f""" | |
<div style="background-color:#e8f4f8; padding:15px; border-radius:5px; margin:10px 0;"> | |
<p><strong>Files with Dependencies:</strong> {len(internal_deps) + len(external_deps)}</p> | |
<p><strong>Internal Dependency Relationships:</strong> {sum(len(deps) for deps in internal_deps.values())}</p> | |
<p><strong>Unique External Dependencies:</strong> {len(all_external)}</p> | |
</div> | |
""")) | |
# Display most imported packages | |
if top_imports: | |
display(Markdown("### Most Used External Dependencies")) | |
imports_data = [] | |
for pkg, count in top_imports: | |
imports_data.append({ | |
"Package": pkg, | |
"Used in # Files": count | |
}) | |
imports_df = pd.DataFrame(imports_data) | |
display(imports_df) | |
# Visualize dependency network (if not too large) | |
if len(G.nodes) <= 50: # Only visualize if not too complex | |
try: | |
display(Markdown("### Dependency Network")) | |
plt.figure(figsize=(12, 12)) | |
# Node colors based on type | |
node_colors = [] | |
for node in G.nodes: | |
if G.nodes[node].get('type') == 'external': | |
node_colors.append('red') | |
else: | |
node_colors.append('skyblue') | |
# Node sizes based on connections | |
node_sizes = [100 + 50 * G.degree(node) for node in G.nodes] | |
# Layout for the graph | |
pos = nx.spring_layout(G, k=0.5, iterations=50, seed=42) # Adding seed for reproducibility | |
# Draw the graph | |
nx.draw_networkx( | |
G, pos, | |
with_labels=False, | |
node_color=node_colors, | |
node_size=node_sizes, | |
alpha=0.7, | |
arrows=True, | |
arrowsize=10, | |
width=0.5 | |
) | |
# Add labels for external dependencies | |
external_labels = {node: node.replace('ext:', '') | |
for node in G.nodes | |
if G.nodes[node].get('type') == 'external'} | |
nx.draw_networkx_labels( | |
G, pos, | |
labels=external_labels, | |
font_size=8, | |
font_color='black' | |
) | |
plt.title("Code Dependency Network (red=external)") | |
plt.axis('off') | |
plt.tight_layout() | |
plt.show() | |
except Exception as e: | |
print(f"Error generating dependency network visualization: {str(e)}") | |
print("Skipping network visualization due to data compatibility issues.") | |
# Display temporal analysis | |
if repo_data["temporal_analysis"]["weekly_commits"]: | |
display(Markdown("## Repository Activity Over Time")) | |
# Commit activity over time | |
weekly_commits = repo_data["temporal_analysis"]["weekly_commits"] | |
if weekly_commits: | |
display(Markdown("### Weekly Commit Activity")) | |
# Convert to DataFrame for plotting | |
dates = [datetime.strptime(week['date'], '%Y-%m-%d') for week in weekly_commits] | |
commits = [week['total'] for week in weekly_commits] | |
try: | |
plt.figure(figsize=(14, 6)) | |
plt.plot(dates, commits, marker='o', linestyle='-', alpha=0.7) | |
plt.title("Weekly Commit Activity") | |
plt.xlabel("Date") | |
plt.ylabel("Number of Commits") | |
plt.grid(True, alpha=0.3) | |
# Format x-axis to show dates nicely | |
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) | |
plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=1)) | |
plt.gcf().autofmt_xdate() | |
plt.tight_layout() | |
plt.show() | |
except Exception as e: | |
print(f"Error generating commit activity chart: {str(e)}") | |
print("Displaying raw data instead:") | |
activity_df = pd.DataFrame({ | |
'Date': [week['date'] for week in weekly_commits], | |
'Commits': [week['total'] for week in weekly_commits] | |
}) | |
display(activity_df.head(10)) | |
# Code changes over time | |
weekly_code_changes = repo_data["temporal_analysis"]["weekly_code_changes"] | |
if weekly_code_changes: | |
display(Markdown("### Weekly Code Changes")) | |
# Convert to DataFrame for plotting | |
dates = [datetime.strptime(week['date'], '%Y-%m-%d') for week in weekly_code_changes] | |
additions = [week['additions'] for week in weekly_code_changes] | |
deletions = [week['deletions'] for week in weekly_code_changes] | |
try: | |
# Convert data to proper format for plotting | |
plot_dates = np.array(dates) | |
plot_additions = np.array([float(a) for a in additions]) | |
plot_deletions = np.array([float(d) for d in deletions]) | |
plt.figure(figsize=(14, 6)) | |
plt.bar(plot_dates, plot_additions, color='green', alpha=0.6, label='Additions') | |
plt.bar(plot_dates, plot_deletions, color='red', alpha=0.6, label='Deletions') | |
plt.title("Weekly Code Changes") | |
plt.xlabel("Date") | |
plt.ylabel("Lines Changed") | |
plt.legend() | |
# Format x-axis to show dates nicely | |
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) | |
plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=1)) | |
plt.gcf().autofmt_xdate() | |
plt.tight_layout() | |
plt.show() | |
except Exception as e: | |
print(f"Error generating code changes chart: {str(e)}") | |
print("Displaying raw data instead:") | |
changes_df = pd.DataFrame({ | |
'Date': [week['date'] for week in weekly_code_changes], | |
'Additions': [week['additions'] for week in weekly_code_changes], | |
'Deletions': [week['deletions'] for week in weekly_code_changes] | |
}) | |
display(changes_df.head(10)) | |
# Display issue resolution metrics | |
issue_timeline = repo_data["temporal_analysis"]["issue_timeline"] | |
if issue_timeline and issue_timeline.get('resolution_times'): | |
display(Markdown("### Issue Resolution Statistics")) | |
resolution_times = issue_timeline['resolution_times'] | |
if resolution_times: | |
# Calculate statistics | |
avg_resolution = np.mean(resolution_times) | |
median_resolution = np.median(resolution_times) | |
display(HTML(f""" | |
<div style="background-color:#f5f5f5; padding:15px; border-radius:5px; margin:10px 0;"> | |
<p><strong>Average Time to Close Issues:</strong> {avg_resolution:.2f} hours ({avg_resolution/24:.2f} days)</p> | |
<p><strong>Median Time to Close Issues:</strong> {median_resolution:.2f} hours ({median_resolution/24:.2f} days)</p> | |
<p><strong>Issues Analyzed:</strong> {len(resolution_times)}</p> | |
</div> | |
""")) | |
# Plot histogram of resolution times | |
try: | |
plt.figure(figsize=(10, 6)) | |
# Ensure all values are float and clip to reasonable range | |
resolution_times_clean = np.array([float(rt) for rt in resolution_times if rt is not None]) | |
plt.hist(np.clip(resolution_times_clean, 0, 168), bins=20, alpha=0.7) # Clip to one week for readability | |
plt.title("Issue Resolution Times (Capped at 1 Week)") | |
plt.xlabel("Hours to Resolution") | |
plt.ylabel("Number of Issues") | |
plt.axvline(avg_resolution, color='r', linestyle='dashed', linewidth=1, label=f"Mean: {avg_resolution:.2f} hours") | |
plt.axvline(median_resolution, color='g', linestyle='dashed', linewidth=1, label=f"Median: {median_resolution:.2f} hours") | |
plt.legend() | |
plt.tight_layout() | |
plt.show() | |
except Exception as e: | |
print(f"Error generating issue resolution histogram: {str(e)}") | |
print("Skipping histogram visualization due to data compatibility issues.") | |
# Display issue labels analysis | |
if issue_timeline.get('labels'): | |
top_labels = sorted(issue_timeline['labels'].items(), key=lambda x: x[1], reverse=True)[:10] | |
if top_labels: | |
display(Markdown("### Top Issue Labels")) | |
labels = [label for label, _ in top_labels] | |
counts = [count for _, count in top_labels] | |
try: | |
plt.figure(figsize=(10, 6)) | |
# Limit label length for display and handle potential non-string labels | |
cleaned_labels = [] | |
for label in labels: | |
if isinstance(label, str): | |
# Truncate long labels | |
if len(label) > 20: | |
cleaned_labels.append(label[:17] + "...") | |
else: | |
cleaned_labels.append(label) | |
else: | |
# Convert non-string labels to string | |
cleaned_labels.append(str(label)) | |
plt.bar(cleaned_labels, counts, alpha=0.7) | |
plt.title("Most Common Issue Labels") | |
plt.xlabel("Label") | |
plt.ylabel("Count") | |
plt.xticks(rotation=45, ha='right') | |
plt.tight_layout() | |
plt.show() | |
except Exception as e: | |
print(f"Error generating issue labels chart: {str(e)}") | |
print("Skipping labels visualization due to data compatibility issues.") | |
# Display PR statistics | |
pr_timeline = repo_data["temporal_analysis"]["pr_timeline"] | |
if pr_timeline: | |
display(Markdown("### Pull Request Statistics")) | |
# Display PR acceptance rate | |
acceptance_rate = pr_timeline.get('acceptance_rate', 0) | |
display(HTML(f""" | |
<div style="background-color:#2c2c2c; color:#f5f5f5; padding:15px; border-radius:8px; margin:10px 0;"> | |
<p><strong>PR Acceptance Rate:</strong> {acceptance_rate:.2f}%</p> | |
</div> | |
""")) | |
# Display PR merge time statistics | |
if pr_timeline.get('merge_times'): | |
merge_times = pr_timeline['merge_times'] | |
if merge_times: | |
avg_merge = np.mean(merge_times) | |
median_merge = np.median(merge_times) | |
display(HTML(f""" | |
<div style="background-color:#2c2c2c; color:#f5f5f5; padding:15px; border-radius:8px; margin:10px 0;"> | |
<p><strong>Average Time to Merge PRs:</strong> {avg_merge:.2f} hours ({avg_merge/24:.2f} days)</p> | |
<p><strong>Median Time to Merge PRs:</strong> {median_merge:.2f} hours ({median_merge/24:.2f} days)</p> | |
<p><strong>PRs Analyzed:</strong> {len(merge_times)}</p> | |
</div> | |
""")) | |
# Plot histogram of merge times | |
try: | |
plt.figure(figsize=(10, 6)) | |
# Ensure all values are float and clip to reasonable range | |
merge_times_clean = np.array([float(mt) for mt in merge_times if mt is not None]) | |
plt.hist(np.clip(merge_times_clean, 0, 168), bins=20, alpha=0.7) # Clip to one week for readability | |
plt.title("PR Merge Times (Capped at 1 Week)") | |
plt.xlabel("Hours to Merge") | |
plt.ylabel("Number of PRs") | |
plt.axvline(avg_merge, color='r', linestyle='dashed', linewidth=1, label=f"Mean: {avg_merge:.2f} hours") | |
plt.axvline(median_merge, color='g', linestyle='dashed', linewidth=1, label=f"Median: {median_merge:.2f} hours") | |
plt.legend() | |
plt.tight_layout() | |
plt.show() | |
except Exception as e: | |
print(f"Error generating PR merge time histogram: {str(e)}") | |
print("Skipping histogram visualization due to data compatibility issues.") | |
def display_code_files(self, repo_data, max_files=5): | |
"""Display code files with syntax highlighting and complexity metrics.""" | |
if not repo_data or not repo_data["text_content"] or not repo_data["text_content"]["text_files"]: | |
return | |
display(Markdown("## Code File Preview")) | |
# Filter for Python/JavaScript/TypeScript files | |
code_files = [ | |
file for file in repo_data["text_content"]["text_files"] | |
if file["name"].endswith(('.py', '.js', '.ts', '.jsx', '.tsx')) | |
] | |
# Sort by complexity if available | |
complexity_metrics = repo_data["text_content"]["complexity_metrics"]["cyclomatic_complexity"] | |
complexity_dict = {path: cc for path, cc in complexity_metrics} | |
# Sort files by complexity (if available) or by file size | |
if complexity_dict: | |
code_files.sort(key=lambda x: complexity_dict.get(x["path"], 0), reverse=True) | |
else: | |
code_files.sort(key=lambda x: len(x["content"]), reverse=True) | |
# Display up to max_files | |
for i, file in enumerate(code_files[:max_files]): | |
file_path = file["path"] | |
complexity = complexity_dict.get(file_path, "N/A") | |
display(Markdown(f"### {file_path} (Complexity: {complexity})")) | |
# Get code summary | |
summary = repo_data["text_content"]["code_summary"].get(file_path, {}) | |
# Display functions and classes | |
if summary.get("functions") or summary.get("classes"): | |
func_list = ", ".join(summary.get("functions", [])) | |
class_list = ", ".join(summary.get("classes", [])) | |
display(HTML(f""" | |
<div style="background-color:#2c2c2c; color:#f5f5f5; padding:10px; border-radius:5px; margin:5px 0; font-size:0.9em;"> | |
<p><strong>Functions:</strong> {func_list or "None"}</p> | |
<p><strong>Classes:</strong> {class_list or "None"}</p> | |
</div> | |
""")) | |
# Get file extension for syntax highlighting | |
ext = os.path.splitext(file["name"])[1][1:] # Remove the dot | |
# Display code with syntax highlighting (first 100 lines max) | |
code = file["content"] | |
lines = code.split("\n") | |
preview_lines = lines[:min(100, len(lines))] | |
preview = "\n".join(preview_lines) | |
display(Markdown(f"```{ext}\n{preview}\n```")) | |
if len(lines) > 100: | |
display(Markdown(f"*... ({len(lines) - 100} more lines) ...*")) | |
def export_repo_text(self, repo_data, output_dir='/content/repo_text'): | |
"""Export repository text content and analysis to files in Colab.""" | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
# Write README | |
if repo_data["text_content"]["readme"] and repo_data["text_content"]["readme"].get("content"): | |
readme_path = os.path.join(output_dir, "README.md") | |
with open(readme_path, 'w', encoding='utf-8') as f: | |
f.write(repo_data["text_content"]["readme"]["content"]) | |
# Write documentation files | |
if repo_data["text_content"]["documentation"]: | |
docs_dir = os.path.join(output_dir, "docs") | |
if not os.path.exists(docs_dir): | |
os.makedirs(docs_dir) | |
for doc in repo_data["text_content"]["documentation"]: | |
# Create directory structure if needed | |
doc_path = os.path.join(docs_dir, doc["name"]) | |
with open(doc_path, 'w', encoding='utf-8') as f: | |
f.write(doc["content"]) | |
# Write code files | |
code_dir = os.path.join(output_dir, "code") | |
if not os.path.exists(code_dir): | |
os.makedirs(code_dir) | |
for file in repo_data["text_content"]["text_files"]: | |
if os.path.splitext(file["name"])[1].lower() in ['.py', '.js', '.ts', '.jsx', '.tsx']: | |
file_path = os.path.join(code_dir, file["name"]) | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(file["content"]) | |
# Write enhanced repository summary | |
summary_path = os.path.join(output_dir, "repo_summary.md") | |
with open(summary_path, 'w', encoding='utf-8') as f: | |
# Get basic info | |
basic = repo_data["basic_info"] | |
f.write(f"# Repository Summary: {basic['full_name']}\n\n") | |
f.write(f"**Description:** {basic['description'] or 'No description'}\n\n") | |
f.write(f"**URL:** {basic['html_url']}\n") | |
f.write(f"**Created:** {basic['created_at']}\n") | |
f.write(f"**Last updated:** {basic['updated_at']}\n") | |
f.write(f"**Default branch:** {basic['default_branch']}\n") | |
f.write(f"**Stars:** {basic['stargazers_count']}\n") | |
f.write(f"**Forks:** {basic['forks_count']}\n") | |
f.write(f"**Open issues:** {basic['open_issues_count']}\n\n") | |
# Analysis timestamp | |
f.write(f"*Analysis performed: {repo_data['timestamp']}*\n\n") | |
# Languages | |
if repo_data["languages"]: | |
f.write("## Languages\n\n") | |
total = sum(repo_data["languages"].values()) | |
for lang, bytes_count in repo_data["languages"].items(): | |
percentage = (bytes_count / total) * 100 | |
f.write(f"- **{lang}**: {percentage:.1f}% ({bytes_count} bytes)\n") | |
f.write("\n") | |
# Contributors | |
if repo_data["contributors"]: | |
f.write("## Top Contributors\n\n") | |
for i, contributor in enumerate(repo_data["contributors"][:10], 1): | |
f.write(f"{i}. {contributor['login']} - {contributor['contributions']} contributions\n") | |
f.write("\n") | |
# Repository Activity | |
if repo_data["temporal_analysis"]["weekly_commits"]: | |
f.write("## Repository Activity\n\n") | |
# Recent commit activity | |
recent_weeks = repo_data["temporal_analysis"]["weekly_commits"][-10:] | |
f.write("### Recent Commit Activity\n\n") | |
f.write("| Week | Commits |\n") | |
f.write("|------|--------|\n") | |
for week in recent_weeks: | |
f.write(f"| {week['date']} | {week['total']} |\n") | |
f.write("\n") | |
# Issue and PR stats | |
issue_timeline = repo_data["temporal_analysis"]["issue_timeline"] | |
pr_timeline = repo_data["temporal_analysis"]["pr_timeline"] | |
if issue_timeline and issue_timeline.get('resolution_times'): | |
avg_resolution = np.mean(issue_timeline['resolution_times']) | |
median_resolution = np.median(issue_timeline['resolution_times']) | |
f.write("### Issue Statistics\n\n") | |
f.write(f"- Average time to close issues: {avg_resolution:.2f} hours ({avg_resolution/24:.2f} days)\n") | |
f.write(f"- Median time to close issues: {median_resolution:.2f} hours ({median_resolution/24:.2f} days)\n") | |
f.write(f"- Issues analyzed: {len(issue_timeline['resolution_times'])}\n\n") | |
if pr_timeline and pr_timeline.get('merge_times'): | |
avg_merge = np.mean(pr_timeline['merge_times']) | |
median_merge = np.median(pr_timeline['merge_times']) | |
f.write("### Pull Request Statistics\n\n") | |
f.write(f"- PR acceptance rate: {pr_timeline['acceptance_rate']:.2f}%\n") | |
f.write(f"- Average time to merge PRs: {avg_merge:.2f} hours ({avg_merge/24:.2f} days)\n") | |
f.write(f"- Median time to merge PRs: {median_merge:.2f} hours ({median_merge/24:.2f} days)\n") | |
f.write(f"- PRs analyzed: {len(pr_timeline['merge_times'])}\n\n") | |
# Code Complexity | |
if repo_data["text_content"]["complexity_metrics"]["cyclomatic_complexity"]: | |
f.write("## Code Complexity\n\n") | |
complexity_data = repo_data["text_content"]["complexity_metrics"]["cyclomatic_complexity"] | |
complexity_data.sort(key=lambda x: x[1], reverse=True) | |
f.write("### Most Complex Files\n\n") | |
f.write("| File | Cyclomatic Complexity |\n") | |
f.write("|------|------------------------|\n") | |
for path, cc in complexity_data[:10]: | |
f.write(f"| {path} | {cc} |\n") | |
f.write("\n") | |
# Get aggregate metrics | |
""" | |
cc_values = [cc for _, cc in complexity_data] | |
f.write(f"- **Average complexity**: {np.mean(cc_values):.2f}\n") | |
f.write(f"- **Median complexity**: {np.median(cc_values):.2f}\n") | |
f.write(f"- **Max complexity**: {np.max(cc_values)}\n") | |
f.write(f"- **Files analyzed**: {len(cc_values)}\n\n") | |
""" | |
cc_values = [] | |
for _, cc in complexity_data: | |
try: | |
# Handle different possible types | |
if isinstance(cc, (int, float)): | |
cc_values.append(float(cc)) | |
elif isinstance(cc, list) and len(cc) > 0: | |
# If it's a list, try to get first numeric item | |
cc_values.append(float(cc[0])) | |
else: | |
# Try simple conversion as fallback | |
cc_values.append(float(cc)) | |
except (ValueError, TypeError): | |
# Skip this value if conversion fails | |
continue | |
if cc_values: | |
f.write(f"- **Average complexity**: {np.mean(cc_values):.2f}\n") | |
f.write(f"- **Median complexity**: {np.median(cc_values):.2f}\n") | |
f.write(f"- **Max complexity**: {max(cc_values)}\n") | |
f.write(f"- **Files analyzed**: {len(cc_values)}\n\n") | |
else: | |
f.write("- **Complexity metrics**: Could not be calculated\n\n") | |
# Code Dependencies | |
if repo_data["text_content"]["dependencies"]: | |
f.write("## Code Dependencies\n\n") | |
external_deps = repo_data["text_content"]["dependencies"]["external"] | |
# Count unique external dependencies | |
all_external = set() | |
for deps in external_deps.values(): | |
all_external.update(deps) | |
# Find most imported packages | |
ext_counts = Counter() | |
for deps in external_deps.values(): | |
ext_counts.update(deps) | |
top_imports = ext_counts.most_common(10) | |
f.write("### Most Used External Dependencies\n\n") | |
f.write("| Package | Used in # Files |\n") | |
f.write("|---------|----------------|\n") | |
for pkg, count in top_imports: | |
f.write(f"| {pkg} | {count} |\n") | |
f.write("\n") | |
# Code Summary | |
if repo_data["text_content"]["code_summary"]: | |
f.write("## Code Structure\n\n") | |
# Get summary of most significant files | |
complexity_data = repo_data["text_content"]["complexity_metrics"]["cyclomatic_complexity"] | |
complexity_data.sort(key=lambda x: x[1], reverse=True) | |
for path, _ in complexity_data[:5]: | |
summary = repo_data["text_content"]["code_summary"].get(path) | |
if summary: | |
f.write(f"### {path}\n\n") | |
if summary.get("description"): | |
f.write(f"{summary['description']}\n\n") | |
if summary.get("classes"): | |
f.write("**Classes:**\n\n") | |
for cls in summary["classes"]: | |
f.write(f"- `{cls}`\n") | |
f.write("\n") | |
if summary.get("functions"): | |
f.write("**Functions:**\n\n") | |
for func in summary["functions"]: | |
f.write(f"- `{func}()`\n") | |
f.write("\n") | |
if summary.get("imports"): | |
f.write("**Imports:**\n\n") | |
for imp in summary["imports"][:10]: # Limit to top 10 | |
if isinstance(imp, tuple): | |
imp = ' '.join(filter(None, imp)) | |
f.write(f"- `{imp}`\n") | |
f.write("\n") | |
# --- NEW METHOD for getting specific PR details --- | |
def get_pull_request_details(self, owner, repo, pr_number): | |
"""Get detailed information for a specific Pull Request using PyGithub.""" | |
if not self.github: | |
print("PyGithub client not initialized. Cannot fetch PR details.") | |
# Fallback maybe? Or just return None | |
# You could try a direct REST call here if needed | |
return None | |
try: | |
repo_obj = self.github.get_repo(f"{owner}/{repo}") | |
pr = repo_obj.get_pull(pr_number) | |
# Extract relevant information into a dictionary | |
details = { | |
"number": pr.number, | |
"title": pr.title, | |
"state": pr.state, # 'open', 'closed' | |
"merged": pr.merged, | |
"body": pr.body or "", # Ensure body is string | |
"url": pr.html_url, | |
"created_at": pr.created_at.isoformat() if pr.created_at else None, | |
"updated_at": pr.updated_at.isoformat() if pr.updated_at else None, | |
"closed_at": pr.closed_at.isoformat() if pr.closed_at else None, | |
"merged_at": pr.merged_at.isoformat() if pr.merged_at else None, | |
"author": pr.user.login if pr.user else "N/A", | |
"commits_count": pr.commits, | |
"additions": pr.additions, | |
"deletions": pr.deletions, | |
"changed_files_count": pr.changed_files, | |
"labels": [label.name for label in pr.labels], | |
"assignees": [assignee.login for assignee in pr.assignees], | |
"milestone": pr.milestone.title if pr.milestone else None, | |
"repo_full_name": f"{owner}/{repo}", # Add repo context | |
# Add more fields if needed (e.g., comments, reviews) | |
} | |
return details | |
except GithubException as e: | |
if e.status == 404: | |
print(f"Error: Pull Request #{pr_number} not found in {owner}/{repo}.") | |
else: | |
print(f"Error fetching PR #{pr_number} details: {e}") | |
return None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching PR details: {e}") | |
return None | |
# --- Colab Helpers (Keep these as provided) --- | |
try: | |
from google.colab import files | |
IN_COLAB = True | |
except ImportError: | |
IN_COLAB = False | |
# ...(keep download_file and save_json_to_colab functions)... | |
class CustomJSONEncoder(json.JSONEncoder): | |
def default(self, obj): | |
if isinstance(obj, set): | |
return list(obj) | |
elif isinstance(obj, (datetime, np.datetime64)): | |
# Handle both standard datetime and numpy datetime64 | |
if isinstance(obj, np.datetime64): | |
# Convert numpy datetime64 to standard datetime | |
ts = pd.to_datetime(obj) | |
return ts.isoformat() | |
return obj.isoformat() | |
elif isinstance(obj, (np.int64, np.int32)): | |
return int(obj) | |
elif isinstance(obj, (np.float64, np.float32)): | |
return float(obj) | |
elif hasattr(obj, '__dict__'): | |
# Be careful with complex objects, might expose too much | |
# Consider filtering attributes if needed | |
return {k: v for k, v in obj.__dict__.items() if not k.startswith('_') and not callable(v)} | |
# Let the base class default method raise the TypeError | |
return super(CustomJSONEncoder, self).default(obj) | |
def convert_sets_to_lists(obj): | |
# Recursive function to convert sets and handle numpy types | |
if isinstance(obj, dict): | |
return {k: convert_sets_to_lists(v) for k, v in obj.items()} | |
elif isinstance(obj, list): | |
return [convert_sets_to_lists(i) for i in obj] | |
elif isinstance(obj, set): | |
return [convert_sets_to_lists(i) for i in obj] | |
elif isinstance(obj, tuple): | |
return tuple(convert_sets_to_lists(i) for i in obj) | |
elif isinstance(obj, (np.int64, np.int32)): | |
return int(obj) | |
elif isinstance(obj, (np.float64, np.float32)): | |
return float(obj) | |
elif isinstance(obj, np.datetime64): | |
ts = pd.to_datetime(obj) | |
return ts.isoformat() | |
elif isinstance(obj, np.bool_): | |
return bool(obj) | |
elif isinstance(obj, np.ndarray): | |
return convert_sets_to_lists(obj.tolist()) # Convert numpy arrays to lists | |
else: | |
# Attempt to handle other non-serializable types gracefully | |
try: | |
json.dumps(obj) # Test if serializable | |
return obj | |
except TypeError: | |
return str(obj) # Convert to string as a fallback | |
def save_json_to_colab(data, filename='/content/repo_info.json'): | |
"""Save JSON data to a file in Colab and provide download option.""" | |
# ... (rest of the save_json_to_colab function using the above helpers) ... | |
converted_data = convert_sets_to_lists(data) | |
try: | |
with open(filename, 'w') as f: | |
json.dump(converted_data, f, indent=2, cls=CustomJSONEncoder) | |
print(f"Data saved to {filename}") | |
if IN_COLAB: | |
print("To download the JSON file, run the following cell:") | |
print(f"from google.colab import files") | |
print(f"files.download('{filename}')") | |
except TypeError as e: | |
print(f"Error saving JSON: {e}") | |
print("There might be non-serializable data types remaining.") | |
# Keep GraphRepoAnalyzer class mostly unchanged | |
class GraphRepoAnalyzer: | |
"""Integrates GitHub analysis with Neo4j and Gemini.""" | |
# --- Keep ALL existing methods from the previous version --- | |
# ... ( __init__, close, _create_neo4j_constraints, _run_cypher, ...) | |
def __init__(self, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None): | |
"""Initialize with credentials.""" | |
load_dotenv() # Load .env file if it exists | |
self.github_token = github_token or os.getenv("GITHUB_TOKEN") | |
self.neo4j_uri = neo4j_uri or os.getenv("NEO4J_URI") | |
self.neo4j_user = neo4j_user or os.getenv("NEO4J_USERNAME") | |
self.neo4j_password = neo4j_password or os.getenv("NEO4J_PASSWORD") | |
self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY") | |
if not all([self.neo4j_uri, self.neo4j_user, self.neo4j_password]): | |
print("Warning: Neo4j credentials not fully provided. Graph features will be disabled.") | |
self.neo4j_driver = None | |
else: | |
try: | |
# Use basic_auth for Neo4j driver authentication | |
self.neo4j_driver = GraphDatabase.driver(self.neo4j_uri, auth=basic_auth(self.neo4j_user, self.neo4j_password)) | |
self.neo4j_driver.verify_connectivity() | |
print("Successfully connected to Neo4j.") | |
self._create_neo4j_constraints() | |
except Exception as e: | |
print(f"Error connecting to Neo4j: {e}") | |
print("Graph features will be disabled.") | |
self.neo4j_driver = None | |
if not self.gemini_api_key: | |
print("Warning: Google API Key not provided. Gemini features will be disabled.") | |
self.gemini_model = None | |
else: | |
try: | |
genai.configure(api_key=self.gemini_api_key) | |
# Use the latest Gemini 1.5 Pro model | |
self.gemini_model = genai.GenerativeModel('gemini-1.5-pro-latest') | |
print("Gemini 1.5 Pro model initialized.") | |
except Exception as e: | |
print(f"Error initializing Gemini: {e}") | |
self.gemini_model = None | |
self.github_analyzer = GitHubRepoInfo(token=self.github_token) | |
self.repo_data = None | |
self.repo_full_name = None # Store repo name for context | |
def close(self): | |
"""Close the Neo4j driver connection.""" | |
if self.neo4j_driver: | |
self.neo4j_driver.close() | |
print("Neo4j connection closed.") | |
def _create_neo4j_constraints(self): | |
"""Create unique constraints for better performance and data integrity.""" | |
if not self.neo4j_driver: return | |
constraints = [ | |
"CREATE CONSTRAINT repo_name IF NOT EXISTS FOR (r:Repository) REQUIRE r.fullName IS UNIQUE;", | |
"CREATE CONSTRAINT user_login IF NOT EXISTS FOR (u:User) REQUIRE u.login IS UNIQUE;", | |
"CREATE CONSTRAINT commit_sha IF NOT EXISTS FOR (c:Commit) REQUIRE c.sha IS UNIQUE;", | |
"CREATE CONSTRAINT file_path IF NOT EXISTS FOR (f:File) REQUIRE f.path IS UNIQUE;", | |
"CREATE CONSTRAINT lang_name IF NOT EXISTS FOR (l:Language) REQUIRE l.name IS UNIQUE;", | |
"CREATE CONSTRAINT dep_name IF NOT EXISTS FOR (d:Dependency) REQUIRE d.name IS UNIQUE;", | |
"CREATE CONSTRAINT issue_num IF NOT EXISTS FOR (i:Issue) REQUIRE i.number IS UNIQUE;", # Assumes issue number is unique within repo context - adjust if needed | |
"CREATE CONSTRAINT pr_num IF NOT EXISTS FOR (p:PullRequest) REQUIRE p.number IS UNIQUE;", # Same assumption for PRs | |
] | |
try: | |
with self.neo4j_driver.session() as session: | |
for constraint in constraints: | |
session.run(constraint) | |
print("Neo4j constraints ensured.") | |
except Exception as e: | |
print(f"Error creating Neo4j constraints: {e}") | |
def _run_cypher(self, query, parameters=None): | |
"""Helper function to run Cypher queries.""" | |
if not self.neo4j_driver: | |
print("Neo4j connection not available.") | |
return None | |
try: | |
with self.neo4j_driver.session() as session: | |
result = session.run(query, parameters) | |
return [record.data() for record in result] # Return results as list of dicts | |
except Exception as e: | |
print(f"Error running Cypher query: {e}") | |
print(f"Query: {query}") | |
print(f"Parameters: {parameters}") | |
return None | |
# ... ( _populate_basic_info, _populate_contributors, _populate_commits, ...) | |
def _populate_basic_info(self, tx, repo_node, basic_info): | |
"""Populate basic repo info and owner.""" | |
owner_login = basic_info.get('owner', {}).get('login') | |
if owner_login: | |
tx.run(""" | |
MERGE (u:User {login: $owner_login}) | |
ON CREATE SET u.avatarUrl = $avatar_url, u.type = $owner_type | |
MERGE (r)-[:OWNED_BY]->(u) | |
""", owner_login=owner_login, | |
avatar_url=basic_info.get('owner', {}).get('avatar_url'), | |
owner_type=basic_info.get('owner', {}).get('type')) | |
# Add languages | |
languages = self.repo_data.get("languages", {}) | |
if languages: | |
for lang, bytes_count in languages.items(): | |
tx.run(""" | |
MERGE (l:Language {name: $lang}) | |
MERGE (repo)-[rel:USES_LANGUAGE]->(l) | |
SET rel.bytes = $bytes_count | |
""", repo=repo_node, lang=lang, bytes_count=bytes_count) | |
def _populate_contributors(self, tx, repo_node): | |
"""Populate contributors.""" | |
contributors = self.repo_data.get("contributors", []) | |
if contributors: | |
for contrib in contributors: | |
tx.run(""" | |
MERGE (u:User {login: $login}) | |
ON CREATE SET u.avatarUrl = $avatar_url, u.profileUrl = $profile_url | |
MERGE (repo)-[rel:HAS_CONTRIBUTOR]->(u) | |
SET rel.contributions = $contributions | |
""", repo=repo_node, login=contrib['login'], | |
avatar_url=contrib.get('avatar_url'), | |
profile_url=contrib.get('html_url'), | |
contributions=contrib['contributions']) | |
def _populate_commits(self, tx, repo_node): | |
"""Populate recent commits and link authors.""" | |
commits = self.repo_data.get("recent_commits", []) | |
if commits: | |
for commit_data in commits: | |
sha = commit_data['sha'] | |
commit_info = commit_data['commit'] | |
author_info = commit_info.get('author', {}) | |
committer_info = commit_info.get('committer', {}) | |
author_login = commit_data.get('author', {}).get('login') # GitHub user if linked | |
committer_login = commit_data.get('committer', {}).get('login') | |
# Create commit node | |
tx.run(""" | |
MERGE (c:Commit {sha: $sha}) | |
ON CREATE SET c.message = $message, c.date = datetime($date) | |
MERGE (repo)-[:HAS_COMMIT]->(c) | |
""", repo=repo_node, sha=sha, | |
message=commit_info.get('message', '')[:500], # Limit message size | |
date=author_info.get('date')) # Use author date | |
# Link author (if GitHub user) | |
if author_login: | |
tx.run(""" | |
MATCH (c:Commit {sha: $sha}) | |
MERGE (u:User {login: $login}) | |
MERGE (u)-[:AUTHORED]->(c) | |
""", sha=sha, login=author_login) | |
# Else, could store author name/email on commit node if needed | |
# Link committer (if GitHub user and different from author) | |
if committer_login and committer_login != author_login: | |
tx.run(""" | |
MATCH (c:Commit {sha: $sha}) | |
MERGE (u:User {login: $login}) | |
MERGE (u)-[:COMMITTED]->(c) | |
""", sha=sha, login=committer_login) | |
# ... ( _populate_files_and_code, _populate_dependencies, populate_neo4j_graph, ...) | |
def _populate_files_and_code(self, tx, repo_node): | |
"""Populate files, basic structure, and code analysis results.""" | |
code_summary = self.repo_data.get("text_content", {}).get("code_summary", {}) | |
text_files = self.repo_data.get("text_content", {}).get("text_files", []) | |
# Create file nodes first | |
for file_info in text_files: | |
path = file_info['path'] | |
name = file_info['name'] | |
extension = os.path.splitext(name)[1].lower() | |
is_code = extension in ['.py', '.js', '.ts', '.jsx', '.tsx'] # Add more if needed | |
tx.run(""" | |
MERGE (f:File {path: $path}) | |
ON CREATE SET f.name = $name, f.extension = $extension, f.isCode = $is_code | |
MERGE (repo)-[:CONTAINS_FILE]->(f) | |
""", repo=repo_node, path=path, name=name, extension=extension, is_code=is_code) | |
# If it's a code file with analysis, add details | |
if path in code_summary: | |
summary = code_summary[path] | |
metrics = summary.get('metrics', {}) | |
complexity = summary.get('complexity', {}) | |
# Add metrics | |
if metrics: | |
tx.run(""" | |
MATCH (f:File {path: $path}) | |
SET f.linesTotal = $total, f.linesCode = $code, f.linesComment = $comment, f.linesBlank = $blank, f.commentRatio = $ratio | |
""", path=path, total=metrics.get('total_lines'), code=metrics.get('code_lines'), | |
comment=metrics.get('comment_lines'), blank=metrics.get('blank_lines'), | |
ratio=metrics.get('comment_ratio')) | |
# Add complexity | |
if complexity: | |
tx.run(""" | |
MATCH (f:File {path: $path}) | |
SET f.complexityCyclomatic = $cc, f.maintainabilityIndex = $mi | |
""", path=path, cc=complexity.get('overall'), mi=complexity.get('maintainability_index')) | |
# Add Functions (if language supports detailed analysis) | |
for func in summary.get("detailed_functions", []): | |
# Ensure func_name is a string | |
func_name = str(func.get('name', 'unknown_function')) | |
tx.run(""" | |
MATCH (f:File {path: $path}) | |
MERGE (fn:Function {name: $func_name, file: $path}) // Unique by name + file path | |
ON CREATE SET fn.args = $args, fn.complexity = $cc, fn.docstring = $doc | |
MERGE (f)-[:DEFINES_FUNCTION]->(fn) | |
""", path=path, func_name=func_name, | |
args=json.dumps(func.get('args', [])), # Store args as JSON string | |
cc=func.get('complexity'), | |
doc=func.get('docstring', '')[:200]) # Limit docstring | |
# Add Classes (if language supports detailed analysis) | |
for cls in summary.get("detailed_classes", []): | |
# Ensure cls_name is a string | |
cls_name = str(cls.get('name', 'unknown_class')) | |
tx.run(""" | |
MATCH (f:File {path: $path}) | |
MERGE (cl:Class {name: $cls_name, file: $path}) // Unique by name + file path | |
ON CREATE SET cl.methods = $methods, cl.docstring = $doc, cl.extends = $extends | |
MERGE (f)-[:DEFINES_CLASS]->(cl) | |
""", path=path, cls_name=cls_name, | |
methods=json.dumps([m['name'] for m in cls.get('methods', [])]), # Store method names | |
doc=cls.get('docstring', '')[:200], | |
extends=cls.get('extends')) # If JS/TS analysis provides it | |
def _populate_dependencies(self, tx, repo_node): | |
"""Populate internal and external code dependencies.""" | |
dependencies = self.repo_data.get("text_content", {}).get("dependencies", {}) | |
internal_deps = dependencies.get('internal', {}) | |
external_deps = dependencies.get('external', {}) | |
# Internal Dependencies (File -> File) | |
for source_path, target_paths in internal_deps.items(): | |
for target_path in target_paths: | |
# Ensure both files exist before creating relationship | |
tx.run(""" | |
MATCH (source:File {path: $source_path}), (target:File {path: $target_path}) | |
WHERE EXISTS(source.path) AND EXISTS(target.path) // Ensure nodes exist | |
MERGE (source)-[:DEPENDS_ON]->(target) | |
""", source_path=source_path, target_path=target_path) | |
# External Dependencies (File -> Dependency) | |
for source_path, package_names in external_deps.items(): | |
for package_name in package_names: | |
# Ensure package name is valid before creating | |
if package_name and isinstance(package_name, str): | |
tx.run(""" | |
MATCH (source:File {path: $source_path}) | |
WHERE EXISTS(source.path) // Ensure source file exists | |
MERGE (dep:Dependency {name: $package_name}) | |
MERGE (source)-[:IMPORTS]->(dep) | |
""", source_path=source_path, package_name=package_name) | |
def populate_neo4j_graph(self): | |
"""Populate the Neo4j graph with data from self.repo_data.""" | |
if not self.neo4j_driver: | |
print("Neo4j connection not available. Skipping graph population.") | |
return | |
if not self.repo_data or not self.repo_data.get("basic_info"): | |
print("No repository data available to populate the graph.") | |
return | |
basic_info = self.repo_data["basic_info"] | |
full_name = basic_info['full_name'] | |
print(f"Populating Neo4j graph for repository: {full_name}") | |
try: | |
with self.neo4j_driver.session(database="neo4j") as session: # Ensure using correct database if needed | |
# Create/Merge Repository Node | |
repo_result = session.execute_write( | |
lambda tx: tx.run(""" | |
MERGE (r:Repository {fullName: $full_name}) | |
ON CREATE SET | |
r.name = $name, | |
r.owner = $owner, | |
r.description = $description, | |
r.url = $url, | |
r.createdAt = datetime($created_at), | |
r.updatedAt = datetime($updated_at), | |
r.stars = $stars, | |
r.forks = $forks, | |
r.openIssues = $open_issues, | |
r.language = $language, | |
r.license = $license | |
RETURN r | |
""", full_name=full_name, | |
name=basic_info['name'], | |
owner=basic_info['owner']['login'], | |
description=basic_info.get('description', ''), | |
url=basic_info['html_url'], | |
created_at=basic_info['created_at'], | |
updated_at=basic_info['updated_at'], | |
stars=basic_info['stargazers_count'], | |
forks=basic_info['forks_count'], | |
open_issues=basic_info['open_issues_count'], | |
language=basic_info.get('language'), | |
license=basic_info.get('license', {}).get('name') | |
).single()[0] # Get the repo node itself | |
) | |
# Call helper functions within transactions for atomicity | |
session.execute_write(self._populate_basic_info, repo_result, basic_info) | |
session.execute_write(self._populate_contributors, repo_result) | |
session.execute_write(self._populate_commits, repo_result) | |
session.execute_write(self._populate_files_and_code, repo_result) | |
session.execute_write(self._populate_dependencies, repo_result) | |
# Add calls for issues, PRs etc. if needed | |
print(f"Successfully populated graph for {full_name}.") | |
except Exception as e: | |
print(f"Error populating Neo4j graph: {e}") | |
# ... ( analyze_repo, _get_graph_summary_for_llm, _node_to_string, ...) | |
def analyze_repo(self, owner, repo, display=True, save_json=False, export_text=False): | |
"""Fetch, analyze, display, and optionally populate graph.""" | |
self.owner = owner | |
self.repo = repo | |
self.repo_full_name = f"{owner}/{repo}" | |
print(f"\nFetching repository information for {self.repo_full_name}...") | |
# Use the github_analyzer instance associated with this GraphRepoAnalyzer | |
self.repo_data = self.github_analyzer.get_all_info(owner, repo) | |
if self.repo_data: | |
if display: | |
print("\nGenerating visualizations and analysis...") | |
self.github_analyzer.display_repo_info(self.repo_data) | |
self.github_analyzer.display_code_files(self.repo_data) # Show code preview | |
if self.neo4j_driver: | |
populate = input("\nPopulate Neo4j graph with this data? (y/n): ").lower() == 'y' | |
if populate: | |
self.populate_neo4j_graph() | |
if save_json: | |
default_filename = f'/content/{self.repo}_info.json' if IN_COLAB else f'./{self.repo}_info.json' | |
filename = input(f"Enter filename for JSON output (default: {default_filename}): ") or default_filename | |
save_json_to_colab(self.repo_data, filename) # Use the enhanced save function | |
if export_text: | |
default_dir = f'/content/{self.repo}_text' if IN_COLAB else f'./{self.repo}_text' | |
output_dir = input(f"Enter output directory for text export (default: {default_dir}): ") or default_dir | |
self.github_analyzer.export_repo_text(self.repo_data, output_dir) | |
else: | |
print(f"Failed to get repository information for {self.repo_full_name}") | |
def _get_graph_summary_for_llm(self, max_nodes=10, max_rels=20): | |
"""Fetch a small, representative sample of the graph for LLM context.""" | |
if not self.neo4j_driver or not self.repo_full_name: | |
return "No graph data available." | |
# Get counts | |
node_counts_query = "MATCH (n) RETURN labels(n) AS label, count(*) AS count" | |
rel_counts_query = "MATCH ()-[r]->() RETURN type(r) AS type, count(*) AS count" | |
node_counts = self._run_cypher(node_counts_query) | |
rel_counts = self._run_cypher(rel_counts_query) | |
# Get sample nodes/rels related to the repo | |
sample_query = """ | |
MATCH (repo:Repository {fullName: $repo_name}) | |
// Get repo node, owner, some contributors, some files, some commits | |
OPTIONAL MATCH (repo)-[:OWNED_BY]->(owner:User) | |
OPTIONAL MATCH (repo)-[:HAS_CONTRIBUTOR]->(contrib:User) | |
WITH repo, owner, collect(contrib)[..5] AS contributors // Limit contributors | |
OPTIONAL MATCH (repo)-[:CONTAINS_FILE]->(file:File) | |
WITH repo, owner, contributors, collect(file)[..10] AS files // Limit files | |
OPTIONAL MATCH (repo)-[:HAS_COMMIT]->(commit:Commit) | |
WITH repo, owner, contributors, files, collect(commit)[..5] AS commits // Limit commits | |
// Get relationships between these sampled nodes | |
CALL apoc.path.subgraphNodes([repo, owner] + contributors + files + commits, { | |
maxLevel: 1, relationshipFilter:'>' // Only outgoing relationships from these nodes | |
}) YIELD node | |
MATCH (n)-[r]->(m) | |
WHERE n IN [repo, owner] + contributors + files + commits AND m IN [repo, owner] + contributors + files + commits | |
RETURN n AS source, type(r) AS relationship, m AS target | |
LIMIT $max_rels | |
""" | |
# Note: Needs APOC installed in Neo4j for subgraphNodes. | |
# Simpler alternative without APOC: Fetch specific relationships manually. | |
# Example simple alternative: | |
# sample_query_simple = """ | |
# MATCH (repo:Repository {fullName: $repo_name}) | |
# OPTIONAL MATCH (repo)-[r1:OWNED_BY|:HAS_CONTRIBUTOR|:CONTAINS_FILE|:HAS_COMMIT]->(related) | |
# WITH repo, type(r1) as rel_type, related LIMIT 15 | |
# RETURN repo AS source, rel_type AS relationship, related AS target | |
# """ | |
try: | |
# Attempt APOC query first | |
graph_sample = self._run_cypher(sample_query, {"repo_name": self.repo_full_name, "max_rels": max_rels}) | |
except Exception as e: | |
print(f"APOC query failed ({e}), trying simpler graph sample query.") | |
sample_query_simple = """ | |
MATCH (repo:Repository {fullName: $repo_name}) | |
OPTIONAL MATCH (repo)-[r1:OWNED_BY|:HAS_CONTRIBUTOR|:CONTAINS_FILE|:HAS_COMMIT|:USES_LANGUAGE]->(related) | |
WITH repo, type(r1) as rel_type, related LIMIT $max_rels | |
RETURN repo AS source, rel_type AS relationship, related AS target | |
UNION | |
MATCH (repo:Repository {fullName: $repo_name})<-[r2:AUTHORED|:COMMITTED]-(user:User) | |
WITH repo, type(r2) as rel_type, user LIMIT $max_rels | |
RETURN user AS source, rel_type AS relationship, repo AS target // Show user -> repo link | |
UNION | |
MATCH (file:File)<-[:CONTAINS_FILE]-(repo:Repository {fullName: $repo_name}) | |
OPTIONAL MATCH (file)-[r3:DEFINES_FUNCTION|:DEFINES_CLASS|:DEPENDS_ON|:IMPORTS]->(related_code) | |
WITH file, type(r3) as rel_type, related_code LIMIT $max_rels | |
RETURN file AS source, rel_type AS relationship, related_code AS target | |
""" | |
graph_sample = self._run_cypher(sample_query_simple, {"repo_name": self.repo_full_name, "max_rels": max_rels}) | |
summary = "Graph Context Summary:\n" | |
if node_counts: | |
summary += "Node Counts: " + ", ".join([f"{c['label'][0]}: {c['count']}" for c in node_counts if c['label']]) + "\n" | |
if rel_counts: | |
summary += "Relationship Counts: " + ", ".join([f"{r['type']}: {r['count']}" for r in rel_counts if r['type']]) + "\n" | |
if graph_sample: | |
summary += f"\nSample Relationships (up to {max_rels}):\n" | |
for rel in graph_sample: | |
# Safely extract node properties for display | |
source_repr = self._node_to_string(rel.get('source')) | |
target_repr = self._node_to_string(rel.get('target')) | |
rel_type = rel.get('relationship', 'UNKNOWN_REL') | |
if source_repr and target_repr and rel_type: | |
summary += f"- ({source_repr})-[:{rel_type}]->({target_repr})\n" | |
else: | |
summary += "No specific graph sample retrieved.\n" | |
return summary.strip() | |
def _node_to_string(self, node): | |
"""Helper to create a string representation of a Neo4j node.""" | |
if not node or not hasattr(node, 'labels') or not hasattr(node, 'items'): | |
return None | |
label = list(node.labels)[0] if node.labels else 'Node' | |
props = dict(node.items()) | |
# Choose a representative property | |
if 'fullName' in props: name = props['fullName'] | |
elif 'login' in props: name = props['login'] | |
elif 'path' in props: name = os.path.basename(props['path']) # Show file name | |
elif 'name' in props: name = props['name'] | |
elif 'sha' in props: name = props['sha'][:7] # Short SHA | |
elif 'number' in props: name = f"#{props['number']}" | |
else: name = node.element_id # Fallback to element ID | |
# Limit name length | |
name_str = str(name) | |
if len(name_str) > 40: | |
name_str = name_str[:37] + "..." | |
return f"{label}:{name_str}" | |
def _get_pr_summary_prompt(self, pr_details, role): | |
"""Generates the Gemini prompt for PR summarization based on role.""" | |
# Extract key details safely | |
title = pr_details.get('title', 'N/A') | |
body = pr_details.get('body', 'No description provided.') | |
pr_number = pr_details.get('number', 'N/A') | |
repo_name = pr_details.get('repo_full_name', 'N/A') | |
author = pr_details.get('author', 'N/A') | |
state = pr_details.get('state', 'N/A') | |
merged_status = 'Merged' if pr_details.get('merged') else ('Closed' if state == 'closed' else 'Open') | |
created_at = pr_details.get('created_at', 'N/A') | |
commits_count = pr_details.get('commits_count', 'N/A') | |
changed_files = pr_details.get('changed_files_count', 'N/A') | |
additions = pr_details.get('additions', 'N/A') | |
deletions = pr_details.get('deletions', 'N/A') | |
labels = ', '.join(pr_details.get('labels', [])) or 'None' | |
# Truncate long body | |
max_body_len = 1500 | |
truncated_body = body[:max_body_len] + ('...' if len(body) > max_body_len else '') | |
base_prompt = f""" | |
You are an AI assistant specializing in summarizing GitHub Pull Requests. | |
Analyze the following Pull Request details from repository '{repo_name}' and provide a summary tailored for a '{role}'. | |
**Pull Request #{pr_number}: {title}** | |
* **Author:** {author} | |
* **Status:** {state.capitalize()} ({merged_status}) | |
* **Created:** {created_at} | |
* **Commits:** {commits_count} | |
* **Changed Files:** {changed_files} | |
* **Code Churn:** +{additions} / -{deletions} lines | |
* **Labels:** {labels} | |
* **Description/Body:** | |
{truncated_body} | |
--- | |
""" | |
role_instructions = "" | |
# Define role-specific instructions | |
if role == 'Developer': | |
role_instructions = """ | |
**Summary Focus (Developer):** | |
* Summarize the core technical changes and their purpose. | |
* Identify key files, modules, or functions affected. | |
* Mention any potential technical complexities, risks, or areas needing careful code review (based *only* on the description and metadata). | |
* Note any mention of tests added or modified. | |
* Be concise and focus on technical aspects relevant for peer review or understanding the change. | |
""" | |
elif role == 'Manager' or role == 'Team Lead': | |
role_instructions = """ | |
**Summary Focus (Manager/Team Lead):** | |
* Explain the high-level purpose and business value (what problem does this PR solve or what feature does it add?). | |
* Summarize the overall status (e.g., Ready for Review, Needs Work, Merged, Blocked?). | |
* Give a sense of the PR's size/complexity (e.g., Small/Medium/Large based on file/line changes and description). | |
* Highlight any mentioned risks, blockers, or dependencies on other work. | |
* Include the author and key dates (created, merged/closed). | |
* Focus on information needed for tracking progress and impact. | |
""" | |
elif role == 'Program Manager' or role == 'Product Owner': | |
role_instructions = """ | |
**Summary Focus (Program/Product Manager):** | |
* Describe the user-facing impact or the feature/bug fix being addressed. | |
* Relate the PR to product goals or requirements if possible (based on title/body/labels). | |
* Note the status (especially if merged or closed). | |
* Mention associated issues or tickets if referenced in the body (though not explicitly provided here, look for patterns like '#123'). | |
* Focus on 'what' and 'why' from a product perspective. | |
""" | |
else: # Default/General | |
role_instructions = """ | |
**Summary Focus (General):** | |
* State the main goal or purpose of the PR clearly. | |
* Identify the author and the current status (Open/Closed/Merged). | |
* Provide a brief, balanced overview of the key changes made. | |
* Keep the summary accessible to a wider audience. | |
""" | |
return base_prompt + role_instructions + "\n**Summary:**" # Ask for summary explicitly | |
def summarize_pull_request(self, pr_number, role): | |
"""Fetches PR details and generates a role-based summary using Gemini.""" | |
if not self.gemini_model: | |
return "Gemini model not initialized. Cannot generate summary." | |
if not self.owner or not self.repo: | |
return "Repository owner and name not set. Analyze a repository first." | |
# Use the github_analyzer instance created in __init__ | |
if not self.github_analyzer: | |
return "GitHub Analyzer not initialized." | |
print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...") | |
pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number) | |
if not pr_details: | |
return f"Could not retrieve details for PR #{pr_number}." | |
print(f"Generating summary for role: {role}...") | |
# Generate the role-specific prompt | |
prompt = self._get_pr_summary_prompt(pr_details, role) | |
# 4. Send to Gemini and Get Response | |
try: | |
# print("--- Sending Prompt to Gemini ---") | |
# print(prompt[:1000] + "..." if len(prompt) > 1000 else prompt) # Debug: Print truncated prompt | |
# print("-----------------------------") | |
response = self.gemini_model.generate_content(prompt) | |
print("\n--- Gemini PR Summary ---") | |
summary_text = response.text | |
display(Markdown(summary_text)) | |
print("------------------------") | |
return summary_text | |
except Exception as e: | |
print(f"Error communicating with Gemini for PR summary: {e}") | |
return f"Error asking Gemini: {e}" | |
# ... ( _get_repo_summary_for_llm, ask_gemini_about_repo ) | |
def _get_repo_summary_for_llm(self): | |
"""Create a concise text summary of the repo_data for the LLM prompt.""" | |
if not self.repo_data or not self.repo_data.get("basic_info"): | |
return "No repository data available." | |
basic = self.repo_data["basic_info"] | |
summary = f"Repository Summary: {basic['full_name']}\n" | |
summary += f"Description: {basic.get('description', 'N/A')}\n" | |
summary += f"Stars: {basic.get('stargazers_count', 0)}, Forks: {basic.get('forks_count', 0)}, Open Issues: {basic.get('open_issues_count', 0)}\n" | |
summary += f"Main Language: {basic.get('language', 'N/A')}\n" | |
summary += f"Last Updated: {basic.get('updated_at', 'N/A')}\n" | |
if self.repo_data.get("languages"): | |
langs = list(self.repo_data["languages"].keys()) | |
summary += f"Languages Used: {', '.join(langs[:5])}{'...' if len(langs) > 5 else ''}\n" | |
if self.repo_data.get("contributors"): | |
contribs = [c['login'] for c in self.repo_data["contributors"][:5]] | |
summary += f"Top Contributors: {', '.join(contribs)}{'...' if len(self.repo_data['contributors']) > 5 else ''}\n" | |
if self.repo_data.get("text_content", {}).get("aggregate_metrics"): | |
metrics = self.repo_data["text_content"]["aggregate_metrics"] | |
summary += f"Code Metrics (approx): {metrics.get('total_code_lines', 0)} LoC, Comment Ratio: {metrics.get('average_comment_ratio', 0):.2f}\n" | |
# Add complexity summary if available | |
complexity_data = self.repo_data.get("text_content", {}).get("complexity_metrics",{}).get("cyclomatic_complexity", []) | |
if complexity_data: | |
cc_values = [c[1] for c in complexity_data if isinstance(c[1], (int, float))] # Extract valid numbers | |
if cc_values: | |
summary += f"Avg Cyclomatic Complexity: {np.mean(cc_values):.2f}\n" | |
# Add dependency summary if available | |
deps = self.repo_data.get("text_content", {}).get("dependencies", {}).get("external", {}) | |
if deps: | |
ext_counts = Counter() | |
for dep_list in deps.values(): | |
ext_counts.update(dep for dep in dep_list if isinstance(dep, str)) # Count valid string deps | |
top_deps = ext_counts.most_common(5) | |
if top_deps: | |
summary += f"Top External Dependencies: {', '.join([d[0] for d in top_deps])}\n" | |
return summary.strip() | |
def ask_gemini_about_repo(self, question): | |
"""Ask Gemini a question about the analyzed repository, using graph context.""" | |
if not self.gemini_model: | |
return "Gemini model not initialized. Please provide GOOGLE_API_KEY." | |
if not self.repo_data: | |
return "No repository has been analyzed yet. Run analyze_repo() first." | |
print("\nAsking Gemini...") | |
# 1. Get Base Summary Context (from fetched GitHub data) | |
repo_summary = self._get_repo_summary_for_llm() | |
# 2. Get Graph Context (GraphRAG - Retrieval Step) | |
# (Simple version: get generic graph summary. Advanced: tailor query to question) | |
graph_context = self._get_graph_summary_for_llm() # Use the helper | |
# 3. Construct the Prompt | |
prompt = f"""You are an expert software engineering assistant analyzing the GitHub repository '{self.repo_full_name}'. | |
You have access to the following information: | |
**Repository Summary (from GitHub API):** | |
{repo_summary} | |
**Knowledge Graph Context (Sample from Neo4j):** | |
{graph_context} | |
--- | |
Based *only* on the information provided above, please answer the following question: | |
**Question:** {question} | |
--- | |
Provide a concise and informative answer, referencing the data sources (summary or graph) where possible. If the information isn't available in the provided context, state that explicitly. | |
""" | |
# 4. Send to Gemini and Get Response | |
try: | |
print("--- Sending Prompt to Gemini ---") | |
print(prompt[:1000] + "..." if len(prompt) > 1000 else prompt) # Print truncated prompt for review | |
print("-----------------------------") | |
response = self.gemini_model.generate_content(prompt) | |
print("\n--- Gemini's Response ---") | |
# Display response using Markdown for better formatting | |
display(Markdown(response.text)) | |
print("------------------------") | |
return response.text | |
except Exception as e: | |
print(f"Error communicating with Gemini: {e}") | |
return f"Error asking Gemini: {e}" | |
def __init__(self, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None): | |
"""Initialize with credentials.""" | |
load_dotenv() # Load .env file if it exists | |
self.github_token = github_token or os.getenv("GITHUB_TOKEN") | |
self.neo4j_uri = neo4j_uri or os.getenv("NEO4J_URI") | |
self.neo4j_user = neo4j_user or os.getenv("NEO4J_USERNAME") | |
self.neo4j_password = neo4j_password or os.getenv("NEO4J_PASSWORD") | |
self.gemini_api_key = gemini_api_key or os.getenv("GOOGLE_API_KEY") | |
self.neo4j_driver = None | |
self.gemini_model = None | |
# Initialize github_analyzer using the potentially updated GitHubRepoInfo | |
self.github_analyzer = GitHubRepoInfo(token=self.github_token) | |
if not all([self.neo4j_uri, self.neo4j_user, self.neo4j_password]): | |
print("Warning: Neo4j credentials not fully provided. Graph features will be disabled.") | |
else: | |
try: | |
self.neo4j_driver = GraphDatabase.driver(self.neo4j_uri, auth=basic_auth(self.neo4j_user, self.neo4j_password)) | |
self.neo4j_driver.verify_connectivity() | |
print("Successfully connected to Neo4j.") | |
self._create_neo4j_constraints() | |
except Exception as e: | |
print(f"Error connecting to Neo4j: {e}") | |
print("Graph features will be disabled.") | |
self.neo4j_driver = None | |
if not self.gemini_api_key: | |
print("Warning: Google API Key not provided. Gemini features will be disabled.") | |
else: | |
try: | |
genai.configure(api_key=self.gemini_api_key) | |
self.gemini_model = genai.GenerativeModel('gemini-2.0-flash-thinking-exp-01-21') | |
print("Gemini 1.5 Pro model initialized.") | |
except Exception as e: | |
print(f"Error initializing Gemini: {e}") | |
self.gemini_model = None | |
self.repo_data = None | |
self.repo_full_name = None # Store repo name for context | |
self.owner = None # Store owner | |
self.repo = None # Store repo name | |
def analyze_repo(self, owner, repo, display=True, save_json=False, export_text=False): | |
"""Fetch, analyze, display, and optionally populate graph.""" | |
self.owner = owner | |
self.repo = repo | |
self.repo_full_name = f"{owner}/{repo}" | |
print(f"\nFetching repository information for {self.repo_full_name}...") | |
# Use the github_analyzer instance associated with this GraphRepoAnalyzer | |
self.repo_data = self.github_analyzer.get_all_info(owner, repo) | |
if self.repo_data: | |
if display: | |
print("\nGenerating visualizations and analysis...") | |
self.github_analyzer.display_repo_info(self.repo_data) | |
self.github_analyzer.display_code_files(self.repo_data) # Show code preview | |
if self.neo4j_driver: | |
populate = input("\nPopulate Neo4j graph with this data? (y/n): ").lower() == 'y' | |
if populate: | |
self.populate_neo4j_graph() | |
if save_json: | |
default_filename = f'/content/{self.repo}_info.json' if IN_COLAB else f'./{self.repo}_info.json' | |
filename = input(f"Enter filename for JSON output (default: {default_filename}): ") or default_filename | |
save_json_to_colab(self.repo_data, filename) # Use the enhanced save function | |
if export_text: | |
default_dir = f'/content/{self.repo}_text' if IN_COLAB else f'./{self.repo}_text' | |
output_dir = input(f"Enter output directory for text export (default: {default_dir}): ") or default_dir | |
self.github_analyzer.export_repo_text(self.repo_data, output_dir) | |
else: | |
print(f"Failed to get repository information for {self.repo_full_name}") | |
def _get_pr_summary_prompt(self, pr_details, role): | |
"""Generates the Gemini prompt for PR summarization based on role.""" | |
# Extract key details safely | |
title = pr_details.get('title', 'N/A') | |
body = pr_details.get('body', 'No description provided.') | |
pr_number = pr_details.get('number', 'N/A') | |
repo_name = pr_details.get('repo_full_name', 'N/A') | |
author = pr_details.get('author', 'N/A') | |
state = pr_details.get('state', 'N/A') | |
merged_status = 'Merged' if pr_details.get('merged') else ('Closed' if state == 'closed' else 'Open') | |
created_at = pr_details.get('created_at', 'N/A') | |
commits_count = pr_details.get('commits_count', 'N/A') | |
changed_files = pr_details.get('changed_files_count', 'N/A') | |
additions = pr_details.get('additions', 'N/A') | |
deletions = pr_details.get('deletions', 'N/A') | |
labels = ', '.join(pr_details.get('labels', [])) or 'None' | |
# Truncate long body | |
max_body_len = 1500 | |
truncated_body = body[:max_body_len] + ('...' if len(body) > max_body_len else '') | |
base_prompt = f""" | |
You are an AI assistant specializing in summarizing GitHub Pull Requests. | |
Analyze the following Pull Request details from repository '{repo_name}' and provide a summary tailored for a '{role}'. | |
**Pull Request #{pr_number}: {title}** | |
* **Author:** {author} | |
* **Status:** {state.capitalize()} ({merged_status}) | |
* **Created:** {created_at} | |
* **Commits:** {commits_count} | |
* **Changed Files:** {changed_files} | |
* **Code Churn:** +{additions} / -{deletions} lines | |
* **Labels:** {labels} | |
* **Description/Body:** | |
{truncated_body} | |
--- | |
""" | |
role_instructions = "" | |
# Define role-specific instructions | |
if role == 'Developer': | |
role_instructions = """ | |
**Summary Focus (Developer):** | |
* Summarize the core technical changes and their purpose. | |
* Identify key files, modules, or functions affected. | |
* Mention any potential technical complexities, risks, or areas needing careful code review (based *only* on the description and metadata). | |
* Note any mention of tests added or modified. | |
* Be concise and focus on technical aspects relevant for peer review or understanding the change. | |
""" | |
elif role == 'Manager' or role == 'Team Lead': | |
role_instructions = """ | |
**Summary Focus (Manager/Team Lead):** | |
* Explain the high-level purpose and business value (what problem does this PR solve or what feature does it add?). | |
* Summarize the overall status (e.g., Ready for Review, Needs Work, Merged, Blocked?). | |
* Give a sense of the PR's size/complexity (e.g., Small/Medium/Large based on file/line changes and description). | |
* Highlight any mentioned risks, blockers, or dependencies on other work. | |
* Include the author and key dates (created, merged/closed). | |
* Focus on information needed for tracking progress and impact. | |
""" | |
elif role == 'Program Manager' or role == 'Product Owner': | |
role_instructions = """ | |
**Summary Focus (Program/Product Manager):** | |
* Describe the user-facing impact or the feature/bug fix being addressed. | |
* Relate the PR to product goals or requirements if possible (based on title/body/labels). | |
* Note the status (especially if merged or closed). | |
* Mention associated issues or tickets if referenced in the body (though not explicitly provided here, look for patterns like '#123'). | |
* Focus on 'what' and 'why' from a product perspective. | |
""" | |
else: # Default/General | |
role_instructions = """ | |
**Summary Focus (General):** | |
* State the main goal or purpose of the PR clearly. | |
* Identify the author and the current status (Open/Closed/Merged). | |
* Provide a brief, balanced overview of the key changes made. | |
* Keep the summary accessible to a wider audience. | |
""" | |
return base_prompt + role_instructions + "\n**Summary:**" # Ask for summary explicitly | |
def summarize_pull_request(self, pr_number, role): | |
"""Fetches PR details and generates a role-based summary using Gemini.""" | |
if not self.gemini_model: | |
return "Gemini model not initialized. Cannot generate summary." | |
if not self.owner or not self.repo: | |
return "Repository owner and name not set. Analyze a repository first." | |
# Use the github_analyzer instance created in __init__ | |
if not self.github_analyzer: | |
return "GitHub Analyzer not initialized." | |
print(f"\nFetching details for PR #{pr_number} in {self.repo_full_name}...") | |
pr_details = self.github_analyzer.get_pull_request_details(self.owner, self.repo, pr_number) | |
if not pr_details: | |
return f"Could not retrieve details for PR #{pr_number}." | |
print(f"Generating summary for role: {role}...") | |
# Generate the role-specific prompt | |
prompt = self._get_pr_summary_prompt(pr_details, role) | |
# 4. Send to Gemini and Get Response | |
try: | |
# print("--- Sending Prompt to Gemini ---") | |
# print(prompt[:1000] + "..." if len(prompt) > 1000 else prompt) # Debug: Print truncated prompt | |
# print("-----------------------------") | |
response = self.gemini_model.generate_content(prompt) | |
print("\n--- Gemini PR Summary ---") | |
summary_text = response.text | |
display(Markdown(summary_text)) | |
print("------------------------") | |
return summary_text | |
except Exception as e: | |
print(f"Error communicating with Gemini for PR summary: {e}") | |
return f"Error asking Gemini: {e}" | |
def create_vizro_dashboard(self, output_dir='./vizro_dashboard'): | |
"""Create a Vizro dashboard from repository data.""" | |
if not self.repo_data: | |
print("No repository data available. Run analyze_repo() first.") | |
return None | |
# Create output directory if it doesn't exist | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
# Extract repository data | |
repo_name = self.repo_data["basic_info"]["full_name"] | |
basic_info = self.repo_data["basic_info"] | |
# Create dashboard pages | |
pages = [] | |
# 1. Overview Page | |
overview_components = [] | |
# Basic repository info as a card | |
repo_info_md = f""" | |
# {basic_info['full_name']} | |
**Description:** {basic_info.get('description', 'No description')} | |
**Stars:** {basic_info['stargazers_count']} | | |
**Forks:** {basic_info['forks_count']} | | |
**Open Issues:** {basic_info['open_issues_count']} | |
**Created:** {basic_info['created_at']} | | |
**Last Updated:** {basic_info['updated_at']} | |
**Default Branch:** {basic_info['default_branch']} | |
**License:** {basic_info['license']['name'] if basic_info.get('license') else 'Not specified'} | |
""" | |
overview_components.append(vzm.Card(text=repo_info_md)) | |
# Languages pie chart | |
if self.repo_data.get("languages"): | |
langs_data = [] | |
total = sum(self.repo_data["languages"].values()) | |
for lang, bytes_count in self.repo_data["languages"].items(): | |
percentage = (bytes_count / total) * 100 | |
langs_data.append({ | |
"Language": lang, | |
"Bytes": bytes_count, | |
"Percentage": percentage | |
}) | |
langs_df = pd.DataFrame(langs_data) | |
lang_pie = vzm.Graph( | |
figure=px.pie( | |
langs_df, | |
values="Percentage", | |
names="Language", | |
title="Language Distribution" | |
) | |
) | |
overview_components.append(vzm.Card(graph=lang_pie)) | |
# Contributors bar chart | |
if self.repo_data.get("contributors"): | |
contrib_data = [] | |
for contributor in self.repo_data["contributors"][:15]: | |
contrib_data.append({ | |
"Username": contributor['login'], | |
"Contributions": contributor['contributions'] | |
}) | |
contrib_df = pd.DataFrame(contrib_data) | |
contrib_bar = vzm.Graph( | |
figure=px.bar( | |
contrib_df, | |
x="Username", | |
y="Contributions", | |
title="Top Contributors" | |
) | |
) | |
overview_components.append(vzm.Card(graph=contrib_bar)) | |
# Add overview page | |
pages.append( | |
vzm.Page( | |
title="Overview", | |
components=overview_components | |
) | |
) | |
# 2. Activity Page | |
activity_components = [] | |
# Commit activity over time | |
weekly_commits = self.repo_data.get("temporal_analysis", {}).get("weekly_commits", []) | |
if weekly_commits: | |
commits_df = pd.DataFrame([ | |
{"Date": week['date'], "Commits": week['total']} | |
for week in weekly_commits | |
]) | |
commits_line = vzm.Graph( | |
figure=px.line( | |
commits_df, | |
x="Date", | |
y="Commits", | |
title="Weekly Commit Activity" | |
) | |
) | |
activity_components.append(vzm.Card(graph=commits_line)) | |
# Code changes over time | |
weekly_code_changes = self.repo_data.get("temporal_analysis", {}).get("weekly_code_changes", []) | |
if weekly_code_changes: | |
changes_data = [] | |
for week in weekly_code_changes: | |
changes_data.append({ | |
"Date": week['date'], | |
"Additions": week['additions'], | |
"Deletions": -abs(week['deletions']) # Make negative for visualization | |
}) | |
changes_df = pd.DataFrame(changes_data) | |
# Create a stacked bar chart | |
changes_fig = go.Figure() | |
changes_fig.add_trace(go.Bar( | |
x=changes_df["Date"], | |
y=changes_df["Additions"], | |
name="Additions", | |
marker_color="green" | |
)) | |
changes_fig.add_trace(go.Bar( | |
x=changes_df["Date"], | |
y=changes_df["Deletions"], | |
name="Deletions", | |
marker_color="red" | |
)) | |
changes_fig.update_layout( | |
title="Weekly Code Changes", | |
barmode="relative" | |
) | |
changes_chart = vzm.Graph(figure=changes_fig) | |
activity_components.append(vzm.Card(graph=changes_chart)) | |
# Issue resolution times | |
issue_timeline = self.repo_data.get("temporal_analysis", {}).get("issue_timeline", {}) | |
if issue_timeline and issue_timeline.get('resolution_times'): | |
resolution_times = issue_timeline['resolution_times'] | |
# Convert to hours for better visualization (cap at one week) | |
rt_hours = [min(rt, 168) for rt in resolution_times if rt is not None] | |
# Create histogram | |
issue_resolution_fig = px.histogram( | |
x=rt_hours, | |
title="Issue Resolution Times (Capped at 1 Week)", | |
labels={"x": "Hours to Resolution", "y": "Number of Issues"} | |
) | |
# Add mean and median lines | |
if rt_hours: | |
mean_rt = np.mean(rt_hours) | |
median_rt = np.median(rt_hours) | |
issue_resolution_fig.add_vline( | |
x=mean_rt, | |
line_dash="dash", | |
line_color="red", | |
annotation_text=f"Mean: {mean_rt:.2f} hours" | |
) | |
issue_resolution_fig.add_vline( | |
x=median_rt, | |
line_dash="dash", | |
line_color="green", | |
annotation_text=f"Median: {median_rt:.2f} hours" | |
) | |
resolution_hist = vzm.Graph(figure=issue_resolution_fig) | |
activity_components.append(vzm.Card(graph=resolution_hist)) | |
# Add activity page | |
pages.append( | |
vzm.Page( | |
title="Activity", | |
components=activity_components | |
) | |
) | |
# 3. Code Quality Page | |
code_components = [] | |
# Code complexity metrics | |
complexity_metrics = self.repo_data.get("text_content", {}).get("complexity_metrics", {}) | |
cyclomatic_complexity = complexity_metrics.get("cyclomatic_complexity", []) | |
if cyclomatic_complexity: | |
# Prepare data for top complex files | |
complexity_data = [] | |
for path, cc in cyclomatic_complexity: | |
# Ensure cc is numeric | |
if isinstance(cc, (int, float)): | |
complexity_data.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Complexity": cc | |
}) | |
if complexity_data: | |
# Sort by complexity | |
complexity_data.sort(key=lambda x: x["Complexity"], reverse=True) | |
# Take top 10 | |
top_complex_files = complexity_data[:10] | |
complex_df = pd.DataFrame(top_complex_files) | |
complex_bar = vzm.Graph( | |
figure=px.bar( | |
complex_df, | |
x="File", | |
y="Complexity", | |
title="Most Complex Files", | |
hover_data=["Path"] | |
) | |
) | |
code_components.append(vzm.Card(graph=complex_bar)) | |
# Complexity histogram | |
cc_values = [d["Complexity"] for d in complexity_data] | |
cc_hist = vzm.Graph( | |
figure=px.histogram( | |
x=cc_values, | |
title="Cyclomatic Complexity Distribution", | |
labels={"x": "Complexity", "y": "Number of Files"} | |
) | |
) | |
code_components.append(vzm.Card(graph=cc_hist)) | |
# Comment ratio by file | |
comment_ratios = complexity_metrics.get("comment_ratios", []) | |
if comment_ratios: | |
comment_data = [] | |
for path, ratio in comment_ratios: | |
comment_data.append({ | |
"File": os.path.basename(path), | |
"Path": path, | |
"Comment Ratio": ratio | |
}) | |
# Sort by ratio | |
comment_data.sort(key=lambda x: x["Comment Ratio"], reverse=True) | |
# Take top 10 | |
top_commented_files = comment_data[:10] | |
comment_df = pd.DataFrame(top_commented_files) | |
comment_bar = vzm.Graph( | |
figure=px.bar( | |
comment_df, | |
x="File", | |
y="Comment Ratio", | |
title="Most Commented Files", | |
hover_data=["Path"] | |
) | |
) | |
code_components.append(vzm.Card(graph=comment_bar)) | |
# Add code quality page | |
pages.append( | |
vzm.Page( | |
title="Code Quality", | |
components=code_components | |
) | |
) | |
# 4. Dependencies Page | |
dependencies = self.repo_data.get("text_content", {}).get("dependencies", {}) | |
if dependencies: | |
dependencies_components = [] | |
# External dependencies | |
external_deps = dependencies.get("external", {}) | |
if external_deps: | |
# Count packages | |
ext_counts = Counter() | |
for file_deps in external_deps.values(): | |
ext_counts.update(dep for dep in file_deps if isinstance(dep, str)) | |
# Get top dependencies | |
top_deps = ext_counts.most_common(10) | |
deps_data = [] | |
for pkg, count in top_deps: | |
deps_data.append({ | |
"Package": pkg, | |
"Count": count | |
}) | |
deps_df = pd.DataFrame(deps_data) | |
deps_bar = vzm.Graph( | |
figure=px.bar( | |
deps_df, | |
x="Package", | |
y="Count", | |
title="Most Used External Dependencies" | |
) | |
) | |
dependencies_components.append(vzm.Card(graph=deps_bar)) | |
# Internal dependencies | |
internal_deps = dependencies.get("internal", {}) | |
if internal_deps and len(internal_deps) <= 50: # Only for smaller graphs | |
try: | |
# Create NetworkX graph | |
G = nx.DiGraph() | |
# Add nodes and edges | |
for source, targets in internal_deps.items(): | |
source_name = os.path.basename(source) | |
G.add_node(source, name=source_name) | |
for target in targets: | |
target_name = os.path.basename(target) | |
G.add_node(target, name=target_name) | |
G.add_edge(source, target) | |
# Get position layout | |
pos = nx.spring_layout(G, seed=42) | |
# Create graph visualization | |
edge_x = [] | |
edge_y = [] | |
for edge in G.edges(): | |
x0, y0 = pos[edge[0]] | |
x1, y1 = pos[edge[1]] | |
edge_x.extend([x0, x1, None]) | |
edge_y.extend([y0, y1, None]) | |
edge_trace = go.Scatter( | |
x=edge_x, y=edge_y, | |
line=dict(width=0.5, color='#888'), | |
hoverinfo='none', | |
mode='lines') | |
node_x = [] | |
node_y = [] | |
node_text = [] | |
for node in G.nodes(): | |
x, y = pos[node] | |
node_x.append(x) | |
node_y.append(y) | |
node_text.append(G.nodes[node].get('name', node)) | |
node_trace = go.Scatter( | |
x=node_x, y=node_y, | |
mode='markers+text', | |
hoverinfo='text', | |
text=node_text, | |
textposition="top center", | |
marker=dict( | |
showscale=True, | |
colorscale='YlGnBu', | |
size=10, | |
colorbar=dict( | |
thickness=15, | |
title='Node Connections', | |
xanchor='left', | |
titleside='right' | |
) | |
), | |
textfont=dict( | |
family="Arial", | |
size=8, | |
color="black" | |
), | |
) | |
# Color by node degree | |
node_adjacencies = [] | |
for node in G.nodes(): | |
node_adjacencies.append(len(list(G.predecessors(node))) + len(list(G.successors(node)))) | |
node_trace.marker.color = node_adjacencies | |
# Create figure | |
fig = go.Figure(data=[edge_trace, node_trace], | |
layout=go.Layout( | |
title='File Dependency Network', | |
showlegend=False, | |
hovermode='closest', | |
margin=dict(b=20, l=5, r=5, t=40), | |
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), | |
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)) | |
) | |
dependencies_components.append(vzm.Card(graph=vzm.Graph(figure=fig))) | |
except Exception as e: | |
print(f"Error generating dependency network: {e}") | |
# Add dependencies page if we have components | |
if dependencies_components: | |
pages.append( | |
vzm.Page( | |
title="Dependencies", | |
components=dependencies_components | |
) | |
) | |
# Create the dashboard | |
dashboard = vzm.Dashboard( | |
title=f"GitHub Repository Analysis: {repo_name}", | |
pages=pages | |
) | |
# Export dashboard | |
dashboard_path = os.path.join(output_dir, "dashboard.html") | |
dashboard.save(dashboard_path) | |
print(f"Vizro dashboard saved to {dashboard_path}") | |
return dashboard | |
# Create Gradio interface | |
def create_gradio_interface(): | |
"""Create a Gradio interface for the GitHub repository analyzer.""" | |
def analyze_repository(owner, repo, github_token=None, neo4j_uri=None, neo4j_user=None, neo4j_password=None, gemini_api_key=None): | |
"""Function to analyze a repository and return a Vizro dashboard.""" | |
try: | |
analyzer = GraphRepoAnalyzer( | |
github_token=github_token if github_token else None, | |
neo4j_uri=neo4j_uri if neo4j_uri else None, | |
neo4j_user=neo4j_user if neo4j_user else None, | |
neo4j_password=neo4j_password if neo4j_password else None, | |
gemini_api_key=gemini_api_key if gemini_api_key else None | |
) | |
# Analyze repository | |
analyzer.analyze_repo(owner, repo, display=False, save_json=False, export_text=False) | |
if not analyzer.repo_data: | |
return None, f"Failed to analyze repository: {owner}/{repo}. Check the repository name and your GitHub token." | |
# Create Vizro dashboard | |
dashboard = analyzer.create_vizro_dashboard(output_dir='./vizro_dashboard') | |
# Path to dashboard HTML | |
dashboard_path = os.path.join('./vizro_dashboard', 'dashboard.html') | |
# Generate a simple report | |
basic_info = analyzer.repo_data["basic_info"] | |
report = f""" | |
### Repository Analysis: {basic_info['full_name']} | |
**Description:** {basic_info.get('description', 'No description')} | |
**Statistics:** | |
- Stars: {basic_info['stargazers_count']} | |
- Forks: {basic_info['forks_count']} | |
- Open Issues: {basic_info['open_issues_count']} | |
**Interactive Dashboard:** | |
The full interactive Vizro dashboard has been created at: `{dashboard_path}` | |
**Language Summary:** | |
""" | |
# Add language info | |
if analyzer.repo_data.get("languages"): | |
langs = analyzer.repo_data["languages"] | |
total = sum(langs.values()) | |
for lang, bytes_count in sorted(langs.items(), key=lambda x: x[1], reverse=True): | |
percentage = (bytes_count / total) * 100 | |
report += f"- {lang}: {percentage:.1f}%\n" | |
# Add code metrics if available | |
if analyzer.repo_data.get("text_content", {}).get("aggregate_metrics"): | |
metrics = analyzer.repo_data["text_content"]["aggregate_metrics"] | |
report += f""" | |
**Code Metrics:** | |
- Total Files Analyzed: {metrics.get('total_files', 'N/A')} | |
- Total Code Lines: {metrics.get('total_code_lines', 'N/A')} | |
- Comment Ratio: {metrics.get('average_comment_ratio', 'N/A'):.2f} | |
""" | |
return dashboard_path, report | |
except Exception as e: | |
return None, f"Error analyzing repository: {str(e)}" | |
def summarize_pr(owner, repo, pr_number, role, github_token=None, gemini_api_key=None): | |
"""Function to summarize a PR for Gradio.""" | |
try: | |
analyzer = GraphRepoAnalyzer( | |
github_token=github_token if github_token else None, | |
gemini_api_key=gemini_api_key if gemini_api_key else None | |
) | |
# Set repo info | |
analyzer.owner = owner | |
analyzer.repo = repo | |
analyzer.repo_full_name = f"{owner}/{repo}" | |
# Summarize the PR | |
summary = analyzer.summarize_pull_request(int(pr_number), role) | |
return summary | |
except Exception as e: | |
return f"Error summarizing PR: {str(e)}" | |
# UI Components | |
with gr.Blocks(title="GitHub Repository Analyzer") as app: | |
gr.Markdown("# GitHub Repository Analyzer with Vizro and Gemini Integration") | |
gr.Markdown("Analyze GitHub repositories, create interactive dashboards, and summarize pull requests") | |
with gr.Tab("Repository Analysis"): | |
with gr.Row(): | |
with gr.Column(): | |
owner_input = gr.Textbox(label="Repository Owner (Username/Organization)") | |
repo_input = gr.Textbox(label="Repository Name") | |
github_token = gr.Textbox(label="GitHub Token (Optional)", type="password") | |
with gr.Accordion("Advanced Settings (Optional)", open=False): | |
neo4j_uri = gr.Textbox(label="Neo4j URI") | |
neo4j_user = gr.Textbox(label="Neo4j Username") | |
neo4j_password = gr.Textbox(label="Neo4j Password", type="password") | |
gemini_api_key = gr.Textbox(label="Google API Key (for Gemini)", type="password") | |
analyze_btn = gr.Button("Analyze Repository") | |
with gr.Column(): | |
report_output = gr.Markdown(label="Analysis Report") | |
dashboard_output = gr.HTML(label="Dashboard Preview") | |
analyze_btn.click( | |
analyze_repository, | |
inputs=[ | |
owner_input, repo_input, github_token, | |
neo4j_uri, neo4j_user, neo4j_password, | |
gemini_api_key | |
], | |
outputs=[dashboard_output, report_output] | |
) | |
with gr.Tab("PR Summarizer"): | |
with gr.Row(): | |
with gr.Column(): | |
pr_owner_input = gr.Textbox(label="Repository Owner") | |
pr_repo_input = gr.Textbox(label="Repository Name") | |
pr_number_input = gr.Number(label="PR Number", precision=0) | |
pr_role_input = gr.Dropdown( | |
choices=["Developer", "Manager", "Team Lead", "Product Owner", "Program Manager", "General"], | |
label="Your Role" | |
) | |
pr_github_token = gr.Textbox(label="GitHub Token (Optional)", type="password") | |
pr_gemini_api_key = gr.Textbox(label="Google API Key (Required for Gemini)", type="password") | |
summarize_btn = gr.Button("Summarize PR") | |
with gr.Column(): | |
pr_summary_output = gr.Markdown(label="PR Summary") | |
summarize_btn.click( | |
summarize_pr, | |
inputs=[ | |
pr_owner_input, pr_repo_input, pr_number_input, | |
pr_role_input, pr_github_token, pr_gemini_api_key | |
], | |
outputs=pr_summary_output | |
) | |
return app | |
# Main function to run the app | |
def main(): | |
"""Run the GitHub Repository Analyzer with Gradio interface.""" | |
# Load environment variables | |
load_dotenv() | |
# Create and launch the Gradio interface | |
app = create_gradio_interface() | |
app.launch(share=True, debug=True) | |
if __name__ == "__main__": | |
main() |