Update main.py
Browse files
main.py
CHANGED
@@ -1,48 +1,36 @@
|
|
1 |
import json, os
|
2 |
import gradio as gr
|
3 |
from fastapi.middleware.cors import CORSMiddleware
|
4 |
-
from fastapi import FastAPI, Request,
|
5 |
import google.generativeai as genai
|
6 |
import base64
|
7 |
from collections import defaultdict
|
8 |
from linebot import LineBotApi, WebhookHandler
|
9 |
from linebot.exceptions import InvalidSignatureError
|
10 |
-
from linebot.models import
|
11 |
-
MessageEvent,
|
12 |
-
TextMessage,
|
13 |
-
TextSendMessage,
|
14 |
-
ImageSendMessage,
|
15 |
-
AudioMessage,
|
16 |
-
ImageMessage,
|
17 |
-
)
|
18 |
import PIL.Image
|
19 |
import tempfile
|
20 |
-
import httpx
|
21 |
-
import uvicorn
|
22 |
|
23 |
# 設定 Google AI API 金鑰
|
24 |
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
|
25 |
|
26 |
# 設定生成文字的參數
|
27 |
-
generation_config = genai.types.GenerationConfig(
|
28 |
-
max_output_tokens=1000, temperature=0.2, top_p=0.5, top_k=16
|
29 |
-
)
|
30 |
|
31 |
-
# 使用 Gemini 模型
|
32 |
-
model = genai.GenerativeModel(
|
33 |
-
"gemini-2.0-flash-exp",
|
34 |
-
system_instruction="主要用繁體中文回答,但如果用戶使用詢問英文問題,就用英文回應。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題",
|
35 |
-
generation_config=generation_config,
|
36 |
-
)
|
37 |
|
38 |
-
#
|
39 |
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
|
40 |
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
|
41 |
|
|
|
|
|
|
|
42 |
# 建立 FastAPI 應用程式
|
43 |
app = FastAPI()
|
44 |
|
45 |
-
# 設定 CORS
|
46 |
app.add_middleware(
|
47 |
CORSMiddleware,
|
48 |
allow_origins=["*"],
|
@@ -51,29 +39,48 @@ app.add_middleware(
|
|
51 |
allow_headers=["*"],
|
52 |
)
|
53 |
|
|
|
54 |
@app.get("/")
|
55 |
def root():
|
56 |
return {"title": "Line Bot"}
|
57 |
|
|
|
58 |
@app.post("/webhook")
|
59 |
-
async def webhook(
|
|
|
|
|
|
|
|
|
|
|
60 |
body = await request.body()
|
61 |
try:
|
|
|
62 |
background_tasks.add_task(
|
63 |
line_handler.handle, body.decode("utf-8"), x_line_signature
|
64 |
)
|
65 |
except InvalidSignatureError:
|
|
|
66 |
raise HTTPException(status_code=400, detail="Invalid signature")
|
67 |
return "ok"
|
68 |
-
|
69 |
-
|
70 |
-
|
|
|
|
|
|
|
|
|
|
|
71 |
try:
|
|
|
|
|
|
|
72 |
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
73 |
data = {
|
74 |
"image": base64.b64encode(image_binary).decode("utf-8"),
|
75 |
"type": "base64",
|
76 |
}
|
|
|
77 |
response = httpx.post("https://api.imgur.com/3/image", headers=headers, data=data)
|
78 |
if response.status_code == 200:
|
79 |
return response.json()["data"]["link"]
|
@@ -84,7 +91,10 @@ def upload_image_to_imgur_with_token(image_binary, access_token):
|
|
84 |
print("圖片上傳例外:", e)
|
85 |
return None
|
86 |
|
87 |
-
|
|
|
|
|
|
|
88 |
def get_image_url(message_id):
|
89 |
try:
|
90 |
message_content = line_bot_api.get_message_content(message_id)
|
@@ -97,31 +107,48 @@ def get_image_url(message_id):
|
|
97 |
print(f"Error getting image: {e}")
|
98 |
return None
|
99 |
|
100 |
-
#
|
101 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
102 |
try:
|
|
|
103 |
if not os.path.exists(image_path):
|
104 |
raise FileNotFoundError(f"圖片路徑無效:{image_path}")
|
|
|
105 |
organ = PIL.Image.open(image_path)
|
106 |
response = chat.send_message([user_text, organ])
|
|
|
|
|
107 |
return response.text
|
|
|
108 |
except Exception as e:
|
109 |
return f"發生錯誤: {e}"
|
110 |
-
|
111 |
-
# 儲存使用者訊息
|
112 |
-
user_message_history = defaultdict(list)
|
113 |
-
def store_user_message(user_id, message_type, message_content):
|
114 |
-
user_message_history[user_id].append({
|
115 |
-
"type": message_type,
|
116 |
-
"content": message_content
|
117 |
-
})
|
118 |
|
119 |
def get_previous_message(user_id):
|
|
|
|
|
|
|
120 |
if user_id in user_message_history and len(user_message_history[user_id]) > 0:
|
|
|
121 |
return user_message_history[user_id][-1]
|
122 |
return None
|
123 |
|
124 |
-
|
|
|
|
|
|
|
125 |
chat_sessions = {}
|
126 |
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
|
127 |
def handle_image_message(event):
|
@@ -133,24 +160,23 @@ def handle_image_message(event):
|
|
133 |
|
134 |
if user_text and user_text.startswith("請幫我生成圖片"):
|
135 |
prompt = user_text.replace("請幫我生成圖片", "").strip()
|
|
|
136 |
image_model = genai.GenerativeModel("gemini-2.0-flash-exp-image-generation")
|
137 |
response = image_model.generate_content(prompt)
|
138 |
|
139 |
if response.parts and hasattr(response.parts[0], "inline_data"):
|
140 |
image_data = response.parts[0].inline_data.data
|
141 |
access_token = os.environ.get("IMGUR_ACCESS_TOKEN")
|
142 |
-
image_url = upload_image_to_imgur_with_token(image_data,
|
143 |
-
|
144 |
-
|
145 |
-
|
146 |
-
|
147 |
-
|
148 |
-
|
149 |
-
|
150 |
-
|
151 |
-
|
152 |
-
else:
|
153 |
-
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片上傳失敗,請稍後再試~"))
|
154 |
else:
|
155 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成失敗,請稍後再試~"))
|
156 |
return
|
@@ -169,7 +195,14 @@ def handle_image_message(event):
|
|
169 |
image_path = previous_message["content"]
|
170 |
user_text = event.message.text
|
171 |
store_user_message(user_id, "text", user_text)
|
172 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
173 |
else:
|
174 |
if event.message.type != "text":
|
175 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~"))
|
@@ -188,8 +221,9 @@ def handle_image_message(event):
|
|
188 |
|
189 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
|
190 |
|
191 |
-
|
192 |
if __name__ == "__main__":
|
|
|
193 |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)
|
194 |
|
195 |
# 註解說明:
|
|
|
1 |
import json, os
|
2 |
import gradio as gr
|
3 |
from fastapi.middleware.cors import CORSMiddleware
|
4 |
+
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status
|
5 |
import google.generativeai as genai
|
6 |
import base64
|
7 |
from collections import defaultdict
|
8 |
from linebot import LineBotApi, WebhookHandler
|
9 |
from linebot.exceptions import InvalidSignatureError
|
10 |
+
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
11 |
import PIL.Image
|
12 |
import tempfile
|
|
|
|
|
13 |
|
14 |
# 設定 Google AI API 金鑰
|
15 |
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
|
16 |
|
17 |
# 設定生成文字的參數
|
18 |
+
generation_config = genai.types.GenerationConfig(max_output_tokens=1000, temperature=0.2, top_p=0.5, top_k=16) #2048
|
|
|
|
|
19 |
|
20 |
+
# 使用 Gemini-1.5-flash 模型
|
21 |
+
model = genai.GenerativeModel('gemini-2.0-flash-exp', system_instruction="主要用繁體中文回答,但如果用戶使用詢問英文問題,就用英文回應。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題", generation_config=generation_config)
|
|
|
|
|
|
|
|
|
22 |
|
23 |
+
# 設定 Line Bot 的 API 金鑰和秘密金鑰
|
24 |
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"])
|
25 |
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"])
|
26 |
|
27 |
+
# 設定是否正在與使用者交談
|
28 |
+
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true"
|
29 |
+
|
30 |
# 建立 FastAPI 應用程式
|
31 |
app = FastAPI()
|
32 |
|
33 |
+
# 設定 CORS,允許跨域請求
|
34 |
app.add_middleware(
|
35 |
CORSMiddleware,
|
36 |
allow_origins=["*"],
|
|
|
39 |
allow_headers=["*"],
|
40 |
)
|
41 |
|
42 |
+
# 處理根路徑請求
|
43 |
@app.get("/")
|
44 |
def root():
|
45 |
return {"title": "Line Bot"}
|
46 |
|
47 |
+
# 處理 Line Webhook 請求
|
48 |
@app.post("/webhook")
|
49 |
+
async def webhook(
|
50 |
+
request: Request,
|
51 |
+
background_tasks: BackgroundTasks,
|
52 |
+
x_line_signature=Header(None),
|
53 |
+
):
|
54 |
+
# 取得請求內容
|
55 |
body = await request.body()
|
56 |
try:
|
57 |
+
# 將處理 Line 事件的任務加入背景工作
|
58 |
background_tasks.add_task(
|
59 |
line_handler.handle, body.decode("utf-8"), x_line_signature
|
60 |
)
|
61 |
except InvalidSignatureError:
|
62 |
+
# 處理無效的簽章錯誤
|
63 |
raise HTTPException(status_code=400, detail="Invalid signature")
|
64 |
return "ok"
|
65 |
+
|
66 |
+
#==========================
|
67 |
+
# 使用者請求生成圖片
|
68 |
+
#==========================
|
69 |
+
def upload_image_to_imgur_with_token(image_path, access_token):
|
70 |
+
"""
|
71 |
+
使用 Imgur OAuth2 Access Token 上傳圖片
|
72 |
+
"""
|
73 |
try:
|
74 |
+
with open(image_path, "rb") as img_file:
|
75 |
+
image_binary = img_file.read()
|
76 |
+
|
77 |
headers = {"Authorization": f"Bearer {access_token}"}
|
78 |
+
|
79 |
data = {
|
80 |
"image": base64.b64encode(image_binary).decode("utf-8"),
|
81 |
"type": "base64",
|
82 |
}
|
83 |
+
|
84 |
response = httpx.post("https://api.imgur.com/3/image", headers=headers, data=data)
|
85 |
if response.status_code == 200:
|
86 |
return response.json()["data"]["link"]
|
|
|
91 |
print("圖片上傳例外:", e)
|
92 |
return None
|
93 |
|
94 |
+
|
95 |
+
#==========================
|
96 |
+
# 使用者上傳圖片
|
97 |
+
#==========================
|
98 |
def get_image_url(message_id):
|
99 |
try:
|
100 |
message_content = line_bot_api.get_message_content(message_id)
|
|
|
107 |
print(f"Error getting image: {e}")
|
108 |
return None
|
109 |
|
110 |
+
# 使用字典模擬用戶訊息歷史存儲
|
111 |
+
user_message_history = defaultdict(list)
|
112 |
+
def store_user_message(user_id, message_type, message_content):
|
113 |
+
"""
|
114 |
+
儲存用戶的訊息
|
115 |
+
"""
|
116 |
+
user_message_history[user_id].append({
|
117 |
+
"type": message_type,
|
118 |
+
"content": message_content})
|
119 |
+
|
120 |
+
def analyze_with_gemini(image_path, user_text):
|
121 |
+
"""
|
122 |
+
分析用戶問題和圖片,並返回 Gemini 的回應
|
123 |
+
"""
|
124 |
try:
|
125 |
+
# ���保圖片存在
|
126 |
if not os.path.exists(image_path):
|
127 |
raise FileNotFoundError(f"圖片路徑無效:{image_path}")
|
128 |
+
|
129 |
organ = PIL.Image.open(image_path)
|
130 |
response = chat.send_message([user_text, organ])
|
131 |
+
|
132 |
+
# 提取回應內容
|
133 |
return response.text
|
134 |
+
|
135 |
except Exception as e:
|
136 |
return f"發生錯誤: {e}"
|
137 |
+
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
138 |
|
139 |
def get_previous_message(user_id):
|
140 |
+
"""
|
141 |
+
獲取用戶的上一則訊息
|
142 |
+
"""
|
143 |
if user_id in user_message_history and len(user_message_history[user_id]) > 0:
|
144 |
+
# 返回最後一則訊息
|
145 |
return user_message_history[user_id][-1]
|
146 |
return None
|
147 |
|
148 |
+
#==========================
|
149 |
+
# 主程式(圖片與文字)
|
150 |
+
#==========================
|
151 |
+
# 建立 chat_sessions 字典
|
152 |
chat_sessions = {}
|
153 |
@line_handler.add(MessageEvent, message=(ImageMessage, TextMessage))
|
154 |
def handle_image_message(event):
|
|
|
160 |
|
161 |
if user_text and user_text.startswith("請幫我生成圖片"):
|
162 |
prompt = user_text.replace("請幫我生成圖片", "").strip()
|
163 |
+
|
164 |
image_model = genai.GenerativeModel("gemini-2.0-flash-exp-image-generation")
|
165 |
response = image_model.generate_content(prompt)
|
166 |
|
167 |
if response.parts and hasattr(response.parts[0], "inline_data"):
|
168 |
image_data = response.parts[0].inline_data.data
|
169 |
access_token = os.environ.get("IMGUR_ACCESS_TOKEN")
|
170 |
+
image_url = upload_image_to_imgur_with_token(image_data,access_token)
|
171 |
+
|
172 |
+
if image_url:
|
173 |
+
line_bot_api.reply_message(
|
174 |
+
event.reply_token,
|
175 |
+
[
|
176 |
+
TextSendMessage(text="這是我為你生成的圖片喔~ ✨"),
|
177 |
+
ImageSendMessage(original_content_url=image_url, preview_image_url=image_url)
|
178 |
+
]
|
179 |
+
)
|
|
|
|
|
180 |
else:
|
181 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成失敗,請稍後再試~"))
|
182 |
return
|
|
|
195 |
image_path = previous_message["content"]
|
196 |
user_text = event.message.text
|
197 |
store_user_message(user_id, "text", user_text)
|
198 |
+
try:
|
199 |
+
if not os.path.exists(image_path):
|
200 |
+
raise FileNotFoundError(f"圖片路徑無效:{image_path}")
|
201 |
+
organ = PIL.Image.open(image_path)
|
202 |
+
completion = chat.send_message([user_text, organ])
|
203 |
+
out = completion.text
|
204 |
+
except Exception as e:
|
205 |
+
out = f"發生錯誤: {e}"
|
206 |
else:
|
207 |
if event.message.type != "text":
|
208 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~"))
|
|
|
221 |
|
222 |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out))
|
223 |
|
224 |
+
|
225 |
if __name__ == "__main__":
|
226 |
+
# 啟動 FastAPI 應用程式
|
227 |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)
|
228 |
|
229 |
# 註解說明:
|