sparrow-ml / embeddings /agents /llamaindex.py
katanaml's picture
Sparrow Parse
42cd5f6
from .interface import Ingest
import weaviate
from llama_index.core import StorageContext, SimpleDirectoryReader, Settings, VectorStoreIndex
from llama_index.vector_stores.weaviate import WeaviateVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
import box
import yaml
from rich.progress import Progress, SpinnerColumn, TextColumn
import timeit
from rich import print
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)
class LlamaIndexIngest(Ingest):
def run_ingest(self,
payload: str,
file_path: str,
index_name: str) -> None:
print(f"\nRunning ingest with {payload}\n")
# Import config vars
with open('config.yml', 'r', encoding='utf8') as ymlfile:
cfg = box.Box(yaml.safe_load(ymlfile))
start = timeit.default_timer()
client = self.invoke_pipeline_step(lambda: weaviate.Client(cfg.WEAVIATE_URL),
"Connecting to Weaviate...")
documents = self.invoke_pipeline_step(lambda: self.load_documents(file_path),
"Loading documents...")
embeddings = self.invoke_pipeline_step(lambda: self.load_embedding_model(cfg.EMBEDDINGS),
"Loading embedding model...")
index = self.invoke_pipeline_step(lambda: self.build_index(client, embeddings, documents, index_name,
cfg.CHUNK_SIZE),
"Building index...")
end = timeit.default_timer()
print(f"\nTime to ingest data: {end - start}\n")
def load_documents(self, file_path):
documents = SimpleDirectoryReader(input_files=[file_path], required_exts=[".pdf", ".PDF"]).load_data()
print(f"\nLoaded {len(documents)} documents")
print(f"\nFirst document: {documents[0]}")
print("\nFirst document content:\n")
print(documents[0])
print()
return documents
def load_embedding_model(self, model_name):
return HuggingFaceEmbedding(model_name=model_name)
def build_index(self, weaviate_client, embed_model, documents, index_name, chunk_size):
# Delete index if it already exists, to avoid data corruption
weaviate_client.schema.delete_class(index_name)
Settings.chunk_size = chunk_size
Settings.llm = None
Settings.embed_model = embed_model
vector_store = WeaviateVectorStore(weaviate_client=weaviate_client, index_name=index_name)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context
)
return index
def invoke_pipeline_step(self, task_call, task_description):
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
transient=False,
) as progress:
progress.add_task(description=task_description, total=None)
ret = task_call()
return ret