Spaces:
Running
Running
import json, os | |
from io import BytesIO | |
import io | |
import gradio as gr | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi import FastAPI, Request, Header, BackgroundTasks, HTTPException, status | |
import google.generativeai as genai_gen | |
from google import genai | |
from google.genai import types | |
import base64 | |
from collections import defaultdict | |
from linebot import LineBotApi, WebhookHandler | |
from linebot.exceptions import InvalidSignatureError | |
from linebot.models import MessageEvent, TextMessage, TextSendMessage, ImageSendMessage, AudioMessage, ImageMessage | |
import PIL.Image | |
import tempfile | |
import httpx | |
from imgurpython import ImgurClient | |
import requests | |
import tempfile | |
from datetime import datetime | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM | |
from transformers import pipeline | |
# 設定 Google AI API 金鑰 | |
genai_gen.configure(api_key=os.environ["GOOGLE_API_KEY"]) | |
# 設定生成文字的參數 | |
generation_config = genai_gen.types.GenerationConfig(max_output_tokens=1000, temperature=0.2, top_p=0.5, top_k=16) #2048 | |
# 使用 Gemini-1.5-flash 模型 | |
model = genai_gen.GenerativeModel('gemini-2.0-flash-exp', system_instruction="主要用繁體中文回答,但如果用戶使用詢問英文問題,就用英文回應。你現在是個專業助理,職稱為OPEN小助理,個性活潑、樂觀,願意回答所有問題", generation_config=generation_config) | |
# 設定 Line Bot 的 API 金鑰和秘密金鑰 | |
line_bot_api = LineBotApi(os.environ["CHANNEL_ACCESS_TOKEN"]) | |
line_handler = WebhookHandler(os.environ["CHANNEL_SECRET"]) | |
# 設定 Imgur 的 API | |
client_id = os.environ.get("IMGUR_CLIENT_ID") | |
client_secret = os.environ.get("IMGUR_CLIENT_SECRET") | |
access_token = os.environ.get("IMGUR_ACCESS_TOKEN") | |
refresh_token = os.environ.get("IMGUR_REFRESH_TOKEN") | |
# 設定是否正在與使用者交談 | |
working_status = os.getenv("DEFALUT_TALKING", default = "true").lower() == "true" | |
# 建立 FastAPI 應用程式 | |
app = FastAPI() | |
# 設定 CORS,允許跨域請求 | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# 處理根路徑請求 | |
def root(): | |
return {"title": "Line Bot"} | |
# 處理 Line Webhook 請求 | |
async def webhook( | |
request: Request, | |
background_tasks: BackgroundTasks, | |
x_line_signature=Header(None), | |
): | |
# 取得請求內容 | |
body = await request.body() | |
try: | |
# 將處理 Line 事件的任務加入背景工作 | |
background_tasks.add_task( | |
line_handler.handle, body.decode("utf-8"), x_line_signature | |
) | |
except InvalidSignatureError: | |
# 處理無效的簽章錯誤 | |
raise HTTPException(status_code=400, detail="Invalid signature") | |
return "ok" | |
#========================== | |
# 使用者請求生成圖片 | |
#========================== | |
def upload_image_to_imgur(client, image_binary, album=None, name="Hugging Face-image", title="Hugging Face Generated Image"): | |
# 將 binary 資料轉為 PIL Image | |
image = PIL.Image.open(io.BytesIO(image_binary)) | |
# 建立暫存檔案來上傳 (因為 ImgurClient 需要檔案路徑) | |
with tempfile.NamedTemporaryFile(suffix=".png", delete=True) as tmp: | |
image.save(tmp.name, format='PNG') | |
# 準備上傳資訊 | |
config = { | |
'album': album, | |
'name': name, | |
'title': title, | |
'description': f'Generated by Hugging Face - {datetime.now()}' | |
} | |
# 使用 client 進行圖片上傳 | |
uploaded_image = client.upload_from_path(tmp.name, config=config, anon=False) | |
# 回傳圖片網址 | |
return uploaded_image['link'] | |
# 使用 Gemini 生成圖片 | |
def generate_image_with_gemini(prompt): | |
client = genai.Client(api_key=os.environ["GOOGLE_API_KEY"]) | |
response = client.models.generate_content( | |
model="gemini-2.0-flash-exp-image-generation", | |
contents=prompt, | |
config=types.GenerateContentConfig( | |
response_modalities=['Text', 'Image'] | |
) | |
) | |
for part in response.candidates[0].content.parts: | |
if part.text is not None: | |
print(part.text) | |
elif part.inline_data is not None: | |
#image = PIL.Image.open(io.BytesIO(part.inline_data.data)) | |
return part.inline_data.data | |
# 使用 Hugging Face 生成圖片 | |
def generate_image_hf(prompt): | |
API_URL = "https://api-inference.huggingface.co/models/stabilityai/stable-diffusion-2" | |
headers = {"Authorization": f"Bearer {os.environ['HF_TOKEN']}"} | |
response = requests.post(API_URL, headers=headers, json={"inputs": prompt}) | |
if response.status_code == 200: | |
return response.content # 圖片binary資料 | |
else: | |
print("圖片生成失敗:", response.text) | |
return None | |
def translate_zh_to_en(text): | |
model_name = "Helsinki-NLP/opus-mt-zh-en" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModelForSeq2SeqLM.from_pretrained(model_name) | |
inputs = tokenizer(text, return_tensors="pt") | |
outputs = model.generate(**inputs) | |
translated_text = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
return translated_text | |
#========================== | |
# 使用者上傳圖片 | |
#========================== | |
def get_image_url(message_id): | |
try: | |
message_content = line_bot_api.get_message_content(message_id) | |
file_path = f"/tmp/{message_id}.png" | |
with open(file_path, "wb") as f: | |
for chunk in message_content.iter_content(): | |
f.write(chunk) | |
return file_path | |
except Exception as e: | |
print(f"Error getting image: {e}") | |
return None | |
# 使用字典模擬用戶訊息歷史存儲 | |
user_message_history = defaultdict(list) | |
def store_user_message(user_id, message_type, message_content): | |
""" | |
儲存用戶的訊息 | |
""" | |
user_message_history[user_id].append({ | |
"type": message_type, | |
"content": message_content}) | |
def analyze_with_gemini(image_path, user_text): | |
""" | |
分析用戶問題和圖片,並返回 Gemini 的回應 | |
""" | |
try: | |
# 確保圖片存在 | |
if not os.path.exists(image_path): | |
raise FileNotFoundError(f"圖片路徑無效:{image_path}") | |
organ = PIL.Image.open(image_path) | |
response = chat.send_message([user_text, organ]) | |
# 提取回應內容 | |
return response.text | |
except Exception as e: | |
return f"發生錯誤: {e}" | |
def get_previous_message(user_id): | |
""" | |
獲取用戶的上一則訊息 | |
""" | |
if user_id in user_message_history and len(user_message_history[user_id]) > 0: | |
# 返回最後一則訊息 | |
return user_message_history[user_id][-1] | |
return None | |
#========================== | |
# 主程式(圖片與文字) | |
#========================== | |
# 建立 chat_sessions 字典 | |
chat_sessions = {} | |
def handle_image_message(event): | |
user_id = event.source.user_id | |
chat = chat_sessions.get(user_id) or model.start_chat(history=[]) | |
chat_sessions[user_id] = chat | |
user_text = event.message.text if event.message.type == "text" else None | |
image_url = None | |
if user_text and user_text.startswith("生成圖片"): | |
prompt = user_text.replace("生成圖片", "").strip() | |
# 先立即回覆避免token過期 | |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片生成中~ 請稍候.....✨")) | |
client = ImgurClient(client_id, client_secret, access_token, refresh_token) | |
#prompt_en = translate_zh_to_en(prompt) | |
# 生成圖片 | |
image_binary = generate_image_with_gemini(prompt) | |
if image_binary: | |
album = "nvsYwgq" | |
image_url = upload_image_to_imgur(client, image_binary, album) | |
if image_url: | |
# 使用 push message 發送圖片,避免 reply token 超時 | |
line_bot_api.push_message( | |
event.source.user_id, | |
[ | |
TextSendMessage(text="✨ 這是我為你生成的圖片喔~"), | |
ImageSendMessage(original_content_url=image_url, preview_image_url=image_url) | |
] | |
) | |
else: | |
line_bot_api.push_message( | |
event.source.user_id, | |
TextSendMessage(text="⚠️ 圖片上傳失敗,請稍後再試~") | |
) | |
else: | |
line_bot_api.push_message( | |
event.source.user_id, | |
TextSendMessage(text="⚠️ 圖片生成失敗,請稍後再試~") | |
) | |
return | |
if event.message.type == "image": | |
image_path = get_image_url(event.message.id) | |
if image_path: | |
store_user_message(user_id, "image", image_path) | |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="圖片已接收成功囉,幫我輸入你想詢問的問題喔~")) | |
else: | |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="沒有接收到圖片~")) | |
return | |
previous_message = get_previous_message(user_id) | |
if previous_message and previous_message["type"] == "image" and event.message.type == "text": | |
image_path = previous_message["content"] | |
user_text = event.message.text | |
store_user_message(user_id, "text", user_text) | |
try: | |
if not os.path.exists(image_path): | |
raise FileNotFoundError(f"圖片路徑無效:{image_path}") | |
organ = PIL.Image.open(image_path) | |
completion = chat.send_message([user_text, organ]) | |
out = completion.text | |
except Exception as e: | |
out = f"發生錯誤: {e}" | |
else: | |
if event.message.type != "text": | |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="請輸入文字或圖片~")) | |
return | |
if event.message.text == "再見": | |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text="Bye!")) | |
return | |
if working_status: | |
try: | |
prompt = event.message.text | |
store_user_message(user_id, "text", prompt) | |
completion = chat.send_message(prompt) | |
out = completion.text if completion.text else "我不太懂什麼意思也~" | |
except: | |
out = "執行出錯!請換個說法!" | |
line_bot_api.reply_message(event.reply_token, TextSendMessage(text=out)) | |
if __name__ == "__main__": | |
# 啟動 FastAPI 應用程式 | |
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True) | |
# 註解說明: | |
# import 導入必要的套件 | |
# genai.configure 設定 Google AI API 金鑰 | |
# generation_config 設定文字生成參數 | |
# model 設定使用的 Gemini 模型 | |
# line_bot_api 和 line_handler 設定 Line Bot API 和 webhook 處理器 | |
# working_status 設定是否正在與使用者交談 | |
# app 建立 FastAPI 應用程式 | |
# app.add_middleware 設定 CORS | |
# @app.get("/") 處理根路徑請求 | |
# @app.post("/webhook") 處理 Line Webhook 請求 | |
# @line_handler.add(MessageEvent, message=TextMessage) 處理文字訊息事件 | |
# if __name__ == "__main__": 啟動 FastAPI 應用程式 | |
# 程式碼功能說明: | |
# 程式碼首先會導入必要的套件,並設定 Google AI API 金鑰、文字生成參數、Gemini 模型以及 Line Bot API。 | |
# 接著會建立 FastAPI 應用程式,並設定 CORS。 | |
# 程式碼會定義兩個函數: | |
# root() 處理根路徑請求,返回一個簡單的 JSON 訊息。 | |
# webhook() 處理 Line Webhook 請求,將處理 Line 事件的任務加入背景工作,並處理無效的簽章錯誤。 | |
# 程式碼還定義一個函數 handle_message() 來處理文字訊息事件,它會檢查事件類型和訊息類型,並根據使用者輸入執行不同的動作: | |
# 如果使用者輸入 "再見",回覆 "Bye!"。 | |
# 如果正在與使用者交談,則會使用 Gemini 模型生成文字,並將結果回覆給使用者。 | |
# 最後,程式碼會啟動 FastAPI 應用程式,開始監聽 HTTP 請求。 | |
# 程式碼運行方式: | |
# 將程式碼存為 main.py 文件。 | |
# 在環境變數中設定 GOOGLE_API_KEY、CHANNEL_ACCESS_TOKEN 和 CHANNEL_SECRET。 | |
# 執行 uvicorn main:app --host 0.0.0.0 --port 7860 --reload 命令啟動 FastAPI 應用程式。 | |
# 使用 Line 帳戶與 Line Bot 進行對話。 | |
# 注意: | |
# 程式碼中使用 os.environ["GOOGLE_API_KEY"]、os.environ["CHANNEL_ACCESS_TOKEN"] 和 os.environ["CHANNEL_SECRET"] 來存取環境變數,需要先在環境變數中設定這些值。 | |
# 程式碼中使用 uvicorn 執行 FastAPI 應用程式,需要先安裝 uvicorn 套件。 | |
# 程式碼中使用 google.generativeai 套件,需要先安裝 google-generativeai 套件。 | |
# 程式碼中使用 linebot 套件,需要先安裝 linebot 套件。 |