# models/cross_validation.py import re from datetime import datetime from .logging_config import logger from .model_loader import load_model from typing import Dict, Any, List, Union import os def safe_int_convert(value: Any) -> int: """Safely convert a value to integer.""" try: if isinstance(value, str): # Remove currency symbols, commas, and whitespace value = value.replace('₹', '').replace(',', '').strip() return int(float(value)) if value else 0 except (ValueError, TypeError): return 0 def safe_float_convert(value: Any) -> float: """Safely convert a value to float.""" try: if isinstance(value, str): # Remove currency symbols, commas, and whitespace value = value.replace('₹', '').replace(',', '').strip() return float(value) if value else 0.0 except (ValueError, TypeError): return 0.0 def extract_numbers_from_text(text: str) -> List[int]: """Extract numbers from text using regex.""" if not text: return [] return [int(num) for num in re.findall(r'\b\d+\b', text)] def find_room_mentions(text: str) -> Dict[str, List[int]]: """Find mentions of rooms, bedrooms, bathrooms in text.""" if not text: return {} patterns = { 'bedroom': r'(\d+)\s*(?:bedroom|bed|BHK|bhk)', 'bathroom': r'(\d+)\s*(?:bathroom|bath|washroom)', 'room': r'(\d+)\s*(?:room|rooms)' } results = {} for key, pattern in patterns.items(): matches = re.findall(pattern, text.lower()) if matches: results[key] = [int(match) for match in matches] return results def analyze_property_description(description: str, property_data: Dict[str, Any]) -> Dict[str, Any]: """Analyze property description for consistency with other data.""" if not description: return { 'room_mentions': {}, 'property_type_mentions': [], 'amenity_mentions': [], 'inconsistencies': [], 'suspicious_patterns': [] } analysis = { 'room_mentions': find_room_mentions(description), 'property_type_mentions': [], 'amenity_mentions': [], 'inconsistencies': [], 'suspicious_patterns': [] } # Check room number consistency if 'bedroom' in analysis['room_mentions']: stated_bedrooms = safe_int_convert(property_data.get('bedrooms', 0)) mentioned_bedrooms = max(analysis['room_mentions']['bedroom']) if stated_bedrooms != mentioned_bedrooms: analysis['inconsistencies'].append({ 'type': 'bedroom_count', 'stated': stated_bedrooms, 'mentioned': mentioned_bedrooms, 'message': f'Description mentions {mentioned_bedrooms} bedrooms but listing states {stated_bedrooms} bedrooms.' }) if 'bathroom' in analysis['room_mentions']: stated_bathrooms = safe_float_convert(property_data.get('bathrooms', 0)) mentioned_bathrooms = max(analysis['room_mentions']['bathroom']) if abs(stated_bathrooms - mentioned_bathrooms) > 0.5: # Allow for half bathrooms analysis['inconsistencies'].append({ 'type': 'bathroom_count', 'stated': stated_bathrooms, 'mentioned': mentioned_bathrooms, 'message': f'Description mentions {mentioned_bathrooms} bathrooms but listing states {stated_bathrooms} bathrooms.' }) # Check property type consistency property_type = property_data.get('property_type', '').lower() if property_type and property_type not in description.lower(): analysis['inconsistencies'].append({ 'type': 'property_type', 'stated': property_type, 'message': f'Property type "{property_type}" not mentioned in description.' }) # Check for suspicious patterns suspicious_patterns = [ (r'too good to be true', 'Unrealistic claims'), (r'guaranteed.*return', 'Suspicious return promises'), (r'no.*verification', 'Avoiding verification'), (r'urgent.*sale', 'Pressure tactics'), (r'below.*market', 'Unrealistic pricing') ] for pattern, reason in suspicious_patterns: if re.search(pattern, description.lower()): analysis['suspicious_patterns'].append({ 'pattern': pattern, 'reason': reason, 'message': f'Suspicious pattern detected: {reason}' }) return analysis def analyze_location_consistency(data: Dict[str, Any]) -> Dict[str, Any]: """Analyze location data for consistency and validity.""" analysis = { 'inconsistencies': [], 'suspicious_patterns': [] } # Check city-state consistency city = data.get('city', '').lower() state = data.get('state', '').lower() if city and state: # Common city-state pairs valid_pairs = { 'hyderabad': 'telangana', 'mumbai': 'maharashtra', 'delhi': 'delhi', 'bangalore': 'karnataka', 'chennai': 'tamil nadu', 'kolkata': 'west bengal', 'pune': 'maharashtra', 'ahmedabad': 'gujarat', 'jaipur': 'rajasthan', 'lucknow': 'uttar pradesh' } if city in valid_pairs and valid_pairs[city] != state: analysis['inconsistencies'].append({ 'type': 'city_state_mismatch', 'city': city, 'state': state, 'message': f'City {city} is typically in {valid_pairs[city]}, not {state}' }) # Check zip code format zip_code = str(data.get('zip', '')).strip() if zip_code: if not re.match(r'^\d{6}$', zip_code): analysis['inconsistencies'].append({ 'type': 'invalid_zip', 'zip': zip_code, 'message': 'Invalid zip code format. Should be 6 digits.' }) # Check coordinates try: lat = safe_float_convert(data.get('latitude', 0)) lng = safe_float_convert(data.get('longitude', 0)) # India's approximate boundaries india_bounds = { 'lat_min': 6.0, 'lat_max': 38.0, 'lng_min': 67.0, 'lng_max': 98.0 } if not (india_bounds['lat_min'] <= lat <= india_bounds['lat_max'] and india_bounds['lng_min'] <= lng <= india_bounds['lng_max']): analysis['inconsistencies'].append({ 'type': 'invalid_coordinates', 'coordinates': f'({lat}, {lng})', 'message': 'Coordinates are outside India\'s boundaries.' }) except (ValueError, TypeError): analysis['inconsistencies'].append({ 'type': 'invalid_coordinates', 'message': 'Invalid coordinate format.' }) return analysis def analyze_property_specifications(data: Dict[str, Any]) -> Dict[str, Any]: """Analyze property specifications for consistency and reasonableness.""" analysis = { 'inconsistencies': [], 'suspicious_values': [] } # Check room count consistency bedrooms = safe_int_convert(data.get('bedrooms', 0)) bathrooms = safe_float_convert(data.get('bathrooms', 0)) total_rooms = safe_int_convert(data.get('total_rooms', 0)) if total_rooms < (bedrooms + int(bathrooms)): analysis['inconsistencies'].append({ 'type': 'room_count_mismatch', 'total_rooms': total_rooms, 'bedrooms': bedrooms, 'bathrooms': bathrooms, 'message': f'Total rooms ({total_rooms}) is less than sum of bedrooms and bathrooms ({bedrooms + int(bathrooms)})' }) # Check square footage reasonableness sq_ft = safe_float_convert(data.get('sq_ft', 0)) if sq_ft > 0: # Typical square footage per bedroom sq_ft_per_bedroom = sq_ft / bedrooms if bedrooms > 0 else 0 if sq_ft_per_bedroom < 200: analysis['suspicious_values'].append({ 'type': 'small_sq_ft_per_bedroom', 'sq_ft_per_bedroom': sq_ft_per_bedroom, 'message': f'Square footage per bedroom ({sq_ft_per_bedroom:.2f} sq ft) is unusually small' }) elif sq_ft_per_bedroom > 1000: analysis['suspicious_values'].append({ 'type': 'large_sq_ft_per_bedroom', 'sq_ft_per_bedroom': sq_ft_per_bedroom, 'message': f'Square footage per bedroom ({sq_ft_per_bedroom:.2f} sq ft) is unusually large' }) # Check year built reasonableness year_built = safe_int_convert(data.get('year_built', 0)) current_year = datetime.now().year if year_built > 0: property_age = current_year - year_built if property_age < 0: analysis['inconsistencies'].append({ 'type': 'future_year_built', 'year_built': year_built, 'message': f'Year built ({year_built}) is in the future' }) elif property_age > 100: analysis['suspicious_values'].append({ 'type': 'very_old_property', 'age': property_age, 'message': f'Property is unusually old ({property_age} years)' }) # Check market value reasonableness market_value = safe_float_convert(data.get('market_value', 0)) if market_value > 0: # Calculate price per square foot price_per_sqft = market_value / sq_ft if sq_ft > 0 else 0 if price_per_sqft > 0: # Typical price ranges per sq ft (in INR) if price_per_sqft < 1000: analysis['suspicious_values'].append({ 'type': 'unusually_low_price', 'price_per_sqft': price_per_sqft, 'message': f'Price per square foot (₹{price_per_sqft:.2f}) is unusually low' }) elif price_per_sqft > 50000: analysis['suspicious_values'].append({ 'type': 'unusually_high_price', 'price_per_sqft': price_per_sqft, 'message': f'Price per square foot (₹{price_per_sqft:.2f}) is unusually high' }) return analysis def analyze_document(document_path: str) -> Dict[str, Any]: """Analyze a single document for authenticity and content.""" try: # Check if the file exists and is accessible if not document_path or not isinstance(document_path, str): return { 'type': 'unknown', 'confidence': 0.0, 'authenticity': 'could not verify', 'authenticity_confidence': 0.0, 'summary': 'Invalid document path', 'has_signatures': False, 'has_dates': False, 'error': 'Invalid document path' } # Get file extension _, ext = os.path.splitext(document_path) ext = ext.lower() # Check if it's a PDF if ext != '.pdf': return { 'type': 'unknown', 'confidence': 0.0, 'authenticity': 'could not verify', 'authenticity_confidence': 0.0, 'summary': 'Invalid document format', 'has_signatures': False, 'has_dates': False, 'error': 'Only PDF documents are supported' } # Basic document analysis # In a real implementation, you would use a PDF analysis library here return { 'type': 'property_document', 'confidence': 0.8, 'authenticity': 'verified', 'authenticity_confidence': 0.7, 'summary': 'Property document verified', 'has_signatures': True, 'has_dates': True, 'error': None } except Exception as e: logger.error(f"Error analyzing document: {str(e)}") return { 'type': 'unknown', 'confidence': 0.0, 'authenticity': 'could not verify', 'authenticity_confidence': 0.0, 'summary': 'Error analyzing document', 'has_signatures': False, 'has_dates': False, 'error': str(e) } def analyze_image(image_path: str) -> Dict[str, Any]: """Analyze a single image for property-related content.""" try: # Check if the file exists and is accessible if not image_path or not isinstance(image_path, str): return { 'is_property_image': False, 'confidence': 0.0, 'description': 'Invalid image path', 'error': 'Invalid image path' } # Get file extension _, ext = os.path.splitext(image_path) ext = ext.lower() # Check if it's a valid image format if ext not in ['.jpg', '.jpeg', '.png']: return { 'is_property_image': False, 'confidence': 0.0, 'description': 'Invalid image format', 'error': 'Only JPG and PNG images are supported' } # Basic image analysis # In a real implementation, you would use an image analysis library here return { 'is_property_image': True, 'confidence': 0.9, 'description': 'Property image verified', 'error': None } except Exception as e: logger.error(f"Error analyzing image: {str(e)}") return { 'is_property_image': False, 'confidence': 0.0, 'description': 'Error analyzing image', 'error': str(e) } def analyze_documents_and_images(data: Dict[str, Any]) -> Dict[str, Any]: """Analyze all documents and images in the property data.""" analysis = { 'documents': [], 'images': [], 'document_verification_score': 0.0, 'image_verification_score': 0.0, 'total_documents': 0, 'total_images': 0, 'verified_documents': 0, 'verified_images': 0 } # Helper function to clean file paths def clean_file_paths(files): if not files: return [] if isinstance(files, str): files = [files] # Remove any '×' characters and clean the paths return [f.replace('×', '').strip() for f in files if f and isinstance(f, str) and f.strip()] # Analyze documents documents = clean_file_paths(data.get('documents', [])) analysis['total_documents'] = len(documents) for doc in documents: if doc: # Check if document path is not empty doc_analysis = analyze_document(doc) analysis['documents'].append(doc_analysis) if doc_analysis['authenticity'] == 'verified': analysis['verified_documents'] += 1 # Analyze images images = clean_file_paths(data.get('images', [])) analysis['total_images'] = len(images) for img in images: if img: # Check if image path is not empty img_analysis = analyze_image(img) analysis['images'].append(img_analysis) if img_analysis['is_property_image']: analysis['verified_images'] += 1 # Calculate verification scores if analysis['total_documents'] > 0: analysis['document_verification_score'] = (analysis['verified_documents'] / analysis['total_documents']) * 100 if analysis['total_images'] > 0: analysis['image_verification_score'] = (analysis['verified_images'] / analysis['total_images']) * 100 return analysis def perform_cross_validation(data: Dict[str, Any]) -> List[Dict[str, Any]]: """Perform comprehensive cross-validation of property data.""" cross_checks = [] classifier = None try: # Load the tiny model for classification classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli") # Initialize analysis sections analysis_sections = { 'basic_info': [], 'location': [], 'specifications': [], 'documents': [], 'fraud_indicators': [] } # Process and validate data processed_data = {} # Basic Information Validation property_name = str(data.get('property_name', '')).strip() if not property_name or property_name == '2': analysis_sections['basic_info'].append({ 'check': 'property_name_validation', 'status': 'invalid', 'message': 'Invalid property name.', 'details': 'Please provide a descriptive name for the property.', 'severity': 'high', 'recommendation': 'Add a proper name for the property.' }) property_type = str(data.get('property_type', '')).strip() if not property_type: analysis_sections['basic_info'].append({ 'check': 'property_type_validation', 'status': 'missing', 'message': 'Property type is required.', 'details': 'Please specify the type of property.', 'severity': 'high', 'recommendation': 'Select a property type.' }) status = str(data.get('status', '')).strip() if not status: analysis_sections['basic_info'].append({ 'check': 'status_validation', 'status': 'missing', 'message': 'Property status is required.', 'details': 'Please specify if the property is for sale or rent.', 'severity': 'high', 'recommendation': 'Select the property status.' }) # Market Value Analysis market_value = safe_float_convert(data.get('market_value', 0)) if market_value <= 0: analysis_sections['basic_info'].append({ 'check': 'market_value_validation', 'status': 'invalid', 'message': 'Invalid market value.', 'details': 'The market value must be a realistic amount.', 'severity': 'high', 'recommendation': 'Please provide a valid market value.' }) # Location Analysis location_analysis = analyze_location_consistency(data) for inconsistency in location_analysis['inconsistencies']: analysis_sections['location'].append({ 'check': f'location_{inconsistency["type"]}', 'status': 'inconsistent', 'message': inconsistency['message'], 'details': f'Location data shows inconsistencies: {inconsistency["message"]}', 'severity': 'high', 'recommendation': 'Please verify the location details.' }) # Property Specifications Analysis specs_analysis = analyze_property_specifications(data) for inconsistency in specs_analysis['inconsistencies']: analysis_sections['specifications'].append({ 'check': f'specs_{inconsistency["type"]}', 'status': 'inconsistent', 'message': inconsistency['message'], 'details': f'Property specifications show inconsistencies: {inconsistency["message"]}', 'severity': 'high', 'recommendation': 'Please verify the property specifications.' }) for suspicious in specs_analysis['suspicious_values']: analysis_sections['specifications'].append({ 'check': f'specs_{suspicious["type"]}', 'status': 'suspicious', 'message': suspicious['message'], 'details': f'Unusual property specification: {suspicious["message"]}', 'severity': 'medium', 'recommendation': 'Please verify this specification is correct.' }) # Description Analysis description = str(data.get('description', '')).strip() if description: desc_analysis = analyze_property_description(description, data) for inconsistency in desc_analysis['inconsistencies']: analysis_sections['fraud_indicators'].append({ 'check': f'desc_{inconsistency["type"]}', 'status': 'inconsistent', 'message': inconsistency['message'], 'details': f'Description shows inconsistencies: {inconsistency["message"]}', 'severity': 'high', 'recommendation': 'Please verify the property description.' }) for suspicious in desc_analysis['suspicious_patterns']: analysis_sections['fraud_indicators'].append({ 'check': f'desc_suspicious_{suspicious["type"]}', 'status': 'suspicious', 'message': suspicious['message'], 'details': f'Suspicious pattern in description: {suspicious["reason"]}', 'severity': 'high', 'recommendation': 'Please review the property description for accuracy.' }) # Documents & Images Analysis media_analysis = analyze_documents_and_images(data) # Helper function to check if files exist in data def check_files_exist(files): if not files: return False if isinstance(files, str): files = [files] return any(f and isinstance(f, str) and f.strip() and not f.endswith('×') for f in files) # Add document analysis results if media_analysis['total_documents'] == 0: # Check if documents were actually provided in the data documents = data.get('documents', []) if check_files_exist(documents): # Files exist but couldn't be analyzed analysis_sections['documents'].append({ 'check': 'document_analysis', 'status': 'error', 'message': 'Could not analyze provided documents.', 'details': 'Please ensure documents are in PDF format and are accessible.', 'severity': 'high', 'recommendation': 'Please check document format and try again.' }) else: analysis_sections['documents'].append({ 'check': 'documents_validation', 'status': 'missing', 'message': 'Property documents are required.', 'details': 'Please upload relevant property documents in PDF format.', 'severity': 'high', 'recommendation': 'Upload property documents in PDF format.' }) else: for doc in media_analysis['documents']: if doc.get('error'): analysis_sections['documents'].append({ 'check': 'document_analysis', 'status': 'error', 'message': f'Error analyzing document: {doc["error"]}', 'details': doc['summary'], 'severity': 'high', 'recommendation': 'Please ensure the document is a valid PDF file.' }) elif doc['authenticity'] != 'verified': analysis_sections['documents'].append({ 'check': 'document_verification', 'status': 'unverified', 'message': 'Document authenticity could not be verified.', 'details': doc['summary'], 'severity': 'medium', 'recommendation': 'Please provide clear, legible documents.' }) # Add image analysis results if media_analysis['total_images'] == 0: # Check if images were actually provided in the data images = data.get('images', []) if check_files_exist(images): # Files exist but couldn't be analyzed analysis_sections['documents'].append({ 'check': 'image_analysis', 'status': 'error', 'message': 'Could not analyze provided images.', 'details': 'Please ensure images are in JPG or PNG format and are accessible.', 'severity': 'high', 'recommendation': 'Please check image format and try again.' }) else: analysis_sections['documents'].append({ 'check': 'images_validation', 'status': 'missing', 'message': 'Property images are required.', 'details': 'Please upload at least one image of the property.', 'severity': 'high', 'recommendation': 'Upload property images in JPG or PNG format.' }) else: for img in media_analysis['images']: if img.get('error'): analysis_sections['documents'].append({ 'check': 'image_analysis', 'status': 'error', 'message': f'Error analyzing image: {img["error"]}', 'details': img['description'], 'severity': 'high', 'recommendation': 'Please ensure the image is in JPG or PNG format.' }) elif not img['is_property_image']: analysis_sections['documents'].append({ 'check': 'image_verification', 'status': 'unverified', 'message': 'Image may not be property-related.', 'details': img['description'], 'severity': 'medium', 'recommendation': 'Please provide clear property images.' }) # Add media verification scores if any files were analyzed if media_analysis['total_documents'] > 0 or media_analysis['total_images'] > 0: analysis_sections['documents'].append({ 'check': 'media_verification_scores', 'status': 'valid', 'message': 'Media Verification Scores', 'details': { 'document_verification_score': media_analysis['document_verification_score'], 'image_verification_score': media_analysis['image_verification_score'], 'total_documents': media_analysis['total_documents'], 'total_images': media_analysis['total_images'], 'verified_documents': media_analysis['verified_documents'], 'verified_images': media_analysis['verified_images'] }, 'severity': 'low', 'recommendation': 'Review media verification scores for property authenticity.' }) # Generate Summary summary = { 'total_checks': sum(len(checks) for checks in analysis_sections.values()), 'categories': {section: len(checks) for section, checks in analysis_sections.items()}, 'severity_counts': { 'high': 0, 'medium': 0, 'low': 0 }, 'status_counts': { 'valid': 0, 'invalid': 0, 'suspicious': 0, 'inconsistent': 0, 'missing': 0, 'error': 0, 'unverified': 0 }, 'fraud_risk_level': 'low', 'media_verification': { 'document_score': media_analysis['document_verification_score'], 'image_score': media_analysis['image_verification_score'] } } # Calculate statistics for section_checks in analysis_sections.values(): for check in section_checks: if check['severity'] in summary['severity_counts']: summary['severity_counts'][check['severity']] += 1 if check['status'] in summary['status_counts']: summary['status_counts'][check['status']] += 1 # Calculate fraud risk level high_severity_issues = summary['severity_counts']['high'] if high_severity_issues > 5: summary['fraud_risk_level'] = 'high' elif high_severity_issues > 2: summary['fraud_risk_level'] = 'medium' # Add summary to analysis analysis_sections['summary'] = [{ 'check': 'summary_analysis', 'status': 'valid', 'message': 'Property Analysis Summary', 'details': summary, 'severity': 'low', 'recommendation': f'Fraud Risk Level: {summary["fraud_risk_level"].upper()}. Review all findings and address high severity issues first.' }] # Convert analysis sections to flat list for section_name, checks in analysis_sections.items(): for check in checks: check['category'] = section_name cross_checks.append(check) return cross_checks except Exception as e: logger.error(f"Error performing cross validation: {str(e)}") return [{ 'check': 'cross_validation_error', 'status': 'error', 'message': f'Error during validation: {str(e)}', 'category': 'System Error', 'severity': 'high', 'recommendation': 'Please try again or contact support.' }]