|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status |
|
import google.generativeai as genai |
|
|
|
import json |
|
import os |
|
|
|
from linebot import ( |
|
LineBotApi, WebhookHandler |
|
) |
|
from linebot.exceptions import ( |
|
InvalidSignatureError |
|
) |
|
from linebot.models import ( |
|
MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage |
|
) |
|
|
|
GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] |
|
genai.configure(api_key = GOOGLE_API_KEY) |
|
|
|
model = genai.GenerativeModel('gemini-pro') |
|
|
|
app = FastAPI() |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"]) |
|
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"]) |
|
|
|
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true" |
|
|
|
@app.get("/") |
|
def root(): |
|
return {"title": "Echo Bot"} |
|
|
|
@app.post("/webhook") |
|
async def webhook( |
|
request: Request, |
|
background_tasks: BackgroundTasks, |
|
x_line_signature=Header(None), |
|
): |
|
body = await request.body() |
|
try: |
|
background_tasks.add_task( |
|
line_handler.handle, body.decode("utf-8"), x_line_signature |
|
) |
|
except InvalidSignatureError: |
|
raise HTTPException(status_code=400, detail="Invalid signature") |
|
return "ok" |
|
|
|
@line_handler.add(MessageEvent, message=TextMessage) |
|
def handle_message(event): |
|
global working_status |
|
|
|
if event.type != "message" or event.message.type != "text": |
|
TextSendMessage(text="Gemini: [No response or the response does not contain text]") |
|
|
|
elif event.message.text == "再見": |
|
working_status = True |
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text="Bye!")) |
|
return |
|
|
|
elif working_status: |
|
try: |
|
messages = [] |
|
message = event.message.text |
|
messages.append({ |
|
"role": "user", |
|
"parts": [message], |
|
}) |
|
|
|
response = model.generate_content(messages) |
|
out = response.parts[0].text |
|
|
|
if not response.parts or not response.parts[0].text: |
|
out = "Gemini: [No response or the response does not contain text]" |
|
except: |
|
out = "Gemini: [No response or the response does not contain text]" |
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text=out)) |
|
|
|
if __name__ == "__main__": |
|
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |