Spaces:
Running
Running
""" | |
Response Formatter Utility for Hugging Face BasicAgent. | |
This module centralizes answer format handling and validation to ensure | |
all responses meet HF evaluation requirements. Extracted from BasicAgent | |
to provide clean separation of concerns. | |
Key Features: | |
- HF evaluation format compliance (no "FINAL ANSWER:" prefix) | |
- Response quality validation and scoring | |
- Format consistency checks | |
- Clean answer processing and sanitization | |
- Configurable formatting options | |
- Comprehensive validation functions | |
Author: Phase 2A Step 4 Implementation | |
""" | |
import re | |
import logging | |
from typing import Dict, Any, Optional, List, Tuple, Union | |
from dataclasses import dataclass | |
from enum import Enum | |
logger = logging.getLogger(__name__) | |
class ResponseType(Enum): | |
"""Types of responses for different formatting needs.""" | |
SIMPLE_ANSWER = "simple_answer" | |
CALCULATION = "calculation" | |
MULTI_STEP = "multi_step" | |
EXPLANATION = "explanation" | |
ERROR = "error" | |
TIMEOUT = "timeout" | |
class FormatStandard(Enum): | |
"""Format standards for response validation.""" | |
HF_EVALUATION = "hf_evaluation" # Hugging Face evaluation format | |
GAIA_STANDARD = "gaia_standard" # GAIA benchmark format | |
GENERAL = "general" # General purpose format | |
class FormatConfig: | |
"""Configuration for response formatting.""" | |
max_length: int = 2000 | |
min_length: int = 1 | |
remove_markdown: bool = True | |
remove_prefixes: bool = True | |
strip_whitespace: bool = True | |
normalize_spaces: bool = True | |
ensure_period: bool = False | |
format_standard: FormatStandard = FormatStandard.HF_EVALUATION | |
class ValidationResult: | |
"""Result of response validation.""" | |
is_valid: bool | |
quality_score: float # 0.0 to 1.0 | |
format_score: float # 0.0 to 1.0 | |
issues: List[str] | |
suggestions: List[str] | |
metadata: Dict[str, Any] | |
class FormattedResponse: | |
"""Container for formatted response with metadata.""" | |
answer: str | |
original_answer: str | |
response_type: ResponseType | |
format_config: FormatConfig | |
validation: ValidationResult | |
processing_metadata: Dict[str, Any] | |
class ResponseFormatter: | |
""" | |
Central response formatter for HF evaluation compliance. | |
Handles all answer formatting, validation, and quality assessment | |
to ensure responses meet Hugging Face evaluation requirements. | |
""" | |
# HF evaluation forbidden prefixes (case variations) | |
FORBIDDEN_PREFIXES = [ | |
f"{case_variant}{suffix}" | |
for prefix in ["FINAL ANSWER", "ANSWER", "RESULT", "CONCLUSION"] | |
for suffix in [":", ""] | |
for case_variant in [prefix.upper(), prefix.lower(), prefix.title()] | |
] | |
# Markdown removal patterns (enhanced for complete cleanup) | |
MARKDOWN_PATTERNS = [ | |
# Code blocks (various formats) - more comprehensive | |
(r'```[\s\S]*?```', ''), # Fenced code blocks (multiline) | |
(r'~~~[\s\S]*?~~~', ''), # Alternative fenced blocks | |
(r'```[^`\n]*```', ''), # Single-line fenced blocks | |
(r'```[^`]*?```', ''), # Any fenced blocks | |
(r'`{3,}[\s\S]*?`{3,}', ''), # Multiple backticks | |
(r'`([^`\n]+)`', r'\1'), # Inline code (preserve content) | |
(r'`([^`]*)`', r'\1'), # Any inline code | |
# Bold/Italic formatting | |
(r'\*\*(.*?)\*\*', r'\1'), # Bold **text** | |
(r'\*(.*?)\*', r'\1'), # Italic *text* | |
(r'__(.*?)__', r'\1'), # Bold __text__ | |
(r'_(.*?)_', r'\1'), # Italic _text_ | |
# Headers | |
(r'#{1,6}\s*(.+)', r'\1'), # Headers with content | |
(r'#{1,6}\s*', ''), # Empty headers | |
# Links and references | |
(r'\[([^\]]+)\]\([^)]+\)', r'\1'), # Links [text](url) | |
(r'\[([^\]]+)\]\[[^\]]*\]', r'\1'), # Reference links | |
# Lists and other formatting | |
(r'^\s*[-*+]\s+', '', re.MULTILINE), # Unordered lists | |
(r'^\s*\d+\.\s+', '', re.MULTILINE), # Ordered lists | |
(r'^\s*>\s+', '', re.MULTILINE), # Blockquotes | |
(r'^\s*\|.*\|\s*$', '', re.MULTILINE), # Table rows | |
(r'^\s*[-:|\s]+\s*$', '', re.MULTILINE), # Table separators | |
] | |
# Quality assessment patterns | |
QUALITY_INDICATORS = { | |
'numbers': r'\b\d+(?:\.\d+)?\b', | |
'units': r'\b(?:meters?|feet|inches?|cm|mm|kg|lbs?|celsius|fahrenheit|°[CF])\b', | |
'calculations': r'[+\-*/=]|\bequals?\b|\bresult\b', | |
'explanations': r'\b(?:because|since|therefore|however|furthermore)\b', | |
'structure': r'(?:first|second|third|finally|in conclusion)', | |
} | |
def __init__(self, config: Optional[FormatConfig] = None): | |
"""Initialize the response formatter.""" | |
self.config = config or FormatConfig() | |
logger.debug(f"ResponseFormatter initialized with {self.config.format_standard.value} standard") | |
def format_response( | |
self, | |
answer: str, | |
response_type: ResponseType = ResponseType.SIMPLE_ANSWER, | |
metadata: Optional[Dict[str, Any]] = None | |
) -> FormattedResponse: | |
"""Format response according to HF evaluation requirements.""" | |
if not answer: | |
return self._create_empty_response(metadata or {}) | |
original_answer = answer | |
processing_metadata = metadata or {} | |
# Stage 1: Basic cleanup | |
formatted_answer = self._basic_cleanup(answer) | |
# Stage 2: Handle markdown (if configured) | |
if self.config.remove_markdown: | |
formatted_answer = self._remove_markdown(formatted_answer) | |
# Stage 3: Remove forbidden prefixes (after markdown removal) | |
formatted_answer = self._remove_forbidden_prefixes(formatted_answer) | |
# Stage 4: Response type specific formatting | |
formatted_answer = self._type_specific_formatting(formatted_answer, response_type) | |
# Stage 5: Final cleanup and validation | |
formatted_answer = self._final_cleanup(formatted_answer) | |
# Stage 6: Validate formatted response | |
validation = self._validate_response(formatted_answer, response_type) | |
return FormattedResponse( | |
answer=formatted_answer, | |
original_answer=original_answer, | |
response_type=response_type, | |
format_config=self.config, | |
validation=validation, | |
processing_metadata=processing_metadata | |
) | |
def _basic_cleanup(self, answer: str) -> str: | |
"""Perform basic cleanup operations.""" | |
if not answer: | |
return "" | |
# Strip whitespace | |
if self.config.strip_whitespace: | |
answer = answer.strip() | |
# Normalize spaces | |
if self.config.normalize_spaces: | |
answer = re.sub(r'\s+', ' ', answer) | |
return answer | |
def _remove_forbidden_prefixes(self, answer: str) -> str: | |
"""Remove HF evaluation forbidden prefixes with case-insensitive matching.""" | |
if not self.config.remove_prefixes: | |
return answer | |
# Define forbidden prefixes with all case variations | |
forbidden_prefixes = [] | |
base_prefixes = ["FINAL ANSWER", "ANSWER", "RESULT", "CONCLUSION"] | |
for prefix in base_prefixes: | |
for suffix in [":", ""]: | |
# Add all case variations | |
forbidden_prefixes.extend([ | |
f"{prefix.upper()}{suffix}", | |
f"{prefix.lower()}{suffix}", | |
f"{prefix.title()}{suffix}", | |
f"{prefix.capitalize()}{suffix}" | |
]) | |
# Case-insensitive prefix removal | |
answer_lower = answer.lower() | |
for prefix in forbidden_prefixes: | |
prefix_lower = prefix.lower() | |
if answer_lower.startswith(prefix_lower): | |
answer = answer[len(prefix):].strip() | |
logger.debug(f"Removed forbidden prefix: {prefix}") | |
break | |
# Also check for common remaining patterns with case-insensitive regex | |
additional_patterns = [ | |
r'^Answer:\s*', | |
r'^Result:\s*', | |
r'^Solution:\s*', | |
r'^Response:\s*', | |
r'^Final\s*Answer:\s*', | |
r'^Conclusion:\s*', | |
] | |
for pattern in additional_patterns: | |
if re.match(pattern, answer, re.IGNORECASE): | |
answer = re.sub(pattern, '', answer, flags=re.IGNORECASE).strip() | |
logger.debug(f"Removed pattern: {pattern}") | |
break | |
return answer | |
def _remove_markdown(self, answer: str) -> str: | |
"""Remove markdown formatting elements.""" | |
for pattern_info in self.MARKDOWN_PATTERNS: | |
if len(pattern_info) == 3: | |
pattern, replacement, flags = pattern_info | |
answer = re.sub(pattern, replacement, answer, flags=flags) | |
else: | |
pattern, replacement = pattern_info | |
answer = re.sub(pattern, replacement, answer) | |
# Clean up multiple whitespace and empty lines | |
answer = re.sub(r'\n\s*\n', '\n', answer) # Remove empty lines | |
answer = re.sub(r'\s+', ' ', answer) # Normalize spaces | |
return answer.strip() | |
def _type_specific_formatting(self, answer: str, response_type: ResponseType) -> str: | |
"""Apply formatting specific to response type.""" | |
if response_type == ResponseType.CALCULATION: | |
return self._format_calculation(answer) | |
elif response_type == ResponseType.MULTI_STEP: | |
return self._format_multi_step(answer) | |
elif response_type == ResponseType.EXPLANATION: | |
return self._format_explanation(answer) | |
elif response_type == ResponseType.ERROR: | |
return self._format_error(answer) | |
elif response_type == ResponseType.TIMEOUT: | |
return self._format_timeout(answer) | |
else: | |
return self._format_simple_answer(answer) | |
def _format_calculation(self, answer: str) -> str: | |
"""Format calculation responses.""" | |
# For mathematical answers, ensure clean presentation | |
# Extract final numerical answer if present | |
number_match = re.search(r'\b(\d+(?:\.\d+)?)\s*(?:degrees?|°|[CF]|meters?|feet|%)?$', answer) | |
if number_match and len(answer.split()) > 3: | |
# If there's a clear final number and the answer is verbose, | |
# consider extracting just the number for simple calculations | |
pass | |
return answer | |
def _format_multi_step(self, answer: str) -> str: | |
"""Format multi-step explanations.""" | |
# Ensure logical flow for multi-step answers | |
return answer | |
def _format_explanation(self, answer: str) -> str: | |
"""Format explanation responses.""" | |
# Ensure explanations are clear and concise | |
return answer | |
def _format_error(self, answer: str) -> str: | |
"""Format error responses.""" | |
# Ensure error messages are user-friendly | |
if not answer.startswith("I apologize") and not answer.startswith("I'm sorry"): | |
answer = f"I apologize, but {answer.lower()}" | |
return answer | |
def _format_timeout(self, answer: str) -> str: | |
"""Format timeout responses.""" | |
# Ensure timeout messages are clear | |
return answer | |
def _format_simple_answer(self, answer: str) -> str: | |
"""Format simple answer responses.""" | |
# For simple answers, keep concise and direct | |
return answer | |
def _final_cleanup(self, answer: str) -> str: | |
"""Perform final cleanup operations.""" | |
# Length constraints | |
if len(answer) > self.config.max_length: | |
answer = answer[:self.config.max_length-3] + "..." | |
logger.warning(f"Answer truncated to {self.config.max_length} characters") | |
# Ensure minimum length | |
if len(answer.strip()) < self.config.min_length: | |
logger.warning("Answer below minimum length") | |
# Ensure period if configured | |
if self.config.ensure_period and answer and not answer.endswith(('.', '!', '?')): | |
answer += '.' | |
return answer.strip() | |
def _validate_response(self, answer: str, response_type: ResponseType) -> ValidationResult: | |
""" | |
Validate formatted response for quality and format compliance. | |
Args: | |
answer: Formatted answer to validate | |
response_type: Type of response being validated | |
Returns: | |
ValidationResult with scores and feedback | |
""" | |
issues = [] | |
suggestions = [] | |
# Format validation | |
format_score = self._calculate_format_score(answer, issues, suggestions) | |
# Quality validation | |
quality_score = self._calculate_quality_score(answer, response_type, issues, suggestions) | |
# Overall validation | |
is_valid = ( | |
format_score >= 0.7 and | |
quality_score >= 0.5 and | |
len(answer.strip()) >= self.config.min_length | |
) | |
metadata = { | |
'answer_length': len(answer), | |
'word_count': len(answer.split()), | |
'has_numbers': bool(re.search(r'\d', answer)), | |
'response_type': response_type.value, | |
'format_standard': self.config.format_standard.value | |
} | |
return ValidationResult( | |
is_valid=is_valid, | |
quality_score=quality_score, | |
format_score=format_score, | |
issues=issues, | |
suggestions=suggestions, | |
metadata=metadata | |
) | |
def _calculate_format_score(self, answer: str, issues: List[str], suggestions: List[str]) -> float: | |
"""Calculate format compliance score.""" | |
score = 1.0 | |
# Check for forbidden prefixes (should be rare after formatting) | |
# Only penalize if prefixes somehow remain after formatting | |
for prefix in self.FORBIDDEN_PREFIXES: | |
if answer.startswith(prefix): | |
score -= 0.3 # Reduced penalty since this indicates formatting failure | |
issues.append(f"Formatting failed to remove prefix: {prefix}") | |
suggestions.append(f"Check prefix removal logic") | |
# Check length constraints | |
if len(answer) > self.config.max_length: | |
score -= 0.2 | |
issues.append("Answer exceeds maximum length") | |
suggestions.append("Shorten the response") | |
if len(answer.strip()) < self.config.min_length: | |
score -= 0.3 | |
issues.append("Answer below minimum length") | |
suggestions.append("Provide a more detailed response") | |
# Check for markdown artifacts (if removal is enabled) | |
if self.config.remove_markdown: | |
markdown_artifacts = ['**', '__', '```', '##', '###'] | |
for artifact in markdown_artifacts: | |
if artifact in answer: | |
score -= 0.1 | |
issues.append(f"Contains markdown artifact: {artifact}") | |
return max(0.0, score) | |
def _calculate_quality_score( | |
self, | |
answer: str, | |
response_type: ResponseType, | |
issues: List[str], | |
suggestions: List[str] | |
) -> float: | |
"""Calculate response quality score.""" | |
score = 0.5 # Base score | |
# Check for quality indicators | |
for indicator, pattern in self.QUALITY_INDICATORS.items(): | |
if re.search(pattern, answer, re.IGNORECASE): | |
score += 0.1 | |
# Response type specific scoring | |
if response_type == ResponseType.CALCULATION: | |
score += self._score_calculation_quality(answer, issues, suggestions) | |
elif response_type == ResponseType.EXPLANATION: | |
score += self._score_explanation_quality(answer, issues, suggestions) | |
# General quality checks | |
if len(answer.split()) > 1: | |
score += 0.1 # Multi-word answers generally better | |
if not answer.lower().startswith(('i don\'t know', 'i\'m not sure')): | |
score += 0.1 # Confident answers | |
return min(1.0, score) | |
def _score_calculation_quality(self, answer: str, issues: List[str], suggestions: List[str]) -> float: | |
"""Score calculation-specific quality.""" | |
score = 0.0 | |
# Check for numerical result | |
if re.search(r'\b\d+(?:\.\d+)?\b', answer): | |
score += 0.2 | |
# Check for units when appropriate | |
if re.search(r'\b(?:degrees?|°|meters?|feet|%)\b', answer): | |
score += 0.1 | |
# Check for calculation steps | |
if re.search(r'[+\-*/=]', answer): | |
score += 0.1 | |
return score | |
def _score_explanation_quality(self, answer: str, issues: List[str], suggestions: List[str]) -> float: | |
"""Score explanation-specific quality.""" | |
score = 0.0 | |
# Check for logical connectors | |
connectors = ['because', 'since', 'therefore', 'however', 'furthermore'] | |
for connector in connectors: | |
if connector in answer.lower(): | |
score += 0.05 | |
# Check for structure | |
if len(answer.split('.')) > 1: | |
score += 0.1 # Multi-sentence explanations | |
return score | |
def _create_empty_response(self, metadata: Dict[str, Any]) -> FormattedResponse: | |
"""Create response for empty input.""" | |
validation = ValidationResult( | |
is_valid=False, | |
quality_score=0.0, | |
format_score=0.0, | |
issues=["Empty or null input"], | |
suggestions=["Provide a valid answer"], | |
metadata=metadata | |
) | |
return FormattedResponse( | |
answer="", | |
original_answer="", | |
response_type=ResponseType.ERROR, | |
format_config=self.config, | |
validation=validation, | |
processing_metadata=metadata | |
) | |
def validate_hf_compliance(self, answer: str) -> Tuple[bool, List[str]]: | |
""" | |
Quick validation for HF evaluation compliance with improved accuracy. | |
Args: | |
answer: Answer to validate (should be pre-formatted) | |
Returns: | |
Tuple of (is_compliant, issues_list) | |
""" | |
issues = [] | |
# Check forbidden prefixes with case-insensitive matching | |
# Use the same logic as the removal function for consistency | |
forbidden_patterns = [ | |
r'^FINAL\s*ANSWER\s*:?\s*', | |
r'^ANSWER\s*:?\s*', | |
r'^RESULT\s*:?\s*', | |
r'^CONCLUSION\s*:?\s*', | |
r'^SOLUTION\s*:?\s*', | |
r'^RESPONSE\s*:?\s*', | |
] | |
for pattern in forbidden_patterns: | |
if re.match(pattern, answer, re.IGNORECASE): | |
issues.append(f"Contains forbidden prefix pattern: {pattern}") | |
break # Only report first match to avoid duplicates | |
# Check basic requirements | |
if not answer.strip(): | |
issues.append("Empty answer") | |
if len(answer) > 2000: | |
issues.append("Answer too long") | |
# Check for obvious formatting artifacts (be more lenient) | |
# Only flag if these appear at the very start and aren't part of content | |
if re.match(r'^(\*\*|__|```)', answer): | |
# Check if it's actually formatting or just content that starts with these | |
if not re.match(r'^(\*\*|__|```).*(\*\*|__|```).*$', answer): | |
issues.append("Contains markdown formatting artifacts") | |
# Additional check for common false positives | |
# Don't flag answers that are just numbers or simple responses | |
if answer.strip().isdigit() or len(answer.strip()) < 10: | |
# These are likely correct simple answers, remove any false positive issues | |
issues = [issue for issue in issues if "formatting artifacts" not in issue] | |
return len(issues) == 0, issues | |
def format_for_hf_evaluation(answer: str) -> str: | |
""" | |
Quick format function for HF evaluation compliance. | |
Args: | |
answer: Raw answer to format | |
Returns: | |
Formatted answer ready for HF evaluation | |
""" | |
formatter = ResponseFormatter() | |
formatted = formatter.format_response(answer) | |
return formatted.answer | |
def validate_answer_format(answer: str) -> Tuple[bool, List[str], float]: | |
""" | |
Quick validation function for answer format. | |
Args: | |
answer: Answer to validate (raw, unformatted) | |
Returns: | |
Tuple of (is_valid, issues_list, quality_score) | |
""" | |
formatter = ResponseFormatter() | |
# Validate the raw answer first (before formatting) | |
is_compliant, compliance_issues = formatter.validate_hf_compliance(answer) | |
# Then format and get full validation | |
formatted = formatter.format_response(answer) | |
# Combine compliance issues with formatting validation | |
all_issues = compliance_issues + formatted.validation.issues | |
is_valid = is_compliant and formatted.validation.is_valid | |
return ( | |
is_valid, | |
all_issues, | |
formatted.validation.quality_score | |
) | |
class BasicAgentFormatter: | |
"""Specialized formatter for BasicAgent integration.""" | |
def __init__(self): | |
"""Initialize with HF evaluation optimized config.""" | |
self.formatter = ResponseFormatter(FormatConfig( | |
remove_markdown=True, | |
remove_prefixes=True, | |
strip_whitespace=True, | |
normalize_spaces=True, | |
format_standard=FormatStandard.HF_EVALUATION | |
)) | |
def format_agent_response( | |
self, | |
question: str, | |
answer: str, | |
response_type: ResponseType = ResponseType.SIMPLE_ANSWER, | |
metadata: Optional[Dict[str, Any]] = None | |
) -> str: | |
""" | |
Format agent response for HF evaluation. | |
Args: | |
question: Original question (for context) | |
answer: Agent's raw answer | |
response_type: Type of response | |
metadata: Additional metadata | |
Returns: | |
Formatted answer ready for submission | |
""" | |
processing_metadata = metadata or {} | |
processing_metadata.update({ | |
'question': question, | |
'agent_type': 'BasicAgent', | |
'processing_timestamp': None # Could add timestamp if needed | |
}) | |
formatted = self.formatter.format_response( | |
answer, | |
response_type, | |
processing_metadata | |
) | |
return formatted.answer |