diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,519 +1,3530 @@ -import gradio as gr -import google.generativeai as genai -import os -from dotenv import load_dotenv -from github import Github +# Import existing libraries from the original code +import requests import json -from pathlib import Path -from datetime import datetime -from collections import defaultdict +import os import base64 -from typing import Dict, List, Any, Optional, Tuple -import tempfile -from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type +import re +import ast +import networkx as nx +import radon.metrics as metrics +import radon.complexity as complexity +from datetime import datetime, timedelta +from collections import defaultdict, Counter +import pandas as pd +import matplotlib.pyplot as plt +import matplotlib.dates as mdates +from IPython.display import display, Markdown, HTML +import numpy as np +from github import Github, GithubException import time -import os +from dotenv import load_dotenv +# Import Neo4j and Gemini libraries +from neo4j import GraphDatabase, basic_auth +import google.generativeai as genai +# Import Vizro and Gradio +import vizro.plotly.express as px +import vizro +import vizro.models as vzm +import plotly.graph_objects as go +import gradio as gr -# Load environment variables -load_dotenv() +# Keep GitHubRepoInfo class unchanged +class GitHubRepoInfo: + """Enhanced class to get comprehensive information about a GitHub repository.""" + + def __init__(self, token=None): + """Initialize with optional GitHub API token.""" + self.base_url = "https://api.github.com" + self.headers = {"Accept": "application/vnd.github.v3+json"} + self.token = token + self.github = None # Initialize github attribute + + # Set up authentication + if token: + self.headers["Authorization"] = f"token {token}" + try: + self.github = Github(token) + self.github.get_user().login # Test connection + except Exception as e: + print(f"Warning: Failed to initialize PyGithub with token: {e}") + self.github = Github() # Fallback to unauthenticated + elif os.environ.get("GITHUB_TOKEN"): + self.token = os.environ.get("GITHUB_TOKEN") + self.headers["Authorization"] = f"token {self.token}" + try: + self.github = Github(self.token) + self.github.get_user().login # Test connection + except Exception as e: + print(f"Warning: Failed to initialize PyGithub with token: {e}") + self.github = Github() # Fallback to unauthenticated + else: + self.github = Github() # Unauthenticated + + # Configure rate limit handling + self.rate_limit_remaining = 5000 # Assume higher limit if authenticated + self.rate_limit_reset = datetime.now() + # Initialize rate limit info if possible + if self.github: + try: + rate_limit = self.github.get_rate_limit() + self.rate_limit_remaining = rate_limit.core.remaining + self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) + except Exception as e: + print(f"Warning: Could not get initial rate limit from PyGithub: {e}") + + + # --- Keep ALL existing methods from the original GitHubRepoInfo class --- + # ... ( _check_rate_limit, _paginated_get, get_repo_info, get_contributors, ...) + def _check_rate_limit(self): + """Check API rate limit and wait if necessary.""" + if self.rate_limit_remaining <= 10: + reset_time = self.rate_limit_reset + current_time = datetime.now() + + if reset_time > current_time: + wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer + print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") + time.sleep(wait_time) + + # Update rate limit info after each API call + response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) + if response.status_code == 200: + rate_data = response.json() + self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] + self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) + + def _paginated_get(self, url, params=None, max_items=None): + """Handle paginated API responses with rate limit awareness.""" + if params is None: + params = {} + + items = [] + page = 1 + per_page = min(100, params.get("per_page", 30)) + params["per_page"] = per_page + + while True: + self._check_rate_limit() + + params["page"] = page + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: + page_items = response.json() + if not page_items: + break + + items.extend(page_items) + page += 1 + + # Check if we've reached the requested limit + if max_items and len(items) >= max_items: + return items[:max_items] + + # Check if we've reached the end (GitHub returns fewer items than requested) + if len(page_items) < per_page: + break + else: + print(f"Error {response.status_code}: {response.text}") + break + + return items + + def get_repo_info(self, owner, repo): + """Get basic repository information.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + else: + print(f"Error {response.status_code}: {response.text}") + return None + + def get_contributors(self, owner, repo, max_contributors=None): + """Get repository contributors with pagination support.""" + url = f"{self.base_url}/repos/{owner}/{repo}/contributors" + return self._paginated_get(url, max_items=max_contributors) + + # ... ( get_languages, get_commits, get_commit_activity, get_code_frequency, ...) + def get_languages(self, owner, repo): + """Get languages used in the repository.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/languages" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + else: + print(f"Error getting languages: {response.status_code}") + return {} + + def get_commits(self, owner, repo, params=None, max_commits=None): + """Get commits with enhanced filtering and pagination.""" + url = f"{self.base_url}/repos/{owner}/{repo}/commits" + return self._paginated_get(url, params=params, max_items=max_commits) + + def get_commit_activity(self, owner, repo): + """Get commit activity stats for the past year.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + elif response.status_code == 202: + # GitHub is computing the statistics, wait and retry + print("GitHub is computing statistics, waiting and retrying...") + time.sleep(2) + return self.get_commit_activity(owner, repo) + else: + print(f"Error getting commit activity: {response.status_code}") + return [] + + def get_code_frequency(self, owner, repo): + """Get weekly code addition and deletion statistics.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + elif response.status_code == 202: + # GitHub is computing the statistics, wait and retry + print("GitHub is computing statistics, waiting and retrying...") + time.sleep(2) + return self.get_code_frequency(owner, repo) + else: + print(f"Error getting code frequency: {response.status_code}") + return [] + + + # ... ( get_contributor_activity, get_branches, get_releases, get_issues, ...) + def get_contributor_activity(self, owner, repo): + """Get contributor commit activity over time.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" + response = requests.get(url, headers=self.headers) + + if response.status_code == 200: + return response.json() + elif response.status_code == 202: + # GitHub is computing the statistics, wait and retry + print("GitHub is computing statistics, waiting and retrying...") + time.sleep(2) + return self.get_contributor_activity(owner, repo) + else: + print(f"Error getting contributor activity: {response.status_code}") + return [] + + def get_branches(self, owner, repo): + """Get repository branches.""" + url = f"{self.base_url}/repos/{owner}/{repo}/branches" + return self._paginated_get(url) + + def get_releases(self, owner, repo, max_releases=None): + """Get repository releases with pagination support.""" + url = f"{self.base_url}/repos/{owner}/{repo}/releases" + return self._paginated_get(url, max_items=max_releases) + + def get_issues(self, owner, repo, state="all", max_issues=None, params=None): + """Get repository issues with enhanced filtering.""" + url = f"{self.base_url}/repos/{owner}/{repo}/issues" + if params is None: + params = {} + params["state"] = state + return self._paginated_get(url, params=params, max_items=max_issues) + + # ... ( get_issue_timeline, get_pull_requests, get_pr_timeline, get_contents, ...) + def get_issue_timeline(self, owner, repo, days_back=180): + """Analyze issue creation and closing over time.""" + # Get issues including closed ones + issues = self.get_issues(owner, repo, state="all") + + # Prepare timeline data + end_date = datetime.now() + start_date = end_date - timedelta(days=days_back) + + # Initialize daily counters + date_range = pd.date_range(start=start_date, end=end_date) + created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + + # Collect issue creation and closing dates + for issue in issues: + created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') + if created_at >= start_date: + created_counts[created_at.strftime('%Y-%m-%d')] += 1 + + if issue['state'] == 'closed' and issue.get('closed_at'): + closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') + if closed_at >= start_date: + closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 + + # Calculate resolution times for closed issues + resolution_times = [] + for issue in issues: + if issue['state'] == 'closed' and issue.get('closed_at'): + created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') + closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') + resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours + resolution_times.append(resolution_time) + + # Calculate issue labels distribution + label_counts = defaultdict(int) + for issue in issues: + for label in issue.get('labels', []): + label_counts[label['name']] += 1 -# Configure API keys -GITHUB_TOKEN = os.getenv("github_api") -GEMINI_API_KEY = os.getenv("gemini_api") + return { + 'created': created_counts, + 'closed': closed_counts, + 'resolution_times': resolution_times, + 'labels': dict(label_counts) + } -if not GITHUB_TOKEN or not GEMINI_API_KEY: - raise ValueError("Both GITHUB_TOKEN and GEMINI_API_KEY must be set in environment") + def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): + """Get repository pull requests with enhanced filtering.""" + url = f"{self.base_url}/repos/{owner}/{repo}/pulls" + if params is None: + params = {} + params["state"] = state + return self._paginated_get(url, params=params, max_items=max_prs) + + def get_pr_timeline(self, owner, repo, days_back=180): + """Analyze PR creation, closing, and metrics over time.""" + # Get PRs including closed and merged ones + prs = self.get_pull_requests(owner, repo, state="all") + + # Prepare timeline data + end_date = datetime.now() + start_date = end_date - timedelta(days=days_back) + + # Initialize daily counters + date_range = pd.date_range(start=start_date, end=end_date) + created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} + + # Track metrics + merge_times = [] + pr_sizes = [] + + # Collect PR data + for pr in prs: + created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') + if created_at >= start_date: + created_counts[created_at.strftime('%Y-%m-%d')] += 1 + + # Get PR size (additions + deletions) + if pr.get('additions') is not None and pr.get('deletions') is not None: + pr_sizes.append({ + 'additions': pr['additions'], + 'deletions': pr['deletions'], + 'total': pr['additions'] + pr['deletions'], + 'files_changed': pr.get('changed_files', 0) + }) + + # Check if PR is closed + if pr['state'] == 'closed': + closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') + if closed_at >= start_date: + closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 + + # Check if PR was merged + if pr['merged_at']: + merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') + if merged_at >= start_date: + merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 + + # Calculate time to merge + merge_time = (merged_at - created_at).total_seconds() / 3600 # hours + merge_times.append(merge_time) + + # Calculate acceptance rate + total_closed = sum(closed_counts.values()) + total_merged = sum(merged_counts.values()) + acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 + + return { + 'created': created_counts, + 'closed': closed_counts, + 'merged': merged_counts, + 'merge_times': merge_times, + 'pr_sizes': pr_sizes, + 'acceptance_rate': acceptance_rate + } + + def get_contents(self, owner, repo, path="", ref=None): + """Get repository contents at the specified path.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" + params = {} + if ref: + params["ref"] = ref + + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: + return response.json() + else: + print(f"Error getting contents: {response.status_code}") + return [] + # ... ( get_readme, get_file_content, is_text_file, get_recursive_contents, ...) + def get_readme(self, owner, repo, ref=None): + """Get repository README file.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/readme" + params = {} + if ref: + params["ref"] = ref + + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: + data = response.json() + if data.get("content"): + content = base64.b64decode(data["content"]).decode("utf-8") + return { + "name": data["name"], + "path": data["path"], + "content": content + } + return data + else: + print(f"README not found or error: {response.status_code}") + return None + + def get_file_content(self, owner, repo, path, ref=None): + """Get the content of a specific file in the repository.""" + self._check_rate_limit() + url = f"{self.base_url}/repos/{owner}/{repo}/contents/{path}" + params = {} + if ref: + params["ref"] = ref + + response = requests.get(url, headers=self.headers, params=params) + + if response.status_code == 200: + data = response.json() + if data.get("content"): + try: + content = base64.b64decode(data["content"]).decode("utf-8") + return content + except UnicodeDecodeError: + return "[Binary file content not displayed]" + return None + else: + print(f"Error getting file content: {response.status_code}") + return None + + def is_text_file(self, file_path): + """Determine if a file is likely a text file based on extension.""" + text_extensions = [ + '.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', + '.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', + '.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', + '.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', + '.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', + '.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', + '.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', + '.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', + '.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' + ] + + extension = os.path.splitext(file_path)[1].lower() + return extension in text_extensions + + def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): + """Recursively get repository contents with a depth limit and file count limit.""" + if current_depth >= max_depth: + return [] + + contents = self.get_contents(owner, repo, path, ref) + results = [] + file_count = 0 + + for item in contents: + if file_count >= max_files: + break + + if item["type"] == "dir": + # For directories, add the directory itself and recursively get contents + dir_item = { + "type": "dir", + "name": item["name"], + "path": item["path"], + "contents": self.get_recursive_contents( + owner, repo, item["path"], max_depth, current_depth + 1, + max_files - file_count, ref + ) + } + results.append(dir_item) + else: + # For files, add the file info + results.append({ + "type": "file", + "name": item["name"], + "path": item["path"], + "size": item["size"], + "url": item["html_url"] + }) + file_count += 1 + + return results + # ... ( get_all_text_files, get_documentation_files, analyze_ast, analyze_js_ts, ...) + def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): + """Get content of all text files in the repository (with limit).""" + contents = self.get_contents(owner, repo, path, ref) + text_files = [] + file_count = 0 + + # Process current directory + for item in contents: + if file_count >= max_files: + break + + if item["type"] == "file" and self.is_text_file(item["name"]): + content = self.get_file_content(owner, repo, item["path"], ref) + if content and content != "[Binary file content not displayed]": + text_files.append({ + "name": item["name"], + "path": item["path"], + "content": content + }) + file_count += 1 + elif item["type"] == "dir": + # Recursively get text files from subdirectories + subdir_files = self.get_all_text_files( + owner, repo, item["path"], max_files - file_count, ref + ) + text_files.extend(subdir_files) + file_count += len(subdir_files) + + return text_files + + def get_documentation_files(self, owner, repo, ref=None): + """Get documentation files from the repository.""" + # Common documentation file paths and directories + doc_paths = [ + "docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", + "CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", + "SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", + ".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" + ] + + doc_files = [] + + # Try to get each documentation file/directory + for path in doc_paths: + try: + contents = self.get_contents(owner, repo, path, ref) + + # If it's a directory, get all markdown files in it + if isinstance(contents, list): + for item in contents: + if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): + content = self.get_file_content(owner, repo, item["path"], ref) + if content: + doc_files.append({ + "name": item["name"], + "path": item["path"], + "content": content + }) + # If it's a file, get its content + elif isinstance(contents, dict) and contents.get("type") == "file": + content = self.get_file_content(owner, repo, path, ref) + if content: + doc_files.append({ + "name": contents["name"], + "path": contents["path"], + "content": content + }) + except: + # Path doesn't exist or access issues + continue + + return doc_files + + def analyze_ast(self, code, file_path): + """Analyze Python code using AST (Abstract Syntax Tree).""" + if not file_path.endswith('.py'): + return None -# Initialize APIs -gh = Github(GITHUB_TOKEN) -genai.configure(api_key=GEMINI_API_KEY) -model = genai.GenerativeModel( - model_name="gemini-1.5-pro-latest", - generation_config = { - "temperature": 1, - "top_p": 0.95, - "top_k": 40, - "max_output_tokens": 8192, - "response_mime_type": "text/plain", -}, - - safety_settings=[ - { - "category": "HARM_CATEGORY_HARASSMENT", - "threshold": "BLOCK_MEDIUM_AND_ABOVE" - }, - { - "category": "HARM_CATEGORY_HATE_SPEECH", - "threshold": "BLOCK_MEDIUM_AND_ABOVE" - }, - { - "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", - "threshold": "BLOCK_MEDIUM_AND_ABOVE" - }, - { - "category": "HARM_CATEGORY_DANGEROUS_CONTENT", - "threshold": "BLOCK_MEDIUM_AND_ABOVE" - }, - ] -) - -RELEVANT_EXTENSIONS = { - ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".cpp", ".c", ".h", - ".hpp", ".rb", ".php", ".go", ".rs", ".swift", ".kt" -} - -class RepositoryAnalyzer: - """Handles GitHub repository analysis""" - - def __init__(self, repo_url: str): - # Extract owner and repo name from URL - parts = repo_url.rstrip('/').split('/') - if len(parts) < 2: - raise ValueError("Invalid repository URL format") - - self.repo_name = parts[-1] - self.owner = parts[-2] - self.repo = gh.get_repo(f"{self.owner}/{self.repo_name}") - self.analysis_data: Dict[str, Any] = {} - - def analyze(self) -> Dict[str, Any]: - """Perform complete repository analysis""" try: - # Basic repository information - self.analysis_data["basic_info"] = { - "name": self.repo.name, - "owner": self.repo.owner.login, - "description": self.repo.description or "No description available", - "stars": self.repo.stargazers_count, - "forks": self.repo.forks_count, - "created_at": self.repo.created_at.isoformat(), - "last_updated": self.repo.updated_at.isoformat(), - "primary_language": self.repo.language or "Not specified", + tree = ast.parse(code) + + # Extract more detailed information using AST + functions = [] + classes = [] + imports = [] + function_complexities = {} + + for node in ast.walk(tree): + # Get function definitions with arguments + if isinstance(node, ast.FunctionDef): + args = [] + defaults = len(node.args.defaults) + args_count = len(node.args.args) - defaults + + # Get positional args + for arg in node.args.args[:args_count]: + if hasattr(arg, 'arg'): # Python 3 + args.append(arg.arg) + else: # Python 2 + args.append(arg.id) + + # Get args with defaults + for i, arg in enumerate(node.args.args[args_count:]): + if hasattr(arg, 'arg'): # Python 3 + args.append(f"{arg.arg}=...") + else: # Python 2 + args.append(f"{arg.id}=...") + + # Calculate function complexity + func_complexity = complexity.cc_visit(node) + function_complexities[node.name] = func_complexity + + # Get docstring if available + docstring = ast.get_docstring(node) + + functions.append({ + 'name': node.name, + 'args': args, + 'complexity': func_complexity, + 'docstring': docstring + }) + + # Get class definitions + elif isinstance(node, ast.ClassDef): + methods = [] + class_docstring = ast.get_docstring(node) + + # Get class methods + for child in node.body: + if isinstance(child, ast.FunctionDef): + method_complexity = complexity.cc_visit(child) + method_docstring = ast.get_docstring(child) + + methods.append({ + 'name': child.name, + 'complexity': method_complexity, + 'docstring': method_docstring + }) + + classes.append({ + 'name': node.name, + 'methods': methods, + 'docstring': class_docstring + }) + + # Get imports + elif isinstance(node, ast.Import): + for name in node.names: + imports.append(name.name) + elif isinstance(node, ast.ImportFrom): + module = node.module or "" + for name in node.names: + imports.append(f"{module}.{name.name}") + + # Calculate overall code complexity + code_complexity = complexity.cc_visit_ast(tree) + + # Calculate maintainability index + try: + mi_score = metrics.mi_visit(code, True) + except: + mi_score = None + + return { + 'functions': functions, + 'classes': classes, + 'imports': imports, + 'complexity': { + 'overall': code_complexity, + 'functions': function_complexities, + 'maintainability_index': mi_score + } } - # Analyze repository structure - self.analysis_data["structure"] = self._analyze_structure() + except SyntaxError: + print(f"Syntax error in Python file: {file_path}") + return None + except Exception as e: + print(f"Error analyzing {file_path}: {str(e)}") + return None + + def analyze_js_ts(self, code, file_path): + """Analyze JavaScript/TypeScript code using regex with improved patterns.""" + if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): + return None + + # More sophisticated regex patterns for JS/TS analysis + results = { + 'functions': [], + 'classes': [], + 'imports': [], + 'exports': [], + 'hooks': [] # For React hooks + } + + # Function patterns (covering various declaration styles) + function_patterns = [ + # Regular functions + r'function\s+(\w+)\s*\(([^)]*)\)', + # Arrow functions assigned to variables + r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', + # Class methods + r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', + # Object methods + r'(\w+)\s*:\s*function\s*\(([^)]*)\)' + ] + + for pattern in function_patterns: + for match in re.finditer(pattern, code): + func_name = match.group(1) + args = match.group(2).strip() if len(match.groups()) > 1 else "" + results['functions'].append({ + 'name': func_name, + 'args': args + }) + + # Class pattern + class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' + for match in re.finditer(class_pattern, code, re.DOTALL): + class_name = match.group(1) + parent_class = match.group(2) if match.group(2) else None + class_body = match.group(3) + + # Find methods in class + methods = [] + method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' + for method_match in re.finditer(method_pattern, class_body): + method_name = method_match.group(1) + methods.append(method_name) + + results['classes'].append({ + 'name': class_name, + 'extends': parent_class, + 'methods': methods + }) + + # Import patterns + import_patterns = [ + # ES6 imports + r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', + # CommonJS requires + r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' + ] + + for pattern in import_patterns: + for match in re.finditer(pattern, code): + groups = match.groups() + if groups[0]: # Destructured import + imports = [name.strip() for name in groups[0].split(',')] + for imp in imports: + results['imports'].append(imp) + elif groups[1]: # Namespace import (import * as X) + results['imports'].append(groups[1]) + elif groups[2]: # Default import + results['imports'].append(groups[2]) + elif groups[3]: # Module name + results['imports'].append(groups[3]) + + # React hooks detection (for React files) + if file_path.endswith(('.jsx', '.tsx')): + hook_pattern = r'use([A-Z]\w+)\s*\(' + for match in re.finditer(hook_pattern, code): + hook_name = 'use' + match.group(1) + results['hooks'].append(hook_name) + + # Export patterns + export_patterns = [ + # Named exports + r'export\s+(?:const|let|var|function|class)\s+(\w+)', + # Default exports + r'export\s+default\s+(?:function|class)?\s*(\w+)?' + ] + + for pattern in export_patterns: + for match in re.finditer(pattern, code): + if match.group(1): + results['exports'].append(match.group(1)) + + return results + # ... ( extract_code_summary, analyze_dependencies, create_dependency_graph, ...) + def extract_code_summary(self, file_content, file_path): + """Extract comprehensive summary information from code files.""" + extension = os.path.splitext(file_path)[1].lower() + + # Initialize summary + summary = { + "functions": [], + "classes": [], + "imports": [], + "description": "", + "complexity": None + } - # Analyze code patterns - self.analysis_data["code_patterns"] = self._analyze_code_patterns() + # Extract Python definitions with AST + if extension == '.py': + ast_result = self.analyze_ast(file_content, file_path) + if ast_result: + summary["functions"] = [f["name"] for f in ast_result["functions"]] + summary["classes"] = [c["name"] for c in ast_result["classes"]] + summary["imports"] = ast_result["imports"] + summary["complexity"] = ast_result["complexity"] - # Analyze commit history - self.analysis_data["commit_history"] = self._analyze_commits() + # Try to extract module docstring + try: + tree = ast.parse(file_content) + module_docstring = ast.get_docstring(tree) + if module_docstring: + summary["description"] = module_docstring + except: + pass + + # Add detailed function and class info + summary["detailed_functions"] = ast_result["functions"] + summary["detailed_classes"] = ast_result["classes"] + + # Extract JavaScript/TypeScript definitions + elif extension in ['.js', '.ts', '.jsx', '.tsx']: + js_result = self.analyze_js_ts(file_content, file_path) + if js_result: + summary["functions"] = [f["name"] for f in js_result["functions"]] + summary["classes"] = [c["name"] for c in js_result["classes"]] + summary["imports"] = js_result["imports"] + + # Add detailed function and class info + summary["detailed_functions"] = js_result["functions"] + summary["detailed_classes"] = js_result["classes"] + summary["hooks"] = js_result.get("hooks", []) + summary["exports"] = js_result.get("exports", []) + + # Calculate basic code metrics for any text file + if file_content: + lines = file_content.split('\n') + code_lines = 0 + comment_lines = 0 + blank_lines = 0 + + comment_prefixes = ['#', '//', '/*', '*', '