File size: 5,612 Bytes
0a40afa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Create a semantic index of document content using embeddings."""
from typing import Dict, Any, List, Tuple
import logging
import numpy as np
from .base_agent import BaseAgent
from services.embedding_client import EmbeddingClient

class IndexAgent(BaseAgent):
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.embedding_client = EmbeddingClient()
        self.logger.info("IndexAgent initialized")

    def execute(self, ctx: Dict[str, Any]):
        """Create a semantic index of document content."""
        try:
            self.logger.info("Starting index creation")
            
            # Get text from PDF agent
            text = ctx.get("text", "")
            if not text:
                self.logger.warning("No text content found in context")
                return {}
            self.logger.info(f"Found text content of length {len(text)}")
            
            # Get tables from Table agent
            tables = ctx.get("tables", [])
            self.logger.info(f"Found {len(tables)} tables in context")
            
            # Combine all content
            all_content = text
            if tables:
                all_content += "\n".join(tables)
                self.logger.info(f"Combined content length: {len(all_content)}")
            
            # Create chunks with metadata
            chunks = self._create_chunks(all_content)
            self.logger.info(f"Created {len(chunks)} content chunks")
            for i, chunk in enumerate(chunks):
                self.logger.debug(f"Chunk {i}: {chunk['text'][:100]}...")
            
            # Get embeddings for chunks
            chunk_texts = [chunk["text"] for chunk in chunks]
            self.logger.info(f"Getting embeddings for {len(chunk_texts)} chunks")
            embeddings = self.embedding_client.embed(chunk_texts)
            self.logger.info(f"Generated {len(embeddings)} embeddings")
            
            # Create semantic index
            index = {
                "chunks": chunks,
                "embeddings": embeddings,
                "text": all_content,  # Keep full text for non-semantic search
            }
            
            # Store in context
            ctx["index"] = index
            self.logger.info(f"Created semantic index with {len(chunks)} chunks")
            return index
            
        except Exception as e:
            self.logger.error(f"Error in IndexAgent: {str(e)}", exc_info=True)
            return {}

    def _create_chunks(self, text: str, chunk_size: int = 1000) -> List[Dict[str, Any]]:
        """Split text into chunks with metadata."""
        self.logger.info(f"Creating chunks from text of length {len(text)}")
        chunks = []
        sentences = text.split(". ")
        self.logger.info(f"Split into {len(sentences)} sentences")
        current_chunk = []
        current_size = 0
        total_length = 0
        
        for sentence in sentences:
            sentence = sentence.strip() + ". "
            sentence_size = len(sentence)
            
            if current_size + sentence_size > chunk_size and current_chunk:
                # Save current chunk
                chunk_text = "".join(current_chunk)
                chunks.append({
                    "text": chunk_text,
                    "start": total_length,
                    "end": total_length + len(chunk_text),
                    "type": "text"
                })
                total_length += len(chunk_text)
                self.logger.debug(f"Created chunk of size {len(chunk_text)}")
                current_chunk = []
                current_size = 0
            
            current_chunk.append(sentence)
            current_size += sentence_size
        
        # Add last chunk if any
        if current_chunk:
            chunk_text = "".join(current_chunk)
            chunks.append({
                "text": chunk_text,
                "start": total_length,
                "end": total_length + len(chunk_text),
                "type": "text"
            })
            self.logger.debug(f"Created final chunk of size {len(chunk_text)}")
        
        self.logger.info(f"Created {len(chunks)} total chunks")
        return chunks

    def find_similar_chunks(self, query: str, index: Dict[str, Any], top_k: int = 3) -> List[Dict[str, Any]]:
        """Find chunks semantically similar to the query."""
        try:
            self.logger.info(f"Finding similar chunks for query: {query}")
            # Get query embedding
            query_embedding = self.embedding_client.embed([query])[0]
            
            # Calculate similarities
            similarities = []
            for chunk, embedding in zip(index["chunks"], index["embeddings"]):
                similarity = self._cosine_similarity(query_embedding, embedding)
                similarities.append((similarity, chunk))
                self.logger.debug(f"Chunk similarity: {similarity:.3f}")
            
            # Sort by similarity and return top k
            similarities.sort(reverse=True)
            results = [chunk for _, chunk in similarities[:top_k]]
            self.logger.info(f"Found {len(results)} similar chunks")
            return results
            
        except Exception as e:
            self.logger.error(f"Error finding similar chunks: {str(e)}", exc_info=True)
            return []

    def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
        """Calculate cosine similarity between two vectors."""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))