{ "cells": [ { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/usr/local/lib/python3.12/site-packages/pymongo/synchronous/collection.py:1920: UserWarning: use an explicit session with no_cursor_timeout=True otherwise the cursor may still timeout after 30 minutes, for more info see https://mongodb.com/docs/v4.4/reference/method/cursor.noCursorTimeout/#session-idle-timeout-overrides-nocursortimeout\n", " return Cursor(self, *args, **kwargs)\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Stopping document loop\n", "Stopping document loop\n" ] } ], "source": [ "# See README for more info on how the FeaturePipeline works\n", "# The Ingestion pipeline is part of the FeaturePipeline\n", "# Make sure to ollama serve before running!\n", "from langchain_text_splitters import RecursiveCharacterTextSplitter\n", "from qdrant_client.http.models import Distance, VectorParams, PointStruct\n", "from shared import getMongoClient, getQdrantClient, getEmbeddingsModel\n", "\n", "# Create a mongoDB connection\n", "mongoHost = getMongoClient()\n", "\n", "# Create a qdrant connection\n", "qClient = getQdrantClient()\n", "\n", "# Create qdrant collections to store embeddings\n", "if not qClient.collection_exists(\"Github\"):\n", " qClient.create_collection(\n", " collection_name=\"Github\",\n", " vectors_config=VectorParams(size=3072, distance=Distance.COSINE),\n", " )\n", "if not qClient.collection_exists(\"Document\"):\n", " qClient.create_collection(\n", " collection_name=\"Document\",\n", " vectors_config=VectorParams(size=3072, distance=Distance.COSINE),\n", " )\n", "\n", "# Ingestion Pipeline Setup\n", "# Define a text cleaner\n", "def cleanText(text):\n", " return ''.join(char for char in text if 32 <= ord(char) <= 126)\n", "\n", "# Setup the text chunker\n", "text_splitter = RecursiveCharacterTextSplitter(\n", " chunk_size=500,\n", " chunk_overlap=20,\n", " length_function=len,\n", " is_separator_regex=False,\n", ")\n", "\n", "# Setup the text embedder\n", "embeddingsModel = getEmbeddingsModel()\n", "\n", "# Running the ingestion pipeline\n", "# Store all documents from each MongoDB collection into qdrant\n", "mongoDatabase = mongoHost[\"twin\"]\n", "collections = mongoDatabase.list_collection_names()\n", "for collection in collections:\n", " mongoCollection = mongoDatabase[collection]\n", "\n", " documents = mongoCollection.find(no_cursor_timeout=True)\n", " id = 0\n", " try:\n", " for document in documents:\n", " # For each document, split it into chunks\n", " link = document[\"link\"]\n", " resultType = document[\"type\"]\n", " text = document[\"content\"]\n", " text = cleanText(text)\n", " chunks = text_splitter.split_text(text)\n", " chunkNum = 0\n", " embeddings = embeddingsModel.embed_documents(chunks)\n", " for chunk in chunks:\n", " # Create embeddings for each chunk, of length 3072 using the embedding model\n", " # Store the embedding along with some metadata into the Qdrant vector database\n", " qClient.upsert(collection_name=resultType, wait=True, points=[PointStruct(id=id, vector=embeddings[chunkNum], payload={\"link\": link, \"type\": resultType, \"chunk\": chunkNum, \"text\": chunk})])\n", " chunkNum += 1\n", " id += 1\n", " except:\n", " print(\"Stopping document loop\")\n", " \n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 2 }