|
|
|
|
|
import fitz |
|
import re |
|
from .model_loader import load_model |
|
from .logging_config import logger |
|
from sentence_transformers import SentenceTransformer, util |
|
from .property_relation import check_if_property_related |
|
from .utils import summarize_text |
|
|
|
|
|
try: |
|
sentence_model = SentenceTransformer('paraphrase-MiniLM-L6-v2') |
|
logger.info("Sentence transformer loaded successfully in pdf_analysis.py") |
|
except Exception as e: |
|
logger.error(f"Error loading sentence transformer in pdf_analysis.py: {str(e)}") |
|
sentence_model = None |
|
|
|
def extract_pdf_text(pdf_file): |
|
try: |
|
pdf_document = fitz.Document(stream=pdf_file.read(), filetype="pdf") |
|
text = "" |
|
for page in pdf_document: |
|
text += page.get_text() |
|
pdf_document.close() |
|
return text |
|
except Exception as e: |
|
logger.error(f"Error extracting PDF text: {str(e)}") |
|
return "" |
|
|
|
def analyze_pdf_content(document_text, property_data): |
|
try: |
|
if not document_text: |
|
return { |
|
'document_type': {'classification': 'unknown', 'confidence': 0.0}, |
|
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, |
|
'key_info': {}, |
|
'consistency_score': 0.0, |
|
'is_property_related': False, |
|
'summary': 'Empty document', |
|
'has_signatures': False, |
|
'has_dates': False, |
|
'verification_score': 0.0 |
|
} |
|
|
|
|
|
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli") |
|
|
|
|
|
doc_types = [ |
|
"property deed", "sales agreement", "mortgage document", |
|
"property tax record", "title document", "khata certificate", |
|
"encumbrance certificate", "lease agreement", "rental agreement", |
|
"property registration document", "building permit", "other document" |
|
] |
|
|
|
|
|
doc_context = f"{document_text[:1000]} property_type:{property_data.get('property_type', '')} location:{property_data.get('city', '')}" |
|
doc_result = classifier(doc_context, doc_types) |
|
doc_type = doc_result['labels'][0] |
|
doc_confidence = doc_result['scores'][0] |
|
|
|
|
|
authenticity_aspects = [ |
|
"authentic legal document", |
|
"questionable document", |
|
"forged document", |
|
"template document", |
|
"official document" |
|
] |
|
authenticity_result = classifier(document_text[:1000], authenticity_aspects) |
|
authenticity = "likely authentic" if authenticity_result['labels'][0] == "authentic legal document" else "questionable" |
|
authenticity_confidence = authenticity_result['scores'][0] |
|
|
|
|
|
key_info = extract_document_key_info(document_text) |
|
|
|
|
|
consistency_score = check_document_consistency(document_text, property_data) |
|
|
|
|
|
property_context = f"{document_text[:1000]} property:{property_data.get('property_name', '')} type:{property_data.get('property_type', '')}" |
|
is_property_related = check_if_property_related(property_context)['is_related'] |
|
|
|
|
|
summary = summarize_text(document_text[:2000]) |
|
|
|
|
|
has_signatures = bool(re.search(r'(?:sign|signature|signed|witness|notary|authorized).{0,50}(?:by|of|for)', document_text.lower())) |
|
has_dates = bool(re.search(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}', document_text)) |
|
|
|
|
|
verification_weights = { |
|
'doc_type': 0.3, |
|
'authenticity': 0.3, |
|
'consistency': 0.2, |
|
'property_relation': 0.1, |
|
'signatures_dates': 0.1 |
|
} |
|
|
|
verification_score = ( |
|
doc_confidence * verification_weights['doc_type'] + |
|
authenticity_confidence * verification_weights['authenticity'] + |
|
consistency_score * verification_weights['consistency'] + |
|
float(is_property_related) * verification_weights['property_relation'] + |
|
float(has_signatures and has_dates) * verification_weights['signatures_dates'] |
|
) |
|
|
|
return { |
|
'document_type': {'classification': doc_type, 'confidence': float(doc_confidence)}, |
|
'authenticity': {'assessment': authenticity, 'confidence': float(authenticity_confidence)}, |
|
'key_info': key_info, |
|
'consistency_score': float(consistency_score), |
|
'is_property_related': is_property_related, |
|
'summary': summary, |
|
'has_signatures': has_signatures, |
|
'has_dates': has_dates, |
|
'verification_score': float(verification_score) |
|
} |
|
except Exception as e: |
|
logger.error(f"Error analyzing PDF content: {str(e)}") |
|
return { |
|
'document_type': {'classification': 'unknown', 'confidence': 0.0}, |
|
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0}, |
|
'key_info': {}, |
|
'consistency_score': 0.0, |
|
'is_property_related': False, |
|
'summary': 'Could not analyze document', |
|
'has_signatures': False, |
|
'has_dates': False, |
|
'verification_score': 0.0, |
|
'error': str(e) |
|
} |
|
|
|
def check_document_consistency(document_text, property_data): |
|
try: |
|
if not sentence_model: |
|
logger.warning("Sentence model unavailable") |
|
return 0.5 |
|
property_text = ' '.join([ |
|
property_data.get(key, '') for key in [ |
|
'property_name', 'property_type', 'address', 'city', |
|
'state', 'market_value', 'sq_ft', 'bedrooms' |
|
] |
|
]) |
|
property_embedding = sentence_model.encode(property_text) |
|
document_embedding = sentence_model.encode(document_text[:1000]) |
|
similarity = util.cos_sim(property_embedding, document_embedding)[0][0].item() |
|
return max(0.0, min(1.0, float(similarity))) |
|
except Exception as e: |
|
logger.error(f"Error checking document consistency: {str(e)}") |
|
return 0.0 |
|
|
|
def extract_document_key_info(text): |
|
try: |
|
info = {} |
|
patterns = { |
|
'property_address': r'(?:property|premises|located at)[:\s]+([^\n.]+)', |
|
'price': r'(?:price|value|amount)[:\s]+(?:Rs\.?|₹)?[\s]*([0-9,.]+)', |
|
'date': r'(?:date|dated|executed on)[:\s]+([^\n.]+\d{4})', |
|
'seller': r'(?:seller|grantor|owner)[:\s]+([^\n.]+)', |
|
'buyer': r'(?:buyer|grantee|purchaser)[:\s]+([^\n.]+)', |
|
'size': r'(?:area|size|extent)[:\s]+([0-9,.]+)[\s]*(?:sq\.?[\s]*(?:ft|feet))', |
|
'registration_number': r'(?:registration|reg\.?|document)[\s]*(?:no\.?|number|#)[:\s]*([A-Za-z0-9\-/]+)' |
|
} |
|
for key, pattern in patterns.items(): |
|
match = re.search(pattern, text, re.IGNORECASE) |
|
if match: |
|
info[key] = match.group(1).strip() |
|
return info |
|
except Exception as e: |
|
logger.error(f"Error extracting document key info: {str(e)}") |
|
return {} |
|
|