test / database /query_processor.py
christopher
removed async
c708265
raw
history blame
4.83 kB
import datetime
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
from datetime import datetime as dt
logger = logging.getLogger(__name__)
class QueryProcessor:
def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
self.embedding_model = embedding_model
self.summarization_model = summarization_model
self.nlp_model = nlp_model
self.db_service = db_service
logger.info("QueryProcessor initialized")
async def process(
self,
query: str,
topic: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
try:
# Date handling
start_dt = self._parse_date(start_date) if start_date else None
end_dt = self._parse_date(end_date) if end_date else None
# Query processing
query_embedding = self.embedding_model.encode(query).tolist()
logger.debug(f"Generated embedding for query: {query[:50]}...")
# Entity extraction
entities = self.nlp_model.extract_entities(query)
logger.debug(f"Extracted entities: {entities}")
# Database search
articles = await self._execute_semantic_search(
query_embedding,
start_dt,
end_dt,
topic,
[ent[0] for ent in entities] # Just the entity texts
)
if not articles:
logger.info("No articles found matching criteria")
return {"message": "No articles found", "articles": []}
# Summary generation
summary_data = self._generate_summary(articles)
return {
"summary": summary_data["summary"],
"key_sentences": summary_data["key_sentences"],
"articles": articles,
"entities": entities
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return {"error": str(e)}
def _parse_date(self, date_str: str) -> dt:
"""Safe date parsing with validation"""
try:
return dt.strptime(date_str, "%Y-%m-%d")
except ValueError as e:
logger.error(f"Invalid date format: {date_str}")
raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}")
async def _execute_semantic_search(
self,
query_embedding: List[float],
start_date: Optional[dt],
end_date: Optional[dt],
topic: Optional[str],
entities: List[str]
) -> List[Dict[str, Any]]:
"""Execute search with proper error handling"""
try:
return await self.db_service.semantic_search(
query_embedding=query_embedding,
start_date=start_date,
end_date=end_date,
topic=topic,
entities=entities
)
except Exception as e:
logger.error(f"Semantic search failed: {str(e)}")
raise
def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Generate summary from articles with fallback handling"""
try:
# Extract and process content
sentences = []
for article in articles:
if content := article.get("content"):
sentences.extend(self.nlp_model.tokenize_sentences(content))
if not sentences:
logger.warning("No sentences available for summarization")
return {
"summary": "No content available for summarization",
"key_sentences": []
}
# Generate summary
embeddings = self.embedding_model.encode(sentences)
similarity_matrix = np.inner(embeddings, embeddings)
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
# Get top 10 most central sentences
top_indices = np.argsort(-centrality_scores)[:10]
key_sentences = [sentences[idx].strip() for idx in top_indices]
return {
"summary": self.summarization_model.summarize(' '.join(key_sentences)),
"key_sentences": key_sentences
}
except Exception as e:
logger.error(f"Summary generation failed: {str(e)}")
return {
"summary": "Summary generation failed",
"key_sentences": []
}