sksameermujahid's picture
Upload 45 files
14cb7ae verified
# app.py
from flask import Flask, render_template, request, jsonify
from flask_cors import CORS
import base64
import io
import re
import json
import uuid
import time
import asyncio
from geopy.geocoders import Nominatim
from datetime import datetime
from models.logging_config import logger
from models.model_loader import load_model
from models.image_analysis import analyze_image
from models.pdf_analysis import extract_pdf_text, analyze_pdf_content
from models.property_summary import generate_property_summary
from models.fraud_classification import classify_fraud
from models.trust_score import generate_trust_score
from models.suggestions import generate_suggestions
from models.text_quality import assess_text_quality
from models.address_verification import verify_address
from models.cross_validation import perform_cross_validation
from models.location_analysis import analyze_location
from models.price_analysis import analyze_price
from models.legal_analysis import analyze_legal_details
from models.property_specs import verify_property_specs
from models.market_value import analyze_market_value
from models.image_quality import assess_image_quality
from models.property_relation import check_if_property_related
import torch
import numpy as np
import concurrent.futures
from PIL import Image
app = Flask(__name__)
CORS(app) # Enable CORS for frontend
# Initialize geocoder
geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10)
def make_json_serializable(obj):
try:
if isinstance(obj, (bool, int, float, str, type(None))):
return obj
elif isinstance(obj, (list, tuple)):
return [make_json_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {str(key): make_json_serializable(value) for key, value in obj.items()}
elif torch.is_tensor(obj):
return obj.item() if obj.numel() == 1 else obj.tolist()
elif np.isscalar(obj):
return obj.item() if hasattr(obj, 'item') else float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
else:
return str(obj)
except Exception as e:
logger.error(f"Error serializing object: {str(e)}")
return str(obj)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/get-location', methods=['POST'])
def get_location():
try:
data = request.json or {}
latitude = data.get('latitude')
longitude = data.get('longitude')
if not latitude or not longitude:
logger.warning("Missing latitude or longitude")
return jsonify({
'status': 'error',
'message': 'Latitude and longitude are required'
}), 400
# Validate coordinates are within India
try:
lat, lng = float(latitude), float(longitude)
if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5):
return jsonify({
'status': 'error',
'message': 'Coordinates are outside India'
}), 400
except ValueError:
return jsonify({
'status': 'error',
'message': 'Invalid coordinates format'
}), 400
# Retry geocoding up to 3 times
for attempt in range(3):
try:
location = geocoder.reverse((latitude, longitude), exactly_one=True)
if location:
address_components = location.raw.get('address', {})
# Extract Indian-specific address components
city = address_components.get('city', '')
if not city:
city = address_components.get('town', '')
if not city:
city = address_components.get('village', '')
if not city:
city = address_components.get('suburb', '')
state = address_components.get('state', '')
if not state:
state = address_components.get('state_district', '')
# Get postal code and validate Indian format
postal_code = address_components.get('postcode', '')
if postal_code and not re.match(r'^\d{6}$', postal_code):
postal_code = ''
# Get road/street name
road = address_components.get('road', '')
if not road:
road = address_components.get('street', '')
# Get area/locality
area = address_components.get('suburb', '')
if not area:
area = address_components.get('neighbourhood', '')
return jsonify({
'status': 'success',
'address': location.address,
'street': road,
'area': area,
'city': city,
'state': state,
'country': 'India',
'postal_code': postal_code,
'latitude': latitude,
'longitude': longitude,
'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}"
})
logger.warning(f"Geocoding failed on attempt {attempt + 1}")
time.sleep(1) # Wait before retry
except Exception as e:
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}")
time.sleep(1)
return jsonify({
'status': 'error',
'message': 'Could not determine location after retries'
}), 500
except Exception as e:
logger.error(f"Error in get_location: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
def calculate_final_verdict(results):
"""
Calculate a comprehensive final verdict based on all analysis results.
This function combines all verification scores, fraud indicators, and quality assessments
to determine if a property listing is legitimate, suspicious, or fraudulent.
"""
try:
# Initialize verdict components
verdict = {
'status': 'unknown',
'confidence': 0.0,
'score': 0.0,
'reasons': [],
'critical_issues': [],
'warnings': [],
'recommendations': []
}
# Extract key components from results
trust_score = results.get('trust_score', {}).get('score', 0)
fraud_classification = results.get('fraud_classification', {})
quality_assessment = results.get('quality_assessment', {})
specs_verification = results.get('specs_verification', {})
cross_validation = results.get('cross_validation', [])
location_analysis = results.get('location_analysis', {})
price_analysis = results.get('price_analysis', {})
legal_analysis = results.get('legal_analysis', {})
document_analysis = results.get('document_analysis', {})
image_analysis = results.get('image_analysis', {})
# Calculate component scores (0-100)
component_scores = {
'trust': trust_score,
'fraud': 100 - (fraud_classification.get('alert_score', 0) * 100),
'quality': quality_assessment.get('score', 0),
'specs': specs_verification.get('verification_score', 0),
'location': location_analysis.get('completeness_score', 0),
'price': price_analysis.get('confidence', 0) * 100 if price_analysis.get('has_price') else 0,
'legal': legal_analysis.get('completeness_score', 0),
'documents': min(100, (document_analysis.get('pdf_count', 0) / 3) * 100) if document_analysis.get('pdf_count') else 0,
'images': min(100, (image_analysis.get('image_count', 0) / 5) * 100) if image_analysis.get('image_count') else 0
}
# Calculate weighted final score with adjusted weights
weights = {
'trust': 0.20,
'fraud': 0.25, # Increased weight for fraud detection
'quality': 0.15,
'specs': 0.10,
'location': 0.10,
'price': 0.05,
'legal': 0.05,
'documents': 0.05,
'images': 0.05
}
final_score = sum(score * weights.get(component, 0) for component, score in component_scores.items())
verdict['score'] = final_score
# Determine verdict status based on multiple factors
fraud_level = fraud_classification.get('alert_level', 'minimal')
high_risk_indicators = len(fraud_classification.get('high_risk', []))
critical_issues = []
warnings = []
# Check for critical issues
if fraud_level in ['critical', 'high']:
critical_issues.append(f"High fraud risk detected: {fraud_level} alert level")
if trust_score < 40:
critical_issues.append(f"Very low trust score: {trust_score}%")
if quality_assessment.get('score', 0) < 30:
critical_issues.append(f"Very low content quality: {quality_assessment.get('score', 0)}%")
if specs_verification.get('verification_score', 0) < 40:
critical_issues.append(f"Property specifications verification failed: {specs_verification.get('verification_score', 0)}%")
# Check for warnings
if fraud_level == 'medium':
warnings.append(f"Medium fraud risk detected: {fraud_level} alert level")
if trust_score < 60:
warnings.append(f"Low trust score: {trust_score}%")
if quality_assessment.get('score', 0) < 60:
warnings.append(f"Low content quality: {quality_assessment.get('score', 0)}%")
if specs_verification.get('verification_score', 0) < 70:
warnings.append(f"Property specifications have issues: {specs_verification.get('verification_score', 0)}%")
# Check cross-validation results
for check in cross_validation:
if check.get('status') in ['inconsistent', 'invalid', 'suspicious', 'no_match']:
warnings.append(f"Cross-validation issue: {check.get('message', 'Unknown issue')}")
# Check for missing critical information
missing_critical = []
if not location_analysis.get('completeness_score', 0) > 70:
missing_critical.append("Location information is incomplete")
if not price_analysis.get('has_price', False):
missing_critical.append("Price information is missing")
if not legal_analysis.get('completeness_score', 0) > 70:
missing_critical.append("Legal information is incomplete")
if document_analysis.get('pdf_count', 0) == 0:
missing_critical.append("No supporting documents provided")
if image_analysis.get('image_count', 0) == 0:
missing_critical.append("No property images provided")
if missing_critical:
warnings.append(f"Missing critical information: {', '.join(missing_critical)}")
# Enhanced verdict determination with more strict criteria
if critical_issues or (fraud_level in ['critical', 'high'] and trust_score < 50) or high_risk_indicators > 0:
verdict['status'] = 'fraudulent'
verdict['confidence'] = min(100, max(70, 100 - (trust_score * 0.5)))
elif warnings or (fraud_level == 'medium' and trust_score < 70) or specs_verification.get('verification_score', 0) < 60:
verdict['status'] = 'suspicious'
verdict['confidence'] = min(100, max(50, trust_score * 0.8))
else:
verdict['status'] = 'legitimate'
verdict['confidence'] = min(100, max(70, trust_score * 0.9))
# Add reasons to verdict
verdict['critical_issues'] = critical_issues
verdict['warnings'] = warnings
# Add recommendations based on issues
if critical_issues:
verdict['recommendations'].append("Do not proceed with this property listing")
verdict['recommendations'].append("Report this listing to the platform")
elif warnings:
verdict['recommendations'].append("Proceed with extreme caution")
verdict['recommendations'].append("Request additional verification documents")
verdict['recommendations'].append("Verify all information with independent sources")
else:
verdict['recommendations'].append("Proceed with standard due diligence")
verdict['recommendations'].append("Verify final details before transaction")
# Add specific recommendations based on missing information
for missing in missing_critical:
verdict['recommendations'].append(f"Request {missing.lower()}")
return verdict
except Exception as e:
logger.error(f"Error calculating final verdict: {str(e)}")
return {
'status': 'error',
'confidence': 0.0,
'score': 0.0,
'reasons': [f"Error calculating verdict: {str(e)}"],
'critical_issues': [],
'warnings': [],
'recommendations': ["Unable to determine property status due to an error"]
}
@app.route('/verify', methods=['POST'])
def verify_property():
try:
if not request.form and not request.files:
logger.warning("No form data or files provided")
return jsonify({
'error': 'No data provided',
'status': 'error'
}), 400
# Extract form data
data = {
'property_name': request.form.get('property_name', '').strip(),
'property_type': request.form.get('property_type', '').strip(),
'status': request.form.get('status', '').strip(),
'description': request.form.get('description', '').strip(),
'address': request.form.get('address', '').strip(),
'city': request.form.get('city', '').strip(),
'state': request.form.get('state', '').strip(),
'country': request.form.get('country', 'India').strip(),
'zip': request.form.get('zip', '').strip(),
'latitude': request.form.get('latitude', '').strip(),
'longitude': request.form.get('longitude', '').strip(),
'bedrooms': request.form.get('bedrooms', '').strip(),
'bathrooms': request.form.get('bathrooms', '').strip(),
'total_rooms': request.form.get('total_rooms', '').strip(),
'year_built': request.form.get('year_built', '').strip(),
'parking': request.form.get('parking', '').strip(),
'sq_ft': request.form.get('sq_ft', '').strip(),
'market_value': request.form.get('market_value', '').strip(),
'amenities': request.form.get('amenities', '').strip(),
'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(),
'legal_details': request.form.get('legal_details', '').strip()
}
# Validate required fields
required_fields = ['property_name', 'property_type', 'address', 'city', 'state']
missing_fields = [field for field in required_fields if not data[field]]
if missing_fields:
logger.warning(f"Missing required fields: {', '.join(missing_fields)}")
return jsonify({
'error': f"Missing required fields: {', '.join(missing_fields)}",
'status': 'error'
}), 400
# Process images
images = []
image_analysis = []
if 'images' in request.files:
# Get unique image files by filename to prevent duplicates
image_files = {}
for img_file in request.files.getlist('images'):
if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')):
image_files[img_file.filename] = img_file
# Process unique images
for img_file in image_files.values():
try:
img = Image.open(img_file)
buffered = io.BytesIO()
img.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
images.append(img_str)
image_analysis.append(analyze_image(img))
except Exception as e:
logger.error(f"Error processing image {img_file.filename}: {str(e)}")
image_analysis.append({'error': str(e), 'is_property_related': False})
# Process PDFs
pdf_texts = []
pdf_analysis = []
if 'documents' in request.files:
# Get unique PDF files by filename to prevent duplicates
pdf_files = {}
for pdf_file in request.files.getlist('documents'):
if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'):
pdf_files[pdf_file.filename] = pdf_file
# Process unique PDFs
for pdf_file in pdf_files.values():
try:
pdf_text = extract_pdf_text(pdf_file)
pdf_texts.append({
'filename': pdf_file.filename,
'text': pdf_text
})
pdf_analysis.append(analyze_pdf_content(pdf_text, data))
except Exception as e:
logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}")
pdf_analysis.append({'error': str(e)})
# Create consolidated text for analysis
consolidated_text = f"""
Property Name: {data['property_name']}
Property Type: {data['property_type']}
Status: {data['status']}
Description: {data['description']}
Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']}
Coordinates: Lat {data['latitude']}, Long {data['longitude']}
Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms
Year Built: {data['year_built']}
Parking: {data['parking']}
Size: {data['sq_ft']} sq. ft.
Market Value: ₹{data['market_value']}
Amenities: {data['amenities']}
Nearby Landmarks: {data['nearby_landmarks']}
Legal Details: {data['legal_details']}
"""
# Process description translation if needed
try:
description = data['description']
if description and len(description) > 10:
text_language = detect(description)
if text_language != 'en':
translated_description = GoogleTranslator(source=text_language, target='en').translate(description)
data['description_translated'] = translated_description
else:
data['description_translated'] = description
else:
data['description_translated'] = description
except Exception as e:
logger.error(f"Error in language detection/translation: {str(e)}")
data['description_translated'] = data['description']
# Run all analyses in parallel using asyncio
async def run_analyses():
with concurrent.futures.ThreadPoolExecutor() as executor:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(executor, generate_property_summary, data),
loop.run_in_executor(executor, classify_fraud, consolidated_text, data),
loop.run_in_executor(executor, generate_trust_score, consolidated_text, image_analysis, pdf_analysis),
loop.run_in_executor(executor, generate_suggestions, consolidated_text, data),
loop.run_in_executor(executor, assess_text_quality, data['description_translated']),
loop.run_in_executor(executor, verify_address, data),
loop.run_in_executor(executor, perform_cross_validation, data),
loop.run_in_executor(executor, analyze_location, data),
loop.run_in_executor(executor, analyze_price, data),
loop.run_in_executor(executor, analyze_legal_details, data['legal_details']),
loop.run_in_executor(executor, verify_property_specs, data),
loop.run_in_executor(executor, analyze_market_value, data)
]
results = await asyncio.gather(*tasks)
return results
# Run analyses and get results
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
analysis_results = loop.run_until_complete(run_analyses())
loop.close()
# Unpack results
summary, fraud_classification, (trust_score, trust_reasoning), suggestions, quality_assessment, \
address_verification, cross_validation, location_analysis, price_analysis, legal_analysis, \
specs_verification, market_analysis = analysis_results
# Prepare response
document_analysis = {
'pdf_count': len(pdf_texts),
'pdf_texts': pdf_texts,
'pdf_analysis': pdf_analysis
}
image_results = {
'image_count': len(images),
'image_analysis': image_analysis
}
report_id = str(uuid.uuid4())
# Create results dictionary
results = {
'report_id': report_id,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'summary': summary,
'fraud_classification': fraud_classification,
'trust_score': {
'score': trust_score,
'reasoning': trust_reasoning
},
'suggestions': suggestions,
'quality_assessment': quality_assessment,
'address_verification': address_verification,
'cross_validation': cross_validation,
'location_analysis': location_analysis,
'price_analysis': price_analysis,
'legal_analysis': legal_analysis,
'document_analysis': document_analysis,
'image_analysis': image_results,
'specs_verification': specs_verification,
'market_analysis': market_analysis,
'images': images
}
# Calculate final verdict
final_verdict = calculate_final_verdict(results)
results['final_verdict'] = final_verdict
return jsonify(make_json_serializable(results))
except Exception as e:
logger.error(f"Error in verify_property: {str(e)}")
return jsonify({
'error': 'Server error occurred. Please try again later.',
'status': 'error',
'details': str(e)
}), 500
if __name__ == '__main__':
# Run Flask app
app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False)