sksameermujahid's picture
Upload 45 files
14cb7ae verified
# models/pdf_analysis.py
import fitz # PyMuPDF
import re
from .model_loader import load_model
from .logging_config import logger
from sentence_transformers import SentenceTransformer, util
from .property_relation import check_if_property_related
from .utils import summarize_text
# Initialize sentence transformer
try:
sentence_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
logger.info("Sentence transformer loaded successfully in pdf_analysis.py")
except Exception as e:
logger.error(f"Error loading sentence transformer in pdf_analysis.py: {str(e)}")
sentence_model = None
def extract_pdf_text(pdf_file):
try:
pdf_document = fitz.Document(stream=pdf_file.read(), filetype="pdf")
text = ""
for page in pdf_document:
text += page.get_text()
pdf_document.close()
return text
except Exception as e:
logger.error(f"Error extracting PDF text: {str(e)}")
return ""
def analyze_pdf_content(document_text, property_data):
try:
if not document_text:
return {
'document_type': {'classification': 'unknown', 'confidence': 0.0},
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0},
'key_info': {},
'consistency_score': 0.0,
'is_property_related': False,
'summary': 'Empty document',
'has_signatures': False,
'has_dates': False,
'verification_score': 0.0
}
# Use a more sophisticated model for document classification
classifier = load_model("zero-shot-classification", "typeform/mobilebert-uncased-mnli")
# Enhanced document types with more specific categories
doc_types = [
"property deed", "sales agreement", "mortgage document",
"property tax record", "title document", "khata certificate",
"encumbrance certificate", "lease agreement", "rental agreement",
"property registration document", "building permit", "other document"
]
# Analyze document type with context
doc_context = f"{document_text[:1000]} property_type:{property_data.get('property_type', '')} location:{property_data.get('city', '')}"
doc_result = classifier(doc_context, doc_types)
doc_type = doc_result['labels'][0]
doc_confidence = doc_result['scores'][0]
# Enhanced authenticity check with multiple aspects
authenticity_aspects = [
"authentic legal document",
"questionable document",
"forged document",
"template document",
"official document"
]
authenticity_result = classifier(document_text[:1000], authenticity_aspects)
authenticity = "likely authentic" if authenticity_result['labels'][0] == "authentic legal document" else "questionable"
authenticity_confidence = authenticity_result['scores'][0]
# Extract key information using NLP
key_info = extract_document_key_info(document_text)
# Enhanced consistency check
consistency_score = check_document_consistency(document_text, property_data)
# Property relation check with context
property_context = f"{document_text[:1000]} property:{property_data.get('property_name', '')} type:{property_data.get('property_type', '')}"
is_property_related = check_if_property_related(property_context)['is_related']
# Generate summary using BART
summary = summarize_text(document_text[:2000])
# Enhanced signature and date detection
has_signatures = bool(re.search(r'(?:sign|signature|signed|witness|notary|authorized).{0,50}(?:by|of|for)', document_text.lower()))
has_dates = bool(re.search(r'\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}', document_text))
# Calculate verification score with weighted components
verification_weights = {
'doc_type': 0.3,
'authenticity': 0.3,
'consistency': 0.2,
'property_relation': 0.1,
'signatures_dates': 0.1
}
verification_score = (
doc_confidence * verification_weights['doc_type'] +
authenticity_confidence * verification_weights['authenticity'] +
consistency_score * verification_weights['consistency'] +
float(is_property_related) * verification_weights['property_relation'] +
float(has_signatures and has_dates) * verification_weights['signatures_dates']
)
return {
'document_type': {'classification': doc_type, 'confidence': float(doc_confidence)},
'authenticity': {'assessment': authenticity, 'confidence': float(authenticity_confidence)},
'key_info': key_info,
'consistency_score': float(consistency_score),
'is_property_related': is_property_related,
'summary': summary,
'has_signatures': has_signatures,
'has_dates': has_dates,
'verification_score': float(verification_score)
}
except Exception as e:
logger.error(f"Error analyzing PDF content: {str(e)}")
return {
'document_type': {'classification': 'unknown', 'confidence': 0.0},
'authenticity': {'assessment': 'could not verify', 'confidence': 0.0},
'key_info': {},
'consistency_score': 0.0,
'is_property_related': False,
'summary': 'Could not analyze document',
'has_signatures': False,
'has_dates': False,
'verification_score': 0.0,
'error': str(e)
}
def check_document_consistency(document_text, property_data):
try:
if not sentence_model:
logger.warning("Sentence model unavailable")
return 0.5
property_text = ' '.join([
property_data.get(key, '') for key in [
'property_name', 'property_type', 'address', 'city',
'state', 'market_value', 'sq_ft', 'bedrooms'
]
])
property_embedding = sentence_model.encode(property_text)
document_embedding = sentence_model.encode(document_text[:1000])
similarity = util.cos_sim(property_embedding, document_embedding)[0][0].item()
return max(0.0, min(1.0, float(similarity)))
except Exception as e:
logger.error(f"Error checking document consistency: {str(e)}")
return 0.0
def extract_document_key_info(text):
try:
info = {}
patterns = {
'property_address': r'(?:property|premises|located at)[:\s]+([^\n.]+)',
'price': r'(?:price|value|amount)[:\s]+(?:Rs\.?|₹)?[\s]*([0-9,.]+)',
'date': r'(?:date|dated|executed on)[:\s]+([^\n.]+\d{4})',
'seller': r'(?:seller|grantor|owner)[:\s]+([^\n.]+)',
'buyer': r'(?:buyer|grantee|purchaser)[:\s]+([^\n.]+)',
'size': r'(?:area|size|extent)[:\s]+([0-9,.]+)[\s]*(?:sq\.?[\s]*(?:ft|feet))',
'registration_number': r'(?:registration|reg\.?|document)[\s]*(?:no\.?|number|#)[:\s]*([A-Za-z0-9\-/]+)'
}
for key, pattern in patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
info[key] = match.group(1).strip()
return info
except Exception as e:
logger.error(f"Error extracting document key info: {str(e)}")
return {}