# app.py from flask import Flask, render_template, request, jsonify from flask_cors import CORS import base64 import io import re import json import uuid import time import asyncio from geopy.geocoders import Nominatim from datetime import datetime from models.logging_config import logger from models.model_loader import load_model from models.image_analysis import analyze_image from models.pdf_analysis import extract_pdf_text, analyze_pdf_content from models.property_summary import generate_property_summary from models.fraud_classification import classify_fraud from models.trust_score import generate_trust_score from models.suggestions import generate_suggestions from models.text_quality import assess_text_quality from models.address_verification import verify_address from models.cross_validation import perform_cross_validation from models.location_analysis import analyze_location from models.price_analysis import analyze_price from models.legal_analysis import analyze_legal_details from models.property_specs import verify_property_specs from models.market_value import analyze_market_value from models.image_quality import assess_image_quality from models.property_relation import check_if_property_related import torch import numpy as np import concurrent.futures from PIL import Image app = Flask(__name__) CORS(app) # Enable CORS for frontend # Initialize geocoder geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) def make_json_serializable(obj): try: if isinstance(obj, (bool, int, float, str, type(None))): return obj elif isinstance(obj, (list, tuple)): return [make_json_serializable(item) for item in obj] elif isinstance(obj, dict): return {str(key): make_json_serializable(value) for key, value in obj.items()} elif torch.is_tensor(obj): return obj.item() if obj.numel() == 1 else obj.tolist() elif np.isscalar(obj): return obj.item() if hasattr(obj, 'item') else float(obj) elif isinstance(obj, np.ndarray): return obj.tolist() else: return str(obj) except Exception as e: logger.error(f"Error serializing object: {str(e)}") return str(obj) @app.route('/') def index(): return render_template('index.html') @app.route('/get-location', methods=['POST']) def get_location(): try: data = request.json or {} latitude = data.get('latitude') longitude = data.get('longitude') if not latitude or not longitude: logger.warning("Missing latitude or longitude") return jsonify({ 'status': 'error', 'message': 'Latitude and longitude are required' }), 400 # Validate coordinates are within India try: lat, lng = float(latitude), float(longitude) if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5): return jsonify({ 'status': 'error', 'message': 'Coordinates are outside India' }), 400 except ValueError: return jsonify({ 'status': 'error', 'message': 'Invalid coordinates format' }), 400 # Retry geocoding up to 3 times for attempt in range(3): try: location = geocoder.reverse((latitude, longitude), exactly_one=True) if location: address_components = location.raw.get('address', {}) # Extract Indian-specific address components city = address_components.get('city', '') if not city: city = address_components.get('town', '') if not city: city = address_components.get('village', '') if not city: city = address_components.get('suburb', '') state = address_components.get('state', '') if not state: state = address_components.get('state_district', '') # Get postal code and validate Indian format postal_code = address_components.get('postcode', '') if postal_code and not re.match(r'^\d{6}$', postal_code): postal_code = '' # Get road/street name road = address_components.get('road', '') if not road: road = address_components.get('street', '') # Get area/locality area = address_components.get('suburb', '') if not area: area = address_components.get('neighbourhood', '') return jsonify({ 'status': 'success', 'address': location.address, 'street': road, 'area': area, 'city': city, 'state': state, 'country': 'India', 'postal_code': postal_code, 'latitude': latitude, 'longitude': longitude, 'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}" }) logger.warning(f"Geocoding failed on attempt {attempt + 1}") time.sleep(1) # Wait before retry except Exception as e: logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") time.sleep(1) return jsonify({ 'status': 'error', 'message': 'Could not determine location after retries' }), 500 except Exception as e: logger.error(f"Error in get_location: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e) }), 500 def calculate_final_verdict(results): """ Calculate a comprehensive final verdict based on all analysis results. This function combines all verification scores, fraud indicators, and quality assessments to determine if a property listing is legitimate, suspicious, or fraudulent. """ try: # Initialize verdict components verdict = { 'status': 'unknown', 'confidence': 0.0, 'score': 0.0, 'reasons': [], 'critical_issues': [], 'warnings': [], 'recommendations': [] } # Extract key components from results trust_score = results.get('trust_score', {}).get('score', 0) fraud_classification = results.get('fraud_classification', {}) quality_assessment = results.get('quality_assessment', {}) specs_verification = results.get('specs_verification', {}) cross_validation = results.get('cross_validation', []) location_analysis = results.get('location_analysis', {}) price_analysis = results.get('price_analysis', {}) legal_analysis = results.get('legal_analysis', {}) document_analysis = results.get('document_analysis', {}) image_analysis = results.get('image_analysis', {}) # Calculate component scores (0-100) component_scores = { 'trust': trust_score, 'fraud': 100 - (fraud_classification.get('alert_score', 0) * 100), 'quality': quality_assessment.get('score', 0), 'specs': specs_verification.get('verification_score', 0), 'location': location_analysis.get('completeness_score', 0), 'price': price_analysis.get('confidence', 0) * 100 if price_analysis.get('has_price') else 0, 'legal': legal_analysis.get('completeness_score', 0), 'documents': min(100, (document_analysis.get('pdf_count', 0) / 3) * 100) if document_analysis.get('pdf_count') else 0, 'images': min(100, (image_analysis.get('image_count', 0) / 5) * 100) if image_analysis.get('image_count') else 0 } # Calculate weighted final score with adjusted weights weights = { 'trust': 0.20, 'fraud': 0.25, # Increased weight for fraud detection 'quality': 0.15, 'specs': 0.10, 'location': 0.10, 'price': 0.05, 'legal': 0.05, 'documents': 0.05, 'images': 0.05 } final_score = sum(score * weights.get(component, 0) for component, score in component_scores.items()) verdict['score'] = final_score # Determine verdict status based on multiple factors fraud_level = fraud_classification.get('alert_level', 'minimal') high_risk_indicators = len(fraud_classification.get('high_risk', [])) critical_issues = [] warnings = [] # Check for critical issues if fraud_level in ['critical', 'high']: critical_issues.append(f"High fraud risk detected: {fraud_level} alert level") if trust_score < 40: critical_issues.append(f"Very low trust score: {trust_score}%") if quality_assessment.get('score', 0) < 30: critical_issues.append(f"Very low content quality: {quality_assessment.get('score', 0)}%") if specs_verification.get('verification_score', 0) < 40: critical_issues.append(f"Property specifications verification failed: {specs_verification.get('verification_score', 0)}%") # Check for warnings if fraud_level == 'medium': warnings.append(f"Medium fraud risk detected: {fraud_level} alert level") if trust_score < 60: warnings.append(f"Low trust score: {trust_score}%") if quality_assessment.get('score', 0) < 60: warnings.append(f"Low content quality: {quality_assessment.get('score', 0)}%") if specs_verification.get('verification_score', 0) < 70: warnings.append(f"Property specifications have issues: {specs_verification.get('verification_score', 0)}%") # Check cross-validation results for check in cross_validation: if check.get('status') in ['inconsistent', 'invalid', 'suspicious', 'no_match']: warnings.append(f"Cross-validation issue: {check.get('message', 'Unknown issue')}") # Check for missing critical information missing_critical = [] if not location_analysis.get('completeness_score', 0) > 70: missing_critical.append("Location information is incomplete") if not price_analysis.get('has_price', False): missing_critical.append("Price information is missing") if not legal_analysis.get('completeness_score', 0) > 70: missing_critical.append("Legal information is incomplete") if document_analysis.get('pdf_count', 0) == 0: missing_critical.append("No supporting documents provided") if image_analysis.get('image_count', 0) == 0: missing_critical.append("No property images provided") if missing_critical: warnings.append(f"Missing critical information: {', '.join(missing_critical)}") # Enhanced verdict determination with more strict criteria if critical_issues or (fraud_level in ['critical', 'high'] and trust_score < 50) or high_risk_indicators > 0: verdict['status'] = 'fraudulent' verdict['confidence'] = min(100, max(70, 100 - (trust_score * 0.5))) elif warnings or (fraud_level == 'medium' and trust_score < 70) or specs_verification.get('verification_score', 0) < 60: verdict['status'] = 'suspicious' verdict['confidence'] = min(100, max(50, trust_score * 0.8)) else: verdict['status'] = 'legitimate' verdict['confidence'] = min(100, max(70, trust_score * 0.9)) # Add reasons to verdict verdict['critical_issues'] = critical_issues verdict['warnings'] = warnings # Add recommendations based on issues if critical_issues: verdict['recommendations'].append("Do not proceed with this property listing") verdict['recommendations'].append("Report this listing to the platform") elif warnings: verdict['recommendations'].append("Proceed with extreme caution") verdict['recommendations'].append("Request additional verification documents") verdict['recommendations'].append("Verify all information with independent sources") else: verdict['recommendations'].append("Proceed with standard due diligence") verdict['recommendations'].append("Verify final details before transaction") # Add specific recommendations based on missing information for missing in missing_critical: verdict['recommendations'].append(f"Request {missing.lower()}") return verdict except Exception as e: logger.error(f"Error calculating final verdict: {str(e)}") return { 'status': 'error', 'confidence': 0.0, 'score': 0.0, 'reasons': [f"Error calculating verdict: {str(e)}"], 'critical_issues': [], 'warnings': [], 'recommendations': ["Unable to determine property status due to an error"] } @app.route('/verify', methods=['POST']) def verify_property(): try: if not request.form and not request.files: logger.warning("No form data or files provided") return jsonify({ 'error': 'No data provided', 'status': 'error' }), 400 # Extract form data data = { 'property_name': request.form.get('property_name', '').strip(), 'property_type': request.form.get('property_type', '').strip(), 'status': request.form.get('status', '').strip(), 'description': request.form.get('description', '').strip(), 'address': request.form.get('address', '').strip(), 'city': request.form.get('city', '').strip(), 'state': request.form.get('state', '').strip(), 'country': request.form.get('country', 'India').strip(), 'zip': request.form.get('zip', '').strip(), 'latitude': request.form.get('latitude', '').strip(), 'longitude': request.form.get('longitude', '').strip(), 'bedrooms': request.form.get('bedrooms', '').strip(), 'bathrooms': request.form.get('bathrooms', '').strip(), 'total_rooms': request.form.get('total_rooms', '').strip(), 'year_built': request.form.get('year_built', '').strip(), 'parking': request.form.get('parking', '').strip(), 'sq_ft': request.form.get('sq_ft', '').strip(), 'market_value': request.form.get('market_value', '').strip(), 'amenities': request.form.get('amenities', '').strip(), 'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), 'legal_details': request.form.get('legal_details', '').strip() } # Validate required fields required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] missing_fields = [field for field in required_fields if not data[field]] if missing_fields: logger.warning(f"Missing required fields: {', '.join(missing_fields)}") return jsonify({ 'error': f"Missing required fields: {', '.join(missing_fields)}", 'status': 'error' }), 400 # Process images images = [] image_analysis = [] if 'images' in request.files: # Get unique image files by filename to prevent duplicates image_files = {} for img_file in request.files.getlist('images'): if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): image_files[img_file.filename] = img_file # Process unique images for img_file in image_files.values(): try: img = Image.open(img_file) buffered = io.BytesIO() img.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') images.append(img_str) image_analysis.append(analyze_image(img)) except Exception as e: logger.error(f"Error processing image {img_file.filename}: {str(e)}") image_analysis.append({'error': str(e), 'is_property_related': False}) # Process PDFs pdf_texts = [] pdf_analysis = [] if 'documents' in request.files: # Get unique PDF files by filename to prevent duplicates pdf_files = {} for pdf_file in request.files.getlist('documents'): if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): pdf_files[pdf_file.filename] = pdf_file # Process unique PDFs for pdf_file in pdf_files.values(): try: pdf_text = extract_pdf_text(pdf_file) pdf_texts.append({ 'filename': pdf_file.filename, 'text': pdf_text }) pdf_analysis.append(analyze_pdf_content(pdf_text, data)) except Exception as e: logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") pdf_analysis.append({'error': str(e)}) # Create consolidated text for analysis consolidated_text = f""" Property Name: {data['property_name']} Property Type: {data['property_type']} Status: {data['status']} Description: {data['description']} Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} Coordinates: Lat {data['latitude']}, Long {data['longitude']} Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms Year Built: {data['year_built']} Parking: {data['parking']} Size: {data['sq_ft']} sq. ft. Market Value: ₹{data['market_value']} Amenities: {data['amenities']} Nearby Landmarks: {data['nearby_landmarks']} Legal Details: {data['legal_details']} """ # Process description translation if needed try: description = data['description'] if description and len(description) > 10: text_language = detect(description) if text_language != 'en': translated_description = GoogleTranslator(source=text_language, target='en').translate(description) data['description_translated'] = translated_description else: data['description_translated'] = description else: data['description_translated'] = description except Exception as e: logger.error(f"Error in language detection/translation: {str(e)}") data['description_translated'] = data['description'] # Run all analyses in parallel using asyncio async def run_analyses(): with concurrent.futures.ThreadPoolExecutor() as executor: loop = asyncio.get_event_loop() tasks = [ loop.run_in_executor(executor, generate_property_summary, data), loop.run_in_executor(executor, classify_fraud, consolidated_text, data), loop.run_in_executor(executor, generate_trust_score, consolidated_text, image_analysis, pdf_analysis), loop.run_in_executor(executor, generate_suggestions, consolidated_text, data), loop.run_in_executor(executor, assess_text_quality, data['description_translated']), loop.run_in_executor(executor, verify_address, data), loop.run_in_executor(executor, perform_cross_validation, data), loop.run_in_executor(executor, analyze_location, data), loop.run_in_executor(executor, analyze_price, data), loop.run_in_executor(executor, analyze_legal_details, data['legal_details']), loop.run_in_executor(executor, verify_property_specs, data), loop.run_in_executor(executor, analyze_market_value, data) ] results = await asyncio.gather(*tasks) return results # Run analyses and get results loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) analysis_results = loop.run_until_complete(run_analyses()) loop.close() # Unpack results summary, fraud_classification, (trust_score, trust_reasoning), suggestions, quality_assessment, \ address_verification, cross_validation, location_analysis, price_analysis, legal_analysis, \ specs_verification, market_analysis = analysis_results # Prepare response document_analysis = { 'pdf_count': len(pdf_texts), 'pdf_texts': pdf_texts, 'pdf_analysis': pdf_analysis } image_results = { 'image_count': len(images), 'image_analysis': image_analysis } report_id = str(uuid.uuid4()) # Create results dictionary results = { 'report_id': report_id, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'summary': summary, 'fraud_classification': fraud_classification, 'trust_score': { 'score': trust_score, 'reasoning': trust_reasoning }, 'suggestions': suggestions, 'quality_assessment': quality_assessment, 'address_verification': address_verification, 'cross_validation': cross_validation, 'location_analysis': location_analysis, 'price_analysis': price_analysis, 'legal_analysis': legal_analysis, 'document_analysis': document_analysis, 'image_analysis': image_results, 'specs_verification': specs_verification, 'market_analysis': market_analysis, 'images': images } # Calculate final verdict final_verdict = calculate_final_verdict(results) results['final_verdict'] = final_verdict return jsonify(make_json_serializable(results)) except Exception as e: logger.error(f"Error in verify_property: {str(e)}") return jsonify({ 'error': 'Server error occurred. Please try again later.', 'status': 'error', 'details': str(e) }), 500 if __name__ == '__main__': # Run Flask app app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False)