Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
import hashlib | |
from pathlib import Path | |
from dotenv import load_dotenv | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
# === UTILS === | |
def hash_text(text): | |
return hashlib.md5(text.encode()).hexdigest()[:8] | |
def fix_json_text(text): | |
# Normalize quotes and extract clean JSON | |
text = text.replace("β", '"').replace("β", '"').replace("β", "'").replace("β", "'") | |
match = re.search(r'\{.*\}', text, re.DOTALL) | |
return match.group(0) if match else text | |
def enrich_chunk_with_llm(text, llm): | |
prompt = f"""You're a helpful assistant optimizing document retrieval. | |
Every document you see is about Krishna Vamsi Dhulipalla. | |
Hereβs a document chunk: | |
{text} | |
1. Summarize the key content of this chunk in 1β2 sentences, assuming the overall context is about Krishna. | |
2. Generate 3 natural-language questions that a user might ask to which this chunk would be a relevant answer, focusing on Krishna-related topics. | |
Respond in JSON: | |
{{ | |
"summary": "...", | |
"synthetic_queries": ["...", "...", "..."] | |
}}""" | |
response = llm.invoke(prompt) | |
content = getattr(response, "content", "").strip() | |
if not content: | |
raise ValueError("β οΈ LLM returned empty response") | |
fixed = fix_json_text(content) | |
try: | |
return json.loads(fixed) | |
except Exception as e: | |
raise ValueError(f"Invalid JSON from LLM: {e}\n--- Raw Output ---\n{content}") | |
# === MAIN FUNCTION === | |
def create_faiss_store( | |
md_dir="./personal_data", | |
chunk_size=600, | |
chunk_overlap=150, | |
persist_dir="./faiss_store", | |
chunk_save_path="all_chunks.json", | |
llm=None | |
): | |
splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
separators=["\n# ", "\n## ", "\n### ", "\n#### ", "\n\n", "\n- ", "\n", ". ", " "], | |
keep_separator=True, | |
length_function=len, # Consider switching to tokenizer-based later | |
is_separator_regex=False | |
) | |
docs, all_chunks, failed_chunks = [], [], [] | |
for md_file in Path(md_dir).glob("*.md"): | |
with open(md_file, "r", encoding="utf-8") as f: | |
content = f.read().strip() | |
if not content: | |
continue | |
content = re.sub(r'\n#+(\w)', r'\n# \1', content) | |
docs.append({ | |
"content": content, | |
"metadata": { | |
"source": md_file.name, | |
"header": content.split('\n')[0] | |
} | |
}) | |
for doc in docs: | |
try: | |
chunks = splitter.split_text(doc["content"]) | |
except Exception as e: | |
print(f"β Error splitting {doc['metadata']['source']}: {e}") | |
continue | |
for i, chunk in enumerate(chunks): | |
chunk = chunk.strip() | |
if len(chunk) < 50: | |
continue | |
chunk_id = f"{doc['metadata']['source']}_#{i}_{hash_text(chunk)}" | |
metadata = { | |
**doc["metadata"], | |
"chunk_id": chunk_id, | |
"has_header": chunk.startswith("#"), | |
"word_count": len(chunk.split()) | |
} | |
try: | |
print("π Processing chunk:", chunk_id) | |
enriched = enrich_chunk_with_llm(chunk, llm) | |
summary = enriched.get("summary", "") | |
questions = enriched.get("synthetic_queries", []) | |
metadata.update({ | |
"summary": summary, | |
"synthetic_queries": questions | |
}) | |
enriched_text = ( | |
f"{chunk}\n\n" | |
f"---\n" | |
f"πΉ Summary:\n{summary}\n\n" | |
f"πΈ Related Questions:\n" + "\n".join(f"- {q}" for q in questions) | |
) | |
all_chunks.append({ | |
"text": enriched_text, | |
"metadata": metadata | |
}) | |
except Exception as e: | |
print(f"β οΈ LLM failed for {chunk_id}: {e}") | |
failed_chunks.append(f"{chunk_id} β {str(e)}") | |
print(f"β Markdown files processed: {len(docs)}") | |
print(f"β Chunks created: {len(all_chunks)} | β οΈ Failed: {len(failed_chunks)}") | |
# Save enriched chunks | |
with open(chunk_save_path, "w", encoding="utf-8") as f: | |
json.dump(all_chunks, f, indent=2, ensure_ascii=False) | |
print(f"π Saved enriched chunks β {chunk_save_path}") | |
os.makedirs(persist_dir, exist_ok=True) | |
version_tag = f"v{len(all_chunks)}_{chunk_size}_{chunk_overlap}" | |
save_path = os.path.join(persist_dir, version_tag) | |
os.makedirs(save_path, exist_ok=True) | |
embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs={"device": "cpu"}, | |
encode_kwargs={"normalize_embeddings": True} | |
) | |
vector_store = FAISS.from_texts( | |
texts=[chunk["text"] for chunk in all_chunks], | |
embedding=embeddings, | |
metadatas=[chunk["metadata"] for chunk in all_chunks] | |
) | |
vector_store.save_local(save_path) | |
print(f"β FAISS index saved at: {save_path}") | |
avg_len = sum(len(c['text']) for c in all_chunks) / len(all_chunks) if all_chunks else 0 | |
print(f"π Stats β Chunks: {len(all_chunks)} | Avg length: {avg_len:.1f} characters") | |
if failed_chunks: | |
with open("failed_chunks.txt", "w") as f: | |
for line in failed_chunks: | |
f.write(line + "\n") | |
print("π Failed chunk IDs saved to failed_chunks.txt") | |
dotenv_path = os.path.join(os.getcwd(), ".env") | |
load_dotenv(dotenv_path) | |
api_key = os.getenv("NVIDIA_API_KEY") | |
os.environ["NVIDIA_API_KEY"] = api_key | |
# Initialize the model | |
llm = ChatNVIDIA(model="nvidia/llama-3.1-nemotron-70b-instruct") | |
create_faiss_store( | |
md_dir="./personal_data", | |
chunk_size=600, | |
chunk_overlap=150, | |
persist_dir="./faiss_store", | |
llm=llm | |
) | |
# | |
# from langchain.text_splitter import ( | |
# RecursiveCharacterTextSplitter, | |
# MarkdownHeaderTextSplitter | |
# ) | |
# from langchain.embeddings import HuggingFaceEmbeddings | |
# from langchain.vectorstores import FAISS | |
# from langchain.docstore.document import Document | |
# from transformers import AutoTokenizer | |
# from pathlib import Path | |
# import os | |
# from typing import List | |
# def prepare_vectorstore( | |
# base_path: str, | |
# faiss_path: str, | |
# use_markdown_headers: bool = True, | |
# chunk_size: int = 600, | |
# chunk_overlap: int = 150, | |
# model_name: str = "sentence-transformers/all-MiniLM-L6-v2", | |
# verbose: bool = True | |
# ) -> FAISS: | |
# docs = [] | |
# for md_file in Path(base_path).glob("*.md"): | |
# with open(md_file, "r", encoding="utf-8") as f: | |
# content = f.read() | |
# metadata = { | |
# "source": md_file.name, | |
# "file_type": "markdown", | |
# "created_at": md_file.stat().st_ctime | |
# } | |
# docs.append(Document(page_content=content, metadata=metadata)) | |
# # Optional Markdown-aware splitting | |
# if use_markdown_headers: | |
# header_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=[ | |
# ("#", "h1"), ("##", "h2"), ("###", "h3") | |
# ]) | |
# structured_chunks = [] | |
# for doc in docs: | |
# splits = header_splitter.split_text(doc.page_content) | |
# for chunk in splits: | |
# chunk.metadata.update(doc.metadata) | |
# structured_chunks.append(chunk) | |
# else: | |
# structured_chunks = docs | |
# # Tokenizer-based recursive splitting | |
# tokenizer = AutoTokenizer.from_pretrained(model_name) | |
# recursive_splitter = RecursiveCharacterTextSplitter( | |
# chunk_size=chunk_size, | |
# chunk_overlap=chunk_overlap, | |
# length_function=lambda text: len(tokenizer.encode(text)), | |
# separators=["\n## ", "\n### ", "\n\n", "\n", ". "] | |
# ) | |
# final_chunks: List[Document] = [] | |
# for chunk in structured_chunks: | |
# sub_chunks = recursive_splitter.split_text(chunk.page_content) | |
# for i, sub in enumerate(sub_chunks): | |
# final_chunks.append(Document( | |
# page_content=sub, | |
# metadata={**chunk.metadata, "sub_chunk": i} | |
# )) | |
# if verbose: | |
# print(f"β Total chunks after splitting: {len(final_chunks)}") | |
# print(f"π Storing to: {faiss_path}") | |
# embedding_model = HuggingFaceEmbeddings(model_name=model_name) | |
# vectorstore = FAISS.from_documents(final_chunks, embedding_model) | |
# vectorstore.save_local(faiss_path) | |
# if verbose: | |
# print(f"β FAISS vectorstore saved at: {os.path.abspath(faiss_path)}") | |
# return vectorstore | |
# vectorstore = prepare_vectorstore( | |
# base_path="./personal_data", | |
# faiss_path="krishna_vectorstore_hybrid", | |
# use_markdown_headers=True, | |
# chunk_size=600, | |
# chunk_overlap=150, | |
# verbose=True | |
# ) | |