File size: 3,294 Bytes
42cd5f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from .interface import Ingest
import weaviate
from llama_index.core import StorageContext, SimpleDirectoryReader, Settings, VectorStoreIndex
from llama_index.vector_stores.weaviate import WeaviateVectorStore
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
import box
import yaml
from rich.progress import Progress, SpinnerColumn, TextColumn
import timeit
from rich import print
import warnings


warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=UserWarning)


class LlamaIndexIngest(Ingest):
    def run_ingest(self,
                   payload: str,
                   file_path: str,
                   index_name: str) -> None:
        print(f"\nRunning ingest with {payload}\n")

        # Import config vars
        with open('config.yml', 'r', encoding='utf8') as ymlfile:
            cfg = box.Box(yaml.safe_load(ymlfile))

        start = timeit.default_timer()

        client = self.invoke_pipeline_step(lambda: weaviate.Client(cfg.WEAVIATE_URL),
                                           "Connecting to Weaviate...")

        documents = self.invoke_pipeline_step(lambda: self.load_documents(file_path),
                                         "Loading documents...")

        embeddings = self.invoke_pipeline_step(lambda: self.load_embedding_model(cfg.EMBEDDINGS),
                                          "Loading embedding model...")

        index = self.invoke_pipeline_step(lambda: self.build_index(client, embeddings, documents, index_name,
                                                                   cfg.CHUNK_SIZE),
                                          "Building index...")

        end = timeit.default_timer()
        print(f"\nTime to ingest data: {end - start}\n")

    def load_documents(self, file_path):
        documents = SimpleDirectoryReader(input_files=[file_path], required_exts=[".pdf", ".PDF"]).load_data()
        print(f"\nLoaded {len(documents)} documents")
        print(f"\nFirst document: {documents[0]}")
        print("\nFirst document content:\n")
        print(documents[0])
        print()
        return documents

    def load_embedding_model(self, model_name):
        return HuggingFaceEmbedding(model_name=model_name)

    def build_index(self, weaviate_client, embed_model, documents, index_name, chunk_size):
        # Delete index if it already exists, to avoid data corruption
        weaviate_client.schema.delete_class(index_name)

        Settings.chunk_size = chunk_size
        Settings.llm = None
        Settings.embed_model = embed_model

        vector_store = WeaviateVectorStore(weaviate_client=weaviate_client, index_name=index_name)
        storage_context = StorageContext.from_defaults(vector_store=vector_store)

        index = VectorStoreIndex.from_documents(
            documents,
            storage_context=storage_context
        )

        return index

    def invoke_pipeline_step(self, task_call, task_description):
        with Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                transient=False,
        ) as progress:
            progress.add_task(description=task_description, total=None)
            ret = task_call()
        return ret