property_verification_bot / models /cross_validation.py
sksameermujahid's picture
Upload 45 files
14cb7ae verified
# models/cross_validation.py
import re
from datetime import datetime
from .logging_config import logger
from .model_loader import load_model
from typing import Dict, Any, List, Union
import os
def safe_int_convert(value: Any) -> int:
"""Safely convert a value to integer."""
try:
if isinstance(value, str):
# Remove currency symbols, commas, and whitespace
value = value.replace('₹', '').replace(',', '').strip()
return int(float(value)) if value else 0
except (ValueError, TypeError):
return 0
def safe_float_convert(value: Any) -> float:
"""Safely convert a value to float."""
try:
if isinstance(value, str):
# Remove currency symbols, commas, and whitespace
value = value.replace('₹', '').replace(',', '').strip()
return float(value) if value else 0.0
except (ValueError, TypeError):
return 0.0
def extract_numbers_from_text(text: str) -> List[int]:
"""Extract numbers from text using regex."""
if not text:
return []
return [int(num) for num in re.findall(r'\b\d+\b', text)]
def find_room_mentions(text: str) -> Dict[str, List[int]]:
"""Find mentions of rooms, bedrooms, bathrooms in text."""
if not text:
return {}
patterns = {
'bedroom': r'(\d+)\s*(?:bedroom|bed|BHK|bhk)',
'bathroom': r'(\d+)\s*(?:bathroom|bath|washroom)',
'room': r'(\d+)\s*(?:room|rooms)'
}
results = {}
for key, pattern in patterns.items():
matches = re.findall(pattern, text.lower())
if matches:
results[key] = [int(match) for match in matches]
return results
def analyze_property_description(description: str, property_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze property description for consistency with other data."""
if not description:
return {
'room_mentions': {},
'property_type_mentions': [],
'amenity_mentions': [],
'inconsistencies': [],
'suspicious_patterns': []
}
analysis = {
'room_mentions': find_room_mentions(description),
'property_type_mentions': [],
'amenity_mentions': [],
'inconsistencies': [],
'suspicious_patterns': []
}
# Check room number consistency
if 'bedroom' in analysis['room_mentions']:
stated_bedrooms = safe_int_convert(property_data.get('bedrooms', 0))
mentioned_bedrooms = max(analysis['room_mentions']['bedroom'])
if stated_bedrooms != mentioned_bedrooms:
analysis['inconsistencies'].append({
'type': 'bedroom_count',
'stated': stated_bedrooms,
'mentioned': mentioned_bedrooms,
'message': f'Description mentions {mentioned_bedrooms} bedrooms but listing states {stated_bedrooms} bedrooms.'
})
if 'bathroom' in analysis['room_mentions']:
stated_bathrooms = safe_float_convert(property_data.get('bathrooms', 0))
mentioned_bathrooms = max(analysis['room_mentions']['bathroom'])
if abs(stated_bathrooms - mentioned_bathrooms) > 0.5: # Allow for half bathrooms
analysis['inconsistencies'].append({
'type': 'bathroom_count',
'stated': stated_bathrooms,
'mentioned': mentioned_bathrooms,
'message': f'Description mentions {mentioned_bathrooms} bathrooms but listing states {stated_bathrooms} bathrooms.'
})
# Check property type consistency
property_type = property_data.get('property_type', '').lower()
if property_type and property_type not in description.lower():
analysis['inconsistencies'].append({
'type': 'property_type',
'stated': property_type,
'message': f'Property type "{property_type}" not mentioned in description.'
})
# Check for suspicious patterns
suspicious_patterns = [
(r'too good to be true', 'Unrealistic claims'),
(r'guaranteed.*return', 'Suspicious return promises'),
(r'no.*verification', 'Avoiding verification'),
(r'urgent.*sale', 'Pressure tactics'),
(r'below.*market', 'Unrealistic pricing')
]
for pattern, reason in suspicious_patterns:
if re.search(pattern, description.lower()):
analysis['suspicious_patterns'].append({
'pattern': pattern,
'reason': reason,
'message': f'Suspicious pattern detected: {reason}'
})
return analysis
def analyze_location_consistency(data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze location data for consistency and validity."""
analysis = {
'inconsistencies': [],
'suspicious_patterns': []
}
# Check city-state consistency
city = data.get('city', '').lower()
state = data.get('state', '').lower()
if city and state:
# Common city-state pairs
valid_pairs = {
'hyderabad': 'telangana',
'mumbai': 'maharashtra',
'delhi': 'delhi',
'bangalore': 'karnataka',
'chennai': 'tamil nadu',
'kolkata': 'west bengal',
'pune': 'maharashtra',
'ahmedabad': 'gujarat',
'jaipur': 'rajasthan',
'lucknow': 'uttar pradesh'
}
if city in valid_pairs and valid_pairs[city] != state:
analysis['inconsistencies'].append({
'type': 'city_state_mismatch',
'city': city,
'state': state,
'message': f'City {city} is typically in {valid_pairs[city]}, not {state}'
})
# Check zip code format
zip_code = str(data.get('zip', '')).strip()
if zip_code:
if not re.match(r'^\d{6}$', zip_code):
analysis['inconsistencies'].append({
'type': 'invalid_zip',
'zip': zip_code,
'message': 'Invalid zip code format. Should be 6 digits.'
})
# Check coordinates
try:
lat = safe_float_convert(data.get('latitude', 0))
lng = safe_float_convert(data.get('longitude', 0))
# India's approximate boundaries
india_bounds = {
'lat_min': 6.0,
'lat_max': 38.0,
'lng_min': 67.0,
'lng_max': 98.0
}
if not (india_bounds['lat_min'] <= lat <= india_bounds['lat_max'] and
india_bounds['lng_min'] <= lng <= india_bounds['lng_max']):
analysis['inconsistencies'].append({
'type': 'invalid_coordinates',
'coordinates': f'({lat}, {lng})',
'message': 'Coordinates are outside India\'s boundaries.'
})
except (ValueError, TypeError):
analysis['inconsistencies'].append({
'type': 'invalid_coordinates',
'message': 'Invalid coordinate format.'
})
return analysis
def analyze_property_specifications(data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze property specifications for consistency and reasonableness."""
analysis = {
'inconsistencies': [],
'suspicious_values': []
}
# Check room count consistency
bedrooms = safe_int_convert(data.get('bedrooms', 0))
bathrooms = safe_float_convert(data.get('bathrooms', 0))
total_rooms = safe_int_convert(data.get('total_rooms', 0))
if total_rooms < (bedrooms + int(bathrooms)):
analysis['inconsistencies'].append({
'type': 'room_count_mismatch',
'total_rooms': total_rooms,
'bedrooms': bedrooms,
'bathrooms': bathrooms,
'message': f'Total rooms ({total_rooms}) is less than sum of bedrooms and bathrooms ({bedrooms + int(bathrooms)})'
})
# Check square footage reasonableness
sq_ft = safe_float_convert(data.get('sq_ft', 0))
if sq_ft > 0:
# Typical square footage per bedroom
sq_ft_per_bedroom = sq_ft / bedrooms if bedrooms > 0 else 0
if sq_ft_per_bedroom < 200:
analysis['suspicious_values'].append({
'type': 'small_sq_ft_per_bedroom',
'sq_ft_per_bedroom': sq_ft_per_bedroom,
'message': f'Square footage per bedroom ({sq_ft_per_bedroom:.2f} sq ft) is unusually small'
})
elif sq_ft_per_bedroom > 1000:
analysis['suspicious_values'].append({
'type': 'large_sq_ft_per_bedroom',
'sq_ft_per_bedroom': sq_ft_per_bedroom,
'message': f'Square footage per bedroom ({sq_ft_per_bedroom:.2f} sq ft) is unusually large'
})
# Check year built reasonableness
year_built = safe_int_convert(data.get('year_built', 0))
current_year = datetime.now().year
if year_built > 0:
property_age = current_year - year_built
if property_age < 0:
analysis['inconsistencies'].append({
'type': 'future_year_built',
'year_built': year_built,
'message': f'Year built ({year_built}) is in the future'
})
elif property_age > 100:
analysis['suspicious_values'].append({
'type': 'very_old_property',
'age': property_age,
'message': f'Property is unusually old ({property_age} years)'
})
# Check market value reasonableness
market_value = safe_float_convert(data.get('market_value', 0))
if market_value > 0:
# Calculate price per square foot
price_per_sqft = market_value / sq_ft if sq_ft > 0 else 0
if price_per_sqft > 0:
# Typical price ranges per sq ft (in INR)
if price_per_sqft < 1000:
analysis['suspicious_values'].append({
'type': 'unusually_low_price',
'price_per_sqft': price_per_sqft,
'message': f'Price per square foot (₹{price_per_sqft:.2f}) is unusually low'
})
elif price_per_sqft > 50000:
analysis['suspicious_values'].append({
'type': 'unusually_high_price',
'price_per_sqft': price_per_sqft,
'message': f'Price per square foot (₹{price_per_sqft:.2f}) is unusually high'
})
return analysis
def analyze_document(document_path: str) -> Dict[str, Any]:
"""Analyze a single document for authenticity and content."""
try:
# Check if the file exists and is accessible
if not document_path or not isinstance(document_path, str):
return {
'type': 'unknown',
'confidence': 0.0,
'authenticity': 'could not verify',
'authenticity_confidence': 0.0,
'summary': 'Invalid document path',
'has_signatures': False,
'has_dates': False,
'error': 'Invalid document path'
}
# Get file extension
_, ext = os.path.splitext(document_path)
ext = ext.lower()
# Check if it's a PDF
if ext != '.pdf':
return {
'type': 'unknown',
'confidence': 0.0,
'authenticity': 'could not verify',
'authenticity_confidence': 0.0,
'summary': 'Invalid document format',
'has_signatures': False,
'has_dates': False,
'error': 'Only PDF documents are supported'
}
# Basic document analysis
# In a real implementation, you would use a PDF analysis library here
return {
'type': 'property_document',
'confidence': 0.8,
'authenticity': 'verified',
'authenticity_confidence': 0.7,
'summary': 'Property document verified',
'has_signatures': True,
'has_dates': True,
'error': None
}
except Exception as e:
logger.error(f"Error analyzing document: {str(e)}")
return {
'type': 'unknown',
'confidence': 0.0,
'authenticity': 'could not verify',
'authenticity_confidence': 0.0,
'summary': 'Error analyzing document',
'has_signatures': False,
'has_dates': False,
'error': str(e)
}
def analyze_image(image_path: str) -> Dict[str, Any]:
"""Analyze a single image for property-related content."""
try:
# Check if the file exists and is accessible
if not image_path or not isinstance(image_path, str):
return {
'is_property_image': False,
'confidence': 0.0,
'description': 'Invalid image path',
'error': 'Invalid image path'
}
# Get file extension
_, ext = os.path.splitext(image_path)
ext = ext.lower()
# Check if it's a valid image format
if ext not in ['.jpg', '.jpeg', '.png']:
return {
'is_property_image': False,
'confidence': 0.0,
'description': 'Invalid image format',
'error': 'Only JPG and PNG images are supported'
}
# Basic image analysis
# In a real implementation, you would use an image analysis library here
return {
'is_property_image': True,
'confidence': 0.9,
'description': 'Property image verified',
'error': None
}
except Exception as e:
logger.error(f"Error analyzing image: {str(e)}")
return {
'is_property_image': False,
'confidence': 0.0,
'description': 'Error analyzing image',
'error': str(e)
}
def analyze_documents_and_images(data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze all documents and images in the property data."""
analysis = {
'documents': [],
'images': [],
'document_verification_score': 0.0,
'image_verification_score': 0.0,
'total_documents': 0,
'total_images': 0,
'verified_documents': 0,
'verified_images': 0
}
# Helper function to clean file paths
def clean_file_paths(files):
if not files:
return []
if isinstance(files, str):
files = [files]
# Remove any '×' characters and clean the paths
return [f.replace('×', '').strip() for f in files if f and isinstance(f, str) and f.strip()]
# Analyze documents
documents = clean_file_paths(data.get('documents', []))
analysis['total_documents'] = len(documents)
for doc in documents:
if doc: # Check if document path is not empty
doc_analysis = analyze_document(doc)
analysis['documents'].append(doc_analysis)
if doc_analysis['authenticity'] == 'verified':
analysis['verified_documents'] += 1
# Analyze images
images = clean_file_paths(data.get('images', []))
analysis['total_images'] = len(images)
for img in images:
if img: # Check if image path is not empty
img_analysis = analyze_image(img)
analysis['images'].append(img_analysis)
if img_analysis['is_property_image']:
analysis['verified_images'] += 1
# Calculate verification scores
if analysis['total_documents'] > 0:
analysis['document_verification_score'] = (analysis['verified_documents'] / analysis['total_documents']) * 100
if analysis['total_images'] > 0:
analysis['image_verification_score'] = (analysis['verified_images'] / analysis['total_images']) * 100
return analysis
def perform_cross_validation(data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Perform comprehensive cross-validation of property data."""
cross_checks = []
classifier = None
try:
# Load the tiny model for classification
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli")
# Initialize analysis sections
analysis_sections = {
'basic_info': [],
'location': [],
'specifications': [],
'documents': [],
'fraud_indicators': []
}
# Process and validate data
processed_data = {}
# Basic Information Validation
property_name = str(data.get('property_name', '')).strip()
if not property_name or property_name == '2':
analysis_sections['basic_info'].append({
'check': 'property_name_validation',
'status': 'invalid',
'message': 'Invalid property name.',
'details': 'Please provide a descriptive name for the property.',
'severity': 'high',
'recommendation': 'Add a proper name for the property.'
})
property_type = str(data.get('property_type', '')).strip()
if not property_type:
analysis_sections['basic_info'].append({
'check': 'property_type_validation',
'status': 'missing',
'message': 'Property type is required.',
'details': 'Please specify the type of property.',
'severity': 'high',
'recommendation': 'Select a property type.'
})
status = str(data.get('status', '')).strip()
if not status:
analysis_sections['basic_info'].append({
'check': 'status_validation',
'status': 'missing',
'message': 'Property status is required.',
'details': 'Please specify if the property is for sale or rent.',
'severity': 'high',
'recommendation': 'Select the property status.'
})
# Market Value Analysis
market_value = safe_float_convert(data.get('market_value', 0))
if market_value <= 0:
analysis_sections['basic_info'].append({
'check': 'market_value_validation',
'status': 'invalid',
'message': 'Invalid market value.',
'details': 'The market value must be a realistic amount.',
'severity': 'high',
'recommendation': 'Please provide a valid market value.'
})
# Location Analysis
location_analysis = analyze_location_consistency(data)
for inconsistency in location_analysis['inconsistencies']:
analysis_sections['location'].append({
'check': f'location_{inconsistency["type"]}',
'status': 'inconsistent',
'message': inconsistency['message'],
'details': f'Location data shows inconsistencies: {inconsistency["message"]}',
'severity': 'high',
'recommendation': 'Please verify the location details.'
})
# Property Specifications Analysis
specs_analysis = analyze_property_specifications(data)
for inconsistency in specs_analysis['inconsistencies']:
analysis_sections['specifications'].append({
'check': f'specs_{inconsistency["type"]}',
'status': 'inconsistent',
'message': inconsistency['message'],
'details': f'Property specifications show inconsistencies: {inconsistency["message"]}',
'severity': 'high',
'recommendation': 'Please verify the property specifications.'
})
for suspicious in specs_analysis['suspicious_values']:
analysis_sections['specifications'].append({
'check': f'specs_{suspicious["type"]}',
'status': 'suspicious',
'message': suspicious['message'],
'details': f'Unusual property specification: {suspicious["message"]}',
'severity': 'medium',
'recommendation': 'Please verify this specification is correct.'
})
# Description Analysis
description = str(data.get('description', '')).strip()
if description:
desc_analysis = analyze_property_description(description, data)
for inconsistency in desc_analysis['inconsistencies']:
analysis_sections['fraud_indicators'].append({
'check': f'desc_{inconsistency["type"]}',
'status': 'inconsistent',
'message': inconsistency['message'],
'details': f'Description shows inconsistencies: {inconsistency["message"]}',
'severity': 'high',
'recommendation': 'Please verify the property description.'
})
for suspicious in desc_analysis['suspicious_patterns']:
analysis_sections['fraud_indicators'].append({
'check': f'desc_suspicious_{suspicious["type"]}',
'status': 'suspicious',
'message': suspicious['message'],
'details': f'Suspicious pattern in description: {suspicious["reason"]}',
'severity': 'high',
'recommendation': 'Please review the property description for accuracy.'
})
# Documents & Images Analysis
media_analysis = analyze_documents_and_images(data)
# Helper function to check if files exist in data
def check_files_exist(files):
if not files:
return False
if isinstance(files, str):
files = [files]
return any(f and isinstance(f, str) and f.strip() and not f.endswith('×') for f in files)
# Add document analysis results
if media_analysis['total_documents'] == 0:
# Check if documents were actually provided in the data
documents = data.get('documents', [])
if check_files_exist(documents):
# Files exist but couldn't be analyzed
analysis_sections['documents'].append({
'check': 'document_analysis',
'status': 'error',
'message': 'Could not analyze provided documents.',
'details': 'Please ensure documents are in PDF format and are accessible.',
'severity': 'high',
'recommendation': 'Please check document format and try again.'
})
else:
analysis_sections['documents'].append({
'check': 'documents_validation',
'status': 'missing',
'message': 'Property documents are required.',
'details': 'Please upload relevant property documents in PDF format.',
'severity': 'high',
'recommendation': 'Upload property documents in PDF format.'
})
else:
for doc in media_analysis['documents']:
if doc.get('error'):
analysis_sections['documents'].append({
'check': 'document_analysis',
'status': 'error',
'message': f'Error analyzing document: {doc["error"]}',
'details': doc['summary'],
'severity': 'high',
'recommendation': 'Please ensure the document is a valid PDF file.'
})
elif doc['authenticity'] != 'verified':
analysis_sections['documents'].append({
'check': 'document_verification',
'status': 'unverified',
'message': 'Document authenticity could not be verified.',
'details': doc['summary'],
'severity': 'medium',
'recommendation': 'Please provide clear, legible documents.'
})
# Add image analysis results
if media_analysis['total_images'] == 0:
# Check if images were actually provided in the data
images = data.get('images', [])
if check_files_exist(images):
# Files exist but couldn't be analyzed
analysis_sections['documents'].append({
'check': 'image_analysis',
'status': 'error',
'message': 'Could not analyze provided images.',
'details': 'Please ensure images are in JPG or PNG format and are accessible.',
'severity': 'high',
'recommendation': 'Please check image format and try again.'
})
else:
analysis_sections['documents'].append({
'check': 'images_validation',
'status': 'missing',
'message': 'Property images are required.',
'details': 'Please upload at least one image of the property.',
'severity': 'high',
'recommendation': 'Upload property images in JPG or PNG format.'
})
else:
for img in media_analysis['images']:
if img.get('error'):
analysis_sections['documents'].append({
'check': 'image_analysis',
'status': 'error',
'message': f'Error analyzing image: {img["error"]}',
'details': img['description'],
'severity': 'high',
'recommendation': 'Please ensure the image is in JPG or PNG format.'
})
elif not img['is_property_image']:
analysis_sections['documents'].append({
'check': 'image_verification',
'status': 'unverified',
'message': 'Image may not be property-related.',
'details': img['description'],
'severity': 'medium',
'recommendation': 'Please provide clear property images.'
})
# Add media verification scores if any files were analyzed
if media_analysis['total_documents'] > 0 or media_analysis['total_images'] > 0:
analysis_sections['documents'].append({
'check': 'media_verification_scores',
'status': 'valid',
'message': 'Media Verification Scores',
'details': {
'document_verification_score': media_analysis['document_verification_score'],
'image_verification_score': media_analysis['image_verification_score'],
'total_documents': media_analysis['total_documents'],
'total_images': media_analysis['total_images'],
'verified_documents': media_analysis['verified_documents'],
'verified_images': media_analysis['verified_images']
},
'severity': 'low',
'recommendation': 'Review media verification scores for property authenticity.'
})
# Generate Summary
summary = {
'total_checks': sum(len(checks) for checks in analysis_sections.values()),
'categories': {section: len(checks) for section, checks in analysis_sections.items()},
'severity_counts': {
'high': 0,
'medium': 0,
'low': 0
},
'status_counts': {
'valid': 0,
'invalid': 0,
'suspicious': 0,
'inconsistent': 0,
'missing': 0,
'error': 0,
'unverified': 0
},
'fraud_risk_level': 'low',
'media_verification': {
'document_score': media_analysis['document_verification_score'],
'image_score': media_analysis['image_verification_score']
}
}
# Calculate statistics
for section_checks in analysis_sections.values():
for check in section_checks:
if check['severity'] in summary['severity_counts']:
summary['severity_counts'][check['severity']] += 1
if check['status'] in summary['status_counts']:
summary['status_counts'][check['status']] += 1
# Calculate fraud risk level
high_severity_issues = summary['severity_counts']['high']
if high_severity_issues > 5:
summary['fraud_risk_level'] = 'high'
elif high_severity_issues > 2:
summary['fraud_risk_level'] = 'medium'
# Add summary to analysis
analysis_sections['summary'] = [{
'check': 'summary_analysis',
'status': 'valid',
'message': 'Property Analysis Summary',
'details': summary,
'severity': 'low',
'recommendation': f'Fraud Risk Level: {summary["fraud_risk_level"].upper()}. Review all findings and address high severity issues first.'
}]
# Convert analysis sections to flat list
for section_name, checks in analysis_sections.items():
for check in checks:
check['category'] = section_name
cross_checks.append(check)
return cross_checks
except Exception as e:
logger.error(f"Error performing cross validation: {str(e)}")
return [{
'check': 'cross_validation_error',
'status': 'error',
'message': f'Error during validation: {str(e)}',
'category': 'System Error',
'severity': 'high',
'recommendation': 'Please try again or contact support.'
}]