import gradio as gr import requests import json from transformers import AutoConfig import math from typing import Dict, Tuple, Optional class LLMMemoryCalculator: def __init__(self): self.precision_bytes = { 'fp32': 4, 'fp16': 2, 'bf16': 2, 'int8': 1, 'int4': 0.5 } # ------------------------------------------------- # 📥 基础工具 # ------------------------------------------------- def get_model_config(self, model_id: str) -> Dict: """获取模型配置""" try: config = AutoConfig.from_pretrained(model_id, trust_remote_code=True) return config except Exception as e: raise Exception(f"无法获取模型配置: {str(e)}") def get_file_size_from_url(self, model_id: str, filename: str) -> int: """通过 HEAD 请求获取文件大小(备用)""" try: url = f"https://huggingface.co/{model_id}/resolve/main/{filename}" response = requests.head(url, timeout=10) if response.status_code == 200: content_length = response.headers.get('Content-Length') if content_length: return int(content_length) return 0 except: return 0 # ------------------------------------------------- # 📦 获取模型权重大小 # ------------------------------------------------- def get_model_size_from_hf(self, model_id: str) -> Tuple[float, str]: """优先使用 *.index.json 中的 metadata.total_size,回退到文件列表/HEAD""" try: # 1️⃣ 尝试读取 index.json(safetensors > pytorch) for index_name, tag in [ ("model.safetensors.index.json", "safetensors_index"), ("pytorch_model.bin.index.json", "pytorch_index") ]: url = f"https://huggingface.co/{model_id}/resolve/main/{index_name}" resp = requests.get(url, timeout=10) if resp.status_code == 200: try: data = resp.json() except ValueError: # 某些仓库 index.json 以文本形式存储,需要手动解析 data = json.loads(resp.text) total_bytes = data.get("metadata", {}).get("total_size", 0) if total_bytes > 0: return total_bytes / (1024 ** 3), tag # 2️⃣ 调用 Hub API,尝试直接读取 size 字段 api_url = f"https://huggingface.co/api/models/{model_id}" response = requests.get(api_url, timeout=10) if response.status_code != 200: raise Exception(f"API请求失败: {response.status_code}") model_info = response.json() # 2a. 查找 siblings 列表中带 size 的 .safetensors 文件 safetensors_files = [f for f in model_info.get('siblings', []) if f['rfilename'].endswith('.safetensors') and 'size' in f] if safetensors_files: total_size = sum(f['size'] for f in safetensors_files) return total_size / (1024 ** 3), "safetensors_files" # 2b. 使用 HEAD 请求补全未包含 size 的 .safetensors 文件 safetensors_no_size = [f for f in model_info.get('siblings', []) if f['rfilename'].endswith('.safetensors')] if safetensors_no_size: total_size = 0 for f in safetensors_no_size: total_size += self.get_file_size_from_url(model_id, f['rfilename']) if total_size > 0: return total_size / (1024 ** 3), "safetensors_head" # 2c. 同理处理 pytorch_model-xxxxx.bin pytorch_files = [f for f in model_info.get('siblings', []) if f['rfilename'].endswith('.bin') and 'size' in f] if pytorch_files: total_size = sum(f['size'] for f in pytorch_files) return total_size / (1024 ** 3), "pytorch_files" pytorch_no_size = [f for f in model_info.get('siblings', []) if f['rfilename'].endswith('.bin')] if pytorch_no_size: total_size = 0 for f in pytorch_no_size: total_size += self.get_file_size_from_url(model_id, f['rfilename']) if total_size > 0: return total_size / (1024 ** 3), "pytorch_head" # 3️⃣ 如果仍然无法确定大小,走估算逻辑 raise Exception("未找到权重大小信息") except Exception: # 估算 return self.estimate_model_size_from_config(model_id) # ------------------------------------------------- # 📐 估算逻辑(与原始保持一致) # ------------------------------------------------- def estimate_model_size_from_config(self, model_id: str) -> Tuple[float, str]: """根据 config.json 估算模型大小(FP16)""" try: config = self.get_model_config(model_id) vocab_size = getattr(config, 'vocab_size', 50000) hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096)) num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32)) intermediate_size = getattr(config, 'intermediate_size', hidden_size * 4) # Embedding embedding_params = vocab_size * hidden_size # Transformer layer attention_params = 4 * hidden_size * hidden_size ffn_params = 2 * hidden_size * intermediate_size ln_params = 2 * hidden_size params_per_layer = attention_params + ffn_params + ln_params total_params = embedding_params + num_layers * params_per_layer if hasattr(config, 'tie_word_embeddings') and not config.tie_word_embeddings: total_params += vocab_size * hidden_size model_size_gb = (total_params * 2) / (1024 ** 3) # 默认 fp16 return model_size_gb, "estimated" except Exception as e: raise Exception(f"无法估算模型大小: {str(e)}") # ------------------------------------------------- # 🗄️ KV Cache 计算(原逻辑保持) # ------------------------------------------------- def calculate_kv_cache_size(self, config, context_length: int, batch_size: int = 1) -> Dict[str, float]: try: num_layers = getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 32)) hidden_size = getattr(config, 'hidden_size', getattr(config, 'd_model', 4096)) num_attention_heads = getattr(config, 'num_attention_heads', getattr(config, 'num_heads', 32)) num_key_value_heads = getattr(config, 'num_key_value_heads', num_attention_heads) is_mla = hasattr(config, 'kv_lora_rank') and config.kv_lora_rank is not None head_dim = hidden_size // num_attention_heads if is_mla: kv_lora_rank = getattr(config, 'kv_lora_rank', 512) kv_cache_per_token = kv_lora_rank * 2 attention_type = "MLA" elif num_key_value_heads < num_attention_heads: kv_cache_per_token = num_key_value_heads * head_dim * 2 attention_type = "GQA" else: kv_cache_per_token = num_attention_heads * head_dim * 2 attention_type = "MHA" total_kv_cache = (kv_cache_per_token * context_length * num_layers * batch_size * 2) / (1024 ** 3) return { 'size_gb': total_kv_cache, 'attention_type': attention_type, 'num_kv_heads': num_key_value_heads, 'num_attention_heads': num_attention_heads, 'head_dim': head_dim } except Exception as e: raise Exception(f"计算KV Cache失败: {str(e)}") # ------------------------------------------------- # 🧮 综合内存需求计算(保持不变) # ------------------------------------------------- def calculate_memory_requirements(self, model_id: str, gpu_memory_gb: float, num_gpus: int, context_length: int, utilization_rate: float = 0.9) -> Dict: try: config = self.get_model_config(model_id) model_size_gb, size_source = self.get_model_size_from_hf(model_id) kv_info = self.calculate_kv_cache_size(config, context_length) available_memory = gpu_memory_gb * num_gpus * utilization_rate other_overhead = model_size_gb * 0.1 total_memory_needed = model_size_gb + kv_info['size_gb'] + other_overhead is_feasible = total_memory_needed <= available_memory memory_margin = available_memory - total_memory_needed memory_per_gpu = total_memory_needed / num_gpus return { 'model_id': model_id, 'model_size_gb': round(model_size_gb, 2), 'size_source': size_source, 'kv_cache_gb': round(kv_info['size_gb'], 2), 'attention_type': kv_info['attention_type'], 'other_overhead_gb': round(other_overhead, 2), 'total_memory_needed_gb': round(total_memory_needed, 2), 'available_memory_gb': round(available_memory, 2), 'memory_margin_gb': round(memory_margin, 2), 'memory_per_gpu_gb': round(memory_per_gpu, 2), 'is_feasible': is_feasible, 'utilization_per_gpu': round((memory_per_gpu / gpu_memory_gb) * 100, 1), 'config_info': { 'num_layers': getattr(config, 'num_hidden_layers', getattr(config, 'num_layers', 'N/A')), 'hidden_size': getattr(config, 'hidden_size', getattr(config, 'd_model', 'N/A')), 'num_attention_heads': kv_info['num_attention_heads'], 'num_kv_heads': kv_info['num_kv_heads'], 'head_dim': kv_info['head_dim'] } } except Exception as e: return {'error': str(e)} # ------------------------------------------------- # 🌟 Gradio 界面构建(保持原逻辑) # ------------------------------------------------- def create_gradio_interface(): calculator = LLMMemoryCalculator() def calculate_memory(model_id, gpu_memory, num_gpus, context_length, utilization_rate): if not model_id.strip(): return "请输入模型ID" try: result = calculator.calculate_memory_requirements( model_id.strip(), float(gpu_memory), int(num_gpus), int(context_length), float(utilization_rate) / 100 ) if 'error' in result: return f"❌ 错误: {result['error']}" status = "✅ 可以运行" if result['is_feasible'] else "❌ 显存不足" output = f""" ## 模型分析结果 **模型**: {result['model_id']} **状态**: {status} ### 📊 内存分析 - **模型大小**: {result['model_size_gb']} GB ({result['size_source']}) - **KV Cache**: {result['kv_cache_gb']} GB - **其他开销**: {result['other_overhead_gb']} GB - **总需求**: {result['total_memory_needed_gb']} GB - **可用显存**: {result['available_memory_gb']} GB - **剩余显存**: {result['memory_margin_gb']} GB ### 🔧 模型配置 - **注意力类型**: {result['attention_type']} - **层数**: {result['config_info']['num_layers']} - **隐藏维度**: {result['config_info']['hidden_size']} - **注意力头数**: {result['config_info']['num_attention_heads']} - **KV头数**: {result['config_info']['num_kv_heads']} - **头维度**: {result['config_info']['head_dim']} ### 💾 GPU使用情况 - **每GPU内存**: {result['memory_per_gpu_gb']} GB - **每GPU利用率**: {result['utilization_per_gpu']}% ### 💡 建议 """ if result['is_feasible']: output += f"✅ 当前配置可以成功运行该模型。剩余 {result['memory_margin_gb']} GB 显存。" else: needed_extra = abs(result['memory_margin_gb']) output += f"❌ 需要额外 {needed_extra} GB 显存才能运行。\n建议:\n- 增加GPU数量\n- 使用更大显存的GPU\n- 减少上下文长度\n- 使用模型量化(如int8/int4)" return output except Exception as e: return f"❌ 计算出错: {str(e)}" with gr.Blocks(title="LLM GPU内存计算器", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🚀 LLM GPU内存需求计算器") gr.Markdown("输入模型信息和硬件配置,计算是否能够成功运行大语言模型") with gr.Row(): with gr.Column(scale=1): gr.Markdown("## 📝 输入参数") model_id = gr.Textbox(label="🤗 Hugging Face 模型ID", placeholder="例如: deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", value="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B") with gr.Row(): gpu_memory = gr.Number(label="💾 单张GPU显存 (GB)", value=24, minimum=1, maximum=1000) num_gpus = gr.Number(label="🔢 GPU数量", value=1, minimum=1, maximum=64, precision=0) with gr.Row(): context_length = gr.Number(label="📏 上下文长度", value=16384, minimum=512, maximum=1000000, precision=0) utilization_rate = gr.Slider(label="⚡ 显存利用率 (%)", minimum=50, maximum=95, value=90, step=5) calculate_btn = gr.Button("🔍 计算内存需求", variant="primary") with gr.Column(scale=2): gr.Markdown("## 📊 计算结果") output = gr.Markdown("点击计算按钮开始分析...") calculate_btn.click(fn=calculate_memory, inputs=[model_id, gpu_memory, num_gpus, context_length, utilization_rate], outputs=output) gr.Markdown(""" ## 📚 使用示例 **小型模型**: `microsoft/DialoGPT-medium` **中型模型**: `microsoft/DialoGPT-large` **大型模型**: `meta-llama/Llama-2-7b-hf` **超大模型**: `meta-llama/Llama-2-13b-hf` 注意:某些模型可能需要申请访问权限。 """) return demo if __name__ == "__main__": demo = create_gradio_interface() demo.launch(share=True, debug=True)