Spaces:
Runtime error
Runtime error
File size: 3,857 Bytes
1969750 3432c0b 1969750 3432c0b 1969750 f1cb8b4 1969750 7fc693f 1969750 464f7ab 3432c0b 1969750 e4a98dc 1969750 e4a98dc 1969750 3432c0b 1969750 3432c0b 1969750 7fc693f 3432c0b 1969750 3432c0b 1969750 4b97fee 1969750 4b97fee 1969750 e4a98dc 1969750 3432c0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
import os
import time
from typing import Optional, Tuple
import gradio as gr
from query_data import get_chain
from threading import Lock
import pinecone
from langchain.vectorstores import Pinecone
from langchain.embeddings.openai import OpenAIEmbeddings
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_API_ENV = os.getenv("PINECONE_API_ENV")
PINECONE_INDEX = os.getenv("PINECONE_INDEX")
# initialize pinecone
pinecone.init(
api_key=PINECONE_API_KEY, # find at app.pinecone.io
environment=PINECONE_API_ENV # next to api key in console
)
index_name = PINECONE_INDEX
embeddings = OpenAIEmbeddings()
# with open("posts.pkl", "rb") as f:
# vectorstore = pickle.load(f)
vectorstore = Pinecone.from_existing_index(index_name=index_name, embedding=embeddings)
chain = get_chain(vectorstore)
class ChatWrapper:
def __init__(self):
self.lock = Lock()
def __call__(
self, inp: str, history: Optional[Tuple[str, str]]
):
"""Execute the chat functionality."""
self.lock.acquire()
try:
history = history or []
# If chain is None, that is because no API key was provided.
# if chain is None:
# history.append((inp, "Please paste your OpenAI key to use"))
# return history, history
# Set OpenAI key
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
start_time = time.time()
chain_obj = chain({"question": inp, "chat_history": history})
print('=======time===== : ' + str(time.time() - start_time))
output = chain_obj["answer"]
sources = chain_obj['source_documents']
source_link = []
sources_str = 'Sources:'
for source in sources:
if source.metadata['source'] not in source_link:
source_link.append(source.metadata['source'])
sources_str += '\n' + source.metadata['source']
if 'Unfortunately, our current pool of insights does not have an answer to this.' not in output:
output += '\n\n' + sources_str
history.append((inp, output))
# print(chain_obj)
except Exception as e:
raise e
finally:
self.lock.release()
return history, history
chat = ChatWrapper()
block = gr.Blocks(css=".gradio-container {background-color: #111827};footer "
"{visibility: hidden};")
with block:
# with gr.Row():
# openai_api_key_textbox = gr.Textbox(
# placeholder="",
# show_label=False,
# lines=1,
# type="password",
# value=""
# )
chatbot = gr.Chatbot().style(height=500)
with gr.Row():
message = gr.Textbox(
label="What's your question?",
placeholder="Ask questions about reports",
lines=1,
)
submit = gr.Button(value="Send", variant="secondary").style(full_width=False)
# gr.Examples(
# examples=[
# "What did the president say about Kentaji Brown Jackson",
# "Did he mention Stephen Breyer?",
# "What was his stance on Ukraine",
# ],
# inputs=message,
# )
state = gr.State()
agent_state = gr.State()
submit.click(chat, inputs=[message, state], outputs=[chatbot, state])
message.submit(chat, inputs=[message, state], outputs=[chatbot, state])
# openai_api_key_textbox.change(
# set_openai_api_key,
# inputs=[openai_api_key_textbox],
# outputs=[agent_state],
# )
# block.launch(debug=True)
# block.launch(debug=True, auth=('admin', 'Twimbit@2019'), auth_message='enter username password to proceed further')
|