Spaces:
Running
Running
import os | |
import re | |
import json | |
import logging | |
import hashlib | |
from pathlib import Path | |
from typing import List, Tuple, Dict, Any, Optional | |
import gradio as gr | |
from dotenv import load_dotenv | |
from huggingface_hub import hf_hub_download | |
from huggingface_hub.utils import EntryNotFoundError | |
from requests.exceptions import HTTPError | |
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import FAISS | |
from langchain_core.documents import Document | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_core.runnables import RunnableParallel, RunnablePassthrough | |
from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
# ----------------------------------------------------------------------------- | |
# Configuration & Environment Variables | |
# ----------------------------------------------------------------------------- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
load_dotenv() # Load .env file for local development | |
# --- API Keys --- | |
# Base key is for potentially pre-processing fixed files (if needed) | |
# User key is required for processing *new* dynamic files | |
BASE_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # Used for pre-processing base files if needed | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
# --- Constants --- | |
DATASET_ID = "rasoul-nikbakht/TSpec-LLM" | |
DATA_SUBDIR = "3GPP-clean" | |
EMBEDDING_MODEL = "text-embedding-3-small" | |
LLM_MODEL = "gpt-4o-mini" | |
MAX_DYNAMIC_FILES = 3 | |
ESTIMATED_COST_PER_FILE_CENTS = 2 # Rough estimate | |
# --- File Paths --- | |
SCRIPT_DIR = Path(__file__).parent | |
CACHE_DIR = SCRIPT_DIR / "cached_embeddings" | |
BASE_KNOWLEDGE_INDEX_PATH = CACHE_DIR / "base_knowledge.faiss" | |
USER_DATA_PATH = SCRIPT_DIR / "user_data.json" | |
CACHE_MANIFEST_PATH = SCRIPT_DIR / "cache_manifest.json" | |
# Ensure cache directory exists | |
CACHE_DIR.mkdir(exist_ok=True) | |
# --- Fixed Base Knowledge Files --- | |
# Relative paths within the dataset repo (without DATA_SUBDIR) | |
FIXED_FILES = [ | |
"Rel-16/38_series/38901-g10.md", | |
"Rel-16/38_series/38821-g20.md", | |
"Rel-15/36_series/36777-f00_1.md", | |
"Rel-15/36_series/36777-f00_2.md", | |
] | |
# ----------------------------------------------------------------------------- | |
# Global Variables & In-Memory Stores (Load at startup) | |
# ----------------------------------------------------------------------------- | |
base_knowledge_index: Optional[FAISS] = None | |
user_data: Dict[str, List[str]] = {} # {email: [list_of_processed_files]} | |
cache_manifest: Dict[str, str] = {} # {repo_relative_path: local_faiss_path} | |
# ----------------------------------------------------------------------------- | |
# Helper Functions | |
# ----------------------------------------------------------------------------- | |
def sanitize_path_for_filename(repo_path: str) -> str: | |
"""Creates a safe filename from a repository path.""" | |
# Remove base dir prefix if present | |
if repo_path.startswith(f"{DATA_SUBDIR}/"): | |
repo_path = repo_path[len(f"{DATA_SUBDIR}/"):] | |
# Replace slashes and invalid chars; use hashing for very long paths if needed | |
sanitized = re.sub(r'[\\/*?:"<>|]', '_', repo_path) | |
# Optional: Limit length and add hash if too long | |
if len(sanitized) > 100: | |
hash_suffix = hashlib.md5(repo_path.encode()).hexdigest()[:8] | |
sanitized = sanitized[:90] + "_" + hash_suffix | |
return sanitized + ".faiss" | |
def is_valid_email(email: str) -> bool: | |
"""Basic regex check for email format.""" | |
# This is a simple check, not foolproof validation | |
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" | |
return re.match(pattern, email) is not None | |
def load_user_data(): | |
"""Loads user email and associated file data from JSON.""" | |
global user_data | |
if USER_DATA_PATH.exists(): | |
try: | |
with open(USER_DATA_PATH, 'r') as f: | |
user_data = json.load(f) | |
logging.info(f"Loaded user data for {len(user_data)} users from {USER_DATA_PATH}") | |
except json.JSONDecodeError: | |
logging.error(f"Error decoding JSON from {USER_DATA_PATH}. Starting with empty user data.") | |
user_data = {} | |
except Exception as e: | |
logging.error(f"Failed to load user data: {e}", exc_info=True) | |
user_data = {} | |
else: | |
logging.info("User data file not found. Starting fresh.") | |
user_data = {} | |
def save_user_data(): | |
"""Saves user email and associated file data to JSON.""" | |
try: | |
with open(USER_DATA_PATH, 'w') as f: | |
json.dump(user_data, f, indent=4) | |
# logging.info(f"Saved user data to {USER_DATA_PATH}") # Can be noisy | |
except Exception as e: | |
logging.error(f"Failed to save user data: {e}", exc_info=True) | |
def load_cache_manifest(): | |
"""Loads the manifest of locally cached embeddings.""" | |
global cache_manifest | |
if CACHE_MANIFEST_PATH.exists(): | |
try: | |
with open(CACHE_MANIFEST_PATH, 'r') as f: | |
cache_manifest = json.load(f) | |
logging.info(f"Loaded cache manifest with {len(cache_manifest)} entries from {CACHE_MANIFEST_PATH}") | |
# Optional: Verify that the referenced FAISS files actually exist | |
# keys_to_remove = [k for k, v in cache_manifest.items() if not Path(v).exists()] | |
# if keys_to_remove: | |
# logging.warning(f"Removing {len(keys_to_remove)} stale entries from cache manifest.") | |
# for k in keys_to_remove: del cache_manifest | |
# save_cache_manifest() # Save cleaned manifest | |
except json.JSONDecodeError: | |
logging.error(f"Error decoding JSON from {CACHE_MANIFEST_PATH}. Starting with empty manifest.") | |
cache_manifest = {} | |
except Exception as e: | |
logging.error(f"Failed to load cache manifest: {e}", exc_info=True) | |
cache_manifest = {} | |
else: | |
logging.info("Cache manifest file not found. Starting fresh.") | |
cache_manifest = {} | |
def save_cache_manifest(): | |
"""Saves the manifest of locally cached embeddings.""" | |
try: | |
with open(CACHE_MANIFEST_PATH, 'w') as f: | |
json.dump(cache_manifest, f, indent=4) | |
# logging.info(f"Saved cache manifest to {CACHE_MANIFEST_PATH}") # Can be noisy | |
except Exception as e: | |
logging.error(f"Failed to save cache manifest: {e}", exc_info=True) | |
def download_and_process_file(repo_relative_path: str, api_key_for_embedding: str) -> Optional[FAISS]: | |
"""Downloads, chunks, embeds a single file, returning a FAISS index.""" | |
if not HF_TOKEN: | |
logging.error("HF_TOKEN is missing. Cannot download from gated dataset.") | |
# Don't raise gr.Error here, handle return value in caller | |
return None | |
if not api_key_for_embedding: | |
logging.error("OpenAI API Key is missing. Cannot create embeddings.") | |
return None | |
full_repo_path = f"{DATA_SUBDIR}/{repo_relative_path}" | |
logging.info(f"Processing file: {repo_relative_path}") | |
# --- Download --- | |
try: | |
local_path_str = hf_hub_download( | |
repo_id=DATASET_ID, | |
filename=full_repo_path, | |
repo_type="dataset", | |
token=HF_TOKEN, | |
cache_dir="./hf_cache" | |
) | |
local_path = Path(local_path_str) | |
logging.info(f"Downloaded {repo_relative_path} to: {local_path}") | |
except EntryNotFoundError: | |
logging.error(f"File not found in repository: {full_repo_path}") | |
raise gr.Error(f"File not found in repository: '{repo_relative_path}'. Please check the path.") | |
except HTTPError as e: | |
if e.response is not None and e.response.status_code in {401, 403}: | |
logging.error(f"Hugging Face authentication/authorization failed (Status {e.response.status_code}).") | |
raise gr.Error("Hugging Face authentication failed. Check HF_TOKEN and dataset license acceptance.") | |
else: | |
logging.error(f"HTTP error during download: {e}") | |
raise gr.Error(f"Failed to download file due to an HTTP error: {e}") | |
except Exception as e: | |
logging.error(f"An unexpected error occurred during download for {repo_relative_path}: {e}", exc_info=True) | |
raise gr.Error(f"Download error for {repo_relative_path}: {e}") | |
# --- Load and Chunk --- | |
try: | |
text = local_path.read_text(encoding="utf-8", errors="replace") | |
headers_to_split_on = [("#", "H1"), ("##", "H2"), ("###", "H3"), ("####", "H4")] | |
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False) | |
docs = splitter.split_text(text) | |
if not docs or (len(docs) == 1 and len(docs[0].page_content) > 5000): | |
logging.warning(f"MarkdownHeaderTextSplitter yielded few/large chunks for {repo_relative_path}, using RecursiveCharacterTextSplitter.") | |
fallback_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, chunk_overlap=150, separators=["\n\n", "\n", ". ", ", ", " ", ""] | |
) | |
docs = fallback_splitter.create_documents([text]) | |
if not docs: | |
logging.warning(f"File '{repo_relative_path}' resulted in zero documents after splitting.") | |
return None # Cannot create index from no documents | |
logging.info(f"Split {repo_relative_path} into {len(docs)} documents.") | |
except Exception as e: | |
logging.error(f"Failed to read/split file {local_path}: {e}", exc_info=True) | |
raise gr.Error(f"Error processing content of {repo_relative_path}: {e}") | |
# --- Embed and Create Vector Store --- | |
try: | |
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=api_key_for_embedding) | |
vectordb = FAISS.from_documents(docs, embeddings) | |
logging.info(f"Created FAISS index for {repo_relative_path}.") | |
return vectordb | |
except Exception as e: | |
# Catch potential OpenAI API errors specifically if possible | |
logging.error(f"Failed during embedding/vector store creation for {repo_relative_path}: {e}", exc_info=True) | |
# Check for common errors based on string matching (less robust but helpful) | |
if "AuthenticationError" in str(e) or "Incorrect API key" in str(e): | |
raise gr.Error(f"OpenAI Authentication Error for {repo_relative_path}. Check your API Key. Details: {e}") | |
elif "RateLimitError" in str(e): | |
raise gr.Error(f"OpenAI Rate Limit Error for {repo_relative_path}. Details: {e}") | |
else: | |
raise gr.Error(f"Embedding/VectorStore Error for {repo_relative_path}: {e}") | |
def get_or_create_dynamic_index(repo_relative_path: str, user_api_key: str) -> Optional[FAISS]: | |
"""Loads a dynamic index from cache or creates+caches it if new.""" | |
global cache_manifest # Allow modification | |
if repo_relative_path in cache_manifest: | |
local_faiss_path_str = cache_manifest[repo_relative_path] | |
local_faiss_path = Path(local_faiss_path_str) | |
if local_faiss_path.exists(): | |
try: | |
# Need embeddings object to load; use user's key as they initiated the session | |
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=user_api_key) | |
index = FAISS.load_local(str(local_faiss_path.parent), embeddings, index_name=local_faiss_path.stem, allow_dangerous_deserialization=True) | |
logging.info(f"Loaded cached index for {repo_relative_path} from {local_faiss_path_str}") | |
return index | |
except Exception as e: | |
logging.error(f"Failed to load cached index {local_faiss_path_str}: {e}. Will try to re-create.", exc_info=True) | |
# Remove potentially corrupted entry from manifest | |
del cache_manifest[repo_relative_path] | |
save_cache_manifest() | |
else: | |
logging.warning(f"Cache manifest points to non-existent file: {local_faiss_path_str}. Removing entry.") | |
del cache_manifest[repo_relative_path] | |
save_cache_manifest() | |
# --- If not cached or loading failed, create it --- | |
logging.info(f"Cache miss or load failure for {repo_relative_path}. Processing anew.") | |
if not user_api_key: | |
raise gr.Error(f"Cannot process new file '{repo_relative_path}' without an OpenAI API Key.") | |
new_index = download_and_process_file(repo_relative_path, user_api_key) | |
if new_index: | |
# Save the newly created index | |
try: | |
sanitized_name = sanitize_path_for_filename(repo_relative_path) | |
save_path = CACHE_DIR / sanitized_name | |
# FAISS save_local saves folder and index_name.faiss/pkl inside it | |
new_index.save_local(folder_path=str(CACHE_DIR), index_name=save_path.stem) | |
full_saved_path = str(CACHE_DIR / (save_path.stem + ".faiss")) # Path to the actual .faiss file | |
# Update manifest | |
cache_manifest[repo_relative_path] = full_saved_path | |
save_cache_manifest() | |
logging.info(f"Saved new index for {repo_relative_path} to {full_saved_path} and updated manifest.") | |
return new_index | |
except Exception as e: | |
logging.error(f"Failed to save new index for {repo_relative_path}: {e}", exc_info=True) | |
# Don't raise here, maybe it works in memory for the session | |
return new_index # Return in-memory index even if saving failed | |
else: | |
# download_and_process_file failed, error already raised or logged | |
return None | |
# ----------------------------------------------------------------------------- | |
# Pre-processing Base Knowledge (Run once at startup if needed) | |
# ----------------------------------------------------------------------------- | |
def preprocess_base_knowledge(): | |
"""Creates and saves the base knowledge FAISS index if it doesn't exist by processing files individually and merging.""" | |
global base_knowledge_index | |
if BASE_KNOWLEDGE_INDEX_PATH.exists(): | |
try: | |
if not BASE_OPENAI_API_KEY: | |
logging.error("Base OpenAI API Key missing. Cannot load base knowledge index.") | |
return | |
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=BASE_OPENAI_API_KEY) | |
base_knowledge_index = FAISS.load_local( | |
str(BASE_KNOWLEDGE_INDEX_PATH.parent), | |
embeddings, | |
index_name=BASE_KNOWLEDGE_INDEX_PATH.stem, | |
allow_dangerous_deserialization=True | |
) | |
logging.info(f"Successfully loaded base knowledge index from {BASE_KNOWLEDGE_INDEX_PATH}") | |
return # Successfully loaded, no need to rebuild | |
except Exception as e: | |
logging.error(f"Failed to load base knowledge index from {BASE_KNOWLEDGE_INDEX_PATH}: {e}. Will attempt to rebuild.", exc_info=True) | |
base_knowledge_index = None | |
# Optionally delete corrupted files: | |
# try: | |
# if BASE_KNOWLEDGE_INDEX_PATH.exists(): BASE_KNOWLEDGE_INDEX_PATH.unlink() | |
# pkl_path = BASE_KNOWLEDGE_INDEX_PATH.with_suffix(".pkl") | |
# if pkl_path.exists(): pkl_path.unlink() | |
# except OSError as rm_err: | |
# logging.error(f"Failed to delete potentially corrupted index files: {rm_err}") | |
if base_knowledge_index is None: | |
logging.info("Base knowledge index not found or failed to load. Starting pre-processing...") | |
if not BASE_OPENAI_API_KEY: | |
logging.error("Cannot pre-process base knowledge: BASE_OPENAI_API_KEY is not set.") | |
raise RuntimeError("OpenAI API Key needed for initial base knowledge processing is not configured.") | |
if not HF_TOKEN: | |
logging.error("Cannot pre-process base knowledge: HF_TOKEN is not set.") | |
raise RuntimeError("Hugging Face Token needed for initial base knowledge processing is not configured.") | |
individual_indices : List[FAISS] = [] # Store index for each base file | |
for file_path in FIXED_FILES: | |
try: | |
# Process each file individually to get its FAISS index | |
# This ensures embedding requests are per-file, not one giant batch | |
index = download_and_process_file(file_path, BASE_OPENAI_API_KEY) | |
if index: | |
individual_indices.append(index) | |
# Note: document count is now per-file in logs from download_and_process_file | |
logging.info(f"Successfully processed base file: {file_path}") | |
else: | |
logging.warning(f"Skipping base file {file_path} due to processing error (returned None index).") | |
except Exception as e: | |
# If download_and_process_file raises an error (e.g., download failed, API key invalid) | |
logging.error(f"Failed processing base file {file_path}: {e}", exc_info=True) | |
# Decide whether to stop or continue; let's stop to avoid partial base index | |
raise RuntimeError(f"Failed to process base file {file_path}. Cannot create complete base knowledge index.") from e | |
if not individual_indices: | |
logging.error("No individual indices were successfully created for the base knowledge. Cannot proceed.") | |
raise RuntimeError("Failed to process any base files successfully.") | |
try: | |
logging.info(f"Merging {len(individual_indices)} individual indices into the final base knowledge index...") | |
# Start with the first index | |
base_knowledge_index = individual_indices[0] | |
# Merge the rest | |
if len(individual_indices) > 1: | |
for index_to_merge in individual_indices[1:]: | |
base_knowledge_index.merge_from(index_to_merge) | |
total_vectors = base_knowledge_index.index.ntotal | |
logging.info(f"Final base knowledge index created with {total_vectors} total vectors.") | |
# Save the final merged index | |
base_knowledge_index.save_local(folder_path=str(CACHE_DIR), index_name=BASE_KNOWLEDGE_INDEX_PATH.stem) | |
logging.info(f"Successfully saved merged base knowledge index to {BASE_KNOWLEDGE_INDEX_PATH}") | |
except Exception as e: | |
logging.error(f"Failed to merge individual indices or save the final base knowledge index: {e}", exc_info=True) | |
# Set base_knowledge_index back to None so app knows it failed | |
base_knowledge_index = None | |
raise RuntimeError("Failed to merge or save the final base knowledge index.") from e | |
# ----------------------------------------------------------------------------- | |
# Gradio Chat Function | |
# ----------------------------------------------------------------------------- | |
GradioChatMessages = List[Dict[str, str]] # [{'role': 'user', 'content': 'hi'}, ...] | |
def chat_llm( | |
user_email: str, | |
user_openai_key: str, | |
dynamic_files_str: str, | |
question: str, | |
history: GradioChatMessages | |
) -> Tuple[GradioChatMessages, str, str]: # History, Clear Question Box, Status Update | |
""" | |
Gradio callback function. Performs RAG QA for one turn. | |
Uses base knowledge + dynamically loaded/cached files. | |
""" | |
status_update = "" | |
if not history: history = [] # Initialize history | |
# --- Input Validation --- | |
if not user_email or not is_valid_email(user_email): | |
raise gr.Error("Please enter a valid email address.") | |
if not question or not question.strip(): | |
raise gr.Error("Please enter a question.") | |
# Parse and validate dynamic file paths | |
dynamic_files = [f.strip() for f in dynamic_files_str.split(',') if f.strip()] | |
if len(dynamic_files) > MAX_DYNAMIC_FILES: | |
raise gr.Error(f"Please select a maximum of {MAX_DYNAMIC_FILES} dynamic files per session.") | |
if dynamic_files and not user_openai_key: | |
raise gr.Error("Please provide your OpenAI API Key to process dynamic files.") | |
# Log user interaction | |
logging.info(f"Chat request from: {user_email}, Dynamic files: {dynamic_files}, Question: '{question[:50]}...'") | |
# Use provided key or fallback to base key if available (only if no dynamic files) | |
# If dynamic files are present, user_openai_key MUST be used and validated | |
api_key_to_use = user_openai_key if dynamic_files else (user_openai_key or BASE_OPENAI_API_KEY) | |
if not api_key_to_use: | |
raise gr.Error("An OpenAI API Key is required for this operation (either user-provided or pre-configured).") | |
session_indices : List[FAISS] = [] | |
processed_dynamic_files_this_session : List[str] = [] | |
newly_cached_files: List[str] = [] | |
# --- Retriever Setup --- | |
# 1. Add Base Knowledge | |
if base_knowledge_index: | |
session_indices.append(base_knowledge_index) | |
logging.debug("Added base knowledge index to session.") | |
else: | |
logging.error("Base knowledge index is not loaded. Cannot proceed.") | |
raise gr.Error("Base knowledge index is unavailable. Please check logs.") | |
# 2. Process Dynamic Files | |
for file_path in dynamic_files: | |
try: | |
was_cached = file_path in cache_manifest | |
dynamic_index = get_or_create_dynamic_index(file_path, api_key_to_use) # Use the determined API key | |
if dynamic_index: | |
session_indices.append(dynamic_index) | |
processed_dynamic_files_this_session.append(file_path) | |
if not was_cached: # If it wasn't in the manifest before get_or_create ran | |
newly_cached_files.append(file_path) | |
# else: Error handled within get_or_create_dynamic_index by raising gr.Error | |
except gr.Error as e: | |
# Propagate Gradio errors to UI | |
raise e | |
except Exception as e: | |
logging.error(f"Unexpected error processing dynamic file {file_path}: {e}", exc_info=True) | |
raise gr.Error(f"Failed to process dynamic file {file_path}: {e}") | |
# --- Combine Indices for Session (if dynamic files were added) --- | |
if len(session_indices) > 1 : # Need to merge if dynamic files were added | |
try: | |
logging.info(f"Merging {len(session_indices)} indices for the session...") | |
# Create a temporary merged index for this session | |
# Start with the first index (should be base knowledge) | |
session_master_index = FAISS( | |
embedding_function=session_indices[0].embeddings, # Use embeddings from first index | |
index=session_indices[0].index, | |
docstore=session_indices[0].docstore, | |
index_to_docstore_id=session_indices[0].index_to_docstore_id | |
) | |
# Merge subsequent indices | |
for index_to_merge in session_indices[1:]: | |
session_master_index.merge_from(index_to_merge) | |
logging.info(f"Session index created with {session_master_index.index.ntotal} total vectors.") | |
session_retriever = session_master_index.as_retriever(search_kwargs={"k": 5}) | |
except Exception as e: | |
logging.error(f"Failed to merge session indices: {e}", exc_info=True) | |
raise gr.Error(f"Error creating session knowledge base: {e}") | |
elif session_indices: # Only base knowledge was used | |
session_retriever = session_indices[0].as_retriever(search_kwargs={"k": 5}) | |
else: | |
# Should have been caught earlier if base_knowledge_index was None | |
raise gr.Error("No knowledge base available for retrieval.") | |
# --- Setup LLM and RAG Chain --- | |
try: | |
llm = ChatOpenAI(model=LLM_MODEL, temperature=0.1, api_key=api_key_to_use, max_retries=1) | |
template = """You are an assistant specializing in 3GPP technical specifications. | |
Answer the following question based *only* on the provided context document snippets from the specified files. | |
The context comes from the base knowledge files and potentially these user-provided files: {dynamic_files_list_str} | |
If the answer is not found in the context, state that you cannot answer based on the provided information. Be concise and accurate. | |
Context: | |
{context} | |
Question: | |
{question} | |
Answer:""" | |
prompt = ChatPromptTemplate.from_template(template) | |
# Function to format retrieved documents | |
def format_docs(docs): | |
return "\n\n".join(doc.page_content for doc in docs) | |
# RAG Chain | |
rag_chain = ( | |
{"context": session_retriever | format_docs, | |
"question": RunnablePassthrough(), | |
"dynamic_files_list_str" : lambda x: ", ".join(dynamic_files) or "None"} # Pass dynamic files for context | |
| prompt | |
| llm | |
| StrOutputParser() | |
) | |
logging.info(f"Invoking RAG chain for question: '{question[:50]}...'") | |
answer = rag_chain.invoke(question) | |
logging.info(f"Received answer: '{answer[:100]}...'") | |
# Update user data | |
if user_email not in user_data: user_data[user_email] = [] | |
updated_files_for_user = set(user_data[user_email]) | set(processed_dynamic_files_this_session) | |
user_data[user_email] = sorted(list(updated_files_for_user)) | |
save_user_data() # Save after successful interaction | |
# Prepare status update message | |
if newly_cached_files: | |
status_update = f"Info: The following new files were processed and cached for future use: {', '.join(newly_cached_files)}." | |
except Exception as e: | |
logging.error(f"Error during RAG chain execution or user data update: {e}", exc_info=True) | |
# Append error to chat instead of crashing | |
history.append({"role": "user", "content": question}) | |
history.append({"role": "assistant", "content": f"An error occurred: {e}"}) | |
return history, question, "Error occurred. Check logs." # Keep question in box | |
# --- Update History and Return --- | |
history.append({"role": "user", "content": question}) | |
history.append({"role": "assistant", "content": answer}) | |
return history, "", status_update # Clear question box, provide status | |
# ----------------------------------------------------------------------------- | |
# Gradio UI Definition | |
# ----------------------------------------------------------------------------- | |
# --- UI Text Blocks --- | |
# Construct the cached files list string separately | |
sorted_keys = sorted(list(cache_manifest.keys())) | |
if sorted_keys: | |
# Format each key as a markdown bullet point with backticks | |
formatted_items = [f"* `{key}`" for key in sorted_keys] | |
# Join them with newlines | |
file_list_str = "\n".join(formatted_items) | |
else: | |
file_list_str = "* None yet." # Message when no files are cached | |
# Now define the info string using the pre-formatted list | |
cached_files_info = f""" | |
**Available Cached Files:** | |
The following dynamically added files have already been processed and cached: | |
{file_list_str} | |
""" | |
# --- The rest of the UI text blocks (disclaimer_text, base_knowledge_info) remain the same --- | |
disclaimer_text = f""" | |
**Disclaimer & Usage Notes:** | |
* **Research Preview:** This is a demonstration application for research purposes. Accuracy is not guaranteed. | |
* **License:** By using this application, you agree to the terms and license of the underlying dataset (`{DATASET_ID}`). Please review the dataset's license terms on Hugging Face Hub. | |
* **API Keys:** Your OpenAI API key is required to process *new* documents you specify. It is used solely for embedding generation during your session and is not stored persistently by this application. | |
* **Caching:** Processed dynamic files are cached locally (embeddings only) to speed up future sessions. | |
* **Estimated Cost:** Processing *new* files incurs OpenAI API costs (approx. ${ESTIMATED_COST_PER_FILE_CENTS / 100:.2f} per file for `{EMBEDDING_MODEL}`). Using already cached files or only the base knowledge is free within this app. | |
* **Data:** Your email is logged along with the files you process for usage tracking. See `{USER_DATA_PATH.name}`. | |
""" | |
base_knowledge_info = f""" | |
**Base Knowledge:** | |
The chatbot always has access to the following pre-processed 3GPP specification files: | |
* `{FIXED_FILES[0]}` | |
* `{FIXED_FILES[1]}` | |
* `{FIXED_FILES[2]}` | |
* `{FIXED_FILES[3]}` | |
""" | |
# --- Build UI --- | |
with gr.Blocks(theme=gr.themes.Soft(), title="3GPP TSpec RAG Assistant") as demo: | |
gr.Markdown("# π 3GPP TSpec RAG Assistant") | |
with gr.Row(): | |
# --- Left Column (Chat Interface) --- | |
with gr.Column(scale=7): # 70% width | |
chatbot = gr.Chatbot( | |
label="Chat Session", | |
height=600, | |
type="messages", | |
show_copy_button=True, | |
) | |
question_inp = gr.Textbox( | |
label="Your Question", | |
placeholder="Ask a question about the selected documents...", | |
lines=3 | |
) | |
status_out = gr.Textbox(label="Status Updates", interactive=False) | |
# --- Right Column (Controls & Info) --- | |
with gr.Column(scale=3): # 30% width | |
gr.Markdown("### Session Configuration") | |
email_inp = gr.Textbox(label="Your Email Address", placeholder="Enter your email...") | |
openai_key_inp = gr.Textbox( | |
label="Your OpenAI API Key (Required for new files)", | |
placeholder="Enter your OpenAI API key (sk-...)", | |
type="password" | |
) | |
dynamic_files_inp = gr.Textbox( | |
label=f"Dynamic Files (Optional, max {MAX_DYNAMIC_FILES}, comma-separated)", | |
placeholder="e.g., Rel-17/23_series/23501-h50.md, Rel-18/...", | |
lines=3 | |
) | |
ask_btn = gr.Button("Ask Question", variant="primary") | |
with gr.Accordion("Usage Information & Disclaimers", open=False): | |
gr.Markdown(disclaimer_text) | |
with gr.Accordion("Base Knowledge Files", open=False): | |
gr.Markdown(base_knowledge_info) | |
with gr.Accordion("Cached Dynamic Files", open=True): | |
# Use an HTML component to allow dynamic updates if needed later | |
# For now, just display the initial list | |
# cached_list_html = gr.HTML(value=f"<ul><li>{ '</li><li>'.join(sorted(list(cache_manifest.keys()))) or 'None' }</li></ul>") | |
# Simpler Markdown display: | |
cached_list_md = gr.Markdown(cached_files_info) | |
# --- Event Handling --- | |
ask_btn.click( | |
fn=chat_llm, | |
inputs=[email_inp, openai_key_inp, dynamic_files_inp, question_inp, chatbot], | |
outputs=[chatbot, question_inp, status_out] # Update chat, clear question, show status | |
) | |
# Example Button (Optional - might be less useful with dynamic files) | |
# gr.Examples(...) | |
# ----------------------------------------------------------------------------- | |
# Application Entry Point & Initial Setup | |
# ----------------------------------------------------------------------------- | |
if __name__ == "__main__": | |
print("Starting application setup...") | |
# 1. Load user data and cache manifest | |
print("Loading user data...") | |
load_user_data() | |
print("Loading cache manifest...") | |
load_cache_manifest() | |
print(f"Found {len(cache_manifest)} cached files.") | |
# 2. Ensure base knowledge index is ready | |
print("Checking base knowledge index...") | |
try: | |
preprocess_base_knowledge() | |
print("Base knowledge index is ready.") | |
except Exception as e: | |
print(f"\n!!! CRITICAL ERROR during base knowledge setup: {e} !!!") | |
print("The application cannot start without the base knowledge index.") | |
print("Please ensure BASE_OPENAI_API_KEY and HF_TOKEN are correctly set in your environment or .env file and you have accepted the dataset license.") | |
# Exit if base knowledge failed critically | |
import sys | |
sys.exit(1) | |
# 3. Launch Gradio App | |
print("Launching Gradio interface...") | |
demo.launch(debug=True, mcp_server=True) # debug=True for detailed logs locally |