"""Create a semantic index of document content using embeddings.""" from typing import Dict, Any, List, Tuple import logging import numpy as np from .base_agent import BaseAgent from services.embedding_client import EmbeddingClient class IndexAgent(BaseAgent): def __init__(self): self.logger = logging.getLogger(__name__) self.embedding_client = EmbeddingClient() self.logger.info("IndexAgent initialized") def execute(self, ctx: Dict[str, Any]): """Create a semantic index of document content.""" try: self.logger.info("Starting index creation") # Get text from PDF agent text = ctx.get("text", "") if not text: self.logger.warning("No text content found in context") return {} self.logger.info(f"Found text content of length {len(text)}") # Get tables from Table agent tables = ctx.get("tables", []) self.logger.info(f"Found {len(tables)} tables in context") # Combine all content all_content = text if tables: all_content += "\n".join(tables) self.logger.info(f"Combined content length: {len(all_content)}") # Create chunks with metadata chunks = self._create_chunks(all_content) self.logger.info(f"Created {len(chunks)} content chunks") for i, chunk in enumerate(chunks): self.logger.debug(f"Chunk {i}: {chunk['text'][:100]}...") # Get embeddings for chunks chunk_texts = [chunk["text"] for chunk in chunks] self.logger.info(f"Getting embeddings for {len(chunk_texts)} chunks") embeddings = self.embedding_client.embed(chunk_texts) self.logger.info(f"Generated {len(embeddings)} embeddings") # Create semantic index index = { "chunks": chunks, "embeddings": embeddings, "text": all_content, # Keep full text for non-semantic search } # Store in context ctx["index"] = index self.logger.info(f"Created semantic index with {len(chunks)} chunks") return index except Exception as e: self.logger.error(f"Error in IndexAgent: {str(e)}", exc_info=True) return {} def _create_chunks(self, text: str, chunk_size: int = 1000) -> List[Dict[str, Any]]: """Split text into chunks with metadata.""" self.logger.info(f"Creating chunks from text of length {len(text)}") chunks = [] sentences = text.split(". ") self.logger.info(f"Split into {len(sentences)} sentences") current_chunk = [] current_size = 0 total_length = 0 for sentence in sentences: sentence = sentence.strip() + ". " sentence_size = len(sentence) if current_size + sentence_size > chunk_size and current_chunk: # Save current chunk chunk_text = "".join(current_chunk) chunks.append({ "text": chunk_text, "start": total_length, "end": total_length + len(chunk_text), "type": "text" }) total_length += len(chunk_text) self.logger.debug(f"Created chunk of size {len(chunk_text)}") current_chunk = [] current_size = 0 current_chunk.append(sentence) current_size += sentence_size # Add last chunk if any if current_chunk: chunk_text = "".join(current_chunk) chunks.append({ "text": chunk_text, "start": total_length, "end": total_length + len(chunk_text), "type": "text" }) self.logger.debug(f"Created final chunk of size {len(chunk_text)}") self.logger.info(f"Created {len(chunks)} total chunks") return chunks def find_similar_chunks(self, query: str, index: Dict[str, Any], top_k: int = 3) -> List[Dict[str, Any]]: """Find chunks semantically similar to the query.""" try: self.logger.info(f"Finding similar chunks for query: {query}") # Get query embedding query_embedding = self.embedding_client.embed([query])[0] # Calculate similarities similarities = [] for chunk, embedding in zip(index["chunks"], index["embeddings"]): similarity = self._cosine_similarity(query_embedding, embedding) similarities.append((similarity, chunk)) self.logger.debug(f"Chunk similarity: {similarity:.3f}") # Sort by similarity and return top k similarities.sort(reverse=True) results = [chunk for _, chunk in similarities[:top_k]] self.logger.info(f"Found {len(results)} similar chunks") return results except Exception as e: self.logger.error(f"Error finding similar chunks: {str(e)}", exc_info=True) return [] def _cosine_similarity(self, a: List[float], b: List[float]) -> float: """Calculate cosine similarity between two vectors.""" return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))