|
|
|
|
|
import re |
|
from .model_loader import load_model |
|
from .logging_config import logger |
|
|
|
def classify_fraud(property_details, description): |
|
""" |
|
Classify the risk of fraud in a property listing using zero-shot classification. |
|
This function analyzes property details and description to identify potential fraud indicators. |
|
""" |
|
try: |
|
|
|
fraud_classification = { |
|
'alert_level': 'minimal', |
|
'alert_score': 0.0, |
|
'high_risk': [], |
|
'medium_risk': [], |
|
'low_risk': [], |
|
'confidence_scores': {} |
|
} |
|
|
|
|
|
text_to_analyze = f"{property_details}\n{description}" |
|
|
|
|
|
risk_categories = [ |
|
"fraudulent listing", |
|
"misleading information", |
|
"fake property", |
|
"scam attempt", |
|
"legitimate listing" |
|
] |
|
|
|
|
|
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli") |
|
result = classifier(text_to_analyze, risk_categories, multi_label=True) |
|
|
|
|
|
fraud_score = 0.0 |
|
for label, score in zip(result['labels'], result['scores']): |
|
if label != "legitimate listing": |
|
fraud_score += score |
|
fraud_classification['confidence_scores'][label] = score |
|
|
|
|
|
fraud_score = min(1.0, fraud_score / (len(risk_categories) - 1)) |
|
fraud_classification['alert_score'] = fraud_score |
|
|
|
|
|
fraud_indicators = { |
|
'high_risk': [ |
|
r'urgent|immediate|hurry|limited time|special offer', |
|
r'bank|transfer|wire|payment|money', |
|
r'fake|scam|fraud|illegal|unauthorized', |
|
r'guaranteed|promised|assured|certain', |
|
r'contact.*whatsapp|whatsapp.*contact', |
|
r'price.*negotiable|negotiable.*price', |
|
r'no.*documents|documents.*not.*required', |
|
r'cash.*only|only.*cash', |
|
r'off.*market|market.*off', |
|
r'under.*table|table.*under' |
|
], |
|
'medium_risk': [ |
|
r'unverified|unconfirmed|unchecked', |
|
r'partial|incomplete|missing', |
|
r'different.*location|location.*different', |
|
r'price.*increased|increased.*price', |
|
r'no.*photos|photos.*not.*available', |
|
r'contact.*email|email.*contact', |
|
r'agent.*not.*available|not.*available.*agent', |
|
r'property.*not.*viewable|not.*viewable.*property', |
|
r'price.*changed|changed.*price', |
|
r'details.*updated|updated.*details' |
|
], |
|
'low_risk': [ |
|
r'new.*listing|listing.*new', |
|
r'recent.*update|update.*recent', |
|
r'price.*reduced|reduced.*price', |
|
r'contact.*phone|phone.*contact', |
|
r'agent.*available|available.*agent', |
|
r'property.*viewable|viewable.*property', |
|
r'photos.*available|available.*photos', |
|
r'documents.*available|available.*documents', |
|
r'price.*fixed|fixed.*price', |
|
r'details.*complete|complete.*details' |
|
] |
|
} |
|
|
|
|
|
for risk_level, patterns in fraud_indicators.items(): |
|
for pattern in patterns: |
|
matches = re.finditer(pattern, text_to_analyze, re.IGNORECASE) |
|
for match in matches: |
|
indicator = match.group(0) |
|
if indicator not in fraud_classification[risk_level]: |
|
fraud_classification[risk_level].append(indicator) |
|
|
|
|
|
if fraud_score > 0.7 or len(fraud_classification['high_risk']) > 0: |
|
fraud_classification['alert_level'] = 'critical' |
|
elif fraud_score > 0.5 or len(fraud_classification['medium_risk']) > 2: |
|
fraud_classification['alert_level'] = 'high' |
|
elif fraud_score > 0.3 or len(fraud_classification['medium_risk']) > 0: |
|
fraud_classification['alert_level'] = 'medium' |
|
elif fraud_score > 0.1 or len(fraud_classification['low_risk']) > 0: |
|
fraud_classification['alert_level'] = 'low' |
|
else: |
|
fraud_classification['alert_level'] = 'minimal' |
|
|
|
|
|
if re.search(r'price.*too.*good|too.*good.*price', text_to_analyze, re.IGNORECASE): |
|
fraud_classification['high_risk'].append("Unrealistically low price") |
|
|
|
if re.search(r'no.*inspection|inspection.*not.*allowed', text_to_analyze, re.IGNORECASE): |
|
fraud_classification['high_risk'].append("No property inspection allowed") |
|
|
|
if re.search(r'owner.*abroad|abroad.*owner', text_to_analyze, re.IGNORECASE): |
|
fraud_classification['medium_risk'].append("Owner claims to be abroad") |
|
|
|
if re.search(r'agent.*unavailable|unavailable.*agent', text_to_analyze, re.IGNORECASE): |
|
fraud_classification['medium_risk'].append("Agent unavailable for verification") |
|
|
|
|
|
if 'price' in property_details and 'market_value' in property_details: |
|
try: |
|
price = float(re.search(r'\d+(?:,\d+)*(?:\.\d+)?', property_details['price']).group().replace(',', '')) |
|
market_value = float(re.search(r'\d+(?:,\d+)*(?:\.\d+)?', property_details['market_value']).group().replace(',', '')) |
|
if price < market_value * 0.5: |
|
fraud_classification['high_risk'].append("Price significantly below market value") |
|
except (ValueError, AttributeError): |
|
pass |
|
|
|
return fraud_classification |
|
except Exception as e: |
|
logger.error(f"Error in fraud classification: {str(e)}") |
|
return { |
|
'alert_level': 'error', |
|
'alert_score': 1.0, |
|
'high_risk': [f"Error in fraud classification: {str(e)}"], |
|
'medium_risk': [], |
|
'low_risk': [], |
|
'confidence_scores': {} |
|
} |
|
|