In [5]:
# See README for more info on how the FeaturePipeline works
# The Ingestion pipeline is part of the FeaturePipeline
# Make sure to ollama serve before running!
from langchain_text_splitters import RecursiveCharacterTextSplitter
from qdrant_client.http.models import Distance, VectorParams, PointStruct
from shared import getMongoClient, getQdrantClient, getEmbeddingsModel

# Create a mongoDB connection
mongoHost = getMongoClient()

# Create a qdrant connection
qClient = getQdrantClient()

# Create qdrant collections to store embeddings
if not qClient.collection_exists("Github"):
 qClient.create_collection(
 collection_name="Github",
 vectors_config=VectorParams(size=3072, distance=Distance.COSINE),
 )
if not qClient.collection_exists("Document"):
 qClient.create_collection(
 collection_name="Document",
 vectors_config=VectorParams(size=3072, distance=Distance.COSINE),
 )

# Ingestion Pipeline Setup
# Define a text cleaner
def cleanText(text):
 return ''.join(char for char in text if 32 <= ord(char) <= 126)

# Setup the text chunker
text_splitter = RecursiveCharacterTextSplitter(
 chunk_size=500,
 chunk_overlap=20,
 length_function=len,
 is_separator_regex=False,
)

# Setup the text embedder
embeddingsModel = getEmbeddingsModel()

# Running the ingestion pipeline
# Store all documents from each MongoDB collection into qdrant
mongoDatabase = mongoHost["twin"]
collections = mongoDatabase.list_collection_names()
for collection in collections:
 mongoCollection = mongoDatabase[collection]

 documents = mongoCollection.find(no_cursor_timeout=True)
 id = 0
 try:
 for document in documents:
 # For each document, split it into chunks
 link = document["link"]
 resultType = document["type"]
 text = document["content"]
 text = cleanText(text)
 chunks = text_splitter.split_text(text)
 chunkNum = 0
 embeddings = embeddingsModel.embed_documents(chunks)
 for chunk in chunks:
 # Create embeddings for each chunk, of length 3072 using the embedding model
 # Store the embedding along with some metadata into the Qdrant vector database
 qClient.upsert(collection_name=resultType, wait=True, points=[PointStruct(id=id, vector=embeddings[chunkNum], payload={"link": link, "type": resultType, "chunk": chunkNum, "text": chunk})])
 chunkNum += 1
 id += 1
 except:
 print("Stopping document loop")
 


 return Cursor(self, *args, **kwargs)


Stopping document loop
Stopping document loop
