|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status |
|
from fastapi.staticfiles import StaticFiles |
|
from google import genai |
|
from linebot import LineBotApi, WebhookHandler |
|
from linebot.exceptions import InvalidSignatureError |
|
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage |
|
import json, os |
|
import io |
|
import PIL.Image |
|
from Image_text_generation import Image_text_Generator |
|
from Uploading_images_file import get_image_url, store_user_message, analyze_with_gemini, get_previous_message |
|
|
|
|
|
|
|
|
|
|
|
|
|
client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY")) |
|
|
|
|
|
generation_config = genai.types.GenerateContentConfig(max_output_tokens=256, temperature=0.5, top_p=0.5, top_k=16) |
|
|
|
|
|
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"]) |
|
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"]) |
|
|
|
|
|
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true" |
|
|
|
|
|
app = FastAPI() |
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
@app.get("/") |
|
def root(): |
|
return {"title": "Line Bot"} |
|
|
|
|
|
@app.post("/webhook") |
|
async def webhook( |
|
request: Request, |
|
background_tasks: BackgroundTasks, |
|
x_line_signature=Header(None), |
|
): |
|
|
|
body = await request.body() |
|
try: |
|
|
|
background_tasks.add_task( |
|
line_handler.handle, body.decode("utf-8"), x_line_signature |
|
) |
|
except InvalidSignatureError: |
|
|
|
raise HTTPException(status_code=400, detail="Invalid signature") |
|
return "ok" |
|
|
|
|
|
|
|
|
|
|
|
|
|
chat_sessions = {} |
|
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage)) |
|
def handle_image_message(event): |
|
user_id = event.source.user_id |
|
user_text = event.message.text if event.message.type == "text" else None |
|
previous_message = get_previous_message(user_id) |
|
|
|
if event.message.type != "text" and event.message.type != "image": |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~")) |
|
return |
|
elif user_text == "再見": |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="Bye!")) |
|
return |
|
|
|
|
|
|
|
|
|
elif user_text and user_text.startswith("生成圖片"): |
|
prompt = user_text.replace("生成圖片", "").strip() |
|
|
|
|
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成中~ 請稍候.....✨")) |
|
image_generator = Image_text_Generator(user_id) |
|
|
|
image_binary = image_generator.generate_image_with_gemini(prompt) |
|
|
|
if image_binary: |
|
image_url = os.getenv("HF_SPACE")+image_generator.upload_image_to_tmp(image_binary) |
|
|
|
if image_url: |
|
|
|
line_bot_api.push_message( |
|
event.source.user_id, |
|
[ |
|
TextSendMessage(text="✨ 這是我為你生成的圖片喔~"), |
|
ImageSendMessage(original_content_url=image_url, preview_image_url=image_url) |
|
] |
|
) |
|
else: |
|
line_bot_api.push_message( |
|
event.source.user_id, |
|
TextSendMessage(text="⚠️ 圖片上傳失敗,請稍後再試~") |
|
) |
|
else: |
|
line_bot_api.push_message( |
|
event.source.user_id, |
|
TextSendMessage(text="⚠️ 圖片生成失敗,請稍後再試~") |
|
) |
|
return |
|
|
|
|
|
|
|
|
|
elif event.message.type == "text" and previous_message["type"] != "image": |
|
try: |
|
user_id = event.source.user_id |
|
chat = chat_sessions.get(user_id) or client.chats.create(model="gemini-2.0-flash", config=generation_config) |
|
chat_sessions[user_id] = chat |
|
|
|
user_input = event.message.text |
|
response = chat.send_message(user_input) |
|
if (response.text != None): |
|
out = response.text |
|
else: |
|
out = "Gemini沒答案!請換個說法!" |
|
except: |
|
|
|
out = "Gemini執行出錯!請換個說法!" |
|
|
|
elif previous_message and previous_message["type"] == "image" and event.message.type == "text": |
|
image_path = previous_message["content"] |
|
user_text = event.message.text |
|
store_user_message(user_id, "text", user_text) |
|
try: |
|
if not os.path.exists(image_path): |
|
raise FileNotFoundError(f"圖片路徑無效:{image_path}") |
|
previous_img = PIL.Image.open(image_path) |
|
user_id = event.source.user_id |
|
chat = chat_sessions.get(user_id) or client.chats.create(model="gemini-2.0-flash", config=generation_config) |
|
chat_sessions[user_id] = chat |
|
|
|
user_input = event.message.text |
|
response = chat.send_message([previous_img, user_input]) |
|
if (response.text != None): |
|
out = response.text |
|
else: |
|
out = "Gemini沒答案!請換個說法!" |
|
except: |
|
|
|
out = "Gemini執行出錯!請換個說法!" |
|
|
|
|
|
|
|
|
|
elif event.message.type == "image": |
|
image_path = get_image_url(event.message.id) |
|
if image_path: |
|
store_user_message(user_id, "image", image_path) |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")) |
|
else: |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="沒有接收到圖片~")) |
|
return |
|
|
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out)) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |
|
|