Spaces:
Running
Running
import gradio as gr | |
import torch | |
from torch.utils.data import DataLoader # <--- 新增這一行 | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForMultipleChoice, | |
AutoModelForQuestionAnswering, | |
default_data_collator # 如果您在 app.py 中也使用它 | |
) | |
import json | |
import collections | |
import numpy as np | |
from datasets import Dataset | |
from utils_qa import postprocess_qa_predictions | |
import logging | |
logger = logging.getLogger(__name__) | |
logging.basicConfig( | |
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", | |
datefmt="%m/%d/%Y %H:%M:%S", | |
level=logging.INFO, # Or logging.DEBUG for more verbose output | |
) | |
# 假設 utils_qa.py 在同一目錄下 (或者您需要將其函數複製過來或確保可導入) | |
# from utils_qa import postprocess_qa_predictions # 您可能需要完整路徑或將其放入 requirements.txt | |
# --- 模型和分詞器加載 --- | |
# 建議從 Hugging Face Hub 加載您已經上傳的模型 | |
# 這樣您的 Space 就不需要包含模型文件本身,保持輕量 | |
TOKENIZER_PATH = "bert-base-chinese" # 或者您上傳的分詞器路徑 | |
SELECTOR_MODEL_PATH = "TheWeeeed/chinese-paragraph-selector" # 替換為您上傳的段落選擇模型 ID | |
QA_MODEL_PATH = "TheWeeeed/chinese-extractive-qa" # 替換為您上傳的答案抽取模型 ID | |
try: | |
tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_PATH) | |
selector_model = AutoModelForMultipleChoice.from_pretrained(SELECTOR_MODEL_PATH) | |
qa_model = AutoModelForQuestionAnswering.from_pretrained(QA_MODEL_PATH) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
selector_model.to(device) | |
selector_model.eval() | |
qa_model.to(device) | |
qa_model.eval() | |
models_loaded_successfully = True | |
print(f"模型和分詞器加載成功,使用設備: {device}") | |
except Exception as e: | |
models_loaded_successfully = False | |
error_message = f"加載模型或分詞器時出錯: {e}" | |
print(error_message) | |
# 在 Gradio 界面中,我們可以顯示這個錯誤信息 | |
# --- 從您的 inference_pipeline.py 中提取並調整以下函數 --- | |
def select_relevant_paragraph_gradio(question_text, candidate_paragraph_texts_str, model, tokenizer, device, max_seq_len): | |
# candidate_paragraph_texts_str 是一個由換行符分隔的字符串 | |
candidate_paragraph_texts = [p.strip() for p in candidate_paragraph_texts_str.split('\n') if p.strip()] | |
if not candidate_paragraph_texts: | |
return "請至少提供一個候選段落。", -1 | |
model.eval() | |
inputs_mc = [] | |
for p_text in candidate_paragraph_texts: | |
inputs_mc.append( | |
tokenizer( | |
question_text, p_text, add_special_tokens=True, max_length=max_seq_len, | |
padding="max_length", truncation=True, return_tensors="pt" | |
) | |
) | |
input_ids = torch.stack([inp["input_ids"].squeeze(0) for inp in inputs_mc]).unsqueeze(0).to(device) | |
attention_mask = torch.stack([inp["attention_mask"].squeeze(0) for inp in inputs_mc]).unsqueeze(0).to(device) | |
token_type_ids = None | |
if "token_type_ids" in inputs_mc[0]: | |
token_type_ids = torch.stack([inp["token_type_ids"].squeeze(0) for inp in inputs_mc]).unsqueeze(0).to(device) | |
with torch.no_grad(): | |
if token_type_ids is not None: | |
outputs = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) | |
else: | |
outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
predicted_index = torch.argmax(outputs.logits, dim=1).item() | |
if predicted_index < len(candidate_paragraph_texts): | |
return candidate_paragraph_texts[predicted_index], predicted_index | |
else: | |
return "段落選擇索引錯誤。", -1 | |
def prepare_features_for_qa_inference_gradio(question_id, question_text, selected_context, tokenizer, max_seq_len, doc_stride): | |
# 這個函數需要從您的 inference_pipeline.py 中提取並適當修改 | |
# 它需要返回一個可以被 QA 模型使用的 Dataset 或 features 列表 | |
# 簡化版: | |
from datasets import Dataset # 需要在 requirements.txt 中 | |
qa_example_for_processing = {"id": [question_id], "question": [question_text], "context": [selected_context]} | |
temp_dataset = Dataset.from_dict(qa_example_for_processing) | |
pad_on_right = tokenizer.padding_side == "right" | |
qa_features = temp_dataset.map( | |
lambda examples: prepare_features_for_qa_inference( # 這是您 inference_pipeline.py 中的函數 | |
examples, tokenizer, pad_on_right, max_seq_len, doc_stride | |
), | |
batched=True, | |
remove_columns=temp_dataset.column_names | |
) | |
return qa_features # 返回 Dataset 對象 | |
# 您 inference_pipeline.py 中的 prepare_features_for_qa_inference 函數需要被複製到這裡 | |
# 或者確保它可以被導入 | |
def prepare_features_for_qa_inference(examples, tokenizer, pad_on_right, max_seq_len, doc_stride): | |
# Initial stripping and assignment | |
examples["question"] = [q.lstrip() if isinstance(q, str) else "" for q in examples["question"]] | |
questions_to_tokenize = examples["question" if pad_on_right else "context"] | |
contexts_to_tokenize = examples["context" if pad_on_right else "question"] | |
questions_to_tokenize = [q if isinstance(q, str) else "" for q in questions_to_tokenize] | |
contexts_to_tokenize = [c if isinstance(c, str) else "" for c in contexts_to_tokenize] | |
# Handle cases where either question or context might be empty after processing | |
# Tokenizer might handle empty strings, but let's be explicit if one is vital | |
valid_inputs_for_tokenizer_q = [] | |
valid_inputs_for_tokenizer_c = [] | |
original_indices_for_valid_inputs = [] | |
for i in range(len(questions_to_tokenize)): | |
q_str = questions_to_tokenize[i] | |
c_str = contexts_to_tokenize[i] | |
# Add a basic check: if context is empty, tokenization might be problematic for QA | |
if q_str.strip() and c_str.strip(): # Ensure both have content after stripping | |
valid_inputs_for_tokenizer_q.append(q_str) | |
valid_inputs_for_tokenizer_c.append(c_str) | |
original_indices_for_valid_inputs.append(i) | |
else: | |
logger.warning(f"Skipping tokenization for example index {i} due to empty question or context. Q: '{q_str}', C: '{c_str}'") | |
if not valid_inputs_for_tokenizer_q: # No valid (q,c) pairs to tokenize | |
logger.error(f"No valid question/context pairs to tokenize for examples with IDs: {examples.get('id', ['N/A'])}. Returning empty features.") | |
# Return a structure that .map expects (dictionary of empty lists for all expected keys) | |
return {key: [] for key in ["input_ids", "attention_mask", "token_type_ids", "example_id", "offset_mapping"]} | |
tokenized_output = tokenizer( | |
valid_inputs_for_tokenizer_q, | |
valid_inputs_for_tokenizer_c, | |
truncation="only_second" if pad_on_right else "only_first", | |
max_length=max_seq_len, | |
stride=doc_stride, | |
return_overflowing_tokens=True, | |
return_offsets_mapping=True, | |
padding="max_length", | |
) | |
# Robustness check and fix for tokenizer outputs | |
keys_to_fix = ["input_ids", "attention_mask", "token_type_ids"] | |
pad_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0 | |
cls_id = tokenizer.cls_token_id if tokenizer.cls_token_id is not None else 101 # Common default | |
sep_id = tokenizer.sep_token_id if tokenizer.sep_token_id is not None else 102 # Common default | |
for key in keys_to_fix: | |
if key in tokenized_output: | |
for i in range(len(tokenized_output[key])): # Iterate over each feature's list for this key | |
feature_list = tokenized_output[key][i] | |
if feature_list is None: # If the entire list for a feature is None | |
logger.warning(f"Tokenizer produced None for '{key}' at feature index {i}. Replacing with default.") | |
if key == "input_ids": | |
default_seq = [cls_id, sep_id] + [pad_id] * (max_seq_len - 2) | |
tokenized_output[key][i] = default_seq[:max_seq_len] | |
elif key == "attention_mask": | |
default_mask = [1, 1] + [0] * (max_seq_len - 2) | |
tokenized_output[key][i] = default_mask[:max_seq_len] | |
elif key == "token_type_ids": | |
tokenized_output[key][i] = [0] * max_seq_len | |
elif not all(isinstance(x, int) for x in feature_list): # Check for non-integers (like None) | |
logger.warning(f"Tokenizer produced non-integers in '{key}' at feature index {i}: {str(feature_list)[:100]}... Fixing.") | |
default_val = pad_id if key == "input_ids" else 0 | |
tokenized_output[key][i] = [default_val if not isinstance(x, int) else x for x in feature_list] | |
processed_features = [] | |
num_generated_features = len(tokenized_output["input_ids"]) | |
# sample_mapping from tokenized_output might be incorrect if we filtered inputs | |
# Reconstruct sample_mapping based on original_indices_for_valid_inputs and overflow | |
# This part gets tricky if return_overflowing_tokens is True and we filtered. | |
# For simplicity, let's assume for now that if valid_inputs_for_tokenizer_q is not empty, | |
# tokenizer works on all of them. The more complex case is if tokenizer itself only processes a subset. | |
# The `overflow_to_sample_mapping` maps generated features to the indices in the *input to the tokenizer*. | |
# Our input to tokenizer was `valid_inputs_for_tokenizer_q/c`. | |
overflow_mapping = tokenized_output.pop("overflow_to_sample_mapping") | |
for i in range(num_generated_features): | |
feature = {} | |
# Map the index from the tokenizer's output (which is based on valid_inputs) | |
# back to the index in the original `examples` batch. | |
idx_in_valid_inputs = overflow_mapping[i] | |
original_example_batch_index = original_indices_for_valid_inputs[idx_in_valid_inputs] | |
feature["input_ids"] = tokenized_output["input_ids"][i] | |
if "attention_mask" in tokenized_output: | |
feature["attention_mask"] = tokenized_output["attention_mask"][i] | |
if "token_type_ids" in tokenized_output: | |
feature["token_type_ids"] = tokenized_output["token_type_ids"][i] | |
feature["example_id"] = examples["id"][original_example_batch_index] | |
current_offset_mapping = tokenized_output["offset_mapping"][i] | |
sequence_ids = tokenized_output.sequence_ids(i) | |
context_idx_in_pair = 1 if pad_on_right else 0 | |
feature["offset_mapping"] = [ | |
offset if sequence_ids is not None and k < len(sequence_ids) and sequence_ids[k] == context_idx_in_pair else None | |
for k, offset in enumerate(current_offset_mapping) | |
] | |
processed_features.append(feature) | |
final_batch = {} | |
if not processed_features: | |
logger.warning(f"No features generated for example IDs: {examples.get('id', ['N/A'])}. Returning empty structure.") | |
# 確保返回的結構與 .map 期望的一致,即字典的鍵是列名,值是空列表 | |
for key_to_ensure in ['input_ids', 'attention_mask', 'token_type_ids', 'example_id', 'offset_mapping']: | |
final_batch[key_to_ensure] = [] | |
return final_batch | |
# 1. 首先,將 processed_features (list of dicts) 轉換為 final_batch (dict of lists) | |
for key in processed_features[0].keys(): # 假設所有特徵字典有相同的鍵 | |
final_batch[key] = [feature[key] for feature in processed_features] | |
# 2. 然後,對 final_batch 中需要轉換為張量的字段進行健壯性檢查和修正 | |
keys_to_fix_for_tensor_conversion = ["input_ids", "attention_mask", "token_type_ids"] | |
pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0 | |
cls_token_id = tokenizer.cls_token_id if tokenizer.cls_token_id is not None else 101 | |
sep_token_id = tokenizer.sep_token_id if tokenizer.sep_token_id is not None else 102 | |
for key in keys_to_fix_for_tensor_conversion: | |
if key in final_batch: | |
# final_batch[key] 是一個列表的列表,例如 [[ids_for_feature1], [ids_for_feature2], ...] | |
corrected_list_of_lists = [] | |
for i, single_feature_list in enumerate(final_batch[key]): | |
if single_feature_list is None: | |
logger.warning(f"Feature list for '{key}' at index {i} is None. Replacing with default for max_seq_len {max_seq_len}.") | |
if key == "input_ids": | |
default_seq = [cls_token_id, sep_token_id] + [pad_token_id] * (max_seq_len - 2) | |
corrected_list_of_lists.append(default_seq[:max_seq_len]) | |
elif key == "attention_mask": | |
default_mask = [1, 1] + [0] * (max_seq_len - 2) | |
corrected_list_of_lists.append(default_mask[:max_seq_len]) | |
elif key == "token_type_ids": | |
corrected_list_of_lists.append([0] * max_seq_len) | |
elif not all(isinstance(x, int) for x in single_feature_list): | |
logger.warning(f"Feature list for '{key}' at index {i} contains non-integers: {str(single_feature_list)[:50]}... Fixing Nones.") | |
default_val = pad_token_id if key == "input_ids" else 0 | |
fixed_list = [default_val if not isinstance(x, int) else x for x in single_feature_list] | |
corrected_list_of_lists.append(fixed_list) | |
else: | |
corrected_list_of_lists.append(single_feature_list) # List is already good | |
final_batch[key] = corrected_list_of_lists | |
# 在返回前,可以再加一層打印,確認修正後的 final_batch 結構 | |
# logger.debug(f"Returning final_batch from prepare_features: { {k: str(v)[:200] + '...' for k,v in final_batch.items()} }") | |
return final_batch | |
# postprocess_qa_predictions 函數也需要從 utils_qa.py 複製或導入 | |
# from utils_qa import postprocess_qa_predictions # 確保 utils_qa.py 在 Space 的環境中可用 | |
# --- Gradio 界面函數 --- | |
def two_stage_qa(question, candidate_paragraphs_str, max_seq_len_mc=512, max_seq_len_qa=384, doc_stride_qa=128, n_best_size=20, max_answer_length=100): | |
if not models_loaded_successfully: | |
return f"錯誤: {error_message}", "N/A", "N/A" | |
if not question.strip() or not candidate_paragraphs_str.strip(): | |
return "錯誤: 問題和候選段落不能為空。", "N/A", "N/A" | |
# 階段一 | |
selected_paragraph, selected_idx = select_relevant_paragraph_gradio( | |
question, candidate_paragraphs_str, selector_model, tokenizer, device, max_seq_len_mc | |
) | |
if selected_idx == -1: # 段落選擇出錯 | |
return f"段落選擇出錯: {selected_paragraph}", "N/A", selected_paragraph | |
# 階段二 | |
# 準備 QA 特徵 | |
qa_features_dataset = prepare_features_for_qa_inference_gradio( | |
"temp_id", question, selected_paragraph, tokenizer, max_seq_len_qa, doc_stride_qa | |
) | |
if len(qa_features_dataset) == 0: | |
return "錯誤: 無法為選定段落生成QA特徵 (可能段落太短或內容問題)。", f"選中的段落 (索引 {selected_idx}):\n{selected_paragraph}", "N/A" | |
# 創建 DataLoader | |
from transformers import default_data_collator # 需要導入 | |
qa_dataloader = DataLoader( | |
qa_features_dataset, collate_fn=default_data_collator, batch_size=8 # batch_size可以小一些 | |
) | |
all_start_logits = [] | |
all_end_logits = [] | |
for batch in qa_dataloader: | |
batch = {k: v.to(device) for k, v in batch.items()} | |
with torch.no_grad(): | |
outputs_qa = qa_model(**batch) | |
all_start_logits.append(outputs_qa.start_logits.cpu().numpy()) | |
all_end_logits.append(outputs_qa.end_logits.cpu().numpy()) | |
if not all_start_logits: | |
return "錯誤: QA模型沒有產生logits。", f"選中的段落 (索引 {selected_idx}):\n{selected_paragraph}", "N/A" | |
start_logits_np = np.concatenate(all_start_logits, axis=0) | |
end_logits_np = np.concatenate(all_end_logits, axis=0) | |
# 為了 postprocess_qa_predictions,我們需要原始的 example 數據 | |
# 它期望一個包含 "answers" 字段的 Dataset | |
def add_empty_answers(example): | |
example["answers"] = {"text": [], "answer_start": []} | |
return example | |
# temp_dataset 用於 postprocessing | |
original_example_for_postproc = {"id": ["temp_id"], "question": [question], "context": [selected_paragraph]} | |
original_dataset_for_postproc = Dataset.from_dict(original_example_for_postproc).map(add_empty_answers) | |
# 後處理 | |
# 確保 postprocess_qa_predictions 可用 | |
predictions_dict = postprocess_qa_predictions( | |
examples=original_dataset_for_postproc, # 原始的、包含 context 和空 answers 的 Dataset | |
features=qa_features_dataset, # 包含 offset_mapping 和 example_id 的 Dataset | |
predictions=(start_logits_np, end_logits_np), | |
version_2_with_negative=False, | |
n_best_size=n_best_size, | |
max_answer_length=max_answer_length, | |
null_score_diff_threshold=0.0, | |
output_dir=None, | |
prefix="gradio_predict", | |
is_world_process_zero=True | |
) | |
final_answer = predictions_dict.get("temp_id", "未能提取答案。") | |
return final_answer, f"選中的段落 (索引 {selected_idx}):\n{selected_paragraph}", predictions_dict | |
# --- 創建 Gradio 界面 --- | |
# 定義預設的問題和段落內容 | |
DEFAULT_QUESTION = "世界最高峰是什麼?" | |
DEFAULT_PARAGRAPHS = ( | |
"珠穆朗瑪峰是喜馬拉雅山脈的主峰,位於中國與尼泊爾邊界上,是世界海拔最高的山峰。\n" | |
"喬戈里峰,又稱K2,是喀喇崑崙山脈的主峰,海拔8611米,是世界第二高峰,位於中國與巴基斯坦邊界。\n" | |
"干城章嘉峰位於喜馬拉雅山脈中段尼泊爾和印度邊界線上,海拔8586米,為世界第三高峰。\n" | |
"洛子峰,海拔8516米,為世界第四高峰,位於珠穆朗瑪峰以南約3公里處,同屬喜馬拉雅山脈。" | |
) | |
iface = gr.Interface( | |
fn=two_stage_qa, # 您的兩階段問答處理函數 | |
inputs=[ | |
gr.Textbox( | |
lines=2, | |
placeholder="輸入您的問題...", | |
label="問題 (Question)", | |
value=DEFAULT_QUESTION # <--- 為問題設置預設值 | |
), | |
gr.Textbox( | |
lines=10, | |
placeholder="在此處輸入候選段落,每段一行...", | |
label="候選段落 (Candidate Paragraphs - One per line)", | |
value=DEFAULT_PARAGRAPHS # <--- 為段落設置預設值 | |
) | |
], | |
outputs=[ | |
gr.Textbox(label="預測答案 (Predicted Answer)"), | |
gr.Textbox(label="選中的相關段落 (Selected Relevant Paragraph)"), | |
gr.JSON(label="原始預測字典 (Raw Predictions Dict - for debugging)") | |
], | |
title="兩階段中文抽取式問答系統", | |
description="輸入一個問題和多個候選段落(每行一個段落)。系統會先選擇最相關的段落,然後從中抽取答案。", | |
allow_flagging="never" # 或者您希望的標記設置 | |
) | |
if __name__ == "__main__": | |
if models_loaded_successfully: # 確保模型已加載才啟動 | |
iface.launch() | |
else: | |
print(f"Gradio 應用無法啟動,因為模型加載失敗: {error_message if 'error_message' in locals() else '未知錯誤'}") | |