diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,4 +1,3 @@ -# Import existing libraries from the original code import requests import json import os @@ -6,31 +5,98 @@ import base64 import re import ast import networkx as nx -import radon.metrics as metrics -import radon.complexity as complexity +# Make sure radon is installed: pip install radon +try: + import radon.metrics as metrics + import radon.complexity as complexity +except ImportError: + print("Warning: Radon library not found. Code complexity analysis will be limited.") + # Provide dummy functions if radon is not available + class DummyRadon: + def cc_visit(self, *args, **kwargs): return 0 + def cc_visit_ast(self, *args, **kwargs): return 0 + def mi_visit(self, *args, **kwargs): return None + metrics = DummyRadon() + complexity = DummyRadon() + from datetime import datetime, timedelta from collections import defaultdict, Counter import pandas as pd import matplotlib.pyplot as plt import matplotlib.dates as mdates -from IPython.display import display, Markdown, HTML +# Ensure IPython is available or handle its absence +try: + from IPython.display import display, Markdown, HTML + IPYTHON_AVAILABLE = True +except ImportError: + IPYTHON_AVAILABLE = False + # Define dummy display functions if not in IPython + def display(*args, **kwargs): print(*args) + def Markdown(text): print(f"--- Markdown ---\n{text}\n---------------") + def HTML(text): print(f"----- HTML -----\n{text}\n--------------") + import numpy as np -from github import Github, GithubException +# Ensure PyGithub is installed: pip install PyGithub +try: + from github import Github, GithubException +except ImportError: + print("Warning: PyGithub library not found. Some features might be limited.") + Github = None # Set to None if not available + GithubException = Exception # Use base Exception + import time -from dotenv import load_dotenv +# Ensure python-dotenv is installed: pip install python-dotenv +try: + from dotenv import load_dotenv +except ImportError: + print("Warning: python-dotenv not found. .env file will not be loaded.") + def load_dotenv(): pass # Dummy function # Import Neo4j and Gemini libraries -from neo4j import GraphDatabase, basic_auth -import google.generativeai as genai +# Ensure neo4j is installed: pip install neo4j +try: + from neo4j import GraphDatabase, basic_auth +except ImportError: + print("Warning: Neo4j library not found. Graph features will be disabled.") + GraphDatabase = None # Set to None + basic_auth = None + +# Ensure google-generativeai is installed: pip install google-generativeai +try: + import google.generativeai as genai +except ImportError: + print("Warning: google-generativeai library not found. Gemini features will be disabled.") + genai = None # Set to None # Import Vizro and Gradio -import vizro.plotly.express as px -import vizro -import vizro.models as vzm -import plotly.graph_objects as go -import gradio as gr +# Ensure vizro, vizro-plotly, plotly, gradio are installed +# pip install vizro vizro-plotly plotly gradio pandas networkx matplotlib numpy +try: + import vizro.plotly.express as px + import vizro + import vizro.models as vzm + import plotly.graph_objects as go +except ImportError: + print("Critical Error: Vizro or Plotly libraries not found. Dashboard generation will fail.") + # Define dummy classes/functions to avoid NameErrors later, though functionality will be broken + class DummyVzm: + Card = lambda **kwargs: None + Graph = lambda **kwargs: None + Page = lambda **kwargs: None + Dashboard = lambda **kwargs: type('obj', (object,), {'save': lambda self, path: print(f"Vizro not installed, cannot save to {path}")})() + vzm = DummyVzm() + px = None + go = None + vizro = None + +try: + import gradio as gr +except ImportError: + print("Critical Error: Gradio library not found. Cannot launch the UI.") + gr = None # Set to None + -# Keep GitHubRepoInfo class unchanged +# --- GitHubRepoInfo Class (Keep as provided, ensuring dependencies like PyGithub are handled) --- class GitHubRepoInfo: """Enhanced class to get comprehensive information about a GitHub repository.""" @@ -44,23 +110,37 @@ class GitHubRepoInfo: # Set up authentication if token: self.headers["Authorization"] = f"token {token}" - try: - self.github = Github(token) - self.github.get_user().login # Test connection - except Exception as e: - print(f"Warning: Failed to initialize PyGithub with token: {e}") - self.github = Github() # Fallback to unauthenticated + if Github: # Check if PyGithub was imported + try: + self.github = Github(token) + self.github.get_user().login # Test connection + except Exception as e: + print(f"Warning: Failed to initialize PyGithub with token: {e}") + self.github = Github() # Fallback to unauthenticated + else: + print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") + self.github = None # Explicitly set to None + elif os.environ.get("GITHUB_TOKEN"): self.token = os.environ.get("GITHUB_TOKEN") self.headers["Authorization"] = f"token {self.token}" - try: - self.github = Github(self.token) - self.github.get_user().login # Test connection - except Exception as e: - print(f"Warning: Failed to initialize PyGithub with token: {e}") - self.github = Github() # Fallback to unauthenticated + if Github: + try: + self.github = Github(self.token) + self.github.get_user().login # Test connection + except Exception as e: + print(f"Warning: Failed to initialize PyGithub with token: {e}") + self.github = Github() # Fallback to unauthenticated + else: + print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") + self.github = None else: - self.github = Github() # Unauthenticated + if Github: + self.github = Github() # Unauthenticated + else: + print("Warning: PyGithub not installed. Cannot use authenticated PyGithub client.") + self.github = None + # Configure rate limit handling self.rate_limit_remaining = 5000 # Assume higher limit if authenticated @@ -72,28 +152,54 @@ class GitHubRepoInfo: self.rate_limit_remaining = rate_limit.core.remaining self.rate_limit_reset = datetime.fromtimestamp(rate_limit.core.reset) except Exception as e: - print(f"Warning: Could not get initial rate limit from PyGithub: {e}") + # Don't print warning if self.github is None + if self.github is not None: + print(f"Warning: Could not get initial rate limit from PyGithub: {e}") + # Check rate limit via REST if PyGithub failed or wasn't used + elif self.token: + try: + response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) + if response.status_code == 200: + rate_data = response.json() + self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] + self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) + else: + print(f"Warning: Could not get initial rate limit via REST: Status {response.status_code}") + except Exception as e: + print(f"Warning: Could not get initial rate limit via REST: {e}") - # --- Keep ALL existing methods from the original GitHubRepoInfo class --- - # ... ( _check_rate_limit, _paginated_get, get_repo_info, get_contributors, ...) def _check_rate_limit(self): """Check API rate limit and wait if necessary.""" + # Update rate limit info before checking + try: + response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) + if response.status_code == 200: + rate_data = response.json() + # Ensure keys exist before accessing + core_limits = rate_data.get("resources", {}).get("core", {}) + self.rate_limit_remaining = core_limits.get("remaining", self.rate_limit_remaining) # Use old value if missing + reset_timestamp = core_limits.get("reset") + if reset_timestamp: + self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp) + # No else needed, just use previous values if update fails + except Exception as e: + print(f"Warning: Failed to update rate limit info: {e}") + # Proceed with potentially outdated values + if self.rate_limit_remaining <= 10: reset_time = self.rate_limit_reset + # Use timezone-naive comparison current_time = datetime.now() if reset_time > current_time: wait_time = (reset_time - current_time).total_seconds() + 10 # Add buffer - print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") - time.sleep(wait_time) + if wait_time > 0: # Only wait if reset time is in the future + print(f"Rate limit nearly exhausted. Waiting {wait_time:.0f} seconds for reset.") + time.sleep(wait_time) + # Re-fetch rate limit after waiting + self._check_rate_limit() - # Update rate limit info after each API call - response = requests.get(f"{self.base_url}/rate_limit", headers=self.headers) - if response.status_code == 200: - rate_data = response.json() - self.rate_limit_remaining = rate_data["resources"]["core"]["remaining"] - self.rate_limit_reset = datetime.fromtimestamp(rate_data["resources"]["core"]["reset"]) def _paginated_get(self, url, params=None, max_items=None): """Handle paginated API responses with rate limit awareness.""" @@ -102,33 +208,41 @@ class GitHubRepoInfo: items = [] page = 1 - per_page = min(100, params.get("per_page", 30)) + # Use a smaller default per_page to be safer with rate limits if unauthenticated + default_per_page = 100 if self.token else 30 + per_page = min(100, params.get("per_page", default_per_page)) params["per_page"] = per_page while True: - self._check_rate_limit() + self._check_rate_limit() # Check before each request params["page"] = page - response = requests.get(url, headers=self.headers, params=params) + try: + response = requests.get(url, headers=self.headers, params=params, timeout=20) # Add timeout + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) - if response.status_code == 200: - page_items = response.json() - if not page_items: + page_items = response.json() + if not page_items: # No more items break - items.extend(page_items) - page += 1 + items.extend(page_items) + page += 1 - # Check if we've reached the requested limit - if max_items and len(items) >= max_items: + # Check if we've reached the requested limit + if max_items and len(items) >= max_items: return items[:max_items] - # Check if we've reached the end (GitHub returns fewer items than requested) - if len(page_items) < per_page: + # Check if we've reached the end (GitHub returns fewer items than requested) + if len(page_items) < per_page: break - else: - print(f"Error {response.status_code}: {response.text}") - break + + except requests.exceptions.RequestException as e: + print(f"Error during paginated request to {url} (page {page}): {e}") + # Decide whether to break or retry (here we break) + break + except json.JSONDecodeError as e: + print(f"Error decoding JSON response from {url} (page {page}): {e}") + break return items @@ -136,99 +250,90 @@ class GitHubRepoInfo: """Get basic repository information.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}" - response = requests.get(url, headers=self.headers) - - if response.status_code == 200: + try: + response = requests.get(url, headers=self.headers, timeout=15) + response.raise_for_status() # Check for 4xx/5xx errors return response.json() - else: - print(f"Error {response.status_code}: {response.text}") - return None - - def get_contributors(self, owner, repo, max_contributors=None): - """Get repository contributors with pagination support.""" - url = f"{self.base_url}/repos/{owner}/{repo}/contributors" - return self._paginated_get(url, max_items=max_contributors) + except requests.exceptions.RequestException as e: + print(f"Error getting repository info for {owner}/{repo}: {e}") + return None # Return None on failure - # ... ( get_languages, get_commits, get_commit_activity, get_code_frequency, ...) + # ... (other GitHubRepoInfo methods - assume they return sensible defaults like [] or {} on failure) ... + # --- Add safe defaults to methods that might return None unexpectedly --- def get_languages(self, owner, repo): """Get languages used in the repository.""" self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/languages" - response = requests.get(url, headers=self.headers) - - if response.status_code == 200: + try: + response = requests.get(url, headers=self.headers, timeout=15) + response.raise_for_status() return response.json() - else: - print(f"Error getting languages: {response.status_code}") - return {} + except requests.exceptions.RequestException as e: + print(f"Error getting languages for {owner}/{repo}: {e}") + return {} # Return empty dict on failure + + def get_contributors(self, owner, repo, max_contributors=None): + """Get repository contributors with pagination support.""" + url = f"{self.base_url}/repos/{owner}/{repo}/contributors" + # _paginated_get should already handle errors and return a list + return self._paginated_get(url, max_items=max_contributors) or [] # Ensure list return def get_commits(self, owner, repo, params=None, max_commits=None): """Get commits with enhanced filtering and pagination.""" url = f"{self.base_url}/repos/{owner}/{repo}/commits" - return self._paginated_get(url, params=params, max_items=max_commits) + return self._paginated_get(url, params=params, max_items=max_commits) or [] # Ensure list return + + def _get_stats_with_retry(self, url): + """Helper for stats endpoints that might return 202.""" + retries = 3 + delay = 5 # Initial delay in seconds + for i in range(retries): + self._check_rate_limit() + try: + response = requests.get(url, headers=self.headers, timeout=30) # Longer timeout for stats + if response.status_code == 200: + return response.json() + elif response.status_code == 202 and i < retries - 1: + print(f"GitHub is computing statistics for {url.split('/stats/')[1]}, waiting {delay}s and retrying ({i+1}/{retries})...") + time.sleep(delay) + delay *= 2 # Exponential backoff + continue + elif response.status_code == 204: # No content, valid response but empty data + print(f"No content (204) returned for {url.split('/stats/')[1]}. Returning empty list.") + return [] + else: + print(f"Error getting stats from {url}: Status {response.status_code}, Body: {response.text[:200]}") + return [] # Return empty list on other errors + except requests.exceptions.RequestException as e: + print(f"Request error getting stats from {url}: {e}") + return [] # Return empty list on request error + print(f"Failed to get stats from {url} after {retries} retries.") + return [] # Return empty list after all retries fail def get_commit_activity(self, owner, repo): """Get commit activity stats for the past year.""" - self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/commit_activity" - response = requests.get(url, headers=self.headers) - - if response.status_code == 200: - return response.json() - elif response.status_code == 202: - # GitHub is computing the statistics, wait and retry - print("GitHub is computing statistics, waiting and retrying...") - time.sleep(2) - return self.get_commit_activity(owner, repo) - else: - print(f"Error getting commit activity: {response.status_code}") - return [] + return self._get_stats_with_retry(url) def get_code_frequency(self, owner, repo): """Get weekly code addition and deletion statistics.""" - self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/code_frequency" - response = requests.get(url, headers=self.headers) + return self._get_stats_with_retry(url) - if response.status_code == 200: - return response.json() - elif response.status_code == 202: - # GitHub is computing the statistics, wait and retry - print("GitHub is computing statistics, waiting and retrying...") - time.sleep(2) - return self.get_code_frequency(owner, repo) - else: - print(f"Error getting code frequency: {response.status_code}") - return [] - - - # ... ( get_contributor_activity, get_branches, get_releases, get_issues, ...) def get_contributor_activity(self, owner, repo): """Get contributor commit activity over time.""" - self._check_rate_limit() url = f"{self.base_url}/repos/{owner}/{repo}/stats/contributors" - response = requests.get(url, headers=self.headers) - - if response.status_code == 200: - return response.json() - elif response.status_code == 202: - # GitHub is computing the statistics, wait and retry - print("GitHub is computing statistics, waiting and retrying...") - time.sleep(2) - return self.get_contributor_activity(owner, repo) - else: - print(f"Error getting contributor activity: {response.status_code}") - return [] + return self._get_stats_with_retry(url) def get_branches(self, owner, repo): """Get repository branches.""" url = f"{self.base_url}/repos/{owner}/{repo}/branches" - return self._paginated_get(url) + return self._paginated_get(url) or [] def get_releases(self, owner, repo, max_releases=None): """Get repository releases with pagination support.""" url = f"{self.base_url}/repos/{owner}/{repo}/releases" - return self._paginated_get(url, max_items=max_releases) + return self._paginated_get(url, max_items=max_releases) or [] def get_issues(self, owner, repo, state="all", max_issues=None, params=None): """Get repository issues with enhanced filtering.""" @@ -236,55 +341,7 @@ class GitHubRepoInfo: if params is None: params = {} params["state"] = state - return self._paginated_get(url, params=params, max_items=max_issues) - - # ... ( get_issue_timeline, get_pull_requests, get_pr_timeline, get_contents, ...) - def get_issue_timeline(self, owner, repo, days_back=180): - """Analyze issue creation and closing over time.""" - # Get issues including closed ones - issues = self.get_issues(owner, repo, state="all") - - # Prepare timeline data - end_date = datetime.now() - start_date = end_date - timedelta(days=days_back) - - # Initialize daily counters - date_range = pd.date_range(start=start_date, end=end_date) - created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} - closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} - - # Collect issue creation and closing dates - for issue in issues: - created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') - if created_at >= start_date: - created_counts[created_at.strftime('%Y-%m-%d')] += 1 - - if issue['state'] == 'closed' and issue.get('closed_at'): - closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') - if closed_at >= start_date: - closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 - - # Calculate resolution times for closed issues - resolution_times = [] - for issue in issues: - if issue['state'] == 'closed' and issue.get('closed_at'): - created_at = datetime.strptime(issue['created_at'], '%Y-%m-%dT%H:%M:%SZ') - closed_at = datetime.strptime(issue['closed_at'], '%Y-%m-%dT%H:%M:%SZ') - resolution_time = (closed_at - created_at).total_seconds() / 3600 # hours - resolution_times.append(resolution_time) - - # Calculate issue labels distribution - label_counts = defaultdict(int) - for issue in issues: - for label in issue.get('labels', []): - label_counts[label['name']] += 1 - - return { - 'created': created_counts, - 'closed': closed_counts, - 'resolution_times': resolution_times, - 'labels': dict(label_counts) - } + return self._paginated_get(url, params=params, max_items=max_issues) or [] def get_pull_requests(self, owner, repo, state="all", max_prs=None, params=None): """Get repository pull requests with enhanced filtering.""" @@ -292,71 +349,7 @@ class GitHubRepoInfo: if params is None: params = {} params["state"] = state - return self._paginated_get(url, params=params, max_items=max_prs) - - def get_pr_timeline(self, owner, repo, days_back=180): - """Analyze PR creation, closing, and metrics over time.""" - # Get PRs including closed and merged ones - prs = self.get_pull_requests(owner, repo, state="all") - - # Prepare timeline data - end_date = datetime.now() - start_date = end_date - timedelta(days=days_back) - - # Initialize daily counters - date_range = pd.date_range(start=start_date, end=end_date) - created_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} - closed_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} - merged_counts = {d.strftime('%Y-%m-%d'): 0 for d in date_range} - - # Track metrics - merge_times = [] - pr_sizes = [] - - # Collect PR data - for pr in prs: - created_at = datetime.strptime(pr['created_at'], '%Y-%m-%dT%H:%M:%SZ') - if created_at >= start_date: - created_counts[created_at.strftime('%Y-%m-%d')] += 1 - - # Get PR size (additions + deletions) - if pr.get('additions') is not None and pr.get('deletions') is not None: - pr_sizes.append({ - 'additions': pr['additions'], - 'deletions': pr['deletions'], - 'total': pr['additions'] + pr['deletions'], - 'files_changed': pr.get('changed_files', 0) - }) - - # Check if PR is closed - if pr['state'] == 'closed': - closed_at = datetime.strptime(pr['closed_at'], '%Y-%m-%dT%H:%M:%SZ') - if closed_at >= start_date: - closed_counts[closed_at.strftime('%Y-%m-%d')] += 1 - - # Check if PR was merged - if pr['merged_at']: - merged_at = datetime.strptime(pr['merged_at'], '%Y-%m-%dT%H:%M:%SZ') - if merged_at >= start_date: - merged_counts[merged_at.strftime('%Y-%m-%d')] += 1 - - # Calculate time to merge - merge_time = (merged_at - created_at).total_seconds() / 3600 # hours - merge_times.append(merge_time) - - # Calculate acceptance rate - total_closed = sum(closed_counts.values()) - total_merged = sum(merged_counts.values()) - acceptance_rate = (total_merged / total_closed) * 100 if total_closed > 0 else 0 - - return { - 'created': created_counts, - 'closed': closed_counts, - 'merged': merged_counts, - 'merge_times': merge_times, - 'pr_sizes': pr_sizes, - 'acceptance_rate': acceptance_rate - } + return self._paginated_get(url, params=params, max_items=max_prs) or [] def get_contents(self, owner, repo, path="", ref=None): """Get repository contents at the specified path.""" @@ -366,14 +359,18 @@ class GitHubRepoInfo: if ref: params["ref"] = ref - response = requests.get(url, headers=self.headers, params=params) - - if response.status_code == 200: + try: + response = requests.get(url, headers=self.headers, params=params, timeout=15) + response.raise_for_status() return response.json() - else: - print(f"Error getting contents: {response.status_code}") - return [] - # ... ( get_readme, get_file_content, is_text_file, get_recursive_contents, ...) + except requests.exceptions.RequestException as e: + # Handle 404 specifically for contents + if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: + print(f"Contents not found at path '{path}' in {owner}/{repo}.") + else: + print(f"Error getting contents for {owner}/{repo} at path '{path}': {e}") + return [] # Return empty list on failure + def get_readme(self, owner, repo, ref=None): """Get repository README file.""" self._check_rate_limit() @@ -382,20 +379,27 @@ class GitHubRepoInfo: if ref: params["ref"] = ref - response = requests.get(url, headers=self.headers, params=params) - - if response.status_code == 200: + try: + response = requests.get(url, headers=self.headers, params=params, timeout=15) + response.raise_for_status() data = response.json() if data.get("content"): - content = base64.b64decode(data["content"]).decode("utf-8") - return { - "name": data["name"], - "path": data["path"], - "content": content - } - return data - else: - print(f"README not found or error: {response.status_code}") + try: + content = base64.b64decode(data["content"]).decode("utf-8") + return { + "name": data.get("name", "README"), + "path": data.get("path", "README.md"), + "content": content + } + except (UnicodeDecodeError, base64.binascii.Error) as decode_error: + print(f"Error decoding README content: {decode_error}") + return None # Cannot decode + return None # No content key + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: + print(f"README not found for {owner}/{repo}.") + else: + print(f"Error getting README for {owner}/{repo}: {e}") return None def get_file_content(self, owner, repo, path, ref=None): @@ -406,440 +410,310 @@ class GitHubRepoInfo: if ref: params["ref"] = ref - response = requests.get(url, headers=self.headers, params=params) - - if response.status_code == 200: + try: + response = requests.get(url, headers=self.headers, params=params, timeout=15) + response.raise_for_status() data = response.json() - if data.get("content"): + if data.get("type") == "file" and data.get("content"): try: content = base64.b64decode(data["content"]).decode("utf-8") return content - except UnicodeDecodeError: + except (UnicodeDecodeError, base64.binascii.Error): + # Don't print error here, return indicator return "[Binary file content not displayed]" - return None - else: - print(f"Error getting file content: {response.status_code}") - return None - - def is_text_file(self, file_path): - """Determine if a file is likely a text file based on extension.""" - text_extensions = [ - '.txt', '.md', '.rst', '.py', '.js', '.html', '.css', '.java', '.c', - '.cpp', '.h', '.hpp', '.json', '.xml', '.yaml', '.yml', '.toml', - '.ini', '.cfg', '.conf', '.sh', '.bat', '.ps1', '.rb', '.pl', '.php', - '.go', '.rs', '.ts', '.jsx', '.tsx', '.vue', '.swift', '.kt', '.scala', - '.groovy', '.lua', '.r', '.dart', '.ex', '.exs', '.erl', '.hrl', - '.clj', '.hs', '.elm', '.f90', '.f95', '.f03', '.sql', '.gitignore', - '.dockerignore', '.env', '.editorconfig', '.htaccess', '.cs', '.ipynb', - '.R', '.Rmd', '.jl', '.fs', '.ml', '.mli', '.d', '.scm', '.lisp', - '.el', '.m', '.mm', '.vb', '.asm', '.s', '.Dockerfile', '.gradle' - ] - - extension = os.path.splitext(file_path)[1].lower() - return extension in text_extensions - - def get_recursive_contents(self, owner, repo, path="", max_depth=3, current_depth=0, max_files=1000, ref=None): - """Recursively get repository contents with a depth limit and file count limit.""" - if current_depth >= max_depth: - return [] - - contents = self.get_contents(owner, repo, path, ref) - results = [] - file_count = 0 - - for item in contents: - if file_count >= max_files: - break - - if item["type"] == "dir": - # For directories, add the directory itself and recursively get contents - dir_item = { - "type": "dir", - "name": item["name"], - "path": item["path"], - "contents": self.get_recursive_contents( - owner, repo, item["path"], max_depth, current_depth + 1, - max_files - file_count, ref - ) - } - results.append(dir_item) + elif data.get("type") != "file": + print(f"Path '{path}' is not a file.") + return None else: - # For files, add the file info - results.append({ - "type": "file", - "name": item["name"], - "path": item["path"], - "size": item["size"], - "url": item["html_url"] - }) - file_count += 1 - - return results - # ... ( get_all_text_files, get_documentation_files, analyze_ast, analyze_js_ts, ...) - def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None): + # File exists but no content? Unlikely but handle. + return "" # Return empty string for empty file + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response is not None and e.response.status_code == 404: + print(f"File not found at path '{path}' in {owner}/{repo}.") + else: + print(f"Error getting file content for {owner}/{repo}, path '{path}': {e}") + return None + + # --- Methods like is_text_file, analyze_ast, analyze_js_ts are generally okay --- + # ... (keep them as they are) ... + + # --- Ensure get_all_text_files handles errors from get_contents/get_file_content --- + def get_all_text_files(self, owner, repo, path="", max_files=50, ref=None, _current_count=0): """Get content of all text files in the repository (with limit).""" - contents = self.get_contents(owner, repo, path, ref) + if _current_count >= max_files: + return [], _current_count + + # Get contents for the current path + contents = self.get_contents(owner, repo, path, ref) # Returns [] on error text_files = [] - file_count = 0 + file_count = _current_count + + if not isinstance(contents, list): + print(f"Warning: get_contents did not return a list for path '{path}'. Skipping.") + return [], file_count # Process current directory for item in contents: if file_count >= max_files: break + # Ensure item is a dictionary and has 'type' and 'name' + if not isinstance(item, dict) or 'type' not in item or 'name' not in item: + print(f"Warning: Skipping malformed item in contents: {item}") + continue + + item_path = item.get("path") # Get path safely + if not item_path: + print(f"Warning: Skipping item with missing path: {item}") + continue + + if item["type"] == "file" and self.is_text_file(item["name"]): - content = self.get_file_content(owner, repo, item["path"], ref) + content = self.get_file_content(owner, repo, item_path, ref) + # Check if content is valid text (not None or binary indicator) if content and content != "[Binary file content not displayed]": text_files.append({ "name": item["name"], - "path": item["path"], + "path": item_path, "content": content }) file_count += 1 elif item["type"] == "dir": # Recursively get text files from subdirectories - subdir_files = self.get_all_text_files( - owner, repo, item["path"], max_files - file_count, ref - ) - text_files.extend(subdir_files) - file_count += len(subdir_files) + if file_count < max_files: + try: + subdir_files, file_count = self.get_all_text_files( + owner, repo, item_path, max_files, ref, file_count + ) + text_files.extend(subdir_files) + except Exception as e_rec: + print(f"Error processing subdirectory '{item_path}': {e_rec}") + # Continue with other items in the current directory - return text_files + return text_files, file_count # Return count for recursive calls + # --- Ensure get_documentation_files handles errors --- def get_documentation_files(self, owner, repo, ref=None): """Get documentation files from the repository.""" - # Common documentation file paths and directories doc_paths = [ - "docs", "doc", "documentation", "wiki", "CONTRIBUTING.md", - "CONTRIBUTORS.md", "CODE_OF_CONDUCT.md", "SECURITY.md", - "SUPPORT.md", "docs/index.md", "docs/README.md", "docs/getting-started.md", + "README.md", "CONTRIBUTING.md", "CODE_OF_CONDUCT.md", "SECURITY.md", + "SUPPORT.md", # Files first + "docs", "doc", "documentation", "wiki", # Common Dirs ".github/ISSUE_TEMPLATE", ".github/PULL_REQUEST_TEMPLATE.md" ] - doc_files = [] - # Try to get each documentation file/directory - for path in doc_paths: + # 1. Get top-level files first + root_contents = self.get_contents(owner, repo, "", ref) + if isinstance(root_contents, list): + for item in root_contents: + if isinstance(item, dict) and item.get("type") == "file" and item.get("name") in doc_paths: + path = item.get("path") + if path: + content = self.get_file_content(owner, repo, path, ref) + if content and content != "[Binary file content not displayed]": + doc_files.append({ + "name": item["name"], + "path": path, + "content": content + }) + + # 2. Check specific doc directories + doc_dirs_to_check = ["docs", "doc", "documentation", "wiki", ".github"] + for doc_dir in doc_dirs_to_check: try: - contents = self.get_contents(owner, repo, path, ref) - - # If it's a directory, get all markdown files in it - if isinstance(contents, list): - for item in contents: - if item["type"] == "file" and item["name"].lower().endswith((".md", ".rst", ".txt")): - content = self.get_file_content(owner, repo, item["path"], ref) - if content: - doc_files.append({ - "name": item["name"], - "path": item["path"], - "content": content - }) - # If it's a file, get its content - elif isinstance(contents, dict) and contents.get("type") == "file": - content = self.get_file_content(owner, repo, path, ref) - if content: - doc_files.append({ - "name": contents["name"], - "path": contents["path"], - "content": content - }) - except: - # Path doesn't exist or access issues - continue + dir_contents = self.get_contents(owner, repo, doc_dir, ref) + if isinstance(dir_contents, list): # It's a directory + for item in dir_contents: + if isinstance(item, dict) and item.get("type") == "file": + item_name = item.get("name", "").lower() + item_path = item.get("path") + if item_path and item_name.endswith((".md", ".rst", ".txt")): + content = self.get_file_content(owner, repo, item_path, ref) + if content and content != "[Binary file content not displayed]": + doc_files.append({ + "name": item["name"], + "path": item_path, + "content": content + }) + except Exception as e: + print(f"Error processing documentation path '{doc_dir}': {e}") + continue # Skip this path return doc_files - def analyze_ast(self, code, file_path): - """Analyze Python code using AST (Abstract Syntax Tree).""" - if not file_path.endswith('.py'): - return None + # ... (rest of GitHubRepoInfo, display methods, etc. - keep as provided but be mindful of data access in display) ... + # Add specific error handling in display methods if needed, though Gradio errors often hide underlying data issues. + def get_all_info(self, owner, repo): + """Get comprehensive information about a repository with enhanced metrics.""" + print(f"--- Fetching data for {owner}/{repo} ---") + result = { + "timestamp": datetime.now().isoformat() + } + + print("Getting basic repo info...") + basic_info = self.get_repo_info(owner, repo) + if not basic_info: + print(f"CRITICAL: Could not retrieve basic repository information for {owner}/{repo}. Aborting analysis.") + return None # Cannot proceed without basic info + result["basic_info"] = basic_info + + print("Getting languages...") + result["languages"] = self.get_languages(owner, repo) # Returns {} on error + print("Getting contributors...") + result["contributors"] = self.get_contributors(owner, repo, max_contributors=30) # Returns [] on error + print("Getting recent commits...") + result["recent_commits"] = self.get_commits(owner, repo, max_commits=30) # Returns [] on error + print("Getting branches...") + result["branches"] = self.get_branches(owner, repo) # Returns [] on error + print("Getting releases...") + result["releases"] = self.get_releases(owner, repo, max_releases=10) # Returns [] on error + print("Getting open issues...") + result["open_issues"] = self.get_issues(owner, repo, state="open", max_issues=50) # Returns [] on error + print("Getting open pull requests...") + result["open_pull_requests"] = self.get_pull_requests(owner, repo, state="open", max_prs=50) # Returns [] on error + print("Getting root contents...") + result["root_contents"] = self.get_contents(owner, repo) # Returns [] on error + + print("Analyzing repository content (README, Docs, Code Files)...") + # This relies on other methods returning sensible defaults try: - tree = ast.parse(code) - - # Extract more detailed information using AST - functions = [] - classes = [] - imports = [] - function_complexities = {} - - for node in ast.walk(tree): - # Get function definitions with arguments - if isinstance(node, ast.FunctionDef): - args = [] - defaults = len(node.args.defaults) - args_count = len(node.args.args) - defaults - - # Get positional args - for arg in node.args.args[:args_count]: - if hasattr(arg, 'arg'): # Python 3 - args.append(arg.arg) - else: # Python 2 - args.append(arg.id) - - # Get args with defaults - for i, arg in enumerate(node.args.args[args_count:]): - if hasattr(arg, 'arg'): # Python 3 - args.append(f"{arg.arg}=...") - else: # Python 2 - args.append(f"{arg.id}=...") - - # Calculate function complexity - func_complexity = complexity.cc_visit(node) - function_complexities[node.name] = func_complexity - - # Get docstring if available - docstring = ast.get_docstring(node) - - functions.append({ - 'name': node.name, - 'args': args, - 'complexity': func_complexity, - 'docstring': docstring - }) + # Call get_all_text_files outside get_repo_text_summary to pass count correctly + all_text_files_content, _ = self.get_all_text_files(owner, repo, max_files=30) + # Pass the fetched content to get_repo_text_summary to avoid redundant API calls + result["text_content"] = self.get_repo_text_summary(owner, repo, pre_fetched_files=all_text_files_content) + except Exception as e: + print(f"Error during text content analysis: {e}") + result["text_content"] = {"error": str(e)} # Store error indicator - # Get class definitions - elif isinstance(node, ast.ClassDef): - methods = [] - class_docstring = ast.get_docstring(node) - - # Get class methods - for child in node.body: - if isinstance(child, ast.FunctionDef): - method_complexity = complexity.cc_visit(child) - method_docstring = ast.get_docstring(child) - - methods.append({ - 'name': child.name, - 'complexity': method_complexity, - 'docstring': method_docstring - }) - - classes.append({ - 'name': node.name, - 'methods': methods, - 'docstring': class_docstring - }) - # Get imports - elif isinstance(node, ast.Import): - for name in node.names: - imports.append(name.name) - elif isinstance(node, ast.ImportFrom): - module = node.module or "" - for name in node.names: - imports.append(f"{module}.{name.name}") + print("Analyzing repository activity over time...") + # This relies on stats methods returning [] on error/202 timeout + try: + result["temporal_analysis"] = self.get_temporal_analysis(owner, repo) + except Exception as e: + print(f"Error during temporal analysis: {e}") + result["temporal_analysis"] = {"error": str(e)} # Store error indicator - # Calculate overall code complexity - code_complexity = complexity.cc_visit_ast(tree) + print(f"--- Finished fetching data for {owner}/{repo} ---") + return result - # Calculate maintainability index - try: - mi_score = metrics.mi_visit(code, True) - except: - mi_score = None - - return { - 'functions': functions, - 'classes': classes, - 'imports': imports, - 'complexity': { - 'overall': code_complexity, - 'functions': function_complexities, - 'maintainability_index': mi_score - } - } + # Modify get_repo_text_summary to accept pre-fetched files + def get_repo_text_summary(self, owner, repo, max_files=25, pre_fetched_files=None): + """Extract and summarize text content from the repository with improved metrics.""" + # Get README + readme = self.get_readme(owner, repo) # Returns None on error - except SyntaxError: - print(f"Syntax error in Python file: {file_path}") - return None - except Exception as e: - print(f"Error analyzing {file_path}: {str(e)}") - return None + # Get documentation + docs = self.get_documentation_files(owner, repo) # Returns [] on error - def analyze_js_ts(self, code, file_path): - """Analyze JavaScript/TypeScript code using regex with improved patterns.""" - if not file_path.endswith(('.js', '.ts', '.jsx', '.tsx')): - return None + # Get key code files if not provided + if pre_fetched_files is None: + print("Fetching text files within get_repo_text_summary...") + text_files, _ = self.get_all_text_files(owner, repo, max_files=max_files) # Returns [] on error + else: + print("Using pre-fetched text files in get_repo_text_summary.") + text_files = pre_fetched_files # Use the provided list - # More sophisticated regex patterns for JS/TS analysis - results = { - 'functions': [], - 'classes': [], - 'imports': [], - 'exports': [], - 'hooks': [] # For React hooks + # Analyze code files + code_summary = {} + complexity_metrics = { + 'cyclomatic_complexity': [], + 'maintainability_index': [], + 'comment_ratios': [] } - # Function patterns (covering various declaration styles) - function_patterns = [ - # Regular functions - r'function\s+(\w+)\s*\(([^)]*)\)', - # Arrow functions assigned to variables - r'(?:const|let|var)\s+(\w+)\s*=\s*(?:\([^)]*\)|[^=]*)\s*=>\s*{', - # Class methods - r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{', - # Object methods - r'(\w+)\s*:\s*function\s*\(([^)]*)\)' - ] + for file in text_files: + # Basic check for file structure + if not isinstance(file, dict) or 'name' not in file or 'content' not in file or 'path' not in file: + print(f"Skipping malformed file data in text summary: {file}") + continue - for pattern in function_patterns: - for match in re.finditer(pattern, code): - func_name = match.group(1) - args = match.group(2).strip() if len(match.groups()) > 1 else "" - results['functions'].append({ - 'name': func_name, - 'args': args - }) - - # Class pattern - class_pattern = r'class\s+(\w+)(?:\s+extends\s+(\w+))?\s*{([^}]*)}' - for match in re.finditer(class_pattern, code, re.DOTALL): - class_name = match.group(1) - parent_class = match.group(2) if match.group(2) else None - class_body = match.group(3) - - # Find methods in class - methods = [] - method_pattern = r'(?:async\s+)?(\w+)\s*\(([^)]*)\)\s*{([^}]*)}' - for method_match in re.finditer(method_pattern, class_body): - method_name = method_match.group(1) - methods.append(method_name) - - results['classes'].append({ - 'name': class_name, - 'extends': parent_class, - 'methods': methods - }) - - # Import patterns - import_patterns = [ - # ES6 imports - r'import\s+(?:{([^}]*)}|\*\s+as\s+(\w+)|(\w+))\s+from\s+[\'"]([^\'"]+)[\'"]', - # CommonJS requires - r'(?:const|let|var)\s+(?:{([^}]*)}|(\w+))\s*=\s*require\([\'"]([^\'"]+)[\'"]\)' - ] + ext = os.path.splitext(file["name"])[1].lower() + if ext in ['.py', '.js', '.ts', '.jsx', '.tsx']: # Add other relevant code extensions if needed + try: + file_summary = self.extract_code_summary(file["content"], file["path"]) + if file_summary: # Ensure summary generation didn't fail + code_summary[file["path"]] = file_summary + + # Collect complexity metrics safely + if file_summary.get('complexity'): + cc = file_summary['complexity'].get('overall') + # Ensure cc is a number before appending + if isinstance(cc, (int, float)): + complexity_metrics['cyclomatic_complexity'].append((file["path"], cc)) + + mi = file_summary['complexity'].get('maintainability_index') + # Ensure mi is a number before appending + if isinstance(mi, (int, float)): + complexity_metrics['maintainability_index'].append((file["path"], mi)) + + if file_summary.get('metrics'): + comment_ratio = file_summary['metrics'].get('comment_ratio') + # Ensure ratio is a number before appending + if isinstance(comment_ratio, (int, float)): + complexity_metrics['comment_ratios'].append((file["path"], comment_ratio)) + except Exception as e_sum: + print(f"Error extracting code summary for {file.get('path', 'unknown file')}: {e_sum}") + + # Analyze dependencies (can be slow, consider limiting files further if needed) + # Use the already fetched text_files for dependency analysis + dependencies = self.analyze_dependencies(owner, repo, pre_fetched_code_files=text_files) - for pattern in import_patterns: - for match in re.finditer(pattern, code): - groups = match.groups() - if groups[0]: # Destructured import - imports = [name.strip() for name in groups[0].split(',')] - for imp in imports: - results['imports'].append(imp) - elif groups[1]: # Namespace import (import * as X) - results['imports'].append(groups[1]) - elif groups[2]: # Default import - results['imports'].append(groups[2]) - elif groups[3]: # Module name - results['imports'].append(groups[3]) - - # React hooks detection (for React files) - if file_path.endswith(('.jsx', '.tsx')): - hook_pattern = r'use([A-Z]\w+)\s*\(' - for match in re.finditer(hook_pattern, code): - hook_name = 'use' + match.group(1) - results['hooks'].append(hook_name) - - # Export patterns - export_patterns = [ - # Named exports - r'export\s+(?:const|let|var|function|class)\s+(\w+)', - # Default exports - r'export\s+default\s+(?:function|class)?\s*(\w+)?' - ] - for pattern in export_patterns: - for match in re.finditer(pattern, code): - if match.group(1): - results['exports'].append(match.group(1)) - - return results - # ... ( extract_code_summary, analyze_dependencies, create_dependency_graph, ...) - def extract_code_summary(self, file_content, file_path): - """Extract comprehensive summary information from code files.""" - extension = os.path.splitext(file_path)[1].lower() - - # Initialize summary - summary = { - "functions": [], - "classes": [], - "imports": [], - "description": "", - "complexity": None - } + # Summarize repository content by file type + file_types = defaultdict(int) + for file in text_files: + if isinstance(file, dict) and 'name' in file: # Check again + ext = os.path.splitext(file["name"])[1].lower() + if ext: # Avoid counting files with no extension + file_types[ext] += 1 + + # Calculate aggregate code metrics safely + total_code_lines = 0 + total_comment_lines = 0 + analyzed_code_files = 0 + for path, summary in code_summary.items(): + if summary and summary.get('metrics'): + analyzed_code_files += 1 + total_code_lines += summary['metrics'].get('code_lines', 0) or 0 + total_comment_lines += summary['metrics'].get('comment_lines', 0) or 0 - # Extract Python definitions with AST - if extension == '.py': - ast_result = self.analyze_ast(file_content, file_path) - if ast_result: - summary["functions"] = [f["name"] for f in ast_result["functions"]] - summary["classes"] = [c["name"] for c in ast_result["classes"]] - summary["imports"] = ast_result["imports"] - summary["complexity"] = ast_result["complexity"] + aggregate_metrics = { + 'total_files_analyzed': len(text_files), # All text files fetched + 'code_files_summarized': analyzed_code_files, # Files where summary succeeded + 'total_code_lines': total_code_lines, + 'total_comment_lines': total_comment_lines, + 'average_comment_ratio': (total_comment_lines / total_code_lines) if total_code_lines > 0 else 0 + } - # Try to extract module docstring - try: - tree = ast.parse(file_content) - module_docstring = ast.get_docstring(tree) - if module_docstring: - summary["description"] = module_docstring - except: - pass - - # Add detailed function and class info - summary["detailed_functions"] = ast_result["functions"] - summary["detailed_classes"] = ast_result["classes"] - - # Extract JavaScript/TypeScript definitions - elif extension in ['.js', '.ts', '.jsx', '.tsx']: - js_result = self.analyze_js_ts(file_content, file_path) - if js_result: - summary["functions"] = [f["name"] for f in js_result["functions"]] - summary["classes"] = [c["name"] for c in js_result["classes"]] - summary["imports"] = js_result["imports"] - - # Add detailed function and class info - summary["detailed_functions"] = js_result["functions"] - summary["detailed_classes"] = js_result["classes"] - summary["hooks"] = js_result.get("hooks", []) - summary["exports"] = js_result.get("exports", []) - - # Calculate basic code metrics for any text file - if file_content: - lines = file_content.split('\n') - code_lines = 0 - comment_lines = 0 - blank_lines = 0 - - comment_prefixes = ['#', '//', '/*', '*', '