import requests import json import os import base64 import re import ast import networkx as nx import radon.metrics as metrics import radon.complexity as complexity from datetime import datetime, timedelta from collections import defaultdict, Counter import pandas as pd import numpy as np from github import Github, GithubException import time from dotenv import load_dotenv # Visualization imports import vizro.plotly.express as px import vizro import vizro.models as vzm import plotly.graph_objects as go import gradio as gr from pyvis.network import Network # Google Gemini AI (optional) try: import google.generativeai as genai GEMINI_AVAILABLE = True except ImportError: GEMINI_AVAILABLE = False print("Google Generative AI package not found. PR summarization feature will be disabled.") class GitHubRepoInfo: """Enhanced class to get comprehensive information about a GitHub repository.""" def __init__(self, token=None): """Initialize with optional GitHub API token.""" self.base_url = "https://api.github.com" self.headers = {"Accept": "application/vnd.github.v3+json"} self.token = token self.github = None # Initialize github attribute # Set up authentication if token: self.headers["Authorization"] = f"token {token}" try: self.github = Github(token) self.github.get_user().login # Test connection except Exception as e: print(f"Warning: Failed to initialize PyGithub with token: {e}") self.github = Github() # Fallback to unauthenticated elif os.environ.get("GITHUB_TOKEN"): self.token = os.environ.get("GITHUB_TOKEN") self.headers["Authorization"] = f"token {self.token}" try: self.github = Github(self.token) self.github.get_user().login # Test connection except Exception as e: print(f"Warning: Failed to initialize PyGithub with token: {e}") self.github = Github() # Fallback to unauthenticated else: self.github = Github() # Unauthenticated # Configure rate limit handling self.rate_limit_remaining = 5000 # Assume higher limit if authenticated self.rate_limit_reset = datetime.now() # Initialize rate limit info if possible if self.github: try: rate_limit = self.github.get_rate_limit() self.rate_limit_remaining = rate_limit.core.remaining self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) except Exception as e: print(f"Warning: Could not get initial rate limit from PyGithub: {e}") def _check_rate_limit(self): """Check API rate limit and wait if necessary.""" if self.rate_limit_remaining <= 10: reset_time = self.rate_limit_reset current_time = datetime.now() if reset_time > current_time: wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") time.sleep(wait_time) # Update rate limit info after each API call response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) if response.status_code == 200: rate_data = response.json() self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) def _paginated_get(self, url, params=None, max_items=None): """Handle paginated API responses with rate limit awareness.""" if params is None: params = {} items = [] page = 1 per_page = min(100, params.get("per_page", 30)) params["per_page"] = per_page while True: self._check_rate_limit() params["page"] = page response = requests.get(url, headers=self.headers, params=params) if response.status_code == 200: page_items = response.json() if not page_items: break items.extend(page_items) page += 1 # Check if we've reached the requested limit if max_items and len(items) >= max_items: return items[:max_items] # Check if we've reached the end (GitHub returns fewer items than requested) if len(page_items) < per_page: break else: print(f"Error {response.status_code}: {response.text}") break return items def get_repo_info(self, owner, repo): """Get basic repository information.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}" response = requests.get(url, headers=self.headers) if response.status_code == 200: return response.json() else: print(f"Error {response.status_code}: {response.text}") return None def get_contributors(self, owner, repo, max_contributors=None): """Get repository contributors with pagination support.""" url = f"{self.base_url}/repos/{owner}/{repo}/contributors" return self._paginated_get(url, max_items=max_contributors) def get_languages(self, owner, repo): """Get languages used in the repository.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/languages" response = requests.get(url, headers=self.headers) if response.status_code == 200: return response.json() else: print(f"Error getting languages: {response.status_code}") return {} def get_commits(self, owner, repo, params=None, max_commits=None): """Get commits with enhanced filtering and pagination.""" url = f"{self.base_url}/repos/{owner}/{repo}/commits" return self._paginated_get(url, params=params, max_items=max_commits) def get_commit_activity(self, owner, repo): """Get commit activity stats for the past year.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" response = requests.get(url, headers=self.headers) if response.status_code == 200: return response.json() elif response.status_code == 202: # GitHub is computing the statistics, wait and retry print("GitHub is computing statistics, waiting and retrying...") time.sleep(2) return self.get_commit_activity(owner, repo) else: print(f"Error getting commit activity: {response.status_code}") return [] def get_code_frequency(self, owner, repo): """Get weekly code addition and deletion statistics.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" response = requests.get(url, headers=self.headers) if response.status_code == 200: return response.json() elif response.status_code == 202: # GitHub is computing the statistics, wait and retry print("GitHub is computing statistics, waiting and retrying...") time.sleep(2) return self.get_code_frequency(owner, repo) else: print(f"Error getting code frequency: {response.status_code}") return [] def get_contributor_activity(self, owner, repo): """Get contributor commit activity over time.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" response = requests.get(url, headers=self.headers) if response.status_code == 200: return response.json() elif response.status_code == 202: # GitHub is computing the statistics, wait and retry print("GitHub is computing statistics, waiting and retrying...") time.sleep(2) return self.get_contributor_activity(owner, repo) else: print(f"Error getting contributor activity: {response.status_code}") return [] def get_branches(self, owner, repo): """Get repository branches.""" url = f"{self.base_url}/repos/{owner}/{repo}/branches" return self._paginated_get(url) def get_releases(self, owner, repo, max_releases=None): """Get repository releases with pagination support.""" url = f"{self.base_url}/repos/{owner}/{repo}/releases" return self._paginated_get(url, max_items=max_releases) def get_issues(self, owner, repo, state="all", max_issues=None, params=None): """Get repository issues with enhanced filtering.""" url = f"{self.base_url}/repos/{owner}/{repo}/issues" if params is None: params = {} params["state"] = state return self._paginated_get(url, params=params, max_items=max_issues) def get_issue_timeline(self, owner, repo, days_back=180): """Analyze issue creation and closing over time.""" # Get issues including closed ones issues = self.get_issues(owner, repo, state="all") # Prepare timeline data end_date = datetime.now() start_date = end_date - timedelta(days=days_back) # Initialize daily counters date_range = pd.date_range(start=start_date, end=end_date) created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} # Collect issue creation and closing dates for issue in issues: created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') if created_at >= start_date: created_counts[created_at.strftime('%Y-%m-%d')] += 1 if issue['state'] == 'closed' and issue.get('closed_at'): closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') if closed_at >= start_date: closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 # Calculate resolution times for closed issues resolution_times = [] for issue in issues: if issue['state'] == 'closed' and issue.get('closed_at'): created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours resolution_times.append(resolution_time) # Calculate issue labels distribution label_counts = defaultdict(int) for issue in issues: for label in issue.get('labels', []): label_counts[label['name']] += 1 return { 'created': created_counts, 'closed': closed_counts, 'resolution_times': resolution_times, 'labels': dict(label_counts) } def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): """Get repository pull requests with enhanced filtering.""" url = f"{self.base_url}/repos/{owner}/{repo}/pulls" if params is None: params = {} params["state"] = state return self._paginated_get(url, params=params, max_items=max_prs) def get_pr_timeline(self, owner, repo, days_back=180): """Analyze PR creation, closing, and metrics over time.""" # Get PRs including closed and merged ones prs = self.get_pull_requests(owner, repo, state="all") # Prepare timeline data end_date = datetime.now() start_date = end_date - timedelta(days=days_back) # Initialize daily counters date_range = pd.date_range(start=start_date, end=end_date) created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} # Track metrics merge_times = [] pr_sizes = [] # Collect PR data for pr in prs: created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') if created_at >= start_date: created_counts[created_at.strftime('%Y-%m-%d')] += 1 # Get PR size (additions + deletions) if pr.get('additions') is not None and pr.get('deletions') is not None: pr_sizes.append({ 'additions': pr['additions'], 'deletions': pr['deletions'], 'total': pr['additions'] + pr['deletions'], 'files_changed': pr.get('changed_files', 0) }) # Check if PR is closed if pr['state'] == 'closed': closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') if closed_at >= start_date: closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 # Check if PR was merged if pr['merged_at']: merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') if merged_at >= start_date: merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 # Calculate time to merge merge_time = (merged_at - created_at).total_seconds() / 3600 # hours merge_times.append(merge_time) # Calculate acceptance rate total_closed = sum(closed_counts.values()) total_merged = sum(merged_counts.values()) acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 return { 'created': created_counts, 'closed': closed_counts, 'merged': merged_counts, 'merge_times': merge_times, 'pr_sizes': pr_sizes, 'acceptance_rate': acceptance_rate } def get_contents(self, owner, repo, path="", ref=None): """Get repository contents at the specified path.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" params = {} if ref: params["ref"] = ref response = requests.get(url, headers=self.headers, params=params) if response.status_code == 200: return response.json() else: print(f"Error getting contents: {response.status_code}") return [] def get_readme(self, owner, repo, ref=None): """Get repository README file.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/readme" params = {} if ref: params["ref"] = ref response = requests.get(url, headers=self.headers, params=params) if response.status_code == 200: data = response.json() if data.get("content"): content = base64.b64decode(data["content"]).decode("utf-8") return { "name": data["name"], "path": data["path"], "content": content } return data else: print(f"README not found or error: {response.status_code}") return None def get_file_content(self, owner, repo, path, ref=None): """Get the content of a specific file in the repository.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" params = {} if ref: params["ref"] = ref response = requests.get(url, headers=self.headers, params=params) if response.status_code == 200: data = response.json() if data.get("content"): try: content = base64.b64decode(data["content"]).decode("utf-8") return content except UnicodeDecodeError: return "[Binary file content not displayed]" return None else: print(f"Error getting file content: {response.status_code}") return None def is_text_file(self, file_path): """Determine if a file is likely a text file based on extension.""" text_extensions = [ '.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', '.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', '.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', '.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', '.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', '.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', '.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', '.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', '.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' ] extension = os.path.splitext(file_path)[1].lower() return extension in text_extensions def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): """Recursively get repository contents with a depth limit and file count limit.""" if current_depth >= max_depth: return [] contents = self.get_contents(owner, repo, path, ref) results = [] file_count = 0 for item in contents: if file_count >= max_files: break if item["type"] == "dir": # For directories, add the directory itself and recursively get contents dir_item = { "type": "dir", "name": item["name"], "path": item["path"], "contents": self.get_recursive_contents( owner, repo, item["path"], max_depth, current_depth + 1, max_files - file_count, ref ) } results.append(dir_item) else: # For files, add the file info results.append({ "type": "file", "name": item["name"], "path": item["path"], "size": item["size"], "url": item["html_url"] }) file_count += 1 return results def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): """Get content of all text files in the repository (with limit).""" contents = self.get_contents(owner, repo, path, ref) text_files = [] file_count = 0 # Process current directory for item in contents: if file_count >= max_files: break if item["type"] == "file" and self.is_text_file(item["name"]): content = self.get_file_content(owner, repo, item["path"], ref) if content and content != "[Binary file content not displayed]": text_files.append({ "name": item["name"], "path": item["path"], "content": content }) file_count += 1 elif item["type"] == "dir": # Recursively get text files from subdirectories subdir_files = self.get_all_text_files( owner, repo, item["path"], max_files - file_count, ref ) text_files.extend(subdir_files) file_count += len(subdir_files) return text_files def get_documentation_files(self, owner, repo, ref=None): """Get documentation files from the repository.""" # Common documentation file paths and directories doc_paths = [ "docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", "CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", "SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", ".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" ] doc_files = [] # Try to get each documentation file/directory for path in doc_paths: try: contents = self.get_contents(owner, repo, path, ref) # If it's a directory, get all markdown files in it if isinstance(contents, list): for item in contents: if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): content = self.get_file_content(owner, repo, item["path"], ref) if content: doc_files.append({ "name": item["name"], "path": item["path"], "content": content }) # If it's a file, get its content elif isinstance(contents, dict) and contents.get("type") == "file": content = self.get_file_content(owner, repo, path, ref) if content: doc_files.append({ "name": contents["name"], "path": contents["path"], "content": content }) except: # Path doesn't exist or access issues continue return doc_files def analyze_ast(self, code, file_path): """Analyze Python code using AST (Abstract Syntax Tree).""" if not file_path.endswith('.py'): return None try: tree = ast.parse(code) # Extract more detailed information using AST functions = [] classes = [] imports = [] function_complexities = {} for node in ast.walk(tree): # Get function definitions with arguments if isinstance(node, ast.FunctionDef): args = [] defaults = len(node.args.defaults) args_count = len(node.args.args) - defaults # Get positional args for arg in node.args.args[:args_count]: if hasattr(arg, 'arg'): # Python 3 args.append(arg.arg) else: # Python 2 args.append(arg.id) # Get args with defaults for i, arg in enumerate(node.args.args[args_count:]): if hasattr(arg, 'arg'): # Python 3 args.append(f"{arg.arg}=...") else: # Python 2 args.append(f"{arg.id}=...") # Calculate function complexity func_complexity = complexity.cc_visit(node) function_complexities[node.name] = func_complexity # Get docstring if available docstring = ast.get_docstring(node) functions.append({ 'name': node.name, 'args': args, 'complexity': func_complexity, 'docstring': docstring }) # Get class definitions elif isinstance(node, ast.ClassDef): methods = [] class_docstring = ast.get_docstring(node) # Get class methods for child in node.body: if isinstance(child, ast.FunctionDef): method_complexity = complexity.cc_visit(child) method_docstring = ast.get_docstring(child) methods.append({ 'name': child.name, 'complexity': method_complexity, 'docstring': method_docstring }) classes.append({ 'name': node.name, 'methods': methods, 'docstring': class_docstring }) # Get imports elif isinstance(node, ast.Import): for name in node.names: imports.append(name.name) elif isinstance(node, ast.ImportFrom): module = node.module or "" for name in node.names: imports.append(f"{module}.{name.name}") # Calculate overall code complexity code_complexity = complexity.cc_visit_ast(tree) # Calculate maintainability index try: mi_score = metrics.mi_visit(code, True) except: mi_score = None return { 'functions': functions, 'classes': classes, 'imports': imports, 'complexity': { 'overall': code_complexity, 'functions': function_complexities, 'maintainability_index': mi_score } } except SyntaxError: print(f"Syntax error in Python file: {file_path}") return None except Exception as e: print(f"Error analyzing {file_path}: {str(e)}") return None def analyze_js_ts(self, code, file_path): """Analyze JavaScript/TypeScript code using regex with improved patterns.""" if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): return None # More sophisticated regex patterns for JS/TS analysis results = { 'functions': [], 'classes': [], 'imports': [], 'exports': [], 'hooks': [] # For React hooks } # Function patterns (covering various declaration styles) function_patterns = [ # Regular functions r'function\s+(\w+)\s*\(([^)]*)\)', # Arrow functions assigned to variables r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', # Class methods r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', # Object methods r'(\w+)\s*:\s*function\s*\(([^)]*)\)' ] for pattern in function_patterns: for match in re.finditer(pattern, code): func_name = match.group(1) args = match.group(2).strip() if len(match.groups()) > 1 else "" results['functions'].append({ 'name': func_name, 'args': args }) # Class pattern class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' for match in re.finditer(class_pattern, code, re.DOTALL): class_name = match.group(1) parent_class = match.group(2) if match.group(2) else None class_body = match.group(3) # Find methods in class methods = [] method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' for method_match in re.finditer(method_pattern, class_body): method_name = method_match.group(1) methods.append(method_name) results['classes'].append({ 'name': class_name, 'extends': parent_class, 'methods': methods }) # Import patterns import_patterns = [ # ES6 imports r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', # CommonJS requires r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' ] for pattern in import_patterns: for match in re.finditer(pattern, code): groups = match.groups() if groups[0]: # Destructured import imports = [name.strip() for name in groups[0].split(',')] for imp in imports: results['imports'].append(imp) elif groups[1]: # Namespace import (import * as X) results['imports'].append(groups[1]) elif groups[2]: # Default import results['imports'].append(groups[2]) elif groups[3]: # Module name results['imports'].append(groups[3]) # React hooks detection (for React files) if file_path.endswith(('.jsx', '.tsx')): hook_pattern = r'use([A-Z]\w+)\s*\(' for match in re.finditer(hook_pattern, code): hook_name = 'use' + match.group(1) results['hooks'].append(hook_name) # Export patterns export_patterns = [ # Named exports r'export\s+(?:const|let|var|function|class)\s+(\w+)', # Default exports r'export\s+default\s+(?:function|class)?\s*(\w+)?' ] for pattern in export_patterns: for match in re.finditer(pattern, code): if match.group(1): results['exports'].append(match.group(1)) return results def extract_code_summary(self, file_content, file_path): """Extract comprehensive summary information from code files.""" extension = os.path.splitext(file_path)[1].lower() # Initialize summary summary = { "functions": [], "classes": [], "imports": [], "description": "", "complexity": None } # Extract Python definitions with AST if extension == '.py': ast_result = self.analyze_ast(file_content, file_path) if ast_result: summary["functions"] = [f["name"] for f in ast_result["functions"]] summary["classes"] = [c["name"] for c in ast_result["classes"]] summary["imports"] = ast_result["imports"] summary["complexity"] = ast_result["complexity"] # Try to extract module docstring try: tree = ast.parse(file_content) module_docstring = ast.get_docstring(tree) if module_docstring: summary["description"] = module_docstring except: pass # Add detailed function and class info summary["detailed_functions"] = ast_result["functions"] summary["detailed_classes"] = ast_result["classes"] # Extract JavaScript/TypeScript definitions elif extension in ['.js', '.ts', '.jsx', '.tsx']: js_result = self.analyze_js_ts(file_content, file_path) if js_result: summary["functions"] = [f["name"] for f in js_result["functions"]] summary["classes"] = [c["name"] for c in js_result["classes"]] summary["imports"] = js_result["imports"] # Add detailed function and class info summary["detailed_functions"] = js_result["functions"] summary["detailed_classes"] = js_result["classes"] summary["hooks"] = js_result.get("hooks", []) summary["exports"] = js_result.get("exports", []) # Calculate basic code metrics for any text file if file_content: lines = file_content.split('\n') code_lines = 0 comment_lines = 0 blank_lines = 0 comment_prefixes = ['#', '//', '/*', '*', '