Spaces:
Running
Running
import gradio as gr | |
from transformers import AutoConfig | |
from typing import Dict, Any, Tuple, Optional | |
import math | |
def get_model_config(model_id: str) -> AutoConfig: | |
"""获取模型配置信息""" | |
try: | |
# 使用transformers的AutoConfig,更加可靠 | |
config = AutoConfig.from_pretrained( | |
model_id, | |
trust_remote_code=True, # 支持自定义模型 | |
revision="main" | |
) | |
return config | |
except Exception as e: | |
raise Exception(f"无法获取模型配置: {str(e)}") | |
def analyze_attention_mechanism(config: AutoConfig) -> Dict[str, Any]: | |
"""分析注意力机制类型""" | |
model_type = getattr(config, "model_type", "").lower() | |
architecture = getattr(config, "architectures", []) | |
# 检测各种优化技术 | |
attention_info = { | |
"uses_gqa": False, | |
"uses_mla": False, | |
"uses_sliding_window": False, | |
"attention_type": "Multi-Head Attention (MHA)" | |
} | |
# 检测GQA (Grouped Query Attention) | |
num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0)) | |
num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads) | |
if num_key_value_heads < num_attention_heads and num_key_value_heads > 0: | |
attention_info["uses_gqa"] = True | |
attention_info["attention_type"] = "Grouped Query Attention (GQA)" | |
# 检测MLA (Multi-head Latent Attention) - 主要在DeepSeek-V2等模型中 | |
if "deepseek" in model_type or any("deepseek" in str(arch).lower() for arch in architecture): | |
if hasattr(config, "kv_lora_rank") or hasattr(config, "q_lora_rank"): | |
attention_info["uses_mla"] = True | |
attention_info["attention_type"] = "Multi-head Latent Attention (MLA)" | |
# 检测滑动窗口注意力 | |
if hasattr(config, "sliding_window") or hasattr(config, "attention_window_size"): | |
attention_info["uses_sliding_window"] = True | |
# 特殊模型类型检测 | |
if "llama" in model_type: | |
attention_info["attention_type"] = "RoPE + GQA" if attention_info["uses_gqa"] else "RoPE + MHA" | |
elif "mistral" in model_type: | |
attention_info["attention_type"] = "Sliding Window + GQA" if attention_info["uses_gqa"] else "Sliding Window + MHA" | |
elif "qwen" in model_type: | |
attention_info["attention_type"] = "QWen Attention (GQA)" if attention_info["uses_gqa"] else "QWen Attention" | |
return attention_info | |
def calculate_kv_cache_size(config: AutoConfig, sequence_length: int = 2048, batch_size: int = 1) -> Dict[str, Any]: | |
"""计算KV cache大小""" | |
# 获取基本参数,兼容不同的参数名 | |
num_layers = getattr(config, "num_hidden_layers", getattr(config, "n_layer", getattr(config, "num_layers", 0))) | |
num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0)) | |
num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads) | |
hidden_size = getattr(config, "hidden_size", getattr(config, "n_embd", getattr(config, "d_model", 0))) | |
# 计算head dimension | |
head_dim = hidden_size // num_attention_heads if num_attention_heads > 0 else 0 | |
# 如果是MLA,需要特殊处理 | |
kv_lora_rank = getattr(config, "kv_lora_rank", 0) | |
if kv_lora_rank > 0: # MLA架构 | |
# MLA中KV的维度被压缩 | |
effective_kv_dim = kv_lora_rank | |
else: | |
effective_kv_dim = head_dim * num_key_value_heads | |
# 计算每个token的KV cache大小 (Key + Value) | |
# 使用FP16 (2 bytes per element) | |
bytes_per_element = 2 | |
kv_size_per_token_per_layer = 2 * effective_kv_dim * bytes_per_element # K + V | |
# 总的KV cache大小 | |
total_kv_cache_bytes = kv_size_per_token_per_layer * num_layers * sequence_length * batch_size | |
# 转换为更友好的单位 | |
def format_bytes(bytes_val): | |
if bytes_val < 1024: | |
return f"{bytes_val} B" | |
elif bytes_val < 1024**2: | |
return f"{bytes_val/1024:.2f} KB" | |
elif bytes_val < 1024**3: | |
return f"{bytes_val/(1024**2):.2f} MB" | |
else: | |
return f"{bytes_val/(1024**3):.2f} GB" | |
return { | |
"num_layers": num_layers, | |
"num_attention_heads": num_attention_heads, | |
"num_key_value_heads": num_key_value_heads, | |
"head_dim": head_dim, | |
"hidden_size": hidden_size, | |
"effective_kv_dim": effective_kv_dim, | |
"kv_size_per_token": format_bytes(kv_size_per_token_per_layer * num_layers), | |
"total_kv_cache": format_bytes(total_kv_cache_bytes), | |
"total_kv_cache_bytes": total_kv_cache_bytes, | |
"kv_lora_rank": kv_lora_rank | |
} | |
def analyze_model(model_id: str, sequence_length: int = 2048, batch_size: int = 1) -> str: | |
"""分析模型并返回结果""" | |
try: | |
# 获取模型配置 | |
config = get_model_config(model_id) | |
# 分析注意力机制 | |
attention_info = analyze_attention_mechanism(config) | |
# 计算KV cache大小 | |
kv_info = calculate_kv_cache_size(config, sequence_length, batch_size) | |
# 格式化输出 | |
result = f""" | |
## 模型信息分析 - {model_id} | |
### 基本参数 | |
- **模型类型**: {getattr(config, 'model_type', 'Unknown')} | |
- **层数**: {kv_info['num_layers']} | |
- **隐藏层大小**: {kv_info['hidden_size']} | |
- **注意力头数**: {kv_info['num_attention_heads']} | |
- **KV头数**: {kv_info['num_key_value_heads']} | |
- **每个头的维度**: {kv_info['head_dim']} | |
### 注意力机制优化 | |
- **注意力类型**: {attention_info['attention_type']} | |
- **使用GQA**: {'✅ 是' if attention_info['uses_gqa'] else '❌ 否'} | |
- **使用MLA**: {'✅ 是' if attention_info['uses_mla'] else '❌ 否'} | |
- **滑动窗口**: {'✅ 是' if attention_info['uses_sliding_window'] else '❌ 否'} | |
### KV Cache 存储分析 | |
- **序列长度**: {sequence_length} | |
- **批量大小**: {batch_size} | |
- **有效KV维度**: {kv_info['effective_kv_dim']} | |
- **每个token的KV存储**: {kv_info['kv_size_per_token']} | |
- **总KV Cache大小**: {kv_info['total_kv_cache']} | |
### 优化效果分析 | |
""" | |
# 计算GQA的内存节省 | |
if attention_info['uses_gqa']: | |
original_kv_heads = kv_info['num_attention_heads'] | |
actual_kv_heads = kv_info['num_key_value_heads'] | |
memory_reduction = (1 - actual_kv_heads / original_kv_heads) * 100 | |
result += f"- **GQA内存节省**: {memory_reduction:.1f}% (KV头数从{original_kv_heads}减少到{actual_kv_heads})\n" | |
# MLA的特殊说明 | |
if attention_info['uses_mla']: | |
result += f"- **MLA压缩**: KV维度被压缩到{kv_info['kv_lora_rank']}维\n" | |
# 内存使用建议 | |
total_gb = kv_info['total_kv_cache_bytes'] / (1024**3) | |
if total_gb > 8: | |
result += f"\n⚠️ **内存警告**: KV Cache需要{total_gb:.2f}GB内存,建议使用高端GPU" | |
elif total_gb > 4: | |
result += f"\n💡 **内存提示**: KV Cache需要{total_gb:.2f}GB内存,中等配置可运行" | |
else: | |
result += f"\n✅ **内存友好**: KV Cache仅需{total_gb:.2f}GB内存" | |
return result | |
except Exception as e: | |
return f"❌ 分析失败: {str(e)}" | |
# 创建Gradio界面 | |
def create_interface(): | |
with gr.Blocks(title="Hugging Face模型KV Cache分析器", theme=gr.themes.Soft()) as iface: | |
gr.Markdown("# 🤗 Hugging Face模型KV Cache分析器") | |
gr.Markdown("输入模型ID来分析其KV cache大小和注意力机制优化情况") | |
with gr.Row(): | |
with gr.Column(scale=3): | |
model_input = gr.Textbox( | |
label="模型ID", | |
placeholder="例如: microsoft/DialoGPT-medium, meta-llama/Llama-2-7b-hf", | |
value="microsoft/DialoGPT-medium" | |
) | |
with gr.Column(scale=1): | |
seq_len_input = gr.Number( | |
label="序列长度", | |
value=2048, | |
minimum=1, | |
maximum=131072 | |
) | |
with gr.Column(scale=1): | |
batch_size_input = gr.Number( | |
label="批量大小", | |
value=1, | |
minimum=1, | |
maximum=128 | |
) | |
analyze_btn = gr.Button("🔍 分析模型", variant="primary", size="lg") | |
output = gr.Markdown(label="分析结果") | |
# 添加一些示例模型 | |
gr.Markdown("### 💡 热门模型示例") | |
example_models = [ | |
["deepseek-ai/DeepSeek-V3-0324", 32768, 1], | |
["Qwen/Qwen3-8B", 32768, 1], | |
] | |
gr.Examples( | |
examples=example_models, | |
inputs=[model_input, seq_len_input, batch_size_input], | |
outputs=output, | |
fn=analyze_model, | |
cache_examples=False | |
) | |
analyze_btn.click( | |
fn=analyze_model, | |
inputs=[model_input, seq_len_input, batch_size_input], | |
outputs=output | |
) | |
gr.Markdown(""" | |
### 📖 说明 | |
- **GQA**: Grouped Query Attention,通过减少KV头数来节省内存 | |
- **MLA**: Multi-head Latent Attention,通过低秩分解压缩KV cache | |
- **滑动窗口**: 限制注意力范围来减少计算和内存使用 | |
- KV Cache大小计算基于FP16精度 (每个元素2字节) | |
- 使用 `transformers.AutoConfig` 获取配置,支持自定义模型 | |
### 🛠️ 安装依赖 | |
```bash | |
pip install gradio transformers torch | |
``` | |
""") | |
return iface | |
if __name__ == "__main__": | |
app = create_interface() | |
app.launch(share=True) |