|
""" |
|
Utility functions for Salesforce B2B Commerce migration assistant. |
|
""" |
|
|
|
import re |
|
import json |
|
import logging |
|
from typing import Dict, List, Tuple, Optional, Any |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
VALIDATION_SCHEMA = { |
|
"syntax_errors": ["missing_semicolon", "unclosed_bracket", "invalid_syntax"], |
|
"security_issues": ["soql_injection", "hardcoded_credentials", "unsafe_dml"], |
|
"performance_issues": ["governor_limits", "bulk_operations", "inefficient_queries"], |
|
"b2b_commerce_issues": ["deprecated_apis", "missing_null_checks", "incorrect_field_references"] |
|
} |
|
|
|
|
|
B2B_COMMERCE_PATTERNS = { |
|
"cloudcraze_reference": r"(ccrz__|E_[A-Z]|CloudCraze)", |
|
"trigger_pattern": r"trigger\s+\w+\s+on\s+\w+", |
|
"apex_class_pattern": r"(public|private|global)\s+class\s+\w+", |
|
"soql_pattern": r"SELECT\s+.+\s+FROM\s+\w+", |
|
"dml_pattern": r"(insert|update|delete|upsert)\s+\w+" |
|
} |
|
|
|
def validate_apex_syntax(code: str) -> Tuple[bool, List[Dict[str, Any]]]: |
|
""" |
|
Validate Apex code syntax and return issues found. |
|
|
|
Args: |
|
code: Apex code to validate |
|
|
|
Returns: |
|
Tuple of (is_valid, list_of_issues) |
|
""" |
|
issues = [] |
|
|
|
|
|
if not code.strip(): |
|
issues.append({"type": "error", "message": "Empty code provided", "line": 0}) |
|
return False, issues |
|
|
|
|
|
brackets = {"(": ")", "{": "}", "[": "]"} |
|
stack = [] |
|
|
|
for i, char in enumerate(code): |
|
if char in brackets: |
|
stack.append((char, i)) |
|
elif char in brackets.values(): |
|
if not stack: |
|
issues.append({"type": "error", "message": f"Unmatched closing bracket '{char}'", "line": code[:i].count('\n') + 1}) |
|
else: |
|
open_char, _ = stack.pop() |
|
if brackets[open_char] != char: |
|
issues.append({"type": "error", "message": f"Mismatched bracket pair", "line": code[:i].count('\n') + 1}) |
|
|
|
|
|
if stack: |
|
for char, pos in stack: |
|
issues.append({"type": "error", "message": f"Unclosed bracket '{char}'", "line": code[:pos].count('\n') + 1}) |
|
|
|
|
|
lines = code.split('\n') |
|
for line_num, line in enumerate(lines, 1): |
|
stripped = line.strip() |
|
if stripped and not stripped.endswith((';', '{', '}', '//', '/*', '*/', '*')): |
|
if any(keyword in stripped for keyword in ['if', 'for', 'while', 'try', 'catch', 'class', 'trigger']): |
|
continue |
|
if re.search(r'\b(insert|update|delete|upsert|return)\b', stripped): |
|
issues.append({"type": "warning", "message": "Possible missing semicolon", "line": line_num}) |
|
|
|
is_valid = not any(issue["type"] == "error" for issue in issues) |
|
return is_valid, issues |
|
|
|
def perform_skeptical_evaluation(code: str, code_type: str = "trigger") -> Dict[str, List[str]]: |
|
""" |
|
Perform skeptical evaluation of code to find potential issues. |
|
|
|
Args: |
|
code: Code to evaluate |
|
code_type: Type of code (trigger, class, object) |
|
|
|
Returns: |
|
Dictionary with categorized issues |
|
""" |
|
evaluation = { |
|
"security_concerns": [], |
|
"performance_issues": [], |
|
"b2b_commerce_issues": [], |
|
"best_practice_violations": [] |
|
} |
|
|
|
|
|
if re.search(r"String\.format|String\.valueOf.*user", code, re.IGNORECASE): |
|
evaluation["security_concerns"].append("Potential SOQL injection vulnerability") |
|
|
|
if re.search(r"password\s*=\s*['\"][^'\"]+['\"]", code, re.IGNORECASE): |
|
evaluation["security_concerns"].append("Hardcoded credentials detected") |
|
|
|
|
|
if re.search(r"for\s*\([^)]*:[^)]*\)\s*\{[^}]*\b(insert|update|delete|upsert)\b", code): |
|
evaluation["performance_issues"].append("DML operation inside loop - governor limit risk") |
|
|
|
if re.search(r"for\s*\([^)]*:[^)]*\)\s*\{[^}]*\bSELECT\b", code, re.IGNORECASE): |
|
evaluation["performance_issues"].append("SOQL query inside loop - governor limit risk") |
|
|
|
|
|
if re.search(B2B_COMMERCE_PATTERNS["cloudcraze_reference"], code): |
|
evaluation["b2b_commerce_issues"].append("CloudCraze references need migration to B2B LEX") |
|
|
|
if code_type == "trigger" and not re.search(r"Trigger\.(isInsert|isUpdate|isDelete)", code): |
|
evaluation["best_practice_violations"].append("Missing trigger context checks") |
|
|
|
return evaluation |
|
|
|
def extract_code_blocks(response: str) -> str: |
|
""" |
|
Extract code blocks from AI response. |
|
|
|
Args: |
|
response: AI model response |
|
|
|
Returns: |
|
Extracted code or empty string if none found |
|
""" |
|
|
|
patterns = [ |
|
r"```(?:apex|java|salesforce)?\s*(.*?)```", |
|
r"```\s*(.*?)```", |
|
r"<code>(.*?)</code>", |
|
r"`([^`]+)`" |
|
] |
|
|
|
for pattern in patterns: |
|
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE) |
|
if matches: |
|
|
|
return max(matches, key=len).strip() |
|
|
|
return "" |
|
|
|
def format_structured_explanation(response: str, code_output: str) -> str: |
|
""" |
|
Format the AI response into a structured explanation. |
|
|
|
Args: |
|
response: Raw AI response |
|
code_output: Extracted code |
|
|
|
Returns: |
|
Formatted explanation |
|
""" |
|
explanation = "" |
|
|
|
|
|
key_changes_match = re.search(r"## KEY CHANGES.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) |
|
if key_changes_match: |
|
explanation += "**Key Changes:**\n" + key_changes_match.group(0).replace("## KEY CHANGES", "").strip() + "\n\n" |
|
|
|
|
|
critical_issues_match = re.search(r"## CRITICAL ISSUES.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) |
|
if critical_issues_match: |
|
explanation += "**Critical Issues Fixed:**\n" + critical_issues_match.group(0).replace("## CRITICAL ISSUES", "").strip() + "\n\n" |
|
|
|
|
|
warnings_match = re.search(r"## (?:REMAINING )?WARNINGS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) |
|
if warnings_match: |
|
explanation += "**Remaining Warnings:**\n" + warnings_match.group(0).replace("## REMAINING WARNINGS", "").replace("## WARNINGS", "").strip() |
|
|
|
return explanation if explanation else "Analysis completed. See full response for details." |
|
|
|
def format_object_conversion_explanation(response: str, code_output: str) -> str: |
|
""" |
|
Format object conversion explanation. |
|
|
|
Args: |
|
response: Raw AI response |
|
code_output: Extracted code |
|
|
|
Returns: |
|
Formatted explanation |
|
""" |
|
explanation = "" |
|
|
|
|
|
mapping_match = re.search(r"## B2B LEX OBJECT MAPPING.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) |
|
if mapping_match: |
|
explanation += "**Object Mapping:**\n" + mapping_match.group(0).replace("## B2B LEX OBJECT MAPPING", "").strip() + "\n\n" |
|
|
|
|
|
field_mappings_match = re.search(r"## FIELD MAPPINGS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) |
|
if field_mappings_match: |
|
explanation += "**Field Mappings:**\n" + field_mappings_match.group(0).replace("## FIELD MAPPINGS", "").strip() + "\n\n" |
|
|
|
|
|
steps_match = re.search(r"## MIGRATION STEPS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE) |
|
if steps_match: |
|
explanation += "**Migration Steps:**\n" + steps_match.group(0).replace("## MIGRATION STEPS", "").strip() |
|
|
|
return explanation if explanation else "Conversion completed. See full response for details." |
|
|
|
def extract_validation_metrics(validation_text: str) -> Optional[Dict[str, float]]: |
|
""" |
|
Extract validation metrics from AI response. |
|
|
|
Args: |
|
validation_text: AI validation response |
|
|
|
Returns: |
|
Dictionary of metrics or None if parsing fails |
|
""" |
|
try: |
|
|
|
json_match = re.search(r"```json\s*(.*?)```", validation_text, re.DOTALL) |
|
if json_match: |
|
metrics_data = json.loads(json_match.group(1)) |
|
return { |
|
"quality_rating": float(metrics_data.get("quality_rating", 0)), |
|
"accuracy": float(metrics_data.get("accuracy", 0)), |
|
"completeness": float(metrics_data.get("completeness", 0)), |
|
"best_practices_alignment": float(metrics_data.get("best_practices_alignment", 0)), |
|
"syntax_validity": float(metrics_data.get("syntax_validity", 0)), |
|
"security_score": float(metrics_data.get("security_score", 0)), |
|
"performance_score": float(metrics_data.get("performance_score", 0)) |
|
} |
|
except (json.JSONDecodeError, ValueError, KeyError) as e: |
|
logger.warning(f"Failed to parse validation metrics: {e}") |
|
|
|
return None |
|
|
|
def normalize_metrics(metrics: Dict[str, float]) -> Dict[str, float]: |
|
""" |
|
Normalize metrics to 0-1 scale. |
|
|
|
Args: |
|
metrics: Raw metrics dictionary |
|
|
|
Returns: |
|
Normalized metrics |
|
""" |
|
normalized = {} |
|
for key, value in metrics.items(): |
|
if key == "quality_rating": |
|
normalized[key] = value / 10.0 |
|
else: |
|
normalized[key] = max(0.0, min(1.0, value)) |
|
|
|
return normalized |
|
|
|
def generate_test_cases(code: str, code_type: str = "trigger") -> str: |
|
""" |
|
Generate basic test case templates for the given code. |
|
|
|
Args: |
|
code: Code to generate tests for |
|
code_type: Type of code (trigger, class, object) |
|
|
|
Returns: |
|
Test case template as string |
|
""" |
|
if code_type == "trigger": |
|
|
|
trigger_match = re.search(r"trigger\s+(\w+)\s+on\s+(\w+)", code, re.IGNORECASE) |
|
if trigger_match: |
|
trigger_name = trigger_match.group(1) |
|
sobject_name = trigger_match.group(2) |
|
|
|
return f"""@isTest |
|
public class {trigger_name}Test {{ |
|
@isTest |
|
static void testInsert() {{ |
|
// Test insert scenario |
|
{sobject_name} testRecord = new {sobject_name}(); |
|
// Set required fields |
|
|
|
Test.startTest(); |
|
insert testRecord; |
|
Test.stopTest(); |
|
|
|
// Assert expected behavior |
|
}} |
|
|
|
@isTest |
|
static void testUpdate() {{ |
|
// Test update scenario |
|
{sobject_name} testRecord = new {sobject_name}(); |
|
// Set required fields |
|
insert testRecord; |
|
|
|
Test.startTest(); |
|
// Modify fields |
|
update testRecord; |
|
Test.stopTest(); |
|
|
|
// Assert expected behavior |
|
}} |
|
}}""" |
|
|
|
return "// Test case template not available for this code type" |