Spaces:
Runtime error
Runtime error
import os | |
import logging | |
import logging.config | |
from typing import Any | |
from uuid import uuid4, UUID | |
import json | |
import sys | |
import gradio as gr | |
from dotenv import load_dotenv | |
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage | |
from langgraph.types import RunnableConfig | |
from pydantic import BaseModel | |
from pathlib import Path | |
import subprocess | |
# def update_repo(): | |
# try: | |
# subprocess.run(["git", "fetch", "origin"], check=True) | |
# subprocess.run(["git", "reset", "--hard", "origin/main"], check=True) | |
# subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], check=True) | |
# subprocess.run([sys.executable, "app.py"], check=True) | |
# except Exception as e: | |
# print(f"Git update failed: {e}") | |
# update_repo() | |
load_dotenv() | |
# There are tools set here dependent on environment variables | |
from graph import graph, weak_model, search_enabled # noqa | |
FOLLOWUP_QUESTION_NUMBER = 3 | |
TRIM_MESSAGE_LENGTH = 16 # Includes tool messages | |
USER_INPUT_MAX_LENGTH = 10000 # Characters | |
# We need the same secret for data persistance | |
# If you store sensitive data, you should store your secret in .env | |
BROWSER_STORAGE_SECRET = "itsnosecret" | |
with open('logging-config.json', 'r') as fh: | |
config = json.load(fh) | |
logging.config.dictConfig(config) | |
logger = logging.getLogger(__name__) | |
def load_initial_greeting(filepath="greeting_prompt.txt") -> str: | |
""" | |
Loads the initial greeting message from a specified text file. | |
""" | |
try: | |
with open(filepath, "r", encoding="utf-8") as f: | |
return f.read().strip() | |
except FileNotFoundError: | |
# Use a logger if you have one configured, otherwise print | |
# logger.warning(f"Warning: Prompt file '{filepath}' not found.") | |
print(f"Warning: Prompt file '{filepath}' not found. Using default.") | |
return "Welcome to the application! (Default Greeting)" | |
async def chat_fn(user_input: str, history: dict, input_graph_state: dict, uuid: UUID, prompt: str, search_enabled: bool, download_website_text_enabled: bool): | |
""" | |
Args: | |
user_input (str): The user's input message | |
history (dict): The history of the conversation in gradio | |
input_graph_state (dict): The current state of the graph. This includes tool call history | |
uuid (UUID): The unique identifier for the current conversation. This can be used in conjunction with langgraph or for memory | |
prompt (str): The system prompt | |
Yields: | |
str: The output message | |
dict|Any: The final state of the graph | |
bool|Any: Whether to trigger follow up questions | |
We do not use gradio history in the graph since we want the ToolMessage in the history | |
ordered properly. GraphProcessingState.messages is used as history instead | |
""" | |
try: | |
logger.info(f"Prompt: {prompt}") | |
input_graph_state["tools_enabled"] = { | |
"download_website_text": download_website_text_enabled, | |
"tavily_search_results_json": search_enabled, | |
} | |
if prompt: | |
input_graph_state["prompt"] = prompt | |
if input_graph_state.get("awaiting_human_input"): | |
input_graph_state["messages"].append( | |
ToolMessage( | |
tool_call_id=input_graph_state.pop("human_assistance_tool_id"), | |
content=user_input | |
) | |
) | |
input_graph_state["awaiting_human_input"] = False | |
else: | |
# New user message | |
if "messages" not in input_graph_state: | |
input_graph_state["messages"] = [] | |
input_graph_state["messages"].append( | |
HumanMessage(user_input[:USER_INPUT_MAX_LENGTH]) | |
) | |
input_graph_state["messages"] = input_graph_state["messages"][-TRIM_MESSAGE_LENGTH:] | |
config = RunnableConfig( | |
recursion_limit=20, | |
run_name="user_chat", | |
configurable={"thread_id": uuid} | |
) | |
output: str = "" | |
final_state: dict | Any = {} | |
waiting_output_seq: list[str] = [] | |
async for stream_mode, chunk in graph.astream( | |
input_graph_state, | |
config=config, | |
stream_mode=["values", "messages"], | |
): | |
if stream_mode == "values": | |
final_state = chunk | |
last_message = chunk["messages"][-1] | |
if hasattr(last_message, "tool_calls"): | |
for msg_tool_call in last_message.tool_calls: | |
tool_name: str = msg_tool_call['name'] | |
if tool_name == "tavily_search_results_json": | |
query = msg_tool_call['args']['query'] | |
waiting_output_seq.append(f"Searching for '{query}'...") | |
yield "\n".join(waiting_output_seq), gr.skip(), gr.skip() | |
# download_website_text is the name of the function defined in graph.py | |
elif tool_name == "download_website_text": | |
url = msg_tool_call['args']['url'] | |
waiting_output_seq.append(f"Downloading text from '{url}'...") | |
yield "\n".join(waiting_output_seq), gr.skip(), gr.skip() | |
elif tool_name == "human_assistance": | |
query = msg_tool_call["args"]["query"] | |
waiting_output_seq.append(f"🤖: {query}") | |
# Save state to resume after user provides input | |
input_graph_state["awaiting_human_input"] = True | |
input_graph_state["human_assistance_tool_id"] = msg_tool_call["id"] | |
# Indicate that human input is needed | |
yield "\n".join(waiting_output_seq), input_graph_state, gr.skip(), True | |
return # Pause execution, resume in next call | |
else: | |
waiting_output_seq.append(f"Running {tool_name}...") | |
yield "\n".join(waiting_output_seq), gr.skip(), gr.skip() | |
elif stream_mode == "messages": | |
msg, metadata = chunk | |
# print("output: ", msg, metadata) | |
# assistant_node is the name we defined in the langgraph graph | |
if metadata.get('langgraph_node') == "assistant_node": # Use .get for safety | |
current_chunk_text = "" | |
if isinstance(msg.content, str): | |
current_chunk_text = msg.content | |
elif isinstance(msg.content, list): | |
for block in msg.content: | |
if isinstance(block, dict) and block.get("type") == "text": | |
current_chunk_text += block.get("text", "") | |
elif isinstance(block, str): # Fallback if content is list of strings | |
current_chunk_text += block | |
if current_chunk_text: # Only add and yield if there's actually text | |
output += current_chunk_text | |
yield output, gr.skip(), gr.skip() | |
# Trigger for asking follow up questions | |
# + store the graph state for next iteration | |
# yield output, dict(final_state), gr.skip() | |
yield output + " ", dict(final_state), True | |
except Exception: | |
logger.exception("Exception occurred") | |
user_error_message = "There was an error processing your request. Please try again." | |
yield user_error_message, gr.skip(), False | |
def clear(): | |
return dict(), uuid4() | |
class FollowupQuestions(BaseModel): | |
"""Model for langchain to use for structured output for followup questions""" | |
questions: list[str] | |
async def populate_followup_questions(end_of_chat_response: bool, messages: dict[str, str], uuid: UUID): | |
""" | |
This function gets called a lot due to the asynchronous nature of streaming | |
Only populate followup questions if streaming has completed and the message is coming from the assistant | |
""" | |
if not end_of_chat_response or not messages or messages[-1]["role"] != "assistant": | |
return *[gr.skip() for _ in range(FOLLOWUP_QUESTION_NUMBER)], False | |
config = RunnableConfig( | |
run_name="populate_followup_questions", | |
configurable={"thread_id": uuid} | |
) | |
weak_model_with_config = weak_model.with_config(config) | |
follow_up_questions = await weak_model_with_config.with_structured_output(FollowupQuestions).ainvoke([ | |
("system", f"suggest {FOLLOWUP_QUESTION_NUMBER} followup questions for the user to ask the assistant. Refrain from asking personal questions."), | |
*messages, | |
]) | |
if len(follow_up_questions.questions) != FOLLOWUP_QUESTION_NUMBER: | |
raise ValueError("Invalid value of followup questions") | |
buttons = [] | |
for i in range(FOLLOWUP_QUESTION_NUMBER): | |
buttons.append( | |
gr.Button(follow_up_questions.questions[i], visible=True, elem_classes="chat-tab"), | |
) | |
return *buttons, False | |
async def summarize_chat(end_of_chat_response: bool, messages: dict, sidebar_summaries: dict, uuid: UUID): | |
"""Summarize chat for tab names""" | |
# print("\n------------------------") | |
# print("not end_of_chat_response", not end_of_chat_response) | |
# print("not messages", not messages) | |
# if messages: | |
# print("messages[-1][role] != assistant", messages[-1]["role"] != "assistant") | |
# print("isinstance(sidebar_summaries, type(lambda x: x))", isinstance(sidebar_summaries, type(lambda x: x))) | |
# print("uuid in sidebar_summaries", uuid in sidebar_summaries) | |
should_return = ( | |
not end_of_chat_response or | |
not messages or | |
messages[-1]["role"] != "assistant" or | |
# This is a bug with gradio | |
isinstance(sidebar_summaries, type(lambda x: x)) or | |
# Already created summary | |
uuid in sidebar_summaries | |
) | |
if should_return: | |
return gr.skip(), gr.skip() | |
filtered_messages = [] | |
for msg in messages: | |
if isinstance(msg, dict) and msg.get("content") and msg["content"].strip(): | |
filtered_messages.append(msg) | |
# If we don't have any valid messages after filtering, provide a default summary | |
if not filtered_messages: | |
if uuid not in sidebar_summaries: | |
sidebar_summaries[uuid] = "Chat History" | |
return sidebar_summaries, False | |
config = RunnableConfig( | |
run_name="summarize_chat", | |
configurable={"thread_id": uuid} | |
) | |
try: | |
weak_model_with_config = weak_model.with_config(config) | |
summary_response = await weak_model_with_config.ainvoke([ | |
("system", "summarize this chat in 7 tokens or less. Refrain from using periods"), | |
*filtered_messages, | |
]) | |
if uuid not in sidebar_summaries: | |
sidebar_summaries[uuid] = summary_response.content | |
except Exception as e: | |
logger.error(f"Error summarizing chat: {e}") | |
# Provide a fallback summary if an error occurs | |
if uuid not in sidebar_summaries: | |
sidebar_summaries[uuid] = "Previous Chat" | |
return sidebar_summaries, False | |
async def new_tab(uuid, gradio_graph, messages, tabs, prompt, sidebar_summaries): | |
new_uuid = uuid4() | |
new_graph = {} | |
if uuid not in sidebar_summaries: | |
sidebar_summaries, _ = await summarize_chat(True, messages, sidebar_summaries, uuid) | |
tabs[uuid] = { | |
"graph": gradio_graph, | |
"messages": messages, | |
"prompt": prompt, | |
} | |
suggestion_buttons = [] | |
for _ in range(FOLLOWUP_QUESTION_NUMBER): | |
suggestion_buttons.append(gr.Button(visible=False)) | |
new_messages = {} | |
# --- MODIFICATION FOR GREETING IN EVERY NEW CHAT --- | |
greeting_text = load_initial_greeting() # Get the greeting | |
# `gr.Chatbot` expects a list of tuples or list of dicts. | |
# For `type="messages"`, it's list of dicts: [{"role": "assistant", "content": "Hello"}] | |
# Or list of tuples: [(None, "Hello")] | |
# Let's assume your chatbot is configured for list of tuples (None, bot_message) for initial messages | |
new_chat_messages_for_display = [{"role": "assistant", "content": greeting_text}] | |
# If your chat_interface.chatbot_value expects list of dicts: | |
# new_messages_history = [{"role": "assistant", "content": greeting_text}] | |
# --- END MODIFICATION --- | |
new_prompt = "You are a helpful assistant." | |
return new_uuid, new_graph, new_chat_messages_for_display, tabs, new_prompt, sidebar_summaries, *suggestion_buttons | |
def switch_tab(selected_uuid, tabs, gradio_graph, uuid, messages, prompt): | |
# I don't know of another way to lookup uuid other than | |
# by the button value | |
# Save current state | |
if messages: | |
tabs[uuid] = { | |
"graph": gradio_graph, | |
"messages": messages, | |
"prompt": prompt | |
} | |
if selected_uuid not in tabs: | |
logger.error(f"Could not find the selected tab in offloaded_tabs_data_storage {selected_uuid}") | |
return gr.skip(), gr.skip(), gr.skip(), gr.skip() | |
selected_tab_state = tabs[selected_uuid] | |
selected_graph = selected_tab_state["graph"] | |
selected_messages = selected_tab_state["messages"] | |
selected_prompt = selected_tab_state.get("prompt", "") | |
suggestion_buttons = [] | |
for _ in range(FOLLOWUP_QUESTION_NUMBER): | |
suggestion_buttons.append(gr.Button(visible=False)) | |
return selected_graph, selected_uuid, selected_messages, tabs, selected_prompt, *suggestion_buttons | |
def delete_tab(current_chat_uuid, selected_uuid, sidebar_summaries, tabs): | |
output_messages = gr.skip() | |
if current_chat_uuid == selected_uuid: | |
output_messages = dict() | |
if selected_uuid in tabs: | |
del tabs[selected_uuid] | |
if selected_uuid in sidebar_summaries: | |
del sidebar_summaries[selected_uuid] | |
return sidebar_summaries, tabs, output_messages | |
def submit_edit_tab(selected_uuid, sidebar_summaries, text): | |
sidebar_summaries[selected_uuid] = text | |
return sidebar_summaries, "" | |
def load_mesh(mesh_file_name): | |
return mesh_file_name | |
def display_initial_greeting(is_new_user_state_value: bool): | |
""" | |
Determines if a greeting should be displayed and returns the UI updates. | |
It also returns the new state for 'is_new_user_for_greeting'. | |
""" | |
if is_new_user_state_value: | |
greeting_message_text = load_initial_greeting() | |
# For a chatbot, the history is a list of tuples: [(user_msg, bot_msg)] | |
# For an initial message from the bot, user_msg is None. | |
initial_chat_history = [(None, greeting_message_text)] | |
updated_is_new_user_flag = False # Greeting shown, so set to False | |
return initial_chat_history, updated_is_new_user_flag | |
else: | |
# Not a new user (or already greeted), so no initial message in chat history | |
# and the flag remains False. | |
return [], False | |
def get_sorted_3d_model_examples(): | |
examples_dir = Path("./generated_3d_models") | |
if not examples_dir.exists(): | |
return [] | |
# Get all 3D model files with desired extensions | |
model_files = [ | |
file for file in examples_dir.glob("*") | |
if file.suffix.lower() in {".obj", ".glb", ".gltf"} | |
] | |
# Sort files by creation time (latest first) | |
sorted_files = sorted( | |
model_files, | |
key=lambda x: x.stat().st_ctime, | |
reverse=True | |
) | |
# Convert to format [[path1], [path2], ...] | |
return [[str(file)] for file in sorted_files] | |
CSS = """ | |
footer {visibility: hidden} | |
.followup-question-button {font-size: 12px } | |
.chat-tab { | |
font-size: 12px; | |
padding-inline: 0; | |
} | |
.chat-tab.active { | |
background-color: #654343; | |
} | |
#new-chat-button { background-color: #0f0f11; color: white; } | |
.tab-button-control { | |
min-width: 0; | |
padding-left: 0; | |
padding-right: 0; | |
} | |
.sidebar-collapsed { | |
display: none !important; | |
} | |
.wrap.sidebar-parent { | |
min-height: 2400px !important; | |
height: 2400px !important; | |
} | |
#main-app { | |
height: 4600px; /* or 800px, or 100% */ | |
overflow-y: auto; /* optional if you want it scrollable */\ | |
padding-top:2000px; | |
} | |
""" | |
# We set the ChatInterface textbox id to chat-textbox for this to work | |
TRIGGER_CHATINTERFACE_BUTTON = """ | |
function triggerChatButtonClick() { | |
// Find the div with id "chat-textbox" | |
const chatTextbox = document.getElementById("chat-textbox"); | |
if (!chatTextbox) { | |
console.error("Error: Could not find element with id 'chat-textbox'"); | |
return; | |
} | |
// Find the button that is a descendant of the div | |
const button = chatTextbox.querySelector("button"); | |
if (!button) { | |
console.error("Error: No button found inside the chat-textbox element"); | |
return; | |
} | |
// Trigger the click event | |
button.click(); | |
}""" | |
TOGGLE_SIDEBAR_JS = """ | |
function toggleSidebarVisibility() { | |
console.log("Called the side bar funnction"); | |
const sidebar = document.querySelector(".sidebar svelte-7y53u7 open"); | |
if (!sidebar) { | |
console.error("Error: Could not find the sidebar element"); | |
return; | |
} | |
sidebar.classList.toggle("sidebar-collapsed"); | |
} | |
""" | |
if __name__ == "__main__": | |
logger.info("Starting the interface") | |
with gr.Blocks(title="DIYO is here", fill_height=True, css=CSS, elem_id="main-app") as demo: | |
is_new_user_for_greeting = gr.State(True) | |
chatbot_message_storage = gr.State([]) | |
current_prompt_state = gr.BrowserState( | |
storage_key="current_prompt_state", | |
secret=BROWSER_STORAGE_SECRET, | |
) | |
current_uuid_state = gr.BrowserState( | |
uuid4, | |
storage_key="current_uuid_state", | |
secret=BROWSER_STORAGE_SECRET, | |
) | |
current_langgraph_state = gr.BrowserState( | |
dict(), | |
storage_key="current_langgraph_state", | |
secret=BROWSER_STORAGE_SECRET, | |
) | |
end_of_assistant_response_state = gr.State( | |
bool(), | |
) | |
# [uuid] -> summary of chat | |
sidebar_names_state = gr.BrowserState( | |
dict(), | |
storage_key="sidebar_names_state", | |
secret=BROWSER_STORAGE_SECRET, | |
) | |
# [uuid] -> {"graph": gradio_graph, "messages": messages} | |
offloaded_tabs_data_storage = gr.BrowserState( | |
dict(), | |
storage_key="offloaded_tabs_data_storage", | |
secret=BROWSER_STORAGE_SECRET, | |
) | |
chatbot_message_storage = gr.BrowserState( | |
[], | |
storage_key="chatbot_message_storage", | |
secret=BROWSER_STORAGE_SECRET, | |
) | |
with gr.Row(elem_classes="header-margin"): | |
# Add the decorated header with ASCII art | |
gr.Markdown(""" | |
<div style="display: flex; align-items: center; justify-content: center; text-align: center; padding: 20px; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); border-radius: 15px; margin-bottom: 20px; color: white; box-shadow: 0 4px 15px rgba(0,0,0,0.2);"> | |
╔══════════════════════════════════════════════════════════════════════════════════════════════╗ | |
║ ║ | |
║ █████╗ ██████╗ ███████╗███╗ ██╗████████╗ ██████╗ ██╗██╗ ██╗ ██████╗ ║ | |
║ ██╔══██╗██╔════╝ ██╔════╝████╗ ██║╚══██╔══╝ ██╔══██╗██║╚██╗ ██╔╝██╔═══██╗ ║ | |
║ ███████║██║ ███╗█████╗ ██╔██╗ ██║ ██║ ██║ ██║██║ ╚████╔╝ ██║ ██║ ║ | |
║ ██╔══██║██║ ██║██╔══╝ ██║╚██╗██║ ██║ ██║ ██║██║ ╚██╔╝ ██║ ██║ ║ | |
║ ██║ ██║╚██████╔╝███████╗██║ ╚████║ ██║ ██████╔╝██║ ██║ ╚██████╔╝ ║ | |
║ ╚═╝ ╚═╝ ╚═════╝ ╚══════╝╚═╝ ╚═══╝ ╚═╝ ╚═════╝ ╚═╝ ╚═╝ ╚═════╝ ║ | |
║ ║ | |
╚══════════════════════════════════════════════════════════════════════════════════════════════╝ | |
Let's build things, break boundaries with the help of AI! | |
</div> | |
""") | |
with gr.Row(): | |
prompt_textbox = gr.Textbox(show_label=False, interactive=True) | |
with gr.Row(): | |
checkbox_search_enabled = gr.Checkbox( | |
value=True, | |
label="Enable search", | |
show_label=True, | |
visible=search_enabled, | |
scale=1, | |
) | |
checkbox_download_website_text = gr.Checkbox( | |
value=True, | |
show_label=True, | |
label="Enable downloading text from urls", | |
scale=1, | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
model_3d_output = gr.Model3D( | |
clear_color=[0.0, 0.0, 0.0, 0.0], | |
label="3D Model", | |
height=400 # Adjust height to align better with chatbot | |
) | |
with gr.Column(scale=1): | |
# Input for the 3D model | |
# Using UploadButton is often clearer for users than a clickable Model3D input | |
model_3d_upload_button = gr.UploadButton( | |
"Upload 3D Model (.obj, .glb, .gltf)", | |
file_types=[".obj", ".glb", ".gltf"], | |
# scale=0 # make it take less space if needed | |
) | |
model_3d_upload_button.upload( | |
fn=load_mesh, | |
inputs=model_3d_upload_button, | |
outputs=model_3d_output | |
) | |
gr.Examples( | |
label="Example 3D Models", | |
examples=get_sorted_3d_model_examples(), | |
inputs=model_3d_upload_button, # Dummy input for examples to load into Model3D | |
outputs=model_3d_output, | |
fn=load_mesh, | |
cache_examples=True # Caching might be useful | |
) | |
with gr.Row(): | |
multimodal = False | |
textbox_component = ( | |
gr.MultimodalTextbox if multimodal else gr.Textbox | |
) | |
textbox = textbox_component( | |
show_label=False, | |
label="Message", | |
placeholder="Type a message...", | |
scale=1, | |
autofocus=True, | |
submit_btn=True, | |
stop_btn=True, | |
elem_id="chat-textbox", | |
lines=1, | |
) | |
chatbot = gr.Chatbot( | |
type="messages", | |
scale=0, | |
show_copy_button=True, | |
height=400, | |
editable="all", | |
elem_classes="main-chatbox" | |
) | |
with gr.Row(): | |
followup_question_buttons = [] | |
for i in range(FOLLOWUP_QUESTION_NUMBER): | |
btn = gr.Button(f"Button {i+1}", visible=False) | |
followup_question_buttons.append(btn) | |
tab_edit_uuid_state = gr.State( | |
str() | |
) | |
prompt_textbox.change(lambda prompt: prompt, inputs=[prompt_textbox], outputs=[current_prompt_state]) | |
with gr.Sidebar() as sidebar: | |
def render_chats(tab_uuid_edit, end_of_chat_response, sidebar_summaries, active_uuid, messages, tabs): | |
current_tab_button_text = "" | |
if active_uuid not in sidebar_summaries: | |
current_tab_button_text = "Current Chat" | |
elif active_uuid not in tabs: | |
current_tab_button_text = sidebar_summaries[active_uuid] | |
if current_tab_button_text: | |
unique_id = f"current-tab-{active_uuid}-{uuid4()}" | |
gr.Button( | |
current_tab_button_text, | |
elem_classes=["chat-tab", "active"], | |
elem_id=unique_id # Add unique elem_id | |
) | |
for chat_uuid, tab in reversed(tabs.items()): | |
elem_classes = ["chat-tab"] | |
if chat_uuid == active_uuid: | |
elem_classes.append("active") | |
button_uuid_state = gr.State(chat_uuid) | |
with gr.Row(): | |
clear_tab_button = gr.Button( | |
"🗑", | |
scale=0, | |
elem_classes=["tab-button-control"], | |
elem_id=f"delete-btn-{chat_uuid}-{uuid4()}" # Add unique ID | |
) | |
clear_tab_button.click( | |
fn=delete_tab, | |
inputs=[ | |
current_uuid_state, | |
button_uuid_state, | |
sidebar_names_state, | |
offloaded_tabs_data_storage | |
], | |
outputs=[ | |
sidebar_names_state, | |
offloaded_tabs_data_storage, | |
chat_interface.chatbot_value | |
] | |
) | |
chat_button_text = sidebar_summaries.get(chat_uuid) | |
if not chat_button_text: | |
chat_button_text = str(chat_uuid) | |
if chat_uuid != tab_uuid_edit: | |
set_edit_tab_button = gr.Button( | |
"✎", | |
scale=0, | |
elem_classes=["tab-button-control"], | |
elem_id=f"edit-btn-{chat_uuid}-{uuid4()}" # Add unique ID | |
) | |
set_edit_tab_button.click( | |
fn=lambda x: x, | |
inputs=[button_uuid_state], | |
outputs=[tab_edit_uuid_state] | |
) | |
chat_tab_button = gr.Button( | |
chat_button_text, | |
elem_id=f"chat-{chat_uuid}-{uuid4()}", # Add truly unique ID | |
elem_classes=elem_classes, | |
scale=2 | |
) | |
chat_tab_button.click( | |
fn=switch_tab, | |
inputs=[ | |
button_uuid_state, | |
offloaded_tabs_data_storage, | |
current_langgraph_state, | |
current_uuid_state, | |
chatbot, | |
prompt_textbox | |
], | |
outputs=[ | |
current_langgraph_state, | |
current_uuid_state, | |
chat_interface.chatbot_value, | |
offloaded_tabs_data_storage, | |
prompt_textbox, | |
*followup_question_buttons | |
] | |
) | |
else: | |
chat_tab_text = gr.Textbox( | |
chat_button_text, | |
scale=2, | |
interactive=True, | |
show_label=False, | |
elem_id=f"edit-text-{chat_uuid}-{uuid4()}" # Add unique ID | |
) | |
chat_tab_text.submit( | |
fn=submit_edit_tab, | |
inputs=[ | |
button_uuid_state, | |
sidebar_names_state, | |
chat_tab_text | |
], | |
outputs=[ | |
sidebar_names_state, | |
tab_edit_uuid_state | |
] | |
) | |
# ) | |
# return chat_tabs, sidebar_summaries | |
new_chat_button = gr.Button("New Chat", elem_id="new-chat-button") | |
chatbot.clear(fn=clear, outputs=[current_langgraph_state, current_uuid_state]) | |
chat_interface = gr.ChatInterface( | |
chatbot=chatbot, | |
fn=chat_fn, | |
additional_inputs=[ | |
current_langgraph_state, | |
current_uuid_state, | |
prompt_textbox, | |
checkbox_search_enabled, | |
checkbox_download_website_text, | |
], | |
additional_outputs=[ | |
current_langgraph_state, | |
end_of_assistant_response_state | |
], | |
type="messages", | |
multimodal=multimodal, | |
textbox=textbox, | |
) | |
new_chat_button.click( | |
new_tab, | |
inputs=[ | |
current_uuid_state, | |
current_langgraph_state, | |
chatbot, | |
offloaded_tabs_data_storage, | |
prompt_textbox, | |
sidebar_names_state, | |
], | |
outputs=[ | |
current_uuid_state, | |
current_langgraph_state, | |
chat_interface.chatbot_value, | |
offloaded_tabs_data_storage, | |
prompt_textbox, | |
sidebar_names_state, | |
*followup_question_buttons, | |
] | |
) | |
def click_followup_button(btn): | |
buttons = [gr.Button(visible=False) for _ in range(len(followup_question_buttons))] | |
return btn, *buttons | |
for btn in followup_question_buttons: | |
btn.click( | |
fn=click_followup_button, | |
inputs=[btn], | |
outputs=[ | |
chat_interface.textbox, | |
*followup_question_buttons | |
] | |
).success(lambda: None, js=TRIGGER_CHATINTERFACE_BUTTON) | |
chatbot.change( | |
fn=populate_followup_questions, | |
inputs=[ | |
end_of_assistant_response_state, | |
chatbot, | |
current_uuid_state | |
], | |
outputs=[ | |
*followup_question_buttons, | |
end_of_assistant_response_state | |
], | |
trigger_mode="multiple" | |
) | |
chatbot.change( | |
fn=summarize_chat, | |
inputs=[ | |
end_of_assistant_response_state, | |
chatbot, | |
sidebar_names_state, | |
current_uuid_state | |
], | |
outputs=[ | |
sidebar_names_state, | |
end_of_assistant_response_state | |
], | |
trigger_mode="multiple" | |
) | |
chatbot.change( | |
fn=lambda x: x, | |
inputs=[chatbot], | |
outputs=[chatbot_message_storage], | |
trigger_mode="always_last" | |
) | |
def handle_initial_greeting_load(current_is_new_user_flag: bool, existing_chat_history: list): | |
""" | |
This function is called by the @app.load decorator above. | |
It decides whether to add a greeting to the chat history. | |
""" | |
# You can either put the logic directly here, or call the globally defined one. | |
# Option 1: Call the globally defined function (cleaner if it's complex) | |
# Make sure 'display_initial_greeting_on_load' is defined globally in your app.py | |
# For this example, I'm assuming 'display_initial_greeting_on_load' is the one we defined earlier: | |
# def display_initial_greeting_on_load(current_is_new_user_flag: bool, existing_chat_history: list): | |
# if current_is_new_user_flag: | |
# greeting_message_text = load_initial_greeting() # from graph.py | |
# greeting_entry = (None, greeting_message_text) | |
# if not isinstance(existing_chat_history, list): existing_chat_history = [] | |
# updated_chat_history = [greeting_entry] + existing_chat_history | |
# updated_is_new_user_flag = False | |
# logger.info("Greeting added for new user.") | |
# return updated_chat_history, updated_is_new_user_flag | |
# else: | |
# logger.info("Not a new user or already greeted, no greeting added.") | |
# return existing_chat_history, False | |
# | |
# return display_initial_greeting_on_load(current_is_new_user_flag, existing_chat_history) | |
# Option 2: Put logic directly here (if simple enough) | |
if current_is_new_user_flag: | |
greeting_message_text = load_initial_greeting() # Make sure load_initial_greeting is imported | |
greeting_entry = {"role": "assistant", "content": greeting_message_text} | |
# Ensure existing_chat_history is a list before concatenation | |
if not isinstance(existing_chat_history, list): | |
existing_chat_history = [] | |
updated_chat_history = [greeting_entry] + existing_chat_history | |
updated_is_new_user_flag = False | |
logger.info("Greeting added for new user via handle_initial_greeting_load.") | |
return updated_chat_history, updated_is_new_user_flag | |
else: | |
logger.info("Not a new user or already greeted (handle_initial_greeting_load path).") | |
return existing_chat_history, False | |
def load_messages(messages): | |
return messages | |
def load_prompt(current_prompt): | |
return current_prompt | |
# demo.launch(server_name="127.0.0.1", server_port=8080, share=True) | |
demo.launch(server_name="0.0.0.0", server_port=7860, share=True) | |