|
import json, os |
|
import gradio as gr |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status |
|
|
|
import base64 |
|
from collections import defaultdict |
|
from linebot import LineBotApi, WebhookHandler |
|
from linebot.exceptions import InvalidSignatureError |
|
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage |
|
import PIL.Image |
|
|
|
from langchain_google_genai import ChatGoogleGenerativeAI |
|
from langchain.memory import ConversationBufferMemory |
|
from langchain.chains import ConversationChain |
|
|
|
|
|
ChatGoogleGenerativeAI.configure(api_key=os.environ["GOOGLE_API_KEY"]) |
|
|
|
|
|
generation_config = ChatGoogleGenerativeAI.types.GenerationConfig(max_output_tokens=2048, temperature=0.2, top_p=0.5, top_k=16) |
|
|
|
|
|
model = genai.GenerativeModel('gemini-2.0-flash-exp', system_instruction="請用繁體中文回答。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題", generation_config=generation_config) |
|
chat = model.start_chat(history=[]) |
|
|
|
memory = ConversationBufferMemory() |
|
conversation = ConversationChain(llm=model, memory=memory) |
|
|
|
|
|
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"]) |
|
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"]) |
|
|
|
|
|
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true" |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
@app.get("/") |
|
def root(): |
|
return {"title": "Line Bot"} |
|
|
|
|
|
@app.post("/webhook") |
|
async def webhook( |
|
request: Request, |
|
background_tasks: BackgroundTasks, |
|
x_line_signature=Header(None), |
|
): |
|
|
|
body = await request.body() |
|
try: |
|
|
|
background_tasks.add_task( |
|
line_handler.handle, body.decode("utf-8"), x_line_signature |
|
) |
|
except InvalidSignatureError: |
|
|
|
raise HTTPException(status_code=400, detail="Invalid signature") |
|
return "ok" |
|
|
|
|
|
def get_image_url(message_id): |
|
try: |
|
message_content = line_bot_api.get_message_content(message_id) |
|
file_path = f"/tmp/{message_id}.png" |
|
with open(file_path, "wb") as f: |
|
for chunk in message_content.iter_content(): |
|
f.write(chunk) |
|
return file_path |
|
except Exception as e: |
|
print(f"Error getting image: {e}") |
|
return None |
|
|
|
|
|
user_message_history = defaultdict(list) |
|
def store_user_message(user_id, message_type, message_content): |
|
""" |
|
儲存用戶的訊息 |
|
""" |
|
user_message_history[user_id].append({ |
|
"type": message_type, |
|
"content": message_content}) |
|
|
|
def analyze_with_gemini(image_path, user_text): |
|
""" |
|
分析用戶問題和圖片,並返回 Gemini 的回應 |
|
""" |
|
try: |
|
|
|
if not os.path.exists(image_path): |
|
raise FileNotFoundError(f"圖片路徑無效:{image_path}") |
|
|
|
organ = PIL.Image.open(image_path) |
|
response = chat.send_message([user_text, organ]) |
|
|
|
|
|
|
|
|
|
return response.text |
|
|
|
except Exception as e: |
|
return f"發生錯誤: {e}" |
|
|
|
|
|
def get_previous_message(user_id): |
|
""" |
|
獲取用戶的上一則訊息 |
|
""" |
|
if user_id in user_message_history and len(user_message_history[user_id]) > 0: |
|
|
|
return user_message_history[user_id][-1] |
|
return None |
|
|
|
|
|
chat_memories = {} |
|
def get_or_create_chain(user_id): |
|
if user_id not in chat_memories: |
|
memory = ConversationBufferMemory() |
|
chat_memories[user_id] = ConversationChain(llm=model, memory=memory) |
|
return chat_memories[user_id] |
|
|
|
@line_handler.add(MessageEvent, message=(ImageMessage,TextMessage)) |
|
def handle_image_message(event): |
|
user_id = event.source.user_id |
|
|
|
image_path = get_image_url(event.message.id) |
|
if image_path: |
|
try: |
|
|
|
print(f"圖片已保存至: {image_path}") |
|
store_user_message(user_id, "image", image_path) |
|
|
|
line_bot_api.reply_message(event.reply_token,TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")) |
|
except: |
|
line_bot_api.reply_message(event.reply_token,TextSendMessage(text="沒有接收到圖片~")) |
|
|
|
previous_message = get_previous_message(user_id) |
|
|
|
if previous_message and previous_message["type"] == "image" and event.message.type == "text": |
|
|
|
image_path = previous_message["content"] |
|
|
|
|
|
user_text = event.message.text |
|
store_user_message(user_id, "text", user_text) |
|
|
|
|
|
out = analyze_with_gemini(image_path, user_text) |
|
|
|
|
|
else: |
|
global working_status |
|
|
|
if event.type != "message" or event.message.type != "text": |
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text="Event type error:[No message or the message does not contain text]") |
|
) |
|
|
|
|
|
elif event.message.text == "再見": |
|
|
|
line_bot_api.reply_message(event.reply_token,TextSendMessage(text="Bye!")) |
|
return |
|
|
|
|
|
elif working_status: |
|
try: |
|
|
|
prompt = event.message.text |
|
store_user_message(user_id, "text", prompt) |
|
|
|
|
|
|
|
chain = get_or_create_chain(user_id) |
|
completion = chain.run(prompt) |
|
|
|
if (completion.text != None): |
|
|
|
out = completion.text |
|
else: |
|
|
|
out = "我不太懂什麼意思也~" |
|
except: |
|
|
|
out = "執行出錯!請換個說法!" |
|
|
|
|
|
|
|
line_bot_api.reply_message(event.reply_token,TextSendMessage(text=out)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|