delightfulrachel's picture
Update utils.py
cbf016f verified
"""
Utility functions for Salesforce B2B Commerce migration assistant.
"""
import re
import json
import logging
from typing import Dict, List, Tuple, Optional, Any
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Validation schema for Apex code
VALIDATION_SCHEMA = {
"syntax_errors": ["missing_semicolon", "unclosed_bracket", "invalid_syntax"],
"security_issues": ["soql_injection", "hardcoded_credentials", "unsafe_dml"],
"performance_issues": ["governor_limits", "bulk_operations", "inefficient_queries"],
"b2b_commerce_issues": ["deprecated_apis", "missing_null_checks", "incorrect_field_references"]
}
# B2B Commerce patterns for detection
B2B_COMMERCE_PATTERNS = {
"cloudcraze_reference": r"(ccrz__|E_[A-Z]|CloudCraze)",
"trigger_pattern": r"trigger\s+\w+\s+on\s+\w+",
"apex_class_pattern": r"(public|private|global)\s+class\s+\w+",
"soql_pattern": r"SELECT\s+.+\s+FROM\s+\w+",
"dml_pattern": r"(insert|update|delete|upsert)\s+\w+"
}
def validate_apex_syntax(code: str) -> Tuple[bool, List[Dict[str, Any]]]:
"""
Validate Apex code syntax and return issues found.
Args:
code: Apex code to validate
Returns:
Tuple of (is_valid, list_of_issues)
"""
issues = []
# Basic syntax checks
if not code.strip():
issues.append({"type": "error", "message": "Empty code provided", "line": 0})
return False, issues
# Check for balanced brackets
brackets = {"(": ")", "{": "}", "[": "]"}
stack = []
for i, char in enumerate(code):
if char in brackets:
stack.append((char, i))
elif char in brackets.values():
if not stack:
issues.append({"type": "error", "message": f"Unmatched closing bracket '{char}'", "line": code[:i].count('\n') + 1})
else:
open_char, _ = stack.pop()
if brackets[open_char] != char:
issues.append({"type": "error", "message": f"Mismatched bracket pair", "line": code[:i].count('\n') + 1})
# Check for unclosed brackets
if stack:
for char, pos in stack:
issues.append({"type": "error", "message": f"Unclosed bracket '{char}'", "line": code[:pos].count('\n') + 1})
# Check for missing semicolons (basic check)
lines = code.split('\n')
for line_num, line in enumerate(lines, 1):
stripped = line.strip()
if stripped and not stripped.endswith((';', '{', '}', '//', '/*', '*/', '*')):
if any(keyword in stripped for keyword in ['if', 'for', 'while', 'try', 'catch', 'class', 'trigger']):
continue
if re.search(r'\b(insert|update|delete|upsert|return)\b', stripped):
issues.append({"type": "warning", "message": "Possible missing semicolon", "line": line_num})
is_valid = not any(issue["type"] == "error" for issue in issues)
return is_valid, issues
def perform_skeptical_evaluation(code: str, code_type: str = "trigger") -> Dict[str, List[str]]:
"""
Perform skeptical evaluation of code to find potential issues.
Args:
code: Code to evaluate
code_type: Type of code (trigger, class, object)
Returns:
Dictionary with categorized issues
"""
evaluation = {
"security_concerns": [],
"performance_issues": [],
"b2b_commerce_issues": [],
"best_practice_violations": []
}
# Security checks
if re.search(r"String\.format|String\.valueOf.*user", code, re.IGNORECASE):
evaluation["security_concerns"].append("Potential SOQL injection vulnerability")
if re.search(r"password\s*=\s*['\"][^'\"]+['\"]", code, re.IGNORECASE):
evaluation["security_concerns"].append("Hardcoded credentials detected")
# Performance checks
if re.search(r"for\s*\([^)]*:[^)]*\)\s*\{[^}]*\b(insert|update|delete|upsert)\b", code):
evaluation["performance_issues"].append("DML operation inside loop - governor limit risk")
if re.search(r"for\s*\([^)]*:[^)]*\)\s*\{[^}]*\bSELECT\b", code, re.IGNORECASE):
evaluation["performance_issues"].append("SOQL query inside loop - governor limit risk")
# B2B Commerce specific checks
if re.search(B2B_COMMERCE_PATTERNS["cloudcraze_reference"], code):
evaluation["b2b_commerce_issues"].append("CloudCraze references need migration to B2B LEX")
if code_type == "trigger" and not re.search(r"Trigger\.(isInsert|isUpdate|isDelete)", code):
evaluation["best_practice_violations"].append("Missing trigger context checks")
return evaluation
def extract_code_blocks(response: str) -> str:
"""
Extract code blocks from AI response.
Args:
response: AI model response
Returns:
Extracted code or empty string if none found
"""
# Look for code blocks with various delimiters
patterns = [
r"```(?:apex|java|salesforce)?\s*(.*?)```",
r"```\s*(.*?)```",
r"<code>(.*?)</code>",
r"`([^`]+)`"
]
for pattern in patterns:
matches = re.findall(pattern, response, re.DOTALL | re.IGNORECASE)
if matches:
# Return the longest match (most likely to be the main code block)
return max(matches, key=len).strip()
return ""
def format_structured_explanation(response: str, code_output: str) -> str:
"""
Format the AI response into a structured explanation.
Args:
response: Raw AI response
code_output: Extracted code
Returns:
Formatted explanation
"""
explanation = ""
# Extract key changes section
key_changes_match = re.search(r"## KEY CHANGES.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE)
if key_changes_match:
explanation += "**Key Changes:**\n" + key_changes_match.group(0).replace("## KEY CHANGES", "").strip() + "\n\n"
# Extract critical issues section
critical_issues_match = re.search(r"## CRITICAL ISSUES.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE)
if critical_issues_match:
explanation += "**Critical Issues Fixed:**\n" + critical_issues_match.group(0).replace("## CRITICAL ISSUES", "").strip() + "\n\n"
# Extract warnings section
warnings_match = re.search(r"## (?:REMAINING )?WARNINGS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE)
if warnings_match:
explanation += "**Remaining Warnings:**\n" + warnings_match.group(0).replace("## REMAINING WARNINGS", "").replace("## WARNINGS", "").strip()
return explanation if explanation else "Analysis completed. See full response for details."
def format_object_conversion_explanation(response: str, code_output: str) -> str:
"""
Format object conversion explanation.
Args:
response: Raw AI response
code_output: Extracted code
Returns:
Formatted explanation
"""
explanation = ""
# Extract mapping section
mapping_match = re.search(r"## B2B LEX OBJECT MAPPING.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE)
if mapping_match:
explanation += "**Object Mapping:**\n" + mapping_match.group(0).replace("## B2B LEX OBJECT MAPPING", "").strip() + "\n\n"
# Extract field mappings
field_mappings_match = re.search(r"## FIELD MAPPINGS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE)
if field_mappings_match:
explanation += "**Field Mappings:**\n" + field_mappings_match.group(0).replace("## FIELD MAPPINGS", "").strip() + "\n\n"
# Extract migration steps
steps_match = re.search(r"## MIGRATION STEPS.*?(?=##|$)", response, re.DOTALL | re.IGNORECASE)
if steps_match:
explanation += "**Migration Steps:**\n" + steps_match.group(0).replace("## MIGRATION STEPS", "").strip()
return explanation if explanation else "Conversion completed. See full response for details."
def extract_validation_metrics(validation_text: str) -> Optional[Dict[str, float]]:
"""
Extract validation metrics from AI response.
Args:
validation_text: AI validation response
Returns:
Dictionary of metrics or None if parsing fails
"""
try:
# Look for JSON block in the response
json_match = re.search(r"```json\s*(.*?)```", validation_text, re.DOTALL)
if json_match:
metrics_data = json.loads(json_match.group(1))
return {
"quality_rating": float(metrics_data.get("quality_rating", 0)),
"accuracy": float(metrics_data.get("accuracy", 0)),
"completeness": float(metrics_data.get("completeness", 0)),
"best_practices_alignment": float(metrics_data.get("best_practices_alignment", 0)),
"syntax_validity": float(metrics_data.get("syntax_validity", 0)),
"security_score": float(metrics_data.get("security_score", 0)),
"performance_score": float(metrics_data.get("performance_score", 0))
}
except (json.JSONDecodeError, ValueError, KeyError) as e:
logger.warning(f"Failed to parse validation metrics: {e}")
return None
def normalize_metrics(metrics: Dict[str, float]) -> Dict[str, float]:
"""
Normalize metrics to 0-1 scale.
Args:
metrics: Raw metrics dictionary
Returns:
Normalized metrics
"""
normalized = {}
for key, value in metrics.items():
if key == "quality_rating":
normalized[key] = value / 10.0 # Convert 1-10 scale to 0-1
else:
normalized[key] = max(0.0, min(1.0, value)) # Clamp to 0-1
return normalized
def generate_test_cases(code: str, code_type: str = "trigger") -> str:
"""
Generate basic test case templates for the given code.
Args:
code: Code to generate tests for
code_type: Type of code (trigger, class, object)
Returns:
Test case template as string
"""
if code_type == "trigger":
# Extract trigger name and object
trigger_match = re.search(r"trigger\s+(\w+)\s+on\s+(\w+)", code, re.IGNORECASE)
if trigger_match:
trigger_name = trigger_match.group(1)
sobject_name = trigger_match.group(2)
return f"""@isTest
public class {trigger_name}Test {{
@isTest
static void testInsert() {{
// Test insert scenario
{sobject_name} testRecord = new {sobject_name}();
// Set required fields
Test.startTest();
insert testRecord;
Test.stopTest();
// Assert expected behavior
}}
@isTest
static void testUpdate() {{
// Test update scenario
{sobject_name} testRecord = new {sobject_name}();
// Set required fields
insert testRecord;
Test.startTest();
// Modify fields
update testRecord;
Test.stopTest();
// Assert expected behavior
}}
}}"""
return "// Test case template not available for this code type"