Spaces:
Sleeping
Sleeping
File size: 11,699 Bytes
381a595 1146ff4 4ca6833 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 381a595 1146ff4 381a595 7386b18 1146ff4 7386b18 1146ff4 8686e41 feaac73 1146ff4 feaac73 1146ff4 feaac73 1146ff4 8686e41 1146ff4 381a595 1146ff4 feaac73 381a595 1146ff4 feaac73 381a595 4ca6833 381a595 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
import gradio as gr
import torch
import os
from huggingface_hub import hf_hub_download
from transformers import AutoTokenizer
import torch.nn as nn
import torch.nn.functional as F
"""
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
# ================ 第一步:定义模型结构 ================
class GELU(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x):
return 0.5 * x * (1 + torch.tanh(
torch.sqrt(torch.tensor(2.0 / torch.pi)) *
(x + 0.044715 * torch.pow(x, 3))
))
class FeedForward(nn.Module):
def __init__(self, cfg):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(cfg["emb_dim"], 4 * cfg["emb_dim"]),
GELU(),
nn.Linear(4 * cfg["emb_dim"], cfg["emb_dim"]),
)
def forward(self, x):
return self.layers(x)
class MultiHeadAttention(nn.Module):
def __init__(self, d_in, d_out,
context_length, dropout, num_heads, qkv_bias=False):
super().__init__()
assert (d_out % num_heads == 0), \
"d_out must be divisible by num_heads"
self.d_out = d_out
self.num_heads = num_heads
self.head_dim = d_out // num_heads
self.W_query = nn.Linear(d_in, d_out, bias=qkv_bias)
self.W_key = nn.Linear(d_in, d_out, bias=qkv_bias)
self.W_value = nn.Linear(d_in, d_out, bias=qkv_bias)
self.out_proj = nn.Linear(d_out, d_out)
self.dropout_p = dropout
def forward(self, x):
b, num_tokens, d_in = x.shape
keys = self.W_key(x)
queries = self.W_query(x)
values = self.W_value(x)
# Transpose into [B, num_heads, num_tokens, head_dim] for SDPA
keys = keys.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)
values = values.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)
queries = queries.view(b, num_tokens, self.num_heads, self.head_dim).transpose(1, 2)
# Use F.scaled_dot_product_attention
context_vec = F.scaled_dot_product_attention(
queries, keys, values,
attn_mask=None,
dropout_p=self.dropout_p if self.training else 0.0,
is_causal=True
)
# Transpose back to [B, num_tokens, num_heads * head_dim] = [B, T, d_out]
context_vec = context_vec.transpose(1, 2).contiguous().view(b, num_tokens, self.d_out)
# Apply output projection
context_vec = self.out_proj(context_vec)
return context_vec
class LayerNorm(nn.Module):
def __init__(self, emb_dim):
super().__init__()
self.eps = 1e-5
self.scale = nn.Parameter(torch.ones(emb_dim))
self.shift = nn.Parameter(torch.zeros(emb_dim))
def forward(self, x):
mean = x.mean(dim=-1, keepdim=True)
var = x.var(dim=-1, keepdim=True, unbiased=False)
norm_x = (x - mean) / torch.sqrt(var + self.eps)
return self.scale * norm_x + self.shift
class TransformerBlock(nn.Module):
def __init__(self, cfg):
super().__init__()
self.att = MultiHeadAttention(
d_in=cfg["emb_dim"],
d_out=cfg["emb_dim"],
context_length=cfg["context_length"],
num_heads=cfg["n_heads"],
dropout=cfg["drop_rate"],
qkv_bias=cfg["qkv_bias"])
self.ff = FeedForward(cfg)
self.norm1 = LayerNorm(cfg["emb_dim"])
self.norm2 = LayerNorm(cfg["emb_dim"])
self.drop_shortcut = nn.Dropout(cfg["drop_rate"])
def forward(self, x):
shortcut = x
x = self.norm1(x)
x = self.att(x)
x = self.drop_shortcut(x)
x = x + shortcut
shortcut = x
x = self.norm2(x)
x = self.ff(x)
x = self.drop_shortcut(x)
x = x + shortcut
return x
class GPTModel(nn.Module):
def __init__(self, cfg):
super().__init__()
self.tok_emb = nn.Embedding(cfg["vocab_size"], cfg["emb_dim"])
self.pos_emb = nn.Embedding(cfg["context_length"], cfg["emb_dim"])
self.drop_emb = nn.Dropout(cfg["drop_rate"])
self.trf_blocks = nn.Sequential(
*[TransformerBlock(cfg) for _ in range(cfg["n_layers"])])
self.final_norm = LayerNorm(cfg["emb_dim"])
self.out_head = nn.Linear(
cfg["emb_dim"], cfg["vocab_size"], bias=False
)
def forward(self, in_idx):
batch_size, seq_len = in_idx.shape
tok_embeds = self.tok_emb(in_idx)
pos_embeds = self.pos_emb(
torch.arange(seq_len, device=in_idx.device)
)
x = tok_embeds + pos_embeds
x = self.drop_emb(x)
x = self.trf_blocks(x)
x = self.final_norm(x)
logits = self.out_head(x)
return logits
# ================ 第二步:定义文本生成函数 ================
def generate_text_simple(model, idx, max_new_tokens, context_size, temperature=1.0, top_k=None):
"""
使用 top_k 采样和温度缩放的文本生成函数
参数:
model: 语言模型
idx: 输入序列的 token ID
max_new_tokens: 要生成的最大新 token 数量
context_size: 上下文窗口大小
temperature: 温度参数,控制采样的随机性(越高越随机)
top_k: 只考虑概率最高的 top_k 个 token,如果为 None 或 0 则考虑所有 token
返回:
扩展后的 token ID 序列
"""
device = idx.device
for _ in range(max_new_tokens):
# 获取当前上下文
idx_cond = idx[:, -context_size:]
with torch.no_grad():
# 获取模型预测的下一个 token 的 logits
logits = model(idx_cond)
# 只关心最后一个位置的预测
logits = logits[:, -1, :]
# 应用温度缩放
if temperature > 0:
logits = logits / temperature
# 应用 top_k 过滤
if top_k is not None and top_k > 0:
# 获取前 k 个最大值
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
# 设置阈值为第 k 个最大值
threshold = v[..., [-1]]
# 将阈值以下的值设为 -inf
logits = torch.where(logits < threshold,
torch.full_like(logits, float('-inf')),
logits)
# 应用 softmax 转换为概率
probs = torch.softmax(logits, dim=-1)
# 根据概率分布采样
if temperature > 0:
# 随机采样
idx_next = torch.multinomial(probs, num_samples=1)
else:
# 如果温度为 0,则取最大概率的 token(等同于 argmax)
idx_next = torch.argmax(probs, dim=-1, keepdim=True)
# 将新生成的 token 添加到序列中
idx = torch.cat((idx, idx_next), dim=1)
return idx
def text_to_token_ids(text, tokenizer):
encoded = tokenizer.encode(text)
encoded_tensor = torch.tensor(encoded).unsqueeze(0)
return encoded_tensor
def token_ids_to_text(token_ids, tokenizer):
flat = token_ids.squeeze(0)
return tokenizer.decode(flat.tolist(), skip_special_tokens=True)
# ================ 第三步:设置模型加载和推理 ================
# 模型 ID
model_id = "xingyu1996/tiger-gpt2"
# 从 Hugging Face Hub 下载模型权重文件
def load_model_from_hub():
print("开始从 Hugging Face Hub 下载模型权重...")
# 下载 pytorch_model.bin 文件
model_file = hf_hub_download(model_id, "pytorch_model.bin")
print(f"模型权重文件下载完成:{model_file}")
# 下载 config.json 文件
config_file = hf_hub_download(model_id, "config.json")
print(f"配置文件下载完成:{config_file}")
# 加载权重
state_dict = torch.load(model_file, map_location="cpu")
# 加载配置
import json
with open(config_file, 'r') as f:
config = json.load(f)
# 将 Hugging Face 格式的配置转换为我们的格式
my_config = {
"vocab_size": config.get("vocab_size", 50257),
"context_length": config.get("n_positions", 512),
"emb_dim": config.get("n_embd", 768),
"n_heads": config.get("n_head", 12),
"n_layers": config.get("n_layer", 12),
"drop_rate": config.get("resid_pdrop", 0.1),
"qkv_bias": config.get("qkv_bias", False),
}
# 创建模型
model = GPTModel(my_config)
# 检查状态字典中是否有 _orig_mod. 前缀
if any(k.startswith('_orig_mod.') for k in state_dict.keys()):
state_dict = {k.replace('_orig_mod.', ''): v for k, v in state_dict.items()}
print("已去除权重中的 _orig_mod. 前缀")
# 加载权重
try:
model.load_state_dict(state_dict)
print("模型权重加载成功!")
except Exception as e:
print(f"模型权重加载失败: {e}")
# 尝试加载部分权重
model.load_state_dict(state_dict, strict=False)
print("模型已使用非严格模式加载权重,可能有部分参数没有加载。")
model.eval() # 设置为评估模式
return model, my_config
# 加载模型和分词器
print("正在初始化...")
model, config = load_model_from_hub()
tokenizer = AutoTokenizer.from_pretrained("gpt2")
print("模型和分词器加载完成!")
# ================ 第四步:设置 Gradio 接口 ================
def respond(message, history, max_tokens, temperature, top_k):
input_ids = text_to_token_ids(message, tokenizer).to("cpu") # Hugging Face Space 可能没有 GPU
context_size = config["context_length"]
try:
# 生成文本
output_ids = generate_text_simple(
model=model,
idx=input_ids,
max_new_tokens=max_tokens,
context_size=context_size,
temperature=temperature,
top_k=top_k
)
# 解码生成的文本
full_text = token_ids_to_text(output_ids, tokenizer)
# 分离提示和生成部分
if message in full_text:
generated = full_text[len(message):]
else:
generated = full_text
return generated
except Exception as e:
print(f"生成过程中出错: {type(e).__name__} - {e}")
return f"抱歉,生成文本时出错: {type(e).__name__}"
# 创建 Gradio 界面
demo = gr.ChatInterface(
respond,
additional_inputs=[
gr.Slider(minimum=1, maximum=100, value=30, step=1, label="生成长度"),
gr.Slider(minimum=0.0, maximum=2.0, value=0.7, step=0.1, label="温度 (0.0 表示无随机性)"),
gr.Slider(minimum=0, maximum=100, value=50, step=1, label="Top-K (0 表示不限制)"),
],
title=f"Tiger-GPT2 推理测试",
description="""输入中文文本,模型将生成后续内容。此演示直接加载了原始模型权重,与本地推理行为一致。
**参数说明**:
- **生成长度**: 要生成的最大token数量
- **温度**: 控制生成随机性,值越高越随机,值为0时始终选择最可能的词
- **Top-K**: 只从概率最高的K个词中选择下一个词,设为0则考虑所有词
""",
)
if __name__ == "__main__":
demo.launch() |