Tspec-RAG / app.py
rasoul-nikbakht's picture
ADD MCP server
353cae9 verified
import os
import re
import json
import logging
import hashlib
from pathlib import Path
from typing import List, Tuple, Dict, Any, Optional
import gradio as gr
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
from requests.exceptions import HTTPError
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# -----------------------------------------------------------------------------
# Configuration & Environment Variables
# -----------------------------------------------------------------------------
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
load_dotenv() # Load .env file for local development
# --- API Keys ---
# Base key is for potentially pre-processing fixed files (if needed)
# User key is required for processing *new* dynamic files
BASE_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # Used for pre-processing base files if needed
HF_TOKEN = os.getenv("HF_TOKEN")
# --- Constants ---
DATASET_ID = "rasoul-nikbakht/TSpec-LLM"
DATA_SUBDIR = "3GPP-clean"
EMBEDDING_MODEL = "text-embedding-3-small"
LLM_MODEL = "gpt-4o-mini"
MAX_DYNAMIC_FILES = 3
ESTIMATED_COST_PER_FILE_CENTS = 2 # Rough estimate
# --- File Paths ---
SCRIPT_DIR = Path(__file__).parent
CACHE_DIR = SCRIPT_DIR / "cached_embeddings"
BASE_KNOWLEDGE_INDEX_PATH = CACHE_DIR / "base_knowledge.faiss"
USER_DATA_PATH = SCRIPT_DIR / "user_data.json"
CACHE_MANIFEST_PATH = SCRIPT_DIR / "cache_manifest.json"
# Ensure cache directory exists
CACHE_DIR.mkdir(exist_ok=True)
# --- Fixed Base Knowledge Files ---
# Relative paths within the dataset repo (without DATA_SUBDIR)
FIXED_FILES = [
"Rel-16/38_series/38901-g10.md",
"Rel-16/38_series/38821-g20.md",
"Rel-15/36_series/36777-f00_1.md",
"Rel-15/36_series/36777-f00_2.md",
]
# -----------------------------------------------------------------------------
# Global Variables & In-Memory Stores (Load at startup)
# -----------------------------------------------------------------------------
base_knowledge_index: Optional[FAISS] = None
user_data: Dict[str, List[str]] = {} # {email: [list_of_processed_files]}
cache_manifest: Dict[str, str] = {} # {repo_relative_path: local_faiss_path}
# -----------------------------------------------------------------------------
# Helper Functions
# -----------------------------------------------------------------------------
def sanitize_path_for_filename(repo_path: str) -> str:
"""Creates a safe filename from a repository path."""
# Remove base dir prefix if present
if repo_path.startswith(f"{DATA_SUBDIR}/"):
repo_path = repo_path[len(f"{DATA_SUBDIR}/"):]
# Replace slashes and invalid chars; use hashing for very long paths if needed
sanitized = re.sub(r'[\\/*?:"<>|]', '_', repo_path)
# Optional: Limit length and add hash if too long
if len(sanitized) > 100:
hash_suffix = hashlib.md5(repo_path.encode()).hexdigest()[:8]
sanitized = sanitized[:90] + "_" + hash_suffix
return sanitized + ".faiss"
def is_valid_email(email: str) -> bool:
"""Basic regex check for email format."""
# This is a simple check, not foolproof validation
pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
return re.match(pattern, email) is not None
def load_user_data():
"""Loads user email and associated file data from JSON."""
global user_data
if USER_DATA_PATH.exists():
try:
with open(USER_DATA_PATH, 'r') as f:
user_data = json.load(f)
logging.info(f"Loaded user data for {len(user_data)} users from {USER_DATA_PATH}")
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {USER_DATA_PATH}. Starting with empty user data.")
user_data = {}
except Exception as e:
logging.error(f"Failed to load user data: {e}", exc_info=True)
user_data = {}
else:
logging.info("User data file not found. Starting fresh.")
user_data = {}
def save_user_data():
"""Saves user email and associated file data to JSON."""
try:
with open(USER_DATA_PATH, 'w') as f:
json.dump(user_data, f, indent=4)
# logging.info(f"Saved user data to {USER_DATA_PATH}") # Can be noisy
except Exception as e:
logging.error(f"Failed to save user data: {e}", exc_info=True)
def load_cache_manifest():
"""Loads the manifest of locally cached embeddings."""
global cache_manifest
if CACHE_MANIFEST_PATH.exists():
try:
with open(CACHE_MANIFEST_PATH, 'r') as f:
cache_manifest = json.load(f)
logging.info(f"Loaded cache manifest with {len(cache_manifest)} entries from {CACHE_MANIFEST_PATH}")
# Optional: Verify that the referenced FAISS files actually exist
# keys_to_remove = [k for k, v in cache_manifest.items() if not Path(v).exists()]
# if keys_to_remove:
# logging.warning(f"Removing {len(keys_to_remove)} stale entries from cache manifest.")
# for k in keys_to_remove: del cache_manifest
# save_cache_manifest() # Save cleaned manifest
except json.JSONDecodeError:
logging.error(f"Error decoding JSON from {CACHE_MANIFEST_PATH}. Starting with empty manifest.")
cache_manifest = {}
except Exception as e:
logging.error(f"Failed to load cache manifest: {e}", exc_info=True)
cache_manifest = {}
else:
logging.info("Cache manifest file not found. Starting fresh.")
cache_manifest = {}
def save_cache_manifest():
"""Saves the manifest of locally cached embeddings."""
try:
with open(CACHE_MANIFEST_PATH, 'w') as f:
json.dump(cache_manifest, f, indent=4)
# logging.info(f"Saved cache manifest to {CACHE_MANIFEST_PATH}") # Can be noisy
except Exception as e:
logging.error(f"Failed to save cache manifest: {e}", exc_info=True)
def download_and_process_file(repo_relative_path: str, api_key_for_embedding: str) -> Optional[FAISS]:
"""Downloads, chunks, embeds a single file, returning a FAISS index."""
if not HF_TOKEN:
logging.error("HF_TOKEN is missing. Cannot download from gated dataset.")
# Don't raise gr.Error here, handle return value in caller
return None
if not api_key_for_embedding:
logging.error("OpenAI API Key is missing. Cannot create embeddings.")
return None
full_repo_path = f"{DATA_SUBDIR}/{repo_relative_path}"
logging.info(f"Processing file: {repo_relative_path}")
# --- Download ---
try:
local_path_str = hf_hub_download(
repo_id=DATASET_ID,
filename=full_repo_path,
repo_type="dataset",
token=HF_TOKEN,
cache_dir="./hf_cache"
)
local_path = Path(local_path_str)
logging.info(f"Downloaded {repo_relative_path} to: {local_path}")
except EntryNotFoundError:
logging.error(f"File not found in repository: {full_repo_path}")
raise gr.Error(f"File not found in repository: '{repo_relative_path}'. Please check the path.")
except HTTPError as e:
if e.response is not None and e.response.status_code in {401, 403}:
logging.error(f"Hugging Face authentication/authorization failed (Status {e.response.status_code}).")
raise gr.Error("Hugging Face authentication failed. Check HF_TOKEN and dataset license acceptance.")
else:
logging.error(f"HTTP error during download: {e}")
raise gr.Error(f"Failed to download file due to an HTTP error: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during download for {repo_relative_path}: {e}", exc_info=True)
raise gr.Error(f"Download error for {repo_relative_path}: {e}")
# --- Load and Chunk ---
try:
text = local_path.read_text(encoding="utf-8", errors="replace")
headers_to_split_on = [("#", "H1"), ("##", "H2"), ("###", "H3"), ("####", "H4")]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False)
docs = splitter.split_text(text)
if not docs or (len(docs) == 1 and len(docs[0].page_content) > 5000):
logging.warning(f"MarkdownHeaderTextSplitter yielded few/large chunks for {repo_relative_path}, using RecursiveCharacterTextSplitter.")
fallback_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, chunk_overlap=150, separators=["\n\n", "\n", ". ", ", ", " ", ""]
)
docs = fallback_splitter.create_documents([text])
if not docs:
logging.warning(f"File '{repo_relative_path}' resulted in zero documents after splitting.")
return None # Cannot create index from no documents
logging.info(f"Split {repo_relative_path} into {len(docs)} documents.")
except Exception as e:
logging.error(f"Failed to read/split file {local_path}: {e}", exc_info=True)
raise gr.Error(f"Error processing content of {repo_relative_path}: {e}")
# --- Embed and Create Vector Store ---
try:
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=api_key_for_embedding)
vectordb = FAISS.from_documents(docs, embeddings)
logging.info(f"Created FAISS index for {repo_relative_path}.")
return vectordb
except Exception as e:
# Catch potential OpenAI API errors specifically if possible
logging.error(f"Failed during embedding/vector store creation for {repo_relative_path}: {e}", exc_info=True)
# Check for common errors based on string matching (less robust but helpful)
if "AuthenticationError" in str(e) or "Incorrect API key" in str(e):
raise gr.Error(f"OpenAI Authentication Error for {repo_relative_path}. Check your API Key. Details: {e}")
elif "RateLimitError" in str(e):
raise gr.Error(f"OpenAI Rate Limit Error for {repo_relative_path}. Details: {e}")
else:
raise gr.Error(f"Embedding/VectorStore Error for {repo_relative_path}: {e}")
def get_or_create_dynamic_index(repo_relative_path: str, user_api_key: str) -> Optional[FAISS]:
"""Loads a dynamic index from cache or creates+caches it if new."""
global cache_manifest # Allow modification
if repo_relative_path in cache_manifest:
local_faiss_path_str = cache_manifest[repo_relative_path]
local_faiss_path = Path(local_faiss_path_str)
if local_faiss_path.exists():
try:
# Need embeddings object to load; use user's key as they initiated the session
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=user_api_key)
index = FAISS.load_local(str(local_faiss_path.parent), embeddings, index_name=local_faiss_path.stem, allow_dangerous_deserialization=True)
logging.info(f"Loaded cached index for {repo_relative_path} from {local_faiss_path_str}")
return index
except Exception as e:
logging.error(f"Failed to load cached index {local_faiss_path_str}: {e}. Will try to re-create.", exc_info=True)
# Remove potentially corrupted entry from manifest
del cache_manifest[repo_relative_path]
save_cache_manifest()
else:
logging.warning(f"Cache manifest points to non-existent file: {local_faiss_path_str}. Removing entry.")
del cache_manifest[repo_relative_path]
save_cache_manifest()
# --- If not cached or loading failed, create it ---
logging.info(f"Cache miss or load failure for {repo_relative_path}. Processing anew.")
if not user_api_key:
raise gr.Error(f"Cannot process new file '{repo_relative_path}' without an OpenAI API Key.")
new_index = download_and_process_file(repo_relative_path, user_api_key)
if new_index:
# Save the newly created index
try:
sanitized_name = sanitize_path_for_filename(repo_relative_path)
save_path = CACHE_DIR / sanitized_name
# FAISS save_local saves folder and index_name.faiss/pkl inside it
new_index.save_local(folder_path=str(CACHE_DIR), index_name=save_path.stem)
full_saved_path = str(CACHE_DIR / (save_path.stem + ".faiss")) # Path to the actual .faiss file
# Update manifest
cache_manifest[repo_relative_path] = full_saved_path
save_cache_manifest()
logging.info(f"Saved new index for {repo_relative_path} to {full_saved_path} and updated manifest.")
return new_index
except Exception as e:
logging.error(f"Failed to save new index for {repo_relative_path}: {e}", exc_info=True)
# Don't raise here, maybe it works in memory for the session
return new_index # Return in-memory index even if saving failed
else:
# download_and_process_file failed, error already raised or logged
return None
# -----------------------------------------------------------------------------
# Pre-processing Base Knowledge (Run once at startup if needed)
# -----------------------------------------------------------------------------
def preprocess_base_knowledge():
"""Creates and saves the base knowledge FAISS index if it doesn't exist by processing files individually and merging."""
global base_knowledge_index
if BASE_KNOWLEDGE_INDEX_PATH.exists():
try:
if not BASE_OPENAI_API_KEY:
logging.error("Base OpenAI API Key missing. Cannot load base knowledge index.")
return
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=BASE_OPENAI_API_KEY)
base_knowledge_index = FAISS.load_local(
str(BASE_KNOWLEDGE_INDEX_PATH.parent),
embeddings,
index_name=BASE_KNOWLEDGE_INDEX_PATH.stem,
allow_dangerous_deserialization=True
)
logging.info(f"Successfully loaded base knowledge index from {BASE_KNOWLEDGE_INDEX_PATH}")
return # Successfully loaded, no need to rebuild
except Exception as e:
logging.error(f"Failed to load base knowledge index from {BASE_KNOWLEDGE_INDEX_PATH}: {e}. Will attempt to rebuild.", exc_info=True)
base_knowledge_index = None
# Optionally delete corrupted files:
# try:
# if BASE_KNOWLEDGE_INDEX_PATH.exists(): BASE_KNOWLEDGE_INDEX_PATH.unlink()
# pkl_path = BASE_KNOWLEDGE_INDEX_PATH.with_suffix(".pkl")
# if pkl_path.exists(): pkl_path.unlink()
# except OSError as rm_err:
# logging.error(f"Failed to delete potentially corrupted index files: {rm_err}")
if base_knowledge_index is None:
logging.info("Base knowledge index not found or failed to load. Starting pre-processing...")
if not BASE_OPENAI_API_KEY:
logging.error("Cannot pre-process base knowledge: BASE_OPENAI_API_KEY is not set.")
raise RuntimeError("OpenAI API Key needed for initial base knowledge processing is not configured.")
if not HF_TOKEN:
logging.error("Cannot pre-process base knowledge: HF_TOKEN is not set.")
raise RuntimeError("Hugging Face Token needed for initial base knowledge processing is not configured.")
individual_indices : List[FAISS] = [] # Store index for each base file
for file_path in FIXED_FILES:
try:
# Process each file individually to get its FAISS index
# This ensures embedding requests are per-file, not one giant batch
index = download_and_process_file(file_path, BASE_OPENAI_API_KEY)
if index:
individual_indices.append(index)
# Note: document count is now per-file in logs from download_and_process_file
logging.info(f"Successfully processed base file: {file_path}")
else:
logging.warning(f"Skipping base file {file_path} due to processing error (returned None index).")
except Exception as e:
# If download_and_process_file raises an error (e.g., download failed, API key invalid)
logging.error(f"Failed processing base file {file_path}: {e}", exc_info=True)
# Decide whether to stop or continue; let's stop to avoid partial base index
raise RuntimeError(f"Failed to process base file {file_path}. Cannot create complete base knowledge index.") from e
if not individual_indices:
logging.error("No individual indices were successfully created for the base knowledge. Cannot proceed.")
raise RuntimeError("Failed to process any base files successfully.")
try:
logging.info(f"Merging {len(individual_indices)} individual indices into the final base knowledge index...")
# Start with the first index
base_knowledge_index = individual_indices[0]
# Merge the rest
if len(individual_indices) > 1:
for index_to_merge in individual_indices[1:]:
base_knowledge_index.merge_from(index_to_merge)
total_vectors = base_knowledge_index.index.ntotal
logging.info(f"Final base knowledge index created with {total_vectors} total vectors.")
# Save the final merged index
base_knowledge_index.save_local(folder_path=str(CACHE_DIR), index_name=BASE_KNOWLEDGE_INDEX_PATH.stem)
logging.info(f"Successfully saved merged base knowledge index to {BASE_KNOWLEDGE_INDEX_PATH}")
except Exception as e:
logging.error(f"Failed to merge individual indices or save the final base knowledge index: {e}", exc_info=True)
# Set base_knowledge_index back to None so app knows it failed
base_knowledge_index = None
raise RuntimeError("Failed to merge or save the final base knowledge index.") from e
# -----------------------------------------------------------------------------
# Gradio Chat Function
# -----------------------------------------------------------------------------
GradioChatMessages = List[Dict[str, str]] # [{'role': 'user', 'content': 'hi'}, ...]
def chat_llm(
user_email: str,
user_openai_key: str,
dynamic_files_str: str,
question: str,
history: GradioChatMessages
) -> Tuple[GradioChatMessages, str, str]: # History, Clear Question Box, Status Update
"""
Gradio callback function. Performs RAG QA for one turn.
Uses base knowledge + dynamically loaded/cached files.
"""
status_update = ""
if not history: history = [] # Initialize history
# --- Input Validation ---
if not user_email or not is_valid_email(user_email):
raise gr.Error("Please enter a valid email address.")
if not question or not question.strip():
raise gr.Error("Please enter a question.")
# Parse and validate dynamic file paths
dynamic_files = [f.strip() for f in dynamic_files_str.split(',') if f.strip()]
if len(dynamic_files) > MAX_DYNAMIC_FILES:
raise gr.Error(f"Please select a maximum of {MAX_DYNAMIC_FILES} dynamic files per session.")
if dynamic_files and not user_openai_key:
raise gr.Error("Please provide your OpenAI API Key to process dynamic files.")
# Log user interaction
logging.info(f"Chat request from: {user_email}, Dynamic files: {dynamic_files}, Question: '{question[:50]}...'")
# Use provided key or fallback to base key if available (only if no dynamic files)
# If dynamic files are present, user_openai_key MUST be used and validated
api_key_to_use = user_openai_key if dynamic_files else (user_openai_key or BASE_OPENAI_API_KEY)
if not api_key_to_use:
raise gr.Error("An OpenAI API Key is required for this operation (either user-provided or pre-configured).")
session_indices : List[FAISS] = []
processed_dynamic_files_this_session : List[str] = []
newly_cached_files: List[str] = []
# --- Retriever Setup ---
# 1. Add Base Knowledge
if base_knowledge_index:
session_indices.append(base_knowledge_index)
logging.debug("Added base knowledge index to session.")
else:
logging.error("Base knowledge index is not loaded. Cannot proceed.")
raise gr.Error("Base knowledge index is unavailable. Please check logs.")
# 2. Process Dynamic Files
for file_path in dynamic_files:
try:
was_cached = file_path in cache_manifest
dynamic_index = get_or_create_dynamic_index(file_path, api_key_to_use) # Use the determined API key
if dynamic_index:
session_indices.append(dynamic_index)
processed_dynamic_files_this_session.append(file_path)
if not was_cached: # If it wasn't in the manifest before get_or_create ran
newly_cached_files.append(file_path)
# else: Error handled within get_or_create_dynamic_index by raising gr.Error
except gr.Error as e:
# Propagate Gradio errors to UI
raise e
except Exception as e:
logging.error(f"Unexpected error processing dynamic file {file_path}: {e}", exc_info=True)
raise gr.Error(f"Failed to process dynamic file {file_path}: {e}")
# --- Combine Indices for Session (if dynamic files were added) ---
if len(session_indices) > 1 : # Need to merge if dynamic files were added
try:
logging.info(f"Merging {len(session_indices)} indices for the session...")
# Create a temporary merged index for this session
# Start with the first index (should be base knowledge)
session_master_index = FAISS(
embedding_function=session_indices[0].embeddings, # Use embeddings from first index
index=session_indices[0].index,
docstore=session_indices[0].docstore,
index_to_docstore_id=session_indices[0].index_to_docstore_id
)
# Merge subsequent indices
for index_to_merge in session_indices[1:]:
session_master_index.merge_from(index_to_merge)
logging.info(f"Session index created with {session_master_index.index.ntotal} total vectors.")
session_retriever = session_master_index.as_retriever(search_kwargs={"k": 5})
except Exception as e:
logging.error(f"Failed to merge session indices: {e}", exc_info=True)
raise gr.Error(f"Error creating session knowledge base: {e}")
elif session_indices: # Only base knowledge was used
session_retriever = session_indices[0].as_retriever(search_kwargs={"k": 5})
else:
# Should have been caught earlier if base_knowledge_index was None
raise gr.Error("No knowledge base available for retrieval.")
# --- Setup LLM and RAG Chain ---
try:
llm = ChatOpenAI(model=LLM_MODEL, temperature=0.1, api_key=api_key_to_use, max_retries=1)
template = """You are an assistant specializing in 3GPP technical specifications.
Answer the following question based *only* on the provided context document snippets from the specified files.
The context comes from the base knowledge files and potentially these user-provided files: {dynamic_files_list_str}
If the answer is not found in the context, state that you cannot answer based on the provided information. Be concise and accurate.
Context:
{context}
Question:
{question}
Answer:"""
prompt = ChatPromptTemplate.from_template(template)
# Function to format retrieved documents
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
# RAG Chain
rag_chain = (
{"context": session_retriever | format_docs,
"question": RunnablePassthrough(),
"dynamic_files_list_str" : lambda x: ", ".join(dynamic_files) or "None"} # Pass dynamic files for context
| prompt
| llm
| StrOutputParser()
)
logging.info(f"Invoking RAG chain for question: '{question[:50]}...'")
answer = rag_chain.invoke(question)
logging.info(f"Received answer: '{answer[:100]}...'")
# Update user data
if user_email not in user_data: user_data[user_email] = []
updated_files_for_user = set(user_data[user_email]) | set(processed_dynamic_files_this_session)
user_data[user_email] = sorted(list(updated_files_for_user))
save_user_data() # Save after successful interaction
# Prepare status update message
if newly_cached_files:
status_update = f"Info: The following new files were processed and cached for future use: {', '.join(newly_cached_files)}."
except Exception as e:
logging.error(f"Error during RAG chain execution or user data update: {e}", exc_info=True)
# Append error to chat instead of crashing
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": f"An error occurred: {e}"})
return history, question, "Error occurred. Check logs." # Keep question in box
# --- Update History and Return ---
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": answer})
return history, "", status_update # Clear question box, provide status
# -----------------------------------------------------------------------------
# Gradio UI Definition
# -----------------------------------------------------------------------------
# --- UI Text Blocks ---
# Construct the cached files list string separately
sorted_keys = sorted(list(cache_manifest.keys()))
if sorted_keys:
# Format each key as a markdown bullet point with backticks
formatted_items = [f"* `{key}`" for key in sorted_keys]
# Join them with newlines
file_list_str = "\n".join(formatted_items)
else:
file_list_str = "* None yet." # Message when no files are cached
# Now define the info string using the pre-formatted list
cached_files_info = f"""
**Available Cached Files:**
The following dynamically added files have already been processed and cached:
{file_list_str}
"""
# --- The rest of the UI text blocks (disclaimer_text, base_knowledge_info) remain the same ---
disclaimer_text = f"""
**Disclaimer & Usage Notes:**
* **Research Preview:** This is a demonstration application for research purposes. Accuracy is not guaranteed.
* **License:** By using this application, you agree to the terms and license of the underlying dataset (`{DATASET_ID}`). Please review the dataset's license terms on Hugging Face Hub.
* **API Keys:** Your OpenAI API key is required to process *new* documents you specify. It is used solely for embedding generation during your session and is not stored persistently by this application.
* **Caching:** Processed dynamic files are cached locally (embeddings only) to speed up future sessions.
* **Estimated Cost:** Processing *new* files incurs OpenAI API costs (approx. ${ESTIMATED_COST_PER_FILE_CENTS / 100:.2f} per file for `{EMBEDDING_MODEL}`). Using already cached files or only the base knowledge is free within this app.
* **Data:** Your email is logged along with the files you process for usage tracking. See `{USER_DATA_PATH.name}`.
"""
base_knowledge_info = f"""
**Base Knowledge:**
The chatbot always has access to the following pre-processed 3GPP specification files:
* `{FIXED_FILES[0]}`
* `{FIXED_FILES[1]}`
* `{FIXED_FILES[2]}`
* `{FIXED_FILES[3]}`
"""
# --- Build UI ---
with gr.Blocks(theme=gr.themes.Soft(), title="3GPP TSpec RAG Assistant") as demo:
gr.Markdown("# πŸ“„ 3GPP TSpec RAG Assistant")
with gr.Row():
# --- Left Column (Chat Interface) ---
with gr.Column(scale=7): # 70% width
chatbot = gr.Chatbot(
label="Chat Session",
height=600,
type="messages",
show_copy_button=True,
)
question_inp = gr.Textbox(
label="Your Question",
placeholder="Ask a question about the selected documents...",
lines=3
)
status_out = gr.Textbox(label="Status Updates", interactive=False)
# --- Right Column (Controls & Info) ---
with gr.Column(scale=3): # 30% width
gr.Markdown("### Session Configuration")
email_inp = gr.Textbox(label="Your Email Address", placeholder="Enter your email...")
openai_key_inp = gr.Textbox(
label="Your OpenAI API Key (Required for new files)",
placeholder="Enter your OpenAI API key (sk-...)",
type="password"
)
dynamic_files_inp = gr.Textbox(
label=f"Dynamic Files (Optional, max {MAX_DYNAMIC_FILES}, comma-separated)",
placeholder="e.g., Rel-17/23_series/23501-h50.md, Rel-18/...",
lines=3
)
ask_btn = gr.Button("Ask Question", variant="primary")
with gr.Accordion("Usage Information & Disclaimers", open=False):
gr.Markdown(disclaimer_text)
with gr.Accordion("Base Knowledge Files", open=False):
gr.Markdown(base_knowledge_info)
with gr.Accordion("Cached Dynamic Files", open=True):
# Use an HTML component to allow dynamic updates if needed later
# For now, just display the initial list
# cached_list_html = gr.HTML(value=f"<ul><li>{ '</li><li>'.join(sorted(list(cache_manifest.keys()))) or 'None' }</li></ul>")
# Simpler Markdown display:
cached_list_md = gr.Markdown(cached_files_info)
# --- Event Handling ---
ask_btn.click(
fn=chat_llm,
inputs=[email_inp, openai_key_inp, dynamic_files_inp, question_inp, chatbot],
outputs=[chatbot, question_inp, status_out] # Update chat, clear question, show status
)
# Example Button (Optional - might be less useful with dynamic files)
# gr.Examples(...)
# -----------------------------------------------------------------------------
# Application Entry Point & Initial Setup
# -----------------------------------------------------------------------------
if __name__ == "__main__":
print("Starting application setup...")
# 1. Load user data and cache manifest
print("Loading user data...")
load_user_data()
print("Loading cache manifest...")
load_cache_manifest()
print(f"Found {len(cache_manifest)} cached files.")
# 2. Ensure base knowledge index is ready
print("Checking base knowledge index...")
try:
preprocess_base_knowledge()
print("Base knowledge index is ready.")
except Exception as e:
print(f"\n!!! CRITICAL ERROR during base knowledge setup: {e} !!!")
print("The application cannot start without the base knowledge index.")
print("Please ensure BASE_OPENAI_API_KEY and HF_TOKEN are correctly set in your environment or .env file and you have accepted the dataset license.")
# Exit if base knowledge failed critically
import sys
sys.exit(1)
# 3. Launch Gradio App
print("Launching Gradio interface...")
demo.launch(debug=True, mcp_server=True) # debug=True for detailed logs locally