""" Response Formatter Utility for Hugging Face BasicAgent. This module centralizes answer format handling and validation to ensure all responses meet HF evaluation requirements. Extracted from BasicAgent to provide clean separation of concerns. Key Features: - HF evaluation format compliance (no "FINAL ANSWER:" prefix) - Response quality validation and scoring - Format consistency checks - Clean answer processing and sanitization - Configurable formatting options - Comprehensive validation functions Author: Phase 2A Step 4 Implementation """ import re import logging from typing import Dict, Any, Optional, List, Tuple, Union from dataclasses import dataclass from enum import Enum logger = logging.getLogger(__name__) class ResponseType(Enum): """Types of responses for different formatting needs.""" SIMPLE_ANSWER = "simple_answer" CALCULATION = "calculation" MULTI_STEP = "multi_step" EXPLANATION = "explanation" ERROR = "error" TIMEOUT = "timeout" class FormatStandard(Enum): """Format standards for response validation.""" HF_EVALUATION = "hf_evaluation" # Hugging Face evaluation format GAIA_STANDARD = "gaia_standard" # GAIA benchmark format GENERAL = "general" # General purpose format @dataclass class FormatConfig: """Configuration for response formatting.""" max_length: int = 2000 min_length: int = 1 remove_markdown: bool = True remove_prefixes: bool = True strip_whitespace: bool = True normalize_spaces: bool = True ensure_period: bool = False format_standard: FormatStandard = FormatStandard.HF_EVALUATION @dataclass class ValidationResult: """Result of response validation.""" is_valid: bool quality_score: float # 0.0 to 1.0 format_score: float # 0.0 to 1.0 issues: List[str] suggestions: List[str] metadata: Dict[str, Any] @dataclass class FormattedResponse: """Container for formatted response with metadata.""" answer: str original_answer: str response_type: ResponseType format_config: FormatConfig validation: ValidationResult processing_metadata: Dict[str, Any] class ResponseFormatter: """ Central response formatter for HF evaluation compliance. Handles all answer formatting, validation, and quality assessment to ensure responses meet Hugging Face evaluation requirements. """ # HF evaluation forbidden prefixes (case variations) FORBIDDEN_PREFIXES = [ f"{case_variant}{suffix}" for prefix in ["FINAL ANSWER", "ANSWER", "RESULT", "CONCLUSION"] for suffix in [":", ""] for case_variant in [prefix.upper(), prefix.lower(), prefix.title()] ] # Markdown removal patterns (enhanced for complete cleanup) MARKDOWN_PATTERNS = [ # Code blocks (various formats) - more comprehensive (r'```[\s\S]*?```', ''), # Fenced code blocks (multiline) (r'~~~[\s\S]*?~~~', ''), # Alternative fenced blocks (r'```[^`\n]*```', ''), # Single-line fenced blocks (r'```[^`]*?```', ''), # Any fenced blocks (r'`{3,}[\s\S]*?`{3,}', ''), # Multiple backticks (r'`([^`\n]+)`', r'\1'), # Inline code (preserve content) (r'`([^`]*)`', r'\1'), # Any inline code # Bold/Italic formatting (r'\*\*(.*?)\*\*', r'\1'), # Bold **text** (r'\*(.*?)\*', r'\1'), # Italic *text* (r'__(.*?)__', r'\1'), # Bold __text__ (r'_(.*?)_', r'\1'), # Italic _text_ # Headers (r'#{1,6}\s*(.+)', r'\1'), # Headers with content (r'#{1,6}\s*', ''), # Empty headers # Links and references (r'\[([^\]]+)\]\([^)]+\)', r'\1'), # Links [text](url) (r'\[([^\]]+)\]\[[^\]]*\]', r'\1'), # Reference links # Lists and other formatting (r'^\s*[-*+]\s+', '', re.MULTILINE), # Unordered lists (r'^\s*\d+\.\s+', '', re.MULTILINE), # Ordered lists (r'^\s*>\s+', '', re.MULTILINE), # Blockquotes (r'^\s*\|.*\|\s*$', '', re.MULTILINE), # Table rows (r'^\s*[-:|\s]+\s*$', '', re.MULTILINE), # Table separators ] # Quality assessment patterns QUALITY_INDICATORS = { 'numbers': r'\b\d+(?:\.\d+)?\b', 'units': r'\b(?:meters?|feet|inches?|cm|mm|kg|lbs?|celsius|fahrenheit|°[CF])\b', 'calculations': r'[+\-*/=]|\bequals?\b|\bresult\b', 'explanations': r'\b(?:because|since|therefore|however|furthermore)\b', 'structure': r'(?:first|second|third|finally|in conclusion)', } def __init__(self, config: Optional[FormatConfig] = None): """Initialize the response formatter.""" self.config = config or FormatConfig() logger.debug(f"ResponseFormatter initialized with {self.config.format_standard.value} standard") def format_response( self, answer: str, response_type: ResponseType = ResponseType.SIMPLE_ANSWER, metadata: Optional[Dict[str, Any]] = None ) -> FormattedResponse: """Format response according to HF evaluation requirements.""" if not answer: return self._create_empty_response(metadata or {}) original_answer = answer processing_metadata = metadata or {} # Stage 1: Basic cleanup formatted_answer = self._basic_cleanup(answer) # Stage 2: Handle markdown (if configured) if self.config.remove_markdown: formatted_answer = self._remove_markdown(formatted_answer) # Stage 3: Remove forbidden prefixes (after markdown removal) formatted_answer = self._remove_forbidden_prefixes(formatted_answer) # Stage 4: Response type specific formatting formatted_answer = self._type_specific_formatting(formatted_answer, response_type) # Stage 5: Final cleanup and validation formatted_answer = self._final_cleanup(formatted_answer) # Stage 6: Validate formatted response validation = self._validate_response(formatted_answer, response_type) return FormattedResponse( answer=formatted_answer, original_answer=original_answer, response_type=response_type, format_config=self.config, validation=validation, processing_metadata=processing_metadata ) def _basic_cleanup(self, answer: str) -> str: """Perform basic cleanup operations.""" if not answer: return "" # Strip whitespace if self.config.strip_whitespace: answer = answer.strip() # Normalize spaces if self.config.normalize_spaces: answer = re.sub(r'\s+', ' ', answer) return answer def _remove_forbidden_prefixes(self, answer: str) -> str: """Remove HF evaluation forbidden prefixes with case-insensitive matching.""" if not self.config.remove_prefixes: return answer # Define forbidden prefixes with all case variations forbidden_prefixes = [] base_prefixes = ["FINAL ANSWER", "ANSWER", "RESULT", "CONCLUSION"] for prefix in base_prefixes: for suffix in [":", ""]: # Add all case variations forbidden_prefixes.extend([ f"{prefix.upper()}{suffix}", f"{prefix.lower()}{suffix}", f"{prefix.title()}{suffix}", f"{prefix.capitalize()}{suffix}" ]) # Case-insensitive prefix removal answer_lower = answer.lower() for prefix in forbidden_prefixes: prefix_lower = prefix.lower() if answer_lower.startswith(prefix_lower): answer = answer[len(prefix):].strip() logger.debug(f"Removed forbidden prefix: {prefix}") break # Also check for common remaining patterns with case-insensitive regex additional_patterns = [ r'^Answer:\s*', r'^Result:\s*', r'^Solution:\s*', r'^Response:\s*', r'^Final\s*Answer:\s*', r'^Conclusion:\s*', ] for pattern in additional_patterns: if re.match(pattern, answer, re.IGNORECASE): answer = re.sub(pattern, '', answer, flags=re.IGNORECASE).strip() logger.debug(f"Removed pattern: {pattern}") break return answer def _remove_markdown(self, answer: str) -> str: """Remove markdown formatting elements.""" for pattern_info in self.MARKDOWN_PATTERNS: if len(pattern_info) == 3: pattern, replacement, flags = pattern_info answer = re.sub(pattern, replacement, answer, flags=flags) else: pattern, replacement = pattern_info answer = re.sub(pattern, replacement, answer) # Clean up multiple whitespace and empty lines answer = re.sub(r'\n\s*\n', '\n', answer) # Remove empty lines answer = re.sub(r'\s+', ' ', answer) # Normalize spaces return answer.strip() def _type_specific_formatting(self, answer: str, response_type: ResponseType) -> str: """Apply formatting specific to response type.""" if response_type == ResponseType.CALCULATION: return self._format_calculation(answer) elif response_type == ResponseType.MULTI_STEP: return self._format_multi_step(answer) elif response_type == ResponseType.EXPLANATION: return self._format_explanation(answer) elif response_type == ResponseType.ERROR: return self._format_error(answer) elif response_type == ResponseType.TIMEOUT: return self._format_timeout(answer) else: return self._format_simple_answer(answer) def _format_calculation(self, answer: str) -> str: """Format calculation responses.""" # For mathematical answers, ensure clean presentation # Extract final numerical answer if present number_match = re.search(r'\b(\d+(?:\.\d+)?)\s*(?:degrees?|°|[CF]|meters?|feet|%)?$', answer) if number_match and len(answer.split()) > 3: # If there's a clear final number and the answer is verbose, # consider extracting just the number for simple calculations pass return answer def _format_multi_step(self, answer: str) -> str: """Format multi-step explanations.""" # Ensure logical flow for multi-step answers return answer def _format_explanation(self, answer: str) -> str: """Format explanation responses.""" # Ensure explanations are clear and concise return answer def _format_error(self, answer: str) -> str: """Format error responses.""" # Ensure error messages are user-friendly if not answer.startswith("I apologize") and not answer.startswith("I'm sorry"): answer = f"I apologize, but {answer.lower()}" return answer def _format_timeout(self, answer: str) -> str: """Format timeout responses.""" # Ensure timeout messages are clear return answer def _format_simple_answer(self, answer: str) -> str: """Format simple answer responses.""" # For simple answers, keep concise and direct return answer def _final_cleanup(self, answer: str) -> str: """Perform final cleanup operations.""" # Length constraints if len(answer) > self.config.max_length: answer = answer[:self.config.max_length-3] + "..." logger.warning(f"Answer truncated to {self.config.max_length} characters") # Ensure minimum length if len(answer.strip()) < self.config.min_length: logger.warning("Answer below minimum length") # Ensure period if configured if self.config.ensure_period and answer and not answer.endswith(('.', '!', '?')): answer += '.' return answer.strip() def _validate_response(self, answer: str, response_type: ResponseType) -> ValidationResult: """ Validate formatted response for quality and format compliance. Args: answer: Formatted answer to validate response_type: Type of response being validated Returns: ValidationResult with scores and feedback """ issues = [] suggestions = [] # Format validation format_score = self._calculate_format_score(answer, issues, suggestions) # Quality validation quality_score = self._calculate_quality_score(answer, response_type, issues, suggestions) # Overall validation is_valid = ( format_score >= 0.7 and quality_score >= 0.5 and len(answer.strip()) >= self.config.min_length ) metadata = { 'answer_length': len(answer), 'word_count': len(answer.split()), 'has_numbers': bool(re.search(r'\d', answer)), 'response_type': response_type.value, 'format_standard': self.config.format_standard.value } return ValidationResult( is_valid=is_valid, quality_score=quality_score, format_score=format_score, issues=issues, suggestions=suggestions, metadata=metadata ) def _calculate_format_score(self, answer: str, issues: List[str], suggestions: List[str]) -> float: """Calculate format compliance score.""" score = 1.0 # Check for forbidden prefixes (should be rare after formatting) # Only penalize if prefixes somehow remain after formatting for prefix in self.FORBIDDEN_PREFIXES: if answer.startswith(prefix): score -= 0.3 # Reduced penalty since this indicates formatting failure issues.append(f"Formatting failed to remove prefix: {prefix}") suggestions.append(f"Check prefix removal logic") # Check length constraints if len(answer) > self.config.max_length: score -= 0.2 issues.append("Answer exceeds maximum length") suggestions.append("Shorten the response") if len(answer.strip()) < self.config.min_length: score -= 0.3 issues.append("Answer below minimum length") suggestions.append("Provide a more detailed response") # Check for markdown artifacts (if removal is enabled) if self.config.remove_markdown: markdown_artifacts = ['**', '__', '```', '##', '###'] for artifact in markdown_artifacts: if artifact in answer: score -= 0.1 issues.append(f"Contains markdown artifact: {artifact}") return max(0.0, score) def _calculate_quality_score( self, answer: str, response_type: ResponseType, issues: List[str], suggestions: List[str] ) -> float: """Calculate response quality score.""" score = 0.5 # Base score # Check for quality indicators for indicator, pattern in self.QUALITY_INDICATORS.items(): if re.search(pattern, answer, re.IGNORECASE): score += 0.1 # Response type specific scoring if response_type == ResponseType.CALCULATION: score += self._score_calculation_quality(answer, issues, suggestions) elif response_type == ResponseType.EXPLANATION: score += self._score_explanation_quality(answer, issues, suggestions) # General quality checks if len(answer.split()) > 1: score += 0.1 # Multi-word answers generally better if not answer.lower().startswith(('i don\'t know', 'i\'m not sure')): score += 0.1 # Confident answers return min(1.0, score) def _score_calculation_quality(self, answer: str, issues: List[str], suggestions: List[str]) -> float: """Score calculation-specific quality.""" score = 0.0 # Check for numerical result if re.search(r'\b\d+(?:\.\d+)?\b', answer): score += 0.2 # Check for units when appropriate if re.search(r'\b(?:degrees?|°|meters?|feet|%)\b', answer): score += 0.1 # Check for calculation steps if re.search(r'[+\-*/=]', answer): score += 0.1 return score def _score_explanation_quality(self, answer: str, issues: List[str], suggestions: List[str]) -> float: """Score explanation-specific quality.""" score = 0.0 # Check for logical connectors connectors = ['because', 'since', 'therefore', 'however', 'furthermore'] for connector in connectors: if connector in answer.lower(): score += 0.05 # Check for structure if len(answer.split('.')) > 1: score += 0.1 # Multi-sentence explanations return score def _create_empty_response(self, metadata: Dict[str, Any]) -> FormattedResponse: """Create response for empty input.""" validation = ValidationResult( is_valid=False, quality_score=0.0, format_score=0.0, issues=["Empty or null input"], suggestions=["Provide a valid answer"], metadata=metadata ) return FormattedResponse( answer="", original_answer="", response_type=ResponseType.ERROR, format_config=self.config, validation=validation, processing_metadata=metadata ) def validate_hf_compliance(self, answer: str) -> Tuple[bool, List[str]]: """ Quick validation for HF evaluation compliance with improved accuracy. Args: answer: Answer to validate (should be pre-formatted) Returns: Tuple of (is_compliant, issues_list) """ issues = [] # Check forbidden prefixes with case-insensitive matching # Use the same logic as the removal function for consistency forbidden_patterns = [ r'^FINAL\s*ANSWER\s*:?\s*', r'^ANSWER\s*:?\s*', r'^RESULT\s*:?\s*', r'^CONCLUSION\s*:?\s*', r'^SOLUTION\s*:?\s*', r'^RESPONSE\s*:?\s*', ] for pattern in forbidden_patterns: if re.match(pattern, answer, re.IGNORECASE): issues.append(f"Contains forbidden prefix pattern: {pattern}") break # Only report first match to avoid duplicates # Check basic requirements if not answer.strip(): issues.append("Empty answer") if len(answer) > 2000: issues.append("Answer too long") # Check for obvious formatting artifacts (be more lenient) # Only flag if these appear at the very start and aren't part of content if re.match(r'^(\*\*|__|```)', answer): # Check if it's actually formatting or just content that starts with these if not re.match(r'^(\*\*|__|```).*(\*\*|__|```).*$', answer): issues.append("Contains markdown formatting artifacts") # Additional check for common false positives # Don't flag answers that are just numbers or simple responses if answer.strip().isdigit() or len(answer.strip()) < 10: # These are likely correct simple answers, remove any false positive issues issues = [issue for issue in issues if "formatting artifacts" not in issue] return len(issues) == 0, issues def format_for_hf_evaluation(answer: str) -> str: """ Quick format function for HF evaluation compliance. Args: answer: Raw answer to format Returns: Formatted answer ready for HF evaluation """ formatter = ResponseFormatter() formatted = formatter.format_response(answer) return formatted.answer def validate_answer_format(answer: str) -> Tuple[bool, List[str], float]: """ Quick validation function for answer format. Args: answer: Answer to validate (raw, unformatted) Returns: Tuple of (is_valid, issues_list, quality_score) """ formatter = ResponseFormatter() # Validate the raw answer first (before formatting) is_compliant, compliance_issues = formatter.validate_hf_compliance(answer) # Then format and get full validation formatted = formatter.format_response(answer) # Combine compliance issues with formatting validation all_issues = compliance_issues + formatted.validation.issues is_valid = is_compliant and formatted.validation.is_valid return ( is_valid, all_issues, formatted.validation.quality_score ) class BasicAgentFormatter: """Specialized formatter for BasicAgent integration.""" def __init__(self): """Initialize with HF evaluation optimized config.""" self.formatter = ResponseFormatter(FormatConfig( remove_markdown=True, remove_prefixes=True, strip_whitespace=True, normalize_spaces=True, format_standard=FormatStandard.HF_EVALUATION )) def format_agent_response( self, question: str, answer: str, response_type: ResponseType = ResponseType.SIMPLE_ANSWER, metadata: Optional[Dict[str, Any]] = None ) -> str: """ Format agent response for HF evaluation. Args: question: Original question (for context) answer: Agent's raw answer response_type: Type of response metadata: Additional metadata Returns: Formatted answer ready for submission """ processing_metadata = metadata or {} processing_metadata.update({ 'question': question, 'agent_type': 'BasicAgent', 'processing_timestamp': None # Could add timestamp if needed }) formatted = self.formatter.format_response( answer, response_type, processing_metadata ) return formatted.answer