kebeliu's picture
Update app.py
2c6b761 verified
import gradio as gr
from transformers import AutoConfig
from typing import Dict, Any, Tuple, Optional
import math
def get_model_config(model_id: str) -> AutoConfig:
"""获取模型配置信息"""
try:
# 使用transformers的AutoConfig,更加可靠
config = AutoConfig.from_pretrained(
model_id,
trust_remote_code=True, # 支持自定义模型
revision="main"
)
return config
except Exception as e:
raise Exception(f"无法获取模型配置: {str(e)}")
def analyze_attention_mechanism(config: AutoConfig) -> Dict[str, Any]:
"""分析注意力机制类型"""
model_type = getattr(config, "model_type", "").lower()
architecture = getattr(config, "architectures", [])
# 检测各种优化技术
attention_info = {
"uses_gqa": False,
"uses_mla": False,
"uses_sliding_window": False,
"attention_type": "Multi-Head Attention (MHA)"
}
# 检测GQA (Grouped Query Attention)
num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0))
num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads)
if num_key_value_heads < num_attention_heads and num_key_value_heads > 0:
attention_info["uses_gqa"] = True
attention_info["attention_type"] = "Grouped Query Attention (GQA)"
# 检测MLA (Multi-head Latent Attention) - 主要在DeepSeek-V2等模型中
if "deepseek" in model_type or any("deepseek" in str(arch).lower() for arch in architecture):
if hasattr(config, "kv_lora_rank") or hasattr(config, "q_lora_rank"):
attention_info["uses_mla"] = True
attention_info["attention_type"] = "Multi-head Latent Attention (MLA)"
# 检测滑动窗口注意力
if hasattr(config, "sliding_window") or hasattr(config, "attention_window_size"):
attention_info["uses_sliding_window"] = True
# 特殊模型类型检测
if "llama" in model_type:
attention_info["attention_type"] = "RoPE + GQA" if attention_info["uses_gqa"] else "RoPE + MHA"
elif "mistral" in model_type:
attention_info["attention_type"] = "Sliding Window + GQA" if attention_info["uses_gqa"] else "Sliding Window + MHA"
elif "qwen" in model_type:
attention_info["attention_type"] = "QWen Attention (GQA)" if attention_info["uses_gqa"] else "QWen Attention"
return attention_info
def calculate_kv_cache_size(config: AutoConfig, sequence_length: int = 2048, batch_size: int = 1) -> Dict[str, Any]:
"""计算KV cache大小"""
# 获取基本参数,兼容不同的参数名
num_layers = getattr(config, "num_hidden_layers", getattr(config, "n_layer", getattr(config, "num_layers", 0)))
num_attention_heads = getattr(config, "num_attention_heads", getattr(config, "n_head", 0))
num_key_value_heads = getattr(config, "num_key_value_heads", num_attention_heads)
hidden_size = getattr(config, "hidden_size", getattr(config, "n_embd", getattr(config, "d_model", 0)))
# 计算head dimension
head_dim = hidden_size // num_attention_heads if num_attention_heads > 0 else 0
# 如果是MLA,需要特殊处理
kv_lora_rank = getattr(config, "kv_lora_rank", 0)
if kv_lora_rank > 0: # MLA架构
# MLA中KV的维度被压缩
effective_kv_dim = kv_lora_rank
else:
effective_kv_dim = head_dim * num_key_value_heads
# 计算每个token的KV cache大小 (Key + Value)
# 使用FP16 (2 bytes per element)
bytes_per_element = 2
kv_size_per_token_per_layer = 2 * effective_kv_dim * bytes_per_element # K + V
# 总的KV cache大小
total_kv_cache_bytes = kv_size_per_token_per_layer * num_layers * sequence_length * batch_size
# 转换为更友好的单位
def format_bytes(bytes_val):
if bytes_val < 1024:
return f"{bytes_val} B"
elif bytes_val < 1024**2:
return f"{bytes_val/1024:.2f} KB"
elif bytes_val < 1024**3:
return f"{bytes_val/(1024**2):.2f} MB"
else:
return f"{bytes_val/(1024**3):.2f} GB"
return {
"num_layers": num_layers,
"num_attention_heads": num_attention_heads,
"num_key_value_heads": num_key_value_heads,
"head_dim": head_dim,
"hidden_size": hidden_size,
"effective_kv_dim": effective_kv_dim,
"kv_size_per_token": format_bytes(kv_size_per_token_per_layer * num_layers),
"total_kv_cache": format_bytes(total_kv_cache_bytes),
"total_kv_cache_bytes": total_kv_cache_bytes,
"kv_lora_rank": kv_lora_rank
}
def analyze_model(model_id: str, sequence_length: int = 2048, batch_size: int = 1) -> str:
"""分析模型并返回结果"""
try:
# 获取模型配置
config = get_model_config(model_id)
# 分析注意力机制
attention_info = analyze_attention_mechanism(config)
# 计算KV cache大小
kv_info = calculate_kv_cache_size(config, sequence_length, batch_size)
# 格式化输出
result = f"""
## 模型信息分析 - {model_id}
### 基本参数
- **模型类型**: {getattr(config, 'model_type', 'Unknown')}
- **层数**: {kv_info['num_layers']}
- **隐藏层大小**: {kv_info['hidden_size']}
- **注意力头数**: {kv_info['num_attention_heads']}
- **KV头数**: {kv_info['num_key_value_heads']}
- **每个头的维度**: {kv_info['head_dim']}
### 注意力机制优化
- **注意力类型**: {attention_info['attention_type']}
- **使用GQA**: {'✅ 是' if attention_info['uses_gqa'] else '❌ 否'}
- **使用MLA**: {'✅ 是' if attention_info['uses_mla'] else '❌ 否'}
- **滑动窗口**: {'✅ 是' if attention_info['uses_sliding_window'] else '❌ 否'}
### KV Cache 存储分析
- **序列长度**: {sequence_length}
- **批量大小**: {batch_size}
- **有效KV维度**: {kv_info['effective_kv_dim']}
- **每个token的KV存储**: {kv_info['kv_size_per_token']}
- **总KV Cache大小**: {kv_info['total_kv_cache']}
### 优化效果分析
"""
# 计算GQA的内存节省
if attention_info['uses_gqa']:
original_kv_heads = kv_info['num_attention_heads']
actual_kv_heads = kv_info['num_key_value_heads']
memory_reduction = (1 - actual_kv_heads / original_kv_heads) * 100
result += f"- **GQA内存节省**: {memory_reduction:.1f}% (KV头数从{original_kv_heads}减少到{actual_kv_heads})\n"
# MLA的特殊说明
if attention_info['uses_mla']:
result += f"- **MLA压缩**: KV维度被压缩到{kv_info['kv_lora_rank']}维\n"
# 内存使用建议
total_gb = kv_info['total_kv_cache_bytes'] / (1024**3)
if total_gb > 8:
result += f"\n⚠️ **内存警告**: KV Cache需要{total_gb:.2f}GB内存,建议使用高端GPU"
elif total_gb > 4:
result += f"\n💡 **内存提示**: KV Cache需要{total_gb:.2f}GB内存,中等配置可运行"
else:
result += f"\n✅ **内存友好**: KV Cache仅需{total_gb:.2f}GB内存"
return result
except Exception as e:
return f"❌ 分析失败: {str(e)}"
# 创建Gradio界面
def create_interface():
with gr.Blocks(title="Hugging Face模型KV Cache分析器", theme=gr.themes.Soft()) as iface:
gr.Markdown("# 🤗 Hugging Face模型KV Cache分析器")
gr.Markdown("输入模型ID来分析其KV cache大小和注意力机制优化情况")
with gr.Row():
with gr.Column(scale=3):
model_input = gr.Textbox(
label="模型ID",
placeholder="例如: microsoft/DialoGPT-medium, meta-llama/Llama-2-7b-hf",
value="microsoft/DialoGPT-medium"
)
with gr.Column(scale=1):
seq_len_input = gr.Number(
label="序列长度",
value=2048,
minimum=1,
maximum=131072
)
with gr.Column(scale=1):
batch_size_input = gr.Number(
label="批量大小",
value=1,
minimum=1,
maximum=128
)
analyze_btn = gr.Button("🔍 分析模型", variant="primary", size="lg")
output = gr.Markdown(label="分析结果")
# 添加一些示例模型
gr.Markdown("### 💡 热门模型示例")
example_models = [
["deepseek-ai/DeepSeek-V3-0324", 32768, 1],
["Qwen/Qwen3-8B", 32768, 1],
]
gr.Examples(
examples=example_models,
inputs=[model_input, seq_len_input, batch_size_input],
outputs=output,
fn=analyze_model,
cache_examples=False
)
analyze_btn.click(
fn=analyze_model,
inputs=[model_input, seq_len_input, batch_size_input],
outputs=output
)
gr.Markdown("""
### 📖 说明
- **GQA**: Grouped Query Attention,通过减少KV头数来节省内存
- **MLA**: Multi-head Latent Attention,通过低秩分解压缩KV cache
- **滑动窗口**: 限制注意力范围来减少计算和内存使用
- KV Cache大小计算基于FP16精度 (每个元素2字节)
- 使用 `transformers.AutoConfig` 获取配置,支持自定义模型
### 🛠️ 安装依赖
```bash
pip install gradio transformers torch
```
""")
return iface
if __name__ == "__main__":
app = create_interface()
app.launch(share=True)