|
import json, os |
|
import gradio as gr |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status |
|
import google.generativeai as genai |
|
import openai |
|
from linebot import LineBotApi, WebhookHandler |
|
from linebot.exceptions import InvalidSignatureError |
|
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage |
|
|
|
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
|
|
|
|
genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) |
|
|
|
|
|
generation_config = genai.types.GenerationConfig(max_output_tokens=2048, temperature=0.2, top_p=0.5, top_k=16) |
|
|
|
|
|
model = genai.GenerativeModel('gemini-1.5-flash', system_instruction="請用繁體中文回答。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題") |
|
|
|
|
|
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"]) |
|
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"]) |
|
|
|
|
|
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true" |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
@app.get("/") |
|
def root(): |
|
return {"title": "Line Bot"} |
|
|
|
|
|
@app.post("/webhook") |
|
async def webhook( |
|
request: Request, |
|
background_tasks: BackgroundTasks, |
|
x_line_signature=Header(None), |
|
): |
|
|
|
body = await request.body() |
|
try: |
|
|
|
background_tasks.add_task( |
|
line_handler.handle, body.decode("utf-8"), x_line_signature |
|
) |
|
except InvalidSignatureError: |
|
|
|
raise HTTPException(status_code=400, detail="Invalid signature") |
|
return "ok" |
|
|
|
|
|
@line_handler.add(MessageEvent, message=TextMessage) |
|
def handle_message(event): |
|
global working_status |
|
|
|
|
|
if event.type != "message" or event.message.type != "text": |
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text="Event type error:[No message or the message does not contain text]") |
|
) |
|
|
|
|
|
elif event.message.text == "再見": |
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text="Bye!") |
|
) |
|
return |
|
|
|
|
|
elif working_status: |
|
try: |
|
|
|
prompt = event.message.text |
|
|
|
completion = model.generate_content(prompt, generation_config=generation_config) |
|
|
|
if (completion.parts[0].text != None): |
|
|
|
out = completion.parts[0].text |
|
else: |
|
|
|
out = "Gemini沒答案!請換個說法!" |
|
except: |
|
|
|
out = "Gemini執行出錯!請換個說法!" |
|
|
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text=out)) |
|
|
|
|
|
def get_image_url(message_id): |
|
"""從 Line 取得圖片 URL。""" |
|
try: |
|
content = line_bot_api.get_message_content(message_id) |
|
|
|
|
|
response = requests.get(line_bot_api.config.http_client.line_endpoint + "/bot/message/" + message_id + "/content", headers=line_bot_api.config.http_client.default_headers, stream=True) |
|
if response.status_code == 200: |
|
return response.url |
|
else: |
|
print(f"Error getting image url: {response.status_code}") |
|
return None |
|
except Exception as e: |
|
print(f"Error getting image content: {e}") |
|
return None |
|
|
|
@line_handler.add(MessageEvent, message=ImageMessage) |
|
def handle_image_message(event): |
|
image_url = get_image_url(event.message.id) |
|
if not image_url: |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="無法取得圖片。")) |
|
return |
|
|
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text="圖片已收到,請針對圖片提問。")) |
|
|
|
|
|
user_id = event.source.user_id |
|
user_sessions[user_id] = {"image_url": image_url} |
|
|
|
@line_handler.add(MessageEvent, message=TextMessage) |
|
def handle_text_message(event): |
|
user_id = event.source.user_id |
|
if user_id in user_sessions and "image_url" in user_sessions[user_id]: |
|
image_url = user_sessions[user_id]["image_url"] |
|
user_question = event.message.text |
|
del user_sessions[user_id] |
|
|
|
try: |
|
response = openai.ChatCompletion.create( |
|
model="gpt-4-vision-preview", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": user_question}, |
|
{"type": "image_url", "image_url": {"url": image_url}}, |
|
], |
|
} |
|
], |
|
max_tokens=300, |
|
) |
|
bot_reply = response.choices[0].message.content |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=bot_reply)) |
|
except openai.error.OpenAIError as e: |
|
print(f"OpenAI API error: {e}") |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"處理您的問題時發生錯誤:{e}")) |
|
except Exception as e: |
|
print(f"Other error: {e}") |
|
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=f"發生未預期的錯誤:{e}")) |
|
|
|
else: |
|
line_bot_api.reply_message(event.reply_token,TextSendMessage(text=event.message.text)) |
|
user_sessions = {} |
|
|
|
if __name__ == "__main__": |
|
|
|
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|