# src/vectorstores/optimized_vectorstore.py import asyncio from typing import Tuple, Optional, List, Dict, Any, Callable import concurrent.futures from functools import lru_cache from .base_vectorstore import BaseVectorStore from .chroma_vectorstore import ChromaVectorStore from src.embeddings.huggingface_embedding import HuggingFaceEmbedding from src.utils.logger import logger from config.config import settings class OptimizedVectorStore(ChromaVectorStore): """ Optimized vector store that maintains ChromaVectorStore compatibility while adding caching and async initialization """ _instance: Optional['OptimizedVectorStore'] = None _lock = asyncio.Lock() _initialized = False _embedding_model: Optional[HuggingFaceEmbedding] = None _executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__( self, embedding_function: Optional[Callable] = None, persist_directory: str = settings.CHROMA_PATH, collection_name: str = "documents", client_settings: Optional[Dict[str, Any]] = None ): """ Initialize the optimized vector store Note: The actual initialization is deferred until needed """ if not self._initialized: self._persist_directory = persist_directory self._collection_name = collection_name self._client_settings = client_settings self._embedding_function = embedding_function # Don't call super().__init__() here - we'll do it in _initialize() @classmethod async def create( cls, persist_directory: str = settings.CHROMA_PATH, collection_name: str = "documents", client_settings: Optional[Dict[str, Any]] = None ) -> Tuple['OptimizedVectorStore', HuggingFaceEmbedding]: """ Asynchronously create or get instance Returns: Tuple[OptimizedVectorStore, HuggingFaceEmbedding]: The vector store instance and embedding model """ async with cls._lock: if not cls._instance or not cls._initialized: instance = cls( persist_directory=persist_directory, collection_name=collection_name, client_settings=client_settings ) await instance._initialize() cls._instance = instance return cls._instance, cls._instance._embedding_model async def _initialize(self) -> None: """Initialize the vector store and embedding model""" if self._initialized: return try: # Load embedding model in background thread self._embedding_model = await self._load_embedding_model() # Initialize ChromaVectorStore with the loaded model super().__init__( embedding_function=self._embedding_model.embed_documents, persist_directory=self._persist_directory, collection_name=self._collection_name, client_settings=self._client_settings ) self._initialized = True except Exception as e: logger.error(f"Error initializing vector store: {str(e)}") raise async def _load_embedding_model(self) -> HuggingFaceEmbedding: """Load embedding model in background thread""" try: loop = asyncio.get_event_loop() return await loop.run_in_executor( self._executor, self._create_embedding_model ) except Exception as e: logger.error(f"Error loading embedding model: {str(e)}") raise @staticmethod @lru_cache(maxsize=1) def _create_embedding_model() -> HuggingFaceEmbedding: """Create and cache embedding model""" return HuggingFaceEmbedding(model_name=settings.EMBEDDING_MODEL) def __getattribute__(self, name): """ Ensure initialization before accessing any ChromaVectorStore methods """ # Get the attribute from the class attr = super().__getattribute__(name) # If it's a method from ChromaVectorStore, ensure initialization if callable(attr) and name in ChromaVectorStore.__dict__: if not self._initialized: raise RuntimeError( "Vector store not initialized. Please use 'await OptimizedVectorStore.create()'" ) return attr # Factory function for getting optimized vector store async def get_optimized_vector_store() -> Tuple[ChromaVectorStore, HuggingFaceEmbedding]: """ Get or create an optimized vector store instance Returns: Tuple[ChromaVectorStore, HuggingFaceEmbedding]: The vector store and embedding model instances """ return await OptimizedVectorStore.create()