|
|
|
|
|
from flask import Flask, render_template, request, jsonify |
|
from flask_cors import CORS |
|
import base64 |
|
import io |
|
import re |
|
import json |
|
import uuid |
|
import time |
|
import asyncio |
|
from geopy.geocoders import Nominatim |
|
from datetime import datetime |
|
from models.logging_config import logger |
|
from models.model_loader import load_model |
|
from models.image_analysis import analyze_image |
|
from models.pdf_analysis import extract_pdf_text, analyze_pdf_content |
|
from models.property_summary import generate_property_summary |
|
from models.fraud_classification import classify_fraud |
|
from models.trust_score import generate_trust_score |
|
from models.suggestions import generate_suggestions |
|
from models.text_quality import assess_text_quality |
|
from models.address_verification import verify_address |
|
from models.cross_validation import perform_cross_validation |
|
from models.location_analysis import analyze_location |
|
from models.price_analysis import analyze_price |
|
from models.legal_analysis import analyze_legal_details |
|
from models.property_specs import verify_property_specs |
|
from models.market_value import analyze_market_value |
|
from models.image_quality import assess_image_quality |
|
from models.property_relation import check_if_property_related |
|
import torch |
|
import numpy as np |
|
import concurrent.futures |
|
from PIL import Image |
|
|
|
app = Flask(__name__) |
|
CORS(app) |
|
|
|
|
|
geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) |
|
|
|
def make_json_serializable(obj): |
|
try: |
|
if isinstance(obj, (bool, int, float, str, type(None))): |
|
return obj |
|
elif isinstance(obj, (list, tuple)): |
|
return [make_json_serializable(item) for item in obj] |
|
elif isinstance(obj, dict): |
|
return {str(key): make_json_serializable(value) for key, value in obj.items()} |
|
elif torch.is_tensor(obj): |
|
return obj.item() if obj.numel() == 1 else obj.tolist() |
|
elif np.isscalar(obj): |
|
return obj.item() if hasattr(obj, 'item') else float(obj) |
|
elif isinstance(obj, np.ndarray): |
|
return obj.tolist() |
|
else: |
|
return str(obj) |
|
except Exception as e: |
|
logger.error(f"Error serializing object: {str(e)}") |
|
return str(obj) |
|
|
|
@app.route('/') |
|
def index(): |
|
return render_template('index.html') |
|
|
|
@app.route('/get-location', methods=['POST']) |
|
def get_location(): |
|
try: |
|
data = request.json or {} |
|
latitude = data.get('latitude') |
|
longitude = data.get('longitude') |
|
|
|
if not latitude or not longitude: |
|
logger.warning("Missing latitude or longitude") |
|
return jsonify({ |
|
'status': 'error', |
|
'message': 'Latitude and longitude are required' |
|
}), 400 |
|
|
|
|
|
try: |
|
lat, lng = float(latitude), float(longitude) |
|
if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5): |
|
return jsonify({ |
|
'status': 'error', |
|
'message': 'Coordinates are outside India' |
|
}), 400 |
|
except ValueError: |
|
return jsonify({ |
|
'status': 'error', |
|
'message': 'Invalid coordinates format' |
|
}), 400 |
|
|
|
|
|
for attempt in range(3): |
|
try: |
|
location = geocoder.reverse((latitude, longitude), exactly_one=True) |
|
if location: |
|
address_components = location.raw.get('address', {}) |
|
|
|
|
|
city = address_components.get('city', '') |
|
if not city: |
|
city = address_components.get('town', '') |
|
if not city: |
|
city = address_components.get('village', '') |
|
if not city: |
|
city = address_components.get('suburb', '') |
|
|
|
state = address_components.get('state', '') |
|
if not state: |
|
state = address_components.get('state_district', '') |
|
|
|
|
|
postal_code = address_components.get('postcode', '') |
|
if postal_code and not re.match(r'^\d{6}$', postal_code): |
|
postal_code = '' |
|
|
|
|
|
road = address_components.get('road', '') |
|
if not road: |
|
road = address_components.get('street', '') |
|
|
|
|
|
area = address_components.get('suburb', '') |
|
if not area: |
|
area = address_components.get('neighbourhood', '') |
|
|
|
return jsonify({ |
|
'status': 'success', |
|
'address': location.address, |
|
'street': road, |
|
'area': area, |
|
'city': city, |
|
'state': state, |
|
'country': 'India', |
|
'postal_code': postal_code, |
|
'latitude': latitude, |
|
'longitude': longitude, |
|
'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}" |
|
}) |
|
logger.warning(f"Geocoding failed on attempt {attempt + 1}") |
|
time.sleep(1) |
|
except Exception as e: |
|
logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") |
|
time.sleep(1) |
|
|
|
return jsonify({ |
|
'status': 'error', |
|
'message': 'Could not determine location after retries' |
|
}), 500 |
|
|
|
except Exception as e: |
|
logger.error(f"Error in get_location: {str(e)}") |
|
return jsonify({ |
|
'status': 'error', |
|
'message': str(e) |
|
}), 500 |
|
|
|
def calculate_final_verdict(results): |
|
""" |
|
Calculate a comprehensive final verdict based on all analysis results. |
|
This function combines all verification scores, fraud indicators, and quality assessments |
|
to determine if a property listing is legitimate, suspicious, or fraudulent. |
|
""" |
|
try: |
|
|
|
verdict = { |
|
'status': 'unknown', |
|
'confidence': 0.0, |
|
'score': 0.0, |
|
'reasons': [], |
|
'critical_issues': [], |
|
'warnings': [], |
|
'recommendations': [] |
|
} |
|
|
|
|
|
trust_score = results.get('trust_score', {}).get('score', 0) |
|
fraud_classification = results.get('fraud_classification', {}) |
|
quality_assessment = results.get('quality_assessment', {}) |
|
specs_verification = results.get('specs_verification', {}) |
|
cross_validation = results.get('cross_validation', []) |
|
location_analysis = results.get('location_analysis', {}) |
|
price_analysis = results.get('price_analysis', {}) |
|
legal_analysis = results.get('legal_analysis', {}) |
|
document_analysis = results.get('document_analysis', {}) |
|
image_analysis = results.get('image_analysis', {}) |
|
|
|
|
|
component_scores = { |
|
'trust': trust_score, |
|
'fraud': 100 - (fraud_classification.get('alert_score', 0) * 100), |
|
'quality': quality_assessment.get('score', 0), |
|
'specs': specs_verification.get('verification_score', 0), |
|
'location': location_analysis.get('completeness_score', 0), |
|
'price': price_analysis.get('confidence', 0) * 100 if price_analysis.get('has_price') else 0, |
|
'legal': legal_analysis.get('completeness_score', 0), |
|
'documents': min(100, (document_analysis.get('pdf_count', 0) / 3) * 100) if document_analysis.get('pdf_count') else 0, |
|
'images': min(100, (image_analysis.get('image_count', 0) / 5) * 100) if image_analysis.get('image_count') else 0 |
|
} |
|
|
|
|
|
weights = { |
|
'trust': 0.20, |
|
'fraud': 0.25, |
|
'quality': 0.15, |
|
'specs': 0.10, |
|
'location': 0.10, |
|
'price': 0.05, |
|
'legal': 0.05, |
|
'documents': 0.05, |
|
'images': 0.05 |
|
} |
|
|
|
final_score = sum(score * weights.get(component, 0) for component, score in component_scores.items()) |
|
verdict['score'] = final_score |
|
|
|
|
|
fraud_level = fraud_classification.get('alert_level', 'minimal') |
|
high_risk_indicators = len(fraud_classification.get('high_risk', [])) |
|
critical_issues = [] |
|
warnings = [] |
|
|
|
|
|
if fraud_level in ['critical', 'high']: |
|
critical_issues.append(f"High fraud risk detected: {fraud_level} alert level") |
|
|
|
if trust_score < 40: |
|
critical_issues.append(f"Very low trust score: {trust_score}%") |
|
|
|
if quality_assessment.get('score', 0) < 30: |
|
critical_issues.append(f"Very low content quality: {quality_assessment.get('score', 0)}%") |
|
|
|
if specs_verification.get('verification_score', 0) < 40: |
|
critical_issues.append(f"Property specifications verification failed: {specs_verification.get('verification_score', 0)}%") |
|
|
|
|
|
if fraud_level == 'medium': |
|
warnings.append(f"Medium fraud risk detected: {fraud_level} alert level") |
|
|
|
if trust_score < 60: |
|
warnings.append(f"Low trust score: {trust_score}%") |
|
|
|
if quality_assessment.get('score', 0) < 60: |
|
warnings.append(f"Low content quality: {quality_assessment.get('score', 0)}%") |
|
|
|
if specs_verification.get('verification_score', 0) < 70: |
|
warnings.append(f"Property specifications have issues: {specs_verification.get('verification_score', 0)}%") |
|
|
|
|
|
for check in cross_validation: |
|
if check.get('status') in ['inconsistent', 'invalid', 'suspicious', 'no_match']: |
|
warnings.append(f"Cross-validation issue: {check.get('message', 'Unknown issue')}") |
|
|
|
|
|
missing_critical = [] |
|
if not location_analysis.get('completeness_score', 0) > 70: |
|
missing_critical.append("Location information is incomplete") |
|
|
|
if not price_analysis.get('has_price', False): |
|
missing_critical.append("Price information is missing") |
|
|
|
if not legal_analysis.get('completeness_score', 0) > 70: |
|
missing_critical.append("Legal information is incomplete") |
|
|
|
if document_analysis.get('pdf_count', 0) == 0: |
|
missing_critical.append("No supporting documents provided") |
|
|
|
if image_analysis.get('image_count', 0) == 0: |
|
missing_critical.append("No property images provided") |
|
|
|
if missing_critical: |
|
warnings.append(f"Missing critical information: {', '.join(missing_critical)}") |
|
|
|
|
|
if critical_issues or (fraud_level in ['critical', 'high'] and trust_score < 50) or high_risk_indicators > 0: |
|
verdict['status'] = 'fraudulent' |
|
verdict['confidence'] = min(100, max(70, 100 - (trust_score * 0.5))) |
|
elif warnings or (fraud_level == 'medium' and trust_score < 70) or specs_verification.get('verification_score', 0) < 60: |
|
verdict['status'] = 'suspicious' |
|
verdict['confidence'] = min(100, max(50, trust_score * 0.8)) |
|
else: |
|
verdict['status'] = 'legitimate' |
|
verdict['confidence'] = min(100, max(70, trust_score * 0.9)) |
|
|
|
|
|
verdict['critical_issues'] = critical_issues |
|
verdict['warnings'] = warnings |
|
|
|
|
|
if critical_issues: |
|
verdict['recommendations'].append("Do not proceed with this property listing") |
|
verdict['recommendations'].append("Report this listing to the platform") |
|
elif warnings: |
|
verdict['recommendations'].append("Proceed with extreme caution") |
|
verdict['recommendations'].append("Request additional verification documents") |
|
verdict['recommendations'].append("Verify all information with independent sources") |
|
else: |
|
verdict['recommendations'].append("Proceed with standard due diligence") |
|
verdict['recommendations'].append("Verify final details before transaction") |
|
|
|
|
|
for missing in missing_critical: |
|
verdict['recommendations'].append(f"Request {missing.lower()}") |
|
|
|
return verdict |
|
except Exception as e: |
|
logger.error(f"Error calculating final verdict: {str(e)}") |
|
return { |
|
'status': 'error', |
|
'confidence': 0.0, |
|
'score': 0.0, |
|
'reasons': [f"Error calculating verdict: {str(e)}"], |
|
'critical_issues': [], |
|
'warnings': [], |
|
'recommendations': ["Unable to determine property status due to an error"] |
|
} |
|
|
|
@app.route('/verify', methods=['POST']) |
|
def verify_property(): |
|
try: |
|
if not request.form and not request.files: |
|
logger.warning("No form data or files provided") |
|
return jsonify({ |
|
'error': 'No data provided', |
|
'status': 'error' |
|
}), 400 |
|
|
|
|
|
data = { |
|
'property_name': request.form.get('property_name', '').strip(), |
|
'property_type': request.form.get('property_type', '').strip(), |
|
'status': request.form.get('status', '').strip(), |
|
'description': request.form.get('description', '').strip(), |
|
'address': request.form.get('address', '').strip(), |
|
'city': request.form.get('city', '').strip(), |
|
'state': request.form.get('state', '').strip(), |
|
'country': request.form.get('country', 'India').strip(), |
|
'zip': request.form.get('zip', '').strip(), |
|
'latitude': request.form.get('latitude', '').strip(), |
|
'longitude': request.form.get('longitude', '').strip(), |
|
'bedrooms': request.form.get('bedrooms', '').strip(), |
|
'bathrooms': request.form.get('bathrooms', '').strip(), |
|
'total_rooms': request.form.get('total_rooms', '').strip(), |
|
'year_built': request.form.get('year_built', '').strip(), |
|
'parking': request.form.get('parking', '').strip(), |
|
'sq_ft': request.form.get('sq_ft', '').strip(), |
|
'market_value': request.form.get('market_value', '').strip(), |
|
'amenities': request.form.get('amenities', '').strip(), |
|
'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), |
|
'legal_details': request.form.get('legal_details', '').strip() |
|
} |
|
|
|
|
|
required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] |
|
missing_fields = [field for field in required_fields if not data[field]] |
|
if missing_fields: |
|
logger.warning(f"Missing required fields: {', '.join(missing_fields)}") |
|
return jsonify({ |
|
'error': f"Missing required fields: {', '.join(missing_fields)}", |
|
'status': 'error' |
|
}), 400 |
|
|
|
|
|
images = [] |
|
image_analysis = [] |
|
if 'images' in request.files: |
|
|
|
image_files = {} |
|
for img_file in request.files.getlist('images'): |
|
if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): |
|
image_files[img_file.filename] = img_file |
|
|
|
|
|
for img_file in image_files.values(): |
|
try: |
|
img = Image.open(img_file) |
|
buffered = io.BytesIO() |
|
img.save(buffered, format="JPEG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
images.append(img_str) |
|
image_analysis.append(analyze_image(img)) |
|
except Exception as e: |
|
logger.error(f"Error processing image {img_file.filename}: {str(e)}") |
|
image_analysis.append({'error': str(e), 'is_property_related': False}) |
|
|
|
|
|
pdf_texts = [] |
|
pdf_analysis = [] |
|
if 'documents' in request.files: |
|
|
|
pdf_files = {} |
|
for pdf_file in request.files.getlist('documents'): |
|
if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): |
|
pdf_files[pdf_file.filename] = pdf_file |
|
|
|
|
|
for pdf_file in pdf_files.values(): |
|
try: |
|
pdf_text = extract_pdf_text(pdf_file) |
|
pdf_texts.append({ |
|
'filename': pdf_file.filename, |
|
'text': pdf_text |
|
}) |
|
pdf_analysis.append(analyze_pdf_content(pdf_text, data)) |
|
except Exception as e: |
|
logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") |
|
pdf_analysis.append({'error': str(e)}) |
|
|
|
|
|
consolidated_text = f""" |
|
Property Name: {data['property_name']} |
|
Property Type: {data['property_type']} |
|
Status: {data['status']} |
|
Description: {data['description']} |
|
Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} |
|
Coordinates: Lat {data['latitude']}, Long {data['longitude']} |
|
Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms |
|
Year Built: {data['year_built']} |
|
Parking: {data['parking']} |
|
Size: {data['sq_ft']} sq. ft. |
|
Market Value: ₹{data['market_value']} |
|
Amenities: {data['amenities']} |
|
Nearby Landmarks: {data['nearby_landmarks']} |
|
Legal Details: {data['legal_details']} |
|
""" |
|
|
|
|
|
try: |
|
description = data['description'] |
|
if description and len(description) > 10: |
|
text_language = detect(description) |
|
if text_language != 'en': |
|
translated_description = GoogleTranslator(source=text_language, target='en').translate(description) |
|
data['description_translated'] = translated_description |
|
else: |
|
data['description_translated'] = description |
|
else: |
|
data['description_translated'] = description |
|
except Exception as e: |
|
logger.error(f"Error in language detection/translation: {str(e)}") |
|
data['description_translated'] = data['description'] |
|
|
|
|
|
async def run_analyses(): |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
loop = asyncio.get_event_loop() |
|
tasks = [ |
|
loop.run_in_executor(executor, generate_property_summary, data), |
|
loop.run_in_executor(executor, classify_fraud, consolidated_text, data), |
|
loop.run_in_executor(executor, generate_trust_score, consolidated_text, image_analysis, pdf_analysis), |
|
loop.run_in_executor(executor, generate_suggestions, consolidated_text, data), |
|
loop.run_in_executor(executor, assess_text_quality, data['description_translated']), |
|
loop.run_in_executor(executor, verify_address, data), |
|
loop.run_in_executor(executor, perform_cross_validation, data), |
|
loop.run_in_executor(executor, analyze_location, data), |
|
loop.run_in_executor(executor, analyze_price, data), |
|
loop.run_in_executor(executor, analyze_legal_details, data['legal_details']), |
|
loop.run_in_executor(executor, verify_property_specs, data), |
|
loop.run_in_executor(executor, analyze_market_value, data) |
|
] |
|
results = await asyncio.gather(*tasks) |
|
return results |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
analysis_results = loop.run_until_complete(run_analyses()) |
|
loop.close() |
|
|
|
|
|
summary, fraud_classification, (trust_score, trust_reasoning), suggestions, quality_assessment, \ |
|
address_verification, cross_validation, location_analysis, price_analysis, legal_analysis, \ |
|
specs_verification, market_analysis = analysis_results |
|
|
|
|
|
document_analysis = { |
|
'pdf_count': len(pdf_texts), |
|
'pdf_texts': pdf_texts, |
|
'pdf_analysis': pdf_analysis |
|
} |
|
image_results = { |
|
'image_count': len(images), |
|
'image_analysis': image_analysis |
|
} |
|
|
|
report_id = str(uuid.uuid4()) |
|
|
|
|
|
results = { |
|
'report_id': report_id, |
|
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
'summary': summary, |
|
'fraud_classification': fraud_classification, |
|
'trust_score': { |
|
'score': trust_score, |
|
'reasoning': trust_reasoning |
|
}, |
|
'suggestions': suggestions, |
|
'quality_assessment': quality_assessment, |
|
'address_verification': address_verification, |
|
'cross_validation': cross_validation, |
|
'location_analysis': location_analysis, |
|
'price_analysis': price_analysis, |
|
'legal_analysis': legal_analysis, |
|
'document_analysis': document_analysis, |
|
'image_analysis': image_results, |
|
'specs_verification': specs_verification, |
|
'market_analysis': market_analysis, |
|
'images': images |
|
} |
|
|
|
|
|
final_verdict = calculate_final_verdict(results) |
|
results['final_verdict'] = final_verdict |
|
|
|
return jsonify(make_json_serializable(results)) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in verify_property: {str(e)}") |
|
return jsonify({ |
|
'error': 'Server error occurred. Please try again later.', |
|
'status': 'error', |
|
'details': str(e) |
|
}), 500 |
|
|
|
if __name__ == '__main__': |
|
|
|
app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False) |
|
|