""" Utility functions for Salesforce B2B Commerce migration assistant. """ import re import json import logging from typing import Dict, List, Tuple, Optional, Any # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Validation schema for Apex code VALIDATION_SCHEMA = { "syntax_errors": ["missing_semicolon", "unclosed_bracket", "invalid_syntax"], "security_issues": ["soql_injection", "hardcoded_credentials", "unsafe_dml"], "performance_issues": ["governor_limits", "bulk_operations", "inefficient_queries"], "b2b_commerce_issues": ["deprecated_apis", "missing_null_checks", "incorrect_field_references"] } # B2B Commerce patterns for detection B2B_COMMERCE_PATTERNS = { "cloudcraze_reference": r"(ccrz__|E_[A-Z]|CloudCraze)", "trigger_pattern": r"trigger\s+\w+\s+on\s+\w+", "apex_class_pattern": r"(public|private|global)\s+class\s+\w+", "soql_pattern": r"SELECT\s+.+\s+FROM\s+\w+", "dml_pattern": r"(insert|update|delete|upsert)\s+\w+" } def validate_apex_syntax(code: str) -> Tuple[bool, List[Dict[str, Any]]]: """ Validate Apex code syntax and return issues found. Args: code: Apex code to validate Returns: Tuple of (is_valid, list_of_issues) """ issues = [] # Basic syntax checks if not code.strip(): issues.append({"type": "error", "message": "Empty code provided", "line": 0}) return False, issues # Check for balanced brackets brackets = {"(": ")", "{": "}", "[": "]"} stack = [] for i, char in enumerate(code): if char in brackets: stack.append((char, i)) elif char in brackets.values(): if not stack: issues.append({"type": "error", "message": f"Unmatched closing bracket '{char}'", "line": code[:i].count('\n') + 1}) else: open_char, _ = stack.pop() if brackets[open_char] != char: issues.append({"type": "error", "message": f"Mismatched bracket pair", "line": code[:i].count('\n') + 1}) # Check for unclosed brackets if stack: for char, pos in stack: issues.append({"type": "error", "message": f"Unclosed bracket '{char}'", "line": code[:pos].count('\n') + 1}) # Check for missing semicolons (basic check) lines = code.split('\n') for line_num, line in enumerate(lines, 1): stripped = line.strip() if stripped and not stripped.endswith((';', '{', '}', '//', '/*', '*/', '*')): if any(keyword in stripped for keyword in ['if', 'for', 'while', 'try', 'catch', 'class', 'trigger']): continue if re.search(r'\b(insert|update|delete|upsert|return)\b', stripped): issues.append({"type": "warning", "message": "Possible missing semicolon", "line": line_num}) is_valid = not any(issue["type"] == "error" for issue in issues) return is_valid, issues def perform_skeptical_evaluation(code: str, code_type: str = "trigger") -> Dict[str, List[str]]: """ Perform skeptical evaluation of code to find potential issues. Args: code: Code to evaluate code_type: Type of code (trigger, class, object) Returns: Dictionary with categorized issues """ evaluation = { "security_concerns": [], "performance_issues": [], "b2b_commerce_issues": [], "best_practice_violations": [] } # Security checks if re.search(r"String\.format|String\.valueOf.*user", code, re.IGNORECASE): evaluation["security_concerns"].append("Potential SOQL injection vulnerability") if re.search(r"password\s*=\s*['\"][^'\"]+['\"]", code, re.IGNORECASE): evaluation["security_concerns"].append("Hardcoded credentials detected") # Performance checks if re.search(r"for\s*\([^)]*:[^)]*\)\s*\{[^}]*\b(insert|update|delete|upsert)\b", code): evaluation["performance_issues"].append("DML operation inside loop - governor limit risk") if re.search(r"for\s*\([^)]*:[^)]*\)\s*\{[^}]*\bSELECT\b", code, re.IGNORECASE): evaluation["performance_issues"].append("SOQL query inside loop - governor limit risk") # B2B Commerce specific checks if re.search(B2B_COMMERCE_PATTERNS["cloudcraze_reference"], code): evaluation["b2b_commerce_issues"].append("CloudCraze references need migration to B2B LEX") if code_type == "trigger" and not re.search(r"Trigger\.(isInsert|isUpdate|isDelete)", code): evaluation["best_practice_violations"].append("Missing trigger context checks") return evaluation def extract_code_blocks(response: str) -> str: """ Extract code blocks from AI response. Args: response: AI model response Returns: Extracted code or empty string if none found """ # Look for code blocks with various delimiters patterns = [ r"```(?:apex|java|salesforce)?\s*(.*?)```", r"```\s*(.*?)```", r"(.*?)", r"`([^`]+)`" ] for pattern in patterns: matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) if matches: # Return the longest match (most likely to be the main code block) return max(matches, key=len).strip() return "" def format_structured_explanation(response: str, code_output: str) -> str: """ Format the AI response into a structured explanation. Args: response: Raw AI response code_output: Extracted code Returns: Formatted explanation """ explanation = "" # Extract key changes section key_changes_match = re.search(r"## KEY CHANGES.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) if key_changes_match: explanation += "**Key Changes:**\n" + key_changes_match.group(0).replace("## KEY CHANGES", "").strip() + "\n\n" # Extract critical issues section critical_issues_match = re.search(r"## CRITICAL ISSUES.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) if critical_issues_match: explanation += "**Critical Issues Fixed:**\n" + critical_issues_match.group(0).replace("## CRITICAL ISSUES", "").strip() + "\n\n" # Extract warnings section warnings_match = re.search(r"## (?:REMAINING )?WARNINGS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) if warnings_match: explanation += "**Remaining Warnings:**\n" + warnings_match.group(0).replace("## REMAINING WARNINGS", "").replace("## WARNINGS", "").strip() return explanation if explanation else "Analysis completed. See full response for details." def format_object_conversion_explanation(response: str, code_output: str) -> str: """ Format object conversion explanation. Args: response: Raw AI response code_output: Extracted code Returns: Formatted explanation """ explanation = "" # Extract mapping section mapping_match = re.search(r"## B2B LEX OBJECT MAPPING.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) if mapping_match: explanation += "**Object Mapping:**\n" + mapping_match.group(0).replace("## B2B LEX OBJECT MAPPING", "").strip() + "\n\n" # Extract field mappings field_mappings_match = re.search(r"## FIELD MAPPINGS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) if field_mappings_match: explanation += "**Field Mappings:**\n" + field_mappings_match.group(0).replace("## FIELD MAPPINGS", "").strip() + "\n\n" # Extract migration steps steps_match = re.search(r"## MIGRATION STEPS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) if steps_match: explanation += "**Migration Steps:**\n" + steps_match.group(0).replace("## MIGRATION STEPS", "").strip() return explanation if explanation else "Conversion completed. See full response for details." def extract_validation_metrics(validation_text: str) -> Optional[Dict[str, float]]: """ Extract validation metrics from AI response. Args: validation_text: AI validation response Returns: Dictionary of metrics or None if parsing fails """ try: # Look for JSON block in the response json_match = re.search(r"```json\s*(.*?)```", validation_text, re.DOTALL) if json_match: metrics_data = json.loads(json_match.group(1)) return { "quality_rating": float(metrics_data.get("quality_rating", 0)), "accuracy": float(metrics_data.get("accuracy", 0)), "completeness": float(metrics_data.get("completeness", 0)), "best_practices_alignment": float(metrics_data.get("best_practices_alignment", 0)), "syntax_validity": float(metrics_data.get("syntax_validity", 0)), "security_score": float(metrics_data.get("security_score", 0)), "performance_score": float(metrics_data.get("performance_score", 0)) } except (json.JSONDecodeError, ValueError, KeyError) as e: logger.warning(f"Failed to parse validation metrics: {e}") return None def normalize_metrics(metrics: Dict[str, float]) -> Dict[str, float]: """ Normalize metrics to 0-1 scale. Args: metrics: Raw metrics dictionary Returns: Normalized metrics """ normalized = {} for key, value in metrics.items(): if key == "quality_rating": normalized[key] = value / 10.0 # Convert 1-10 scale to 0-1 else: normalized[key] = max(0.0, min(1.0, value)) # Clamp to 0-1 return normalized def generate_test_cases(code: str, code_type: str = "trigger") -> str: """ Generate basic test case templates for the given code. Args: code: Code to generate tests for code_type: Type of code (trigger, class, object) Returns: Test case template as string """ if code_type == "trigger": # Extract trigger name and object trigger_match = re.search(r"trigger\s+(\w+)\s+on\s+(\w+)", code, re.IGNORECASE) if trigger_match: trigger_name = trigger_match.group(1) sobject_name = trigger_match.group(2) return f"""@isTest public class {trigger_name}Test {{ @isTest static void testInsert() {{ // Test insert scenario {sobject_name} testRecord = new {sobject_name}(); // Set required fields Test.startTest(); insert testRecord; Test.stopTest(); // Assert expected behavior }} @isTest static void testUpdate() {{ // Test update scenario {sobject_name} testRecord = new {sobject_name}(); // Set required fields insert testRecord; Test.startTest(); // Modify fields update testRecord; Test.stopTest(); // Assert expected behavior }} }}""" return "// Test case template not available for this code type"