import os import re import json import logging import hashlib from pathlib import Path from typing import List, Tuple, Dict, Any, Optional import gradio as gr from dotenv import load_dotenv from huggingface_hub import hf_hub_download from huggingface_hub.utils import EntryNotFoundError from requests.exceptions import HTTPError from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain_core.documents import Document from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain_openai import ChatOpenAI, OpenAIEmbeddings # ----------------------------------------------------------------------------- # Configuration & Environment Variables # ----------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') load_dotenv() # Load .env file for local development # --- API Keys --- # Base key is for potentially pre-processing fixed files (if needed) # User key is required for processing *new* dynamic files BASE_OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") # Used for pre-processing base files if needed HF_TOKEN = os.getenv("HF_TOKEN") # --- Constants --- DATASET_ID = "rasoul-nikbakht/TSpec-LLM" DATA_SUBDIR = "3GPP-clean" EMBEDDING_MODEL = "text-embedding-3-small" LLM_MODEL = "gpt-4o-mini" MAX_DYNAMIC_FILES = 3 ESTIMATED_COST_PER_FILE_CENTS = 2 # Rough estimate # --- File Paths --- SCRIPT_DIR = Path(__file__).parent CACHE_DIR = SCRIPT_DIR / "cached_embeddings" BASE_KNOWLEDGE_INDEX_PATH = CACHE_DIR / "base_knowledge.faiss" USER_DATA_PATH = SCRIPT_DIR / "user_data.json" CACHE_MANIFEST_PATH = SCRIPT_DIR / "cache_manifest.json" # Ensure cache directory exists CACHE_DIR.mkdir(exist_ok=True) # --- Fixed Base Knowledge Files --- # Relative paths within the dataset repo (without DATA_SUBDIR) FIXED_FILES = [ "Rel-16/38_series/38901-g10.md", "Rel-16/38_series/38821-g20.md", "Rel-15/36_series/36777-f00_1.md", "Rel-15/36_series/36777-f00_2.md", ] # ----------------------------------------------------------------------------- # Global Variables & In-Memory Stores (Load at startup) # ----------------------------------------------------------------------------- base_knowledge_index: Optional[FAISS] = None user_data: Dict[str, List[str]] = {} # {email: [list_of_processed_files]} cache_manifest: Dict[str, str] = {} # {repo_relative_path: local_faiss_path} # ----------------------------------------------------------------------------- # Helper Functions # ----------------------------------------------------------------------------- def sanitize_path_for_filename(repo_path: str) -> str: """Creates a safe filename from a repository path.""" # Remove base dir prefix if present if repo_path.startswith(f"{DATA_SUBDIR}/"): repo_path = repo_path[len(f"{DATA_SUBDIR}/"):] # Replace slashes and invalid chars; use hashing for very long paths if needed sanitized = re.sub(r'[\\/*?:"<>|]', '_', repo_path) # Optional: Limit length and add hash if too long if len(sanitized) > 100: hash_suffix = hashlib.md5(repo_path.encode()).hexdigest()[:8] sanitized = sanitized[:90] + "_" + hash_suffix return sanitized + ".faiss" def is_valid_email(email: str) -> bool: """Basic regex check for email format.""" # This is a simple check, not foolproof validation pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$" return re.match(pattern, email) is not None def load_user_data(): """Loads user email and associated file data from JSON.""" global user_data if USER_DATA_PATH.exists(): try: with open(USER_DATA_PATH, 'r') as f: user_data = json.load(f) logging.info(f"Loaded user data for {len(user_data)} users from {USER_DATA_PATH}") except json.JSONDecodeError: logging.error(f"Error decoding JSON from {USER_DATA_PATH}. Starting with empty user data.") user_data = {} except Exception as e: logging.error(f"Failed to load user data: {e}", exc_info=True) user_data = {} else: logging.info("User data file not found. Starting fresh.") user_data = {} def save_user_data(): """Saves user email and associated file data to JSON.""" try: with open(USER_DATA_PATH, 'w') as f: json.dump(user_data, f, indent=4) # logging.info(f"Saved user data to {USER_DATA_PATH}") # Can be noisy except Exception as e: logging.error(f"Failed to save user data: {e}", exc_info=True) def load_cache_manifest(): """Loads the manifest of locally cached embeddings.""" global cache_manifest if CACHE_MANIFEST_PATH.exists(): try: with open(CACHE_MANIFEST_PATH, 'r') as f: cache_manifest = json.load(f) logging.info(f"Loaded cache manifest with {len(cache_manifest)} entries from {CACHE_MANIFEST_PATH}") # Optional: Verify that the referenced FAISS files actually exist # keys_to_remove = [k for k, v in cache_manifest.items() if not Path(v).exists()] # if keys_to_remove: # logging.warning(f"Removing {len(keys_to_remove)} stale entries from cache manifest.") # for k in keys_to_remove: del cache_manifest # save_cache_manifest() # Save cleaned manifest except json.JSONDecodeError: logging.error(f"Error decoding JSON from {CACHE_MANIFEST_PATH}. Starting with empty manifest.") cache_manifest = {} except Exception as e: logging.error(f"Failed to load cache manifest: {e}", exc_info=True) cache_manifest = {} else: logging.info("Cache manifest file not found. Starting fresh.") cache_manifest = {} def save_cache_manifest(): """Saves the manifest of locally cached embeddings.""" try: with open(CACHE_MANIFEST_PATH, 'w') as f: json.dump(cache_manifest, f, indent=4) # logging.info(f"Saved cache manifest to {CACHE_MANIFEST_PATH}") # Can be noisy except Exception as e: logging.error(f"Failed to save cache manifest: {e}", exc_info=True) def download_and_process_file(repo_relative_path: str, api_key_for_embedding: str) -> Optional[FAISS]: """Downloads, chunks, embeds a single file, returning a FAISS index.""" if not HF_TOKEN: logging.error("HF_TOKEN is missing. Cannot download from gated dataset.") # Don't raise gr.Error here, handle return value in caller return None if not api_key_for_embedding: logging.error("OpenAI API Key is missing. Cannot create embeddings.") return None full_repo_path = f"{DATA_SUBDIR}/{repo_relative_path}" logging.info(f"Processing file: {repo_relative_path}") # --- Download --- try: local_path_str = hf_hub_download( repo_id=DATASET_ID, filename=full_repo_path, repo_type="dataset", token=HF_TOKEN, cache_dir="./hf_cache" ) local_path = Path(local_path_str) logging.info(f"Downloaded {repo_relative_path} to: {local_path}") except EntryNotFoundError: logging.error(f"File not found in repository: {full_repo_path}") raise gr.Error(f"File not found in repository: '{repo_relative_path}'. Please check the path.") except HTTPError as e: if e.response is not None and e.response.status_code in {401, 403}: logging.error(f"Hugging Face authentication/authorization failed (Status {e.response.status_code}).") raise gr.Error("Hugging Face authentication failed. Check HF_TOKEN and dataset license acceptance.") else: logging.error(f"HTTP error during download: {e}") raise gr.Error(f"Failed to download file due to an HTTP error: {e}") except Exception as e: logging.error(f"An unexpected error occurred during download for {repo_relative_path}: {e}", exc_info=True) raise gr.Error(f"Download error for {repo_relative_path}: {e}") # --- Load and Chunk --- try: text = local_path.read_text(encoding="utf-8", errors="replace") headers_to_split_on = [("#", "H1"), ("##", "H2"), ("###", "H3"), ("####", "H4")] splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on, strip_headers=False) docs = splitter.split_text(text) if not docs or (len(docs) == 1 and len(docs[0].page_content) > 5000): logging.warning(f"MarkdownHeaderTextSplitter yielded few/large chunks for {repo_relative_path}, using RecursiveCharacterTextSplitter.") fallback_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=150, separators=["\n\n", "\n", ". ", ", ", " ", ""] ) docs = fallback_splitter.create_documents([text]) if not docs: logging.warning(f"File '{repo_relative_path}' resulted in zero documents after splitting.") return None # Cannot create index from no documents logging.info(f"Split {repo_relative_path} into {len(docs)} documents.") except Exception as e: logging.error(f"Failed to read/split file {local_path}: {e}", exc_info=True) raise gr.Error(f"Error processing content of {repo_relative_path}: {e}") # --- Embed and Create Vector Store --- try: embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=api_key_for_embedding) vectordb = FAISS.from_documents(docs, embeddings) logging.info(f"Created FAISS index for {repo_relative_path}.") return vectordb except Exception as e: # Catch potential OpenAI API errors specifically if possible logging.error(f"Failed during embedding/vector store creation for {repo_relative_path}: {e}", exc_info=True) # Check for common errors based on string matching (less robust but helpful) if "AuthenticationError" in str(e) or "Incorrect API key" in str(e): raise gr.Error(f"OpenAI Authentication Error for {repo_relative_path}. Check your API Key. Details: {e}") elif "RateLimitError" in str(e): raise gr.Error(f"OpenAI Rate Limit Error for {repo_relative_path}. Details: {e}") else: raise gr.Error(f"Embedding/VectorStore Error for {repo_relative_path}: {e}") def get_or_create_dynamic_index(repo_relative_path: str, user_api_key: str) -> Optional[FAISS]: """Loads a dynamic index from cache or creates+caches it if new.""" global cache_manifest # Allow modification if repo_relative_path in cache_manifest: local_faiss_path_str = cache_manifest[repo_relative_path] local_faiss_path = Path(local_faiss_path_str) if local_faiss_path.exists(): try: # Need embeddings object to load; use user's key as they initiated the session embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=user_api_key) index = FAISS.load_local(str(local_faiss_path.parent), embeddings, index_name=local_faiss_path.stem, allow_dangerous_deserialization=True) logging.info(f"Loaded cached index for {repo_relative_path} from {local_faiss_path_str}") return index except Exception as e: logging.error(f"Failed to load cached index {local_faiss_path_str}: {e}. Will try to re-create.", exc_info=True) # Remove potentially corrupted entry from manifest del cache_manifest[repo_relative_path] save_cache_manifest() else: logging.warning(f"Cache manifest points to non-existent file: {local_faiss_path_str}. Removing entry.") del cache_manifest[repo_relative_path] save_cache_manifest() # --- If not cached or loading failed, create it --- logging.info(f"Cache miss or load failure for {repo_relative_path}. Processing anew.") if not user_api_key: raise gr.Error(f"Cannot process new file '{repo_relative_path}' without an OpenAI API Key.") new_index = download_and_process_file(repo_relative_path, user_api_key) if new_index: # Save the newly created index try: sanitized_name = sanitize_path_for_filename(repo_relative_path) save_path = CACHE_DIR / sanitized_name # FAISS save_local saves folder and index_name.faiss/pkl inside it new_index.save_local(folder_path=str(CACHE_DIR), index_name=save_path.stem) full_saved_path = str(CACHE_DIR / (save_path.stem + ".faiss")) # Path to the actual .faiss file # Update manifest cache_manifest[repo_relative_path] = full_saved_path save_cache_manifest() logging.info(f"Saved new index for {repo_relative_path} to {full_saved_path} and updated manifest.") return new_index except Exception as e: logging.error(f"Failed to save new index for {repo_relative_path}: {e}", exc_info=True) # Don't raise here, maybe it works in memory for the session return new_index # Return in-memory index even if saving failed else: # download_and_process_file failed, error already raised or logged return None # ----------------------------------------------------------------------------- # Pre-processing Base Knowledge (Run once at startup if needed) # ----------------------------------------------------------------------------- def preprocess_base_knowledge(): """Creates and saves the base knowledge FAISS index if it doesn't exist by processing files individually and merging.""" global base_knowledge_index if BASE_KNOWLEDGE_INDEX_PATH.exists(): try: if not BASE_OPENAI_API_KEY: logging.error("Base OpenAI API Key missing. Cannot load base knowledge index.") return embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL, api_key=BASE_OPENAI_API_KEY) base_knowledge_index = FAISS.load_local( str(BASE_KNOWLEDGE_INDEX_PATH.parent), embeddings, index_name=BASE_KNOWLEDGE_INDEX_PATH.stem, allow_dangerous_deserialization=True ) logging.info(f"Successfully loaded base knowledge index from {BASE_KNOWLEDGE_INDEX_PATH}") return # Successfully loaded, no need to rebuild except Exception as e: logging.error(f"Failed to load base knowledge index from {BASE_KNOWLEDGE_INDEX_PATH}: {e}. Will attempt to rebuild.", exc_info=True) base_knowledge_index = None # Optionally delete corrupted files: # try: # if BASE_KNOWLEDGE_INDEX_PATH.exists(): BASE_KNOWLEDGE_INDEX_PATH.unlink() # pkl_path = BASE_KNOWLEDGE_INDEX_PATH.with_suffix(".pkl") # if pkl_path.exists(): pkl_path.unlink() # except OSError as rm_err: # logging.error(f"Failed to delete potentially corrupted index files: {rm_err}") if base_knowledge_index is None: logging.info("Base knowledge index not found or failed to load. Starting pre-processing...") if not BASE_OPENAI_API_KEY: logging.error("Cannot pre-process base knowledge: BASE_OPENAI_API_KEY is not set.") raise RuntimeError("OpenAI API Key needed for initial base knowledge processing is not configured.") if not HF_TOKEN: logging.error("Cannot pre-process base knowledge: HF_TOKEN is not set.") raise RuntimeError("Hugging Face Token needed for initial base knowledge processing is not configured.") individual_indices : List[FAISS] = [] # Store index for each base file for file_path in FIXED_FILES: try: # Process each file individually to get its FAISS index # This ensures embedding requests are per-file, not one giant batch index = download_and_process_file(file_path, BASE_OPENAI_API_KEY) if index: individual_indices.append(index) # Note: document count is now per-file in logs from download_and_process_file logging.info(f"Successfully processed base file: {file_path}") else: logging.warning(f"Skipping base file {file_path} due to processing error (returned None index).") except Exception as e: # If download_and_process_file raises an error (e.g., download failed, API key invalid) logging.error(f"Failed processing base file {file_path}: {e}", exc_info=True) # Decide whether to stop or continue; let's stop to avoid partial base index raise RuntimeError(f"Failed to process base file {file_path}. Cannot create complete base knowledge index.") from e if not individual_indices: logging.error("No individual indices were successfully created for the base knowledge. Cannot proceed.") raise RuntimeError("Failed to process any base files successfully.") try: logging.info(f"Merging {len(individual_indices)} individual indices into the final base knowledge index...") # Start with the first index base_knowledge_index = individual_indices[0] # Merge the rest if len(individual_indices) > 1: for index_to_merge in individual_indices[1:]: base_knowledge_index.merge_from(index_to_merge) total_vectors = base_knowledge_index.index.ntotal logging.info(f"Final base knowledge index created with {total_vectors} total vectors.") # Save the final merged index base_knowledge_index.save_local(folder_path=str(CACHE_DIR), index_name=BASE_KNOWLEDGE_INDEX_PATH.stem) logging.info(f"Successfully saved merged base knowledge index to {BASE_KNOWLEDGE_INDEX_PATH}") except Exception as e: logging.error(f"Failed to merge individual indices or save the final base knowledge index: {e}", exc_info=True) # Set base_knowledge_index back to None so app knows it failed base_knowledge_index = None raise RuntimeError("Failed to merge or save the final base knowledge index.") from e # ----------------------------------------------------------------------------- # Gradio Chat Function # ----------------------------------------------------------------------------- GradioChatMessages = List[Dict[str, str]] # [{'role': 'user', 'content': 'hi'}, ...] def chat_llm( user_email: str, user_openai_key: str, dynamic_files_str: str, question: str, history: GradioChatMessages ) -> Tuple[GradioChatMessages, str, str]: # History, Clear Question Box, Status Update """ Gradio callback function. Performs RAG QA for one turn. Uses base knowledge + dynamically loaded/cached files. """ status_update = "" if not history: history = [] # Initialize history # --- Input Validation --- if not user_email or not is_valid_email(user_email): raise gr.Error("Please enter a valid email address.") if not question or not question.strip(): raise gr.Error("Please enter a question.") # Parse and validate dynamic file paths dynamic_files = [f.strip() for f in dynamic_files_str.split(',') if f.strip()] if len(dynamic_files) > MAX_DYNAMIC_FILES: raise gr.Error(f"Please select a maximum of {MAX_DYNAMIC_FILES} dynamic files per session.") if dynamic_files and not user_openai_key: raise gr.Error("Please provide your OpenAI API Key to process dynamic files.") # Log user interaction logging.info(f"Chat request from: {user_email}, Dynamic files: {dynamic_files}, Question: '{question[:50]}...'") # Use provided key or fallback to base key if available (only if no dynamic files) # If dynamic files are present, user_openai_key MUST be used and validated api_key_to_use = user_openai_key if dynamic_files else (user_openai_key or BASE_OPENAI_API_KEY) if not api_key_to_use: raise gr.Error("An OpenAI API Key is required for this operation (either user-provided or pre-configured).") session_indices : List[FAISS] = [] processed_dynamic_files_this_session : List[str] = [] newly_cached_files: List[str] = [] # --- Retriever Setup --- # 1. Add Base Knowledge if base_knowledge_index: session_indices.append(base_knowledge_index) logging.debug("Added base knowledge index to session.") else: logging.error("Base knowledge index is not loaded. Cannot proceed.") raise gr.Error("Base knowledge index is unavailable. Please check logs.") # 2. Process Dynamic Files for file_path in dynamic_files: try: was_cached = file_path in cache_manifest dynamic_index = get_or_create_dynamic_index(file_path, api_key_to_use) # Use the determined API key if dynamic_index: session_indices.append(dynamic_index) processed_dynamic_files_this_session.append(file_path) if not was_cached: # If it wasn't in the manifest before get_or_create ran newly_cached_files.append(file_path) # else: Error handled within get_or_create_dynamic_index by raising gr.Error except gr.Error as e: # Propagate Gradio errors to UI raise e except Exception as e: logging.error(f"Unexpected error processing dynamic file {file_path}: {e}", exc_info=True) raise gr.Error(f"Failed to process dynamic file {file_path}: {e}") # --- Combine Indices for Session (if dynamic files were added) --- if len(session_indices) > 1 : # Need to merge if dynamic files were added try: logging.info(f"Merging {len(session_indices)} indices for the session...") # Create a temporary merged index for this session # Start with the first index (should be base knowledge) session_master_index = FAISS( embedding_function=session_indices[0].embeddings, # Use embeddings from first index index=session_indices[0].index, docstore=session_indices[0].docstore, index_to_docstore_id=session_indices[0].index_to_docstore_id ) # Merge subsequent indices for index_to_merge in session_indices[1:]: session_master_index.merge_from(index_to_merge) logging.info(f"Session index created with {session_master_index.index.ntotal} total vectors.") session_retriever = session_master_index.as_retriever(search_kwargs={"k": 5}) except Exception as e: logging.error(f"Failed to merge session indices: {e}", exc_info=True) raise gr.Error(f"Error creating session knowledge base: {e}") elif session_indices: # Only base knowledge was used session_retriever = session_indices[0].as_retriever(search_kwargs={"k": 5}) else: # Should have been caught earlier if base_knowledge_index was None raise gr.Error("No knowledge base available for retrieval.") # --- Setup LLM and RAG Chain --- try: llm = ChatOpenAI(model=LLM_MODEL, temperature=0.1, api_key=api_key_to_use, max_retries=1) template = """You are an assistant specializing in 3GPP technical specifications. Answer the following question based *only* on the provided context document snippets from the specified files. The context comes from the base knowledge files and potentially these user-provided files: {dynamic_files_list_str} If the answer is not found in the context, state that you cannot answer based on the provided information. Be concise and accurate. Context: {context} Question: {question} Answer:""" prompt = ChatPromptTemplate.from_template(template) # Function to format retrieved documents def format_docs(docs): return "\n\n".join(doc.page_content for doc in docs) # RAG Chain rag_chain = ( {"context": session_retriever | format_docs, "question": RunnablePassthrough(), "dynamic_files_list_str" : lambda x: ", ".join(dynamic_files) or "None"} # Pass dynamic files for context | prompt | llm | StrOutputParser() ) logging.info(f"Invoking RAG chain for question: '{question[:50]}...'") answer = rag_chain.invoke(question) logging.info(f"Received answer: '{answer[:100]}...'") # Update user data if user_email not in user_data: user_data[user_email] = [] updated_files_for_user = set(user_data[user_email]) | set(processed_dynamic_files_this_session) user_data[user_email] = sorted(list(updated_files_for_user)) save_user_data() # Save after successful interaction # Prepare status update message if newly_cached_files: status_update = f"Info: The following new files were processed and cached for future use: {', '.join(newly_cached_files)}." except Exception as e: logging.error(f"Error during RAG chain execution or user data update: {e}", exc_info=True) # Append error to chat instead of crashing history.append({"role": "user", "content": question}) history.append({"role": "assistant", "content": f"An error occurred: {e}"}) return history, question, "Error occurred. Check logs." # Keep question in box # --- Update History and Return --- history.append({"role": "user", "content": question}) history.append({"role": "assistant", "content": answer}) return history, "", status_update # Clear question box, provide status # ----------------------------------------------------------------------------- # Gradio UI Definition # ----------------------------------------------------------------------------- # --- UI Text Blocks --- # Construct the cached files list string separately sorted_keys = sorted(list(cache_manifest.keys())) if sorted_keys: # Format each key as a markdown bullet point with backticks formatted_items = [f"* `{key}`" for key in sorted_keys] # Join them with newlines file_list_str = "\n".join(formatted_items) else: file_list_str = "* None yet." # Message when no files are cached # Now define the info string using the pre-formatted list cached_files_info = f""" **Available Cached Files:** The following dynamically added files have already been processed and cached: {file_list_str} """ # --- The rest of the UI text blocks (disclaimer_text, base_knowledge_info) remain the same --- disclaimer_text = f""" **Disclaimer & Usage Notes:** * **Research Preview:** This is a demonstration application for research purposes. Accuracy is not guaranteed. * **License:** By using this application, you agree to the terms and license of the underlying dataset (`{DATASET_ID}`). Please review the dataset's license terms on Hugging Face Hub. * **API Keys:** Your OpenAI API key is required to process *new* documents you specify. It is used solely for embedding generation during your session and is not stored persistently by this application. * **Caching:** Processed dynamic files are cached locally (embeddings only) to speed up future sessions. * **Estimated Cost:** Processing *new* files incurs OpenAI API costs (approx. ${ESTIMATED_COST_PER_FILE_CENTS / 100:.2f} per file for `{EMBEDDING_MODEL}`). Using already cached files or only the base knowledge is free within this app. * **Data:** Your email is logged along with the files you process for usage tracking. See `{USER_DATA_PATH.name}`. """ base_knowledge_info = f""" **Base Knowledge:** The chatbot always has access to the following pre-processed 3GPP specification files: * `{FIXED_FILES[0]}` * `{FIXED_FILES[1]}` * `{FIXED_FILES[2]}` * `{FIXED_FILES[3]}` """ # --- Build UI --- with gr.Blocks(theme=gr.themes.Soft(), title="3GPP TSpec RAG Assistant") as demo: gr.Markdown("# 📄 3GPP TSpec RAG Assistant") with gr.Row(): # --- Left Column (Chat Interface) --- with gr.Column(scale=7): # 70% width chatbot = gr.Chatbot( label="Chat Session", height=600, type="messages", show_copy_button=True, ) question_inp = gr.Textbox( label="Your Question", placeholder="Ask a question about the selected documents...", lines=3 ) status_out = gr.Textbox(label="Status Updates", interactive=False) # --- Right Column (Controls & Info) --- with gr.Column(scale=3): # 30% width gr.Markdown("### Session Configuration") email_inp = gr.Textbox(label="Your Email Address", placeholder="Enter your email...") openai_key_inp = gr.Textbox( label="Your OpenAI API Key (Required for new files)", placeholder="Enter your OpenAI API key (sk-...)", type="password" ) dynamic_files_inp = gr.Textbox( label=f"Dynamic Files (Optional, max {MAX_DYNAMIC_FILES}, comma-separated)", placeholder="e.g., Rel-17/23_series/23501-h50.md, Rel-18/...", lines=3 ) ask_btn = gr.Button("Ask Question", variant="primary") with gr.Accordion("Usage Information & Disclaimers", open=False): gr.Markdown(disclaimer_text) with gr.Accordion("Base Knowledge Files", open=False): gr.Markdown(base_knowledge_info) with gr.Accordion("Cached Dynamic Files", open=True): # Use an HTML component to allow dynamic updates if needed later # For now, just display the initial list # cached_list_html = gr.HTML(value=f"") # Simpler Markdown display: cached_list_md = gr.Markdown(cached_files_info) # --- Event Handling --- ask_btn.click( fn=chat_llm, inputs=[email_inp, openai_key_inp, dynamic_files_inp, question_inp, chatbot], outputs=[chatbot, question_inp, status_out] # Update chat, clear question, show status ) # Example Button (Optional - might be less useful with dynamic files) # gr.Examples(...) # ----------------------------------------------------------------------------- # Application Entry Point & Initial Setup # ----------------------------------------------------------------------------- if __name__ == "__main__": print("Starting application setup...") # 1. Load user data and cache manifest print("Loading user data...") load_user_data() print("Loading cache manifest...") load_cache_manifest() print(f"Found {len(cache_manifest)} cached files.") # 2. Ensure base knowledge index is ready print("Checking base knowledge index...") try: preprocess_base_knowledge() print("Base knowledge index is ready.") except Exception as e: print(f"\n!!! CRITICAL ERROR during base knowledge setup: {e} !!!") print("The application cannot start without the base knowledge index.") print("Please ensure BASE_OPENAI_API_KEY and HF_TOKEN are correctly set in your environment or .env file and you have accepted the dataset license.") # Exit if base knowledge failed critically import sys sys.exit(1) # 3. Launch Gradio App print("Launching Gradio interface...") demo.launch(debug=True, mcp_server=True) # debug=True for detailed logs locally