gaia-enhanced-agent / utils /response_formatter.py
GAIA Agent Deployment
Deploy Complete Enhanced GAIA Agent with Phase 1-6 Improvements
9a6a4dc
"""
Response Formatter Utility for Hugging Face BasicAgent.
This module centralizes answer format handling and validation to ensure
all responses meet HF evaluation requirements. Extracted from BasicAgent
to provide clean separation of concerns.
Key Features:
- HF evaluation format compliance (no "FINAL ANSWER:" prefix)
- Response quality validation and scoring
- Format consistency checks
- Clean answer processing and sanitization
- Configurable formatting options
- Comprehensive validation functions
Author: Phase 2A Step 4 Implementation
"""
import re
import logging
from typing import Dict, Any, Optional, List, Tuple, Union
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class ResponseType(Enum):
"""Types of responses for different formatting needs."""
SIMPLE_ANSWER = "simple_answer"
CALCULATION = "calculation"
MULTI_STEP = "multi_step"
EXPLANATION = "explanation"
ERROR = "error"
TIMEOUT = "timeout"
class FormatStandard(Enum):
"""Format standards for response validation."""
HF_EVALUATION = "hf_evaluation" # Hugging Face evaluation format
GAIA_STANDARD = "gaia_standard" # GAIA benchmark format
GENERAL = "general" # General purpose format
@dataclass
class FormatConfig:
"""Configuration for response formatting."""
max_length: int = 2000
min_length: int = 1
remove_markdown: bool = True
remove_prefixes: bool = True
strip_whitespace: bool = True
normalize_spaces: bool = True
ensure_period: bool = False
format_standard: FormatStandard = FormatStandard.HF_EVALUATION
@dataclass
class ValidationResult:
"""Result of response validation."""
is_valid: bool
quality_score: float # 0.0 to 1.0
format_score: float # 0.0 to 1.0
issues: List[str]
suggestions: List[str]
metadata: Dict[str, Any]
@dataclass
class FormattedResponse:
"""Container for formatted response with metadata."""
answer: str
original_answer: str
response_type: ResponseType
format_config: FormatConfig
validation: ValidationResult
processing_metadata: Dict[str, Any]
class ResponseFormatter:
"""
Central response formatter for HF evaluation compliance.
Handles all answer formatting, validation, and quality assessment
to ensure responses meet Hugging Face evaluation requirements.
"""
# HF evaluation forbidden prefixes (case variations)
FORBIDDEN_PREFIXES = [
f"{case_variant}{suffix}"
for prefix in ["FINAL ANSWER", "ANSWER", "RESULT", "CONCLUSION"]
for suffix in [":", ""]
for case_variant in [prefix.upper(), prefix.lower(), prefix.title()]
]
# Markdown removal patterns (enhanced for complete cleanup)
MARKDOWN_PATTERNS = [
# Code blocks (various formats) - more comprehensive
(r'```[\s\S]*?```', ''), # Fenced code blocks (multiline)
(r'~~~[\s\S]*?~~~', ''), # Alternative fenced blocks
(r'```[^`\n]*```', ''), # Single-line fenced blocks
(r'```[^`]*?```', ''), # Any fenced blocks
(r'`{3,}[\s\S]*?`{3,}', ''), # Multiple backticks
(r'`([^`\n]+)`', r'\1'), # Inline code (preserve content)
(r'`([^`]*)`', r'\1'), # Any inline code
# Bold/Italic formatting
(r'\*\*(.*?)\*\*', r'\1'), # Bold **text**
(r'\*(.*?)\*', r'\1'), # Italic *text*
(r'__(.*?)__', r'\1'), # Bold __text__
(r'_(.*?)_', r'\1'), # Italic _text_
# Headers
(r'#{1,6}\s*(.+)', r'\1'), # Headers with content
(r'#{1,6}\s*', ''), # Empty headers
# Links and references
(r'\[([^\]]+)\]\([^)]+\)', r'\1'), # Links [text](url)
(r'\[([^\]]+)\]\[[^\]]*\]', r'\1'), # Reference links
# Lists and other formatting
(r'^\s*[-*+]\s+', '', re.MULTILINE), # Unordered lists
(r'^\s*\d+\.\s+', '', re.MULTILINE), # Ordered lists
(r'^\s*>\s+', '', re.MULTILINE), # Blockquotes
(r'^\s*\|.*\|\s*$', '', re.MULTILINE), # Table rows
(r'^\s*[-:|\s]+\s*$', '', re.MULTILINE), # Table separators
]
# Quality assessment patterns
QUALITY_INDICATORS = {
'numbers': r'\b\d+(?:\.\d+)?\b',
'units': r'\b(?:meters?|feet|inches?|cm|mm|kg|lbs?|celsius|fahrenheit|°[CF])\b',
'calculations': r'[+\-*/=]|\bequals?\b|\bresult\b',
'explanations': r'\b(?:because|since|therefore|however|furthermore)\b',
'structure': r'(?:first|second|third|finally|in conclusion)',
}
def __init__(self, config: Optional[FormatConfig] = None):
"""Initialize the response formatter."""
self.config = config or FormatConfig()
logger.debug(f"ResponseFormatter initialized with {self.config.format_standard.value} standard")
def format_response(
self,
answer: str,
response_type: ResponseType = ResponseType.SIMPLE_ANSWER,
metadata: Optional[Dict[str, Any]] = None
) -> FormattedResponse:
"""Format response according to HF evaluation requirements."""
if not answer:
return self._create_empty_response(metadata or {})
original_answer = answer
processing_metadata = metadata or {}
# Stage 1: Basic cleanup
formatted_answer = self._basic_cleanup(answer)
# Stage 2: Handle markdown (if configured)
if self.config.remove_markdown:
formatted_answer = self._remove_markdown(formatted_answer)
# Stage 3: Remove forbidden prefixes (after markdown removal)
formatted_answer = self._remove_forbidden_prefixes(formatted_answer)
# Stage 4: Response type specific formatting
formatted_answer = self._type_specific_formatting(formatted_answer, response_type)
# Stage 5: Final cleanup and validation
formatted_answer = self._final_cleanup(formatted_answer)
# Stage 6: Validate formatted response
validation = self._validate_response(formatted_answer, response_type)
return FormattedResponse(
answer=formatted_answer,
original_answer=original_answer,
response_type=response_type,
format_config=self.config,
validation=validation,
processing_metadata=processing_metadata
)
def _basic_cleanup(self, answer: str) -> str:
"""Perform basic cleanup operations."""
if not answer:
return ""
# Strip whitespace
if self.config.strip_whitespace:
answer = answer.strip()
# Normalize spaces
if self.config.normalize_spaces:
answer = re.sub(r'\s+', ' ', answer)
return answer
def _remove_forbidden_prefixes(self, answer: str) -> str:
"""Remove HF evaluation forbidden prefixes with case-insensitive matching."""
if not self.config.remove_prefixes:
return answer
# Define forbidden prefixes with all case variations
forbidden_prefixes = []
base_prefixes = ["FINAL ANSWER", "ANSWER", "RESULT", "CONCLUSION"]
for prefix in base_prefixes:
for suffix in [":", ""]:
# Add all case variations
forbidden_prefixes.extend([
f"{prefix.upper()}{suffix}",
f"{prefix.lower()}{suffix}",
f"{prefix.title()}{suffix}",
f"{prefix.capitalize()}{suffix}"
])
# Case-insensitive prefix removal
answer_lower = answer.lower()
for prefix in forbidden_prefixes:
prefix_lower = prefix.lower()
if answer_lower.startswith(prefix_lower):
answer = answer[len(prefix):].strip()
logger.debug(f"Removed forbidden prefix: {prefix}")
break
# Also check for common remaining patterns with case-insensitive regex
additional_patterns = [
r'^Answer:\s*',
r'^Result:\s*',
r'^Solution:\s*',
r'^Response:\s*',
r'^Final\s*Answer:\s*',
r'^Conclusion:\s*',
]
for pattern in additional_patterns:
if re.match(pattern, answer, re.IGNORECASE):
answer = re.sub(pattern, '', answer, flags=re.IGNORECASE).strip()
logger.debug(f"Removed pattern: {pattern}")
break
return answer
def _remove_markdown(self, answer: str) -> str:
"""Remove markdown formatting elements."""
for pattern_info in self.MARKDOWN_PATTERNS:
if len(pattern_info) == 3:
pattern, replacement, flags = pattern_info
answer = re.sub(pattern, replacement, answer, flags=flags)
else:
pattern, replacement = pattern_info
answer = re.sub(pattern, replacement, answer)
# Clean up multiple whitespace and empty lines
answer = re.sub(r'\n\s*\n', '\n', answer) # Remove empty lines
answer = re.sub(r'\s+', ' ', answer) # Normalize spaces
return answer.strip()
def _type_specific_formatting(self, answer: str, response_type: ResponseType) -> str:
"""Apply formatting specific to response type."""
if response_type == ResponseType.CALCULATION:
return self._format_calculation(answer)
elif response_type == ResponseType.MULTI_STEP:
return self._format_multi_step(answer)
elif response_type == ResponseType.EXPLANATION:
return self._format_explanation(answer)
elif response_type == ResponseType.ERROR:
return self._format_error(answer)
elif response_type == ResponseType.TIMEOUT:
return self._format_timeout(answer)
else:
return self._format_simple_answer(answer)
def _format_calculation(self, answer: str) -> str:
"""Format calculation responses."""
# For mathematical answers, ensure clean presentation
# Extract final numerical answer if present
number_match = re.search(r'\b(\d+(?:\.\d+)?)\s*(?:degrees?|°|[CF]|meters?|feet|%)?$', answer)
if number_match and len(answer.split()) > 3:
# If there's a clear final number and the answer is verbose,
# consider extracting just the number for simple calculations
pass
return answer
def _format_multi_step(self, answer: str) -> str:
"""Format multi-step explanations."""
# Ensure logical flow for multi-step answers
return answer
def _format_explanation(self, answer: str) -> str:
"""Format explanation responses."""
# Ensure explanations are clear and concise
return answer
def _format_error(self, answer: str) -> str:
"""Format error responses."""
# Ensure error messages are user-friendly
if not answer.startswith("I apologize") and not answer.startswith("I'm sorry"):
answer = f"I apologize, but {answer.lower()}"
return answer
def _format_timeout(self, answer: str) -> str:
"""Format timeout responses."""
# Ensure timeout messages are clear
return answer
def _format_simple_answer(self, answer: str) -> str:
"""Format simple answer responses."""
# For simple answers, keep concise and direct
return answer
def _final_cleanup(self, answer: str) -> str:
"""Perform final cleanup operations."""
# Length constraints
if len(answer) > self.config.max_length:
answer = answer[:self.config.max_length-3] + "..."
logger.warning(f"Answer truncated to {self.config.max_length} characters")
# Ensure minimum length
if len(answer.strip()) < self.config.min_length:
logger.warning("Answer below minimum length")
# Ensure period if configured
if self.config.ensure_period and answer and not answer.endswith(('.', '!', '?')):
answer += '.'
return answer.strip()
def _validate_response(self, answer: str, response_type: ResponseType) -> ValidationResult:
"""
Validate formatted response for quality and format compliance.
Args:
answer: Formatted answer to validate
response_type: Type of response being validated
Returns:
ValidationResult with scores and feedback
"""
issues = []
suggestions = []
# Format validation
format_score = self._calculate_format_score(answer, issues, suggestions)
# Quality validation
quality_score = self._calculate_quality_score(answer, response_type, issues, suggestions)
# Overall validation
is_valid = (
format_score >= 0.7 and
quality_score >= 0.5 and
len(answer.strip()) >= self.config.min_length
)
metadata = {
'answer_length': len(answer),
'word_count': len(answer.split()),
'has_numbers': bool(re.search(r'\d', answer)),
'response_type': response_type.value,
'format_standard': self.config.format_standard.value
}
return ValidationResult(
is_valid=is_valid,
quality_score=quality_score,
format_score=format_score,
issues=issues,
suggestions=suggestions,
metadata=metadata
)
def _calculate_format_score(self, answer: str, issues: List[str], suggestions: List[str]) -> float:
"""Calculate format compliance score."""
score = 1.0
# Check for forbidden prefixes (should be rare after formatting)
# Only penalize if prefixes somehow remain after formatting
for prefix in self.FORBIDDEN_PREFIXES:
if answer.startswith(prefix):
score -= 0.3 # Reduced penalty since this indicates formatting failure
issues.append(f"Formatting failed to remove prefix: {prefix}")
suggestions.append(f"Check prefix removal logic")
# Check length constraints
if len(answer) > self.config.max_length:
score -= 0.2
issues.append("Answer exceeds maximum length")
suggestions.append("Shorten the response")
if len(answer.strip()) < self.config.min_length:
score -= 0.3
issues.append("Answer below minimum length")
suggestions.append("Provide a more detailed response")
# Check for markdown artifacts (if removal is enabled)
if self.config.remove_markdown:
markdown_artifacts = ['**', '__', '```', '##', '###']
for artifact in markdown_artifacts:
if artifact in answer:
score -= 0.1
issues.append(f"Contains markdown artifact: {artifact}")
return max(0.0, score)
def _calculate_quality_score(
self,
answer: str,
response_type: ResponseType,
issues: List[str],
suggestions: List[str]
) -> float:
"""Calculate response quality score."""
score = 0.5 # Base score
# Check for quality indicators
for indicator, pattern in self.QUALITY_INDICATORS.items():
if re.search(pattern, answer, re.IGNORECASE):
score += 0.1
# Response type specific scoring
if response_type == ResponseType.CALCULATION:
score += self._score_calculation_quality(answer, issues, suggestions)
elif response_type == ResponseType.EXPLANATION:
score += self._score_explanation_quality(answer, issues, suggestions)
# General quality checks
if len(answer.split()) > 1:
score += 0.1 # Multi-word answers generally better
if not answer.lower().startswith(('i don\'t know', 'i\'m not sure')):
score += 0.1 # Confident answers
return min(1.0, score)
def _score_calculation_quality(self, answer: str, issues: List[str], suggestions: List[str]) -> float:
"""Score calculation-specific quality."""
score = 0.0
# Check for numerical result
if re.search(r'\b\d+(?:\.\d+)?\b', answer):
score += 0.2
# Check for units when appropriate
if re.search(r'\b(?:degrees?|°|meters?|feet|%)\b', answer):
score += 0.1
# Check for calculation steps
if re.search(r'[+\-*/=]', answer):
score += 0.1
return score
def _score_explanation_quality(self, answer: str, issues: List[str], suggestions: List[str]) -> float:
"""Score explanation-specific quality."""
score = 0.0
# Check for logical connectors
connectors = ['because', 'since', 'therefore', 'however', 'furthermore']
for connector in connectors:
if connector in answer.lower():
score += 0.05
# Check for structure
if len(answer.split('.')) > 1:
score += 0.1 # Multi-sentence explanations
return score
def _create_empty_response(self, metadata: Dict[str, Any]) -> FormattedResponse:
"""Create response for empty input."""
validation = ValidationResult(
is_valid=False,
quality_score=0.0,
format_score=0.0,
issues=["Empty or null input"],
suggestions=["Provide a valid answer"],
metadata=metadata
)
return FormattedResponse(
answer="",
original_answer="",
response_type=ResponseType.ERROR,
format_config=self.config,
validation=validation,
processing_metadata=metadata
)
def validate_hf_compliance(self, answer: str) -> Tuple[bool, List[str]]:
"""
Quick validation for HF evaluation compliance with improved accuracy.
Args:
answer: Answer to validate (should be pre-formatted)
Returns:
Tuple of (is_compliant, issues_list)
"""
issues = []
# Check forbidden prefixes with case-insensitive matching
# Use the same logic as the removal function for consistency
forbidden_patterns = [
r'^FINAL\s*ANSWER\s*:?\s*',
r'^ANSWER\s*:?\s*',
r'^RESULT\s*:?\s*',
r'^CONCLUSION\s*:?\s*',
r'^SOLUTION\s*:?\s*',
r'^RESPONSE\s*:?\s*',
]
for pattern in forbidden_patterns:
if re.match(pattern, answer, re.IGNORECASE):
issues.append(f"Contains forbidden prefix pattern: {pattern}")
break # Only report first match to avoid duplicates
# Check basic requirements
if not answer.strip():
issues.append("Empty answer")
if len(answer) > 2000:
issues.append("Answer too long")
# Check for obvious formatting artifacts (be more lenient)
# Only flag if these appear at the very start and aren't part of content
if re.match(r'^(\*\*|__|```)', answer):
# Check if it's actually formatting or just content that starts with these
if not re.match(r'^(\*\*|__|```).*(\*\*|__|```).*$', answer):
issues.append("Contains markdown formatting artifacts")
# Additional check for common false positives
# Don't flag answers that are just numbers or simple responses
if answer.strip().isdigit() or len(answer.strip()) < 10:
# These are likely correct simple answers, remove any false positive issues
issues = [issue for issue in issues if "formatting artifacts" not in issue]
return len(issues) == 0, issues
def format_for_hf_evaluation(answer: str) -> str:
"""
Quick format function for HF evaluation compliance.
Args:
answer: Raw answer to format
Returns:
Formatted answer ready for HF evaluation
"""
formatter = ResponseFormatter()
formatted = formatter.format_response(answer)
return formatted.answer
def validate_answer_format(answer: str) -> Tuple[bool, List[str], float]:
"""
Quick validation function for answer format.
Args:
answer: Answer to validate (raw, unformatted)
Returns:
Tuple of (is_valid, issues_list, quality_score)
"""
formatter = ResponseFormatter()
# Validate the raw answer first (before formatting)
is_compliant, compliance_issues = formatter.validate_hf_compliance(answer)
# Then format and get full validation
formatted = formatter.format_response(answer)
# Combine compliance issues with formatting validation
all_issues = compliance_issues + formatted.validation.issues
is_valid = is_compliant and formatted.validation.is_valid
return (
is_valid,
all_issues,
formatted.validation.quality_score
)
class BasicAgentFormatter:
"""Specialized formatter for BasicAgent integration."""
def __init__(self):
"""Initialize with HF evaluation optimized config."""
self.formatter = ResponseFormatter(FormatConfig(
remove_markdown=True,
remove_prefixes=True,
strip_whitespace=True,
normalize_spaces=True,
format_standard=FormatStandard.HF_EVALUATION
))
def format_agent_response(
self,
question: str,
answer: str,
response_type: ResponseType = ResponseType.SIMPLE_ANSWER,
metadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Format agent response for HF evaluation.
Args:
question: Original question (for context)
answer: Agent's raw answer
response_type: Type of response
metadata: Additional metadata
Returns:
Formatted answer ready for submission
"""
processing_metadata = metadata or {}
processing_metadata.update({
'question': question,
'agent_type': 'BasicAgent',
'processing_timestamp': None # Could add timestamp if needed
})
formatted = self.formatter.format_response(
answer,
response_type,
processing_metadata
)
return formatted.answer