mohanz's picture
big refactor to include original model card
c8ce925
raw
history blame
19.6 kB
import os
import subprocess
import signal
import tempfile
from pathlib import Path
import logging
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import numpy as np
import shutil
HF_TOKEN = os.environ.get("HF_TOKEN")
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
CONVERSION_SCRIPT = "./llama.cpp/convert_hf_to_gguf.py"
log_dir = "/data/logs"
downloads_dir = "/data/downloads"
outputs_dir = "/data/outputs"
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
filename=os.path.join(log_dir, "app.log"),
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def get_llama_cpp_version():
try:
result = subprocess.run(
["git", "-C", "./llama.cpp", "describe", "--tags", "--always"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
version = result.stdout.strip().split("-")[0]
return version
except subprocess.CalledProcessError as e:
logger.error("Error getting llama.cpp version: %s", e.stderr.strip())
return None
def get_repo_namespace(repo_owner: str, username: str, user_orgs: list) -> str:
if repo_owner == "self":
return username
for org in user_orgs:
if org["name"] == repo_owner:
return org["name"]
raise ValueError(f"Invalid repo_owner: {repo_owner}")
def escape(s: str) -> str:
return (
s.replace("&", "&")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("\n", "<br/>")
)
def toggle_repo_owner(export_to_org: bool, oauth_token: gr.OAuthToken | None) -> tuple:
if oauth_token is None or oauth_token.token is None:
raise gr.Error("You must be logged in to use quantize-my-repo")
if not export_to_org:
return gr.update(visible=False, choices=["self"], value="self"), gr.update(
visible=False, value=""
)
info = whoami(oauth_token.token)
orgs = [org["name"] for org in info.get("orgs", [])]
return gr.update(visible=True, choices=["self"] + orgs, value="self"), gr.update(
visible=True
)
def generate_importance_matrix(
model_path: str, train_data_path: str, output_path: str
) -> None:
imatrix_command = [
"./llama.cpp/llama-imatrix",
"-m",
model_path,
"-f",
train_data_path,
"-ngl",
"99",
"--output-frequency",
"10",
"-o",
output_path,
]
if not os.path.isfile(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
logger.info("Running imatrix command...")
process = subprocess.Popen(imatrix_command, shell=False)
try:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
logger.warning(
"Imatrix computation timed out. Sending SIGINT to allow graceful termination..."
)
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.error(
"Imatrix proc still didn't term. Forecfully terming process..."
)
process.kill()
logger.info("Importance matrix generation completed.")
def split_upload_model(
model_path: str,
outdir: str,
repo_id: str,
oauth_token: gr.OAuthToken | None,
split_max_tensors: int = 256,
split_max_size: str | None = None,
org_token: str | None = None,
export_to_org: bool = False,
) -> None:
logger.info("Model path: %s", model_path)
logger.info("Output dir: %s", outdir)
if oauth_token is None or oauth_token.token is None:
raise ValueError("You have to be logged in.")
split_cmd = ["./llama.cpp/llama-gguf-split", "--split"]
if split_max_size:
split_cmd.extend(["--split-max-size", split_max_size])
else:
split_cmd.extend(["--split-max-tensors", str(split_max_tensors)])
model_path_prefix = ".".join(model_path.split(".")[:-1])
split_cmd.extend([model_path, model_path_prefix])
logger.info("Split command: %s", split_cmd)
result = subprocess.run(split_cmd, shell=False, capture_output=True, text=True)
logger.info("Split command stdout: %s", result.stdout)
logger.info("Split command stderr: %s", result.stderr)
if result.returncode != 0:
raise RuntimeError(f"Error splitting the model: {result.stderr}")
logger.info("Model split successfully!")
if os.path.exists(model_path):
os.remove(model_path)
model_file_prefix = model_path_prefix.split("/")[-1]
logger.info("Model file name prefix: %s", model_file_prefix)
sharded_model_files = [
f
for f in os.listdir(outdir)
if f.startswith(model_file_prefix) and f.endswith(".gguf")
]
if not sharded_model_files:
raise RuntimeError("No sharded files found.")
logger.info("Sharded model files: %s", sharded_model_files)
api = HfApi(token=org_token if (export_to_org and org_token) else oauth_token.token)
for file in sharded_model_files:
file_path = os.path.join(outdir, file)
logger.info("Uploading file: %s", file_path)
try:
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=file,
repo_id=repo_id,
)
except Exception as e:
raise RuntimeError(f"Error uploading file {file_path}: {e}") from e
logger.info("Sharded model has been uploaded successfully!")
def get_new_model_card(
original_card: ModelCard,
original_model_id: str,
gguf_files: list,
new_repo_url: str,
split_model: bool,
) -> ModelCard:
version = get_llama_cpp_version()
model_card = original_card.copy()
model_card.data.tags = (model_card.data.tags or []) + [
"antigma",
"quantize-my-repo",
]
# Format the table rows
table_rows = []
for file_info in gguf_files:
name, _, size, method = file_info
if split_model:
display_name = name[:-5]
else:
display_name = f"[{name}]({new_repo_url}/blob/main/{name})"
table_rows.append(f"{display_name}|{method}|{size:.2f} GB|{split_model}")
model_card.text = f"""
*Produced by [Antigma Labs](https://antigma.ai), [Antigma Quantize Space](https://huggingface.co/spaces/Antigma/quantize-my-repo)*
*Follow Antigma Labs in X [https://x.com/antigma_labs](https://x.com/antigma_labs)*
*Antigma's GitHub Homepage [https://github.com/AntigmaLabs](https://github.com/AntigmaLabs)*
## Quantization Format (GGUF)
We use <a href="https://github.com/ggml-org/llama.cpp">llama.cpp</a> release <a href="https://github.com/ggml-org/llama.cpp/releases/tag/{version}">{version}</a> for quantization.
Original model: https://huggingface.co/{original_model_id}
## Download a file (not the whole branch) from below:
| Filename | Quant type | File Size | Split |
| -------- | ---------- | --------- | ----- |
| {'|'.join(table_rows)}
## Original Model Card
{original_card.text}
## Downloading using huggingface-cli
<details>
<summary>Click to view download instructions</summary>
First, make sure you have hugginface-cli installed:
```
pip install -U "huggingface_hub[cli]"
```
Then, you can target the specific file you want:
```
huggingface-cli download {new_repo_url} --include "{gguf_files[0][0]}" --local-dir ./
```
If the model is bigger than 50GB, it will have been split into multiple files. In order to download them all to a local folder, run:
```
huggingface-cli download {new_repo_url} --include "{gguf_files[0][0]}/*" --local-dir ./
```
You can either specify a new local-dir (e.g. deepseek-ai_DeepSeek-V3-0324-Q8_0) or it will be in default hugging face cache
</details>
"""
return model_card
def process_model(
model_id: str,
q_method: str | list,
use_imatrix: bool,
imatrix_q_method: str,
private_repo: bool,
train_data_file: gr.File | None,
split_model: bool,
split_max_tensors: int,
split_max_size: str | None,
export_to_org: bool,
repo_owner: str,
org_token: str | None,
oauth_token: gr.OAuthToken | None,
) -> tuple[str, str]:
if oauth_token is None or oauth_token.token is None:
raise gr.Error("You must be logged in to use quantize-my-repo")
try:
whoami(oauth_token.token)
except Exception as e:
raise gr.Error("You must be logged in to use quantize-my-repo") from e
user_info = whoami(oauth_token.token)
username = user_info["name"]
user_orgs = user_info.get("orgs", [])
if not export_to_org:
repo_owner = "self"
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(
"Time %s, Username %s, Model_ID %s, q_method %s",
current_time,
username,
model_id,
",".join(q_method) if isinstance(q_method, list) else q_method,
)
repo_namespace = get_repo_namespace(repo_owner, username, user_orgs)
model_name = model_id.split("/")[-1]
try:
api_token = org_token if (export_to_org and org_token) else oauth_token.token
api = HfApi(token=api_token)
dl_pattern = ["*.md", "*.json", "*.model"]
pattern = (
"*.safetensors"
if any(
f.path.endswith(".safetensors")
for f in api.list_repo_tree(repo_id=model_id, recursive=True)
)
else "*.bin"
)
dl_pattern.append(pattern)
os.makedirs(downloads_dir, exist_ok=True)
os.makedirs(outputs_dir, exist_ok=True)
with tempfile.TemporaryDirectory(dir=outputs_dir) as outdir:
fp16 = str(Path(outdir) / f"{model_name}.fp16.gguf")
with tempfile.TemporaryDirectory(dir=downloads_dir) as tmpdir:
logger.info("Start download")
local_dir = Path(tmpdir) / model_name
api.snapshot_download(
repo_id=model_id,
local_dir=local_dir,
local_dir_use_symlinks=False,
allow_patterns=dl_pattern,
)
config_dir = local_dir / "config.json"
adapter_config_dir = local_dir / "adapter_config.json"
if os.path.exists(adapter_config_dir) and not os.path.exists(
config_dir
):
raise RuntimeError(
"adapter_config.json is present. If converting LoRA, use GGUF-my-lora."
)
logger.info("Download successfully")
result = subprocess.run(
[
"python",
CONVERSION_SCRIPT,
local_dir,
"--outtype",
"f16",
"--outfile",
fp16,
],
shell=False,
capture_output=True,
)
logger.info("Converted to f16")
if result.returncode != 0:
raise RuntimeError(
f"Error converting to fp16: {result.stderr.decode()}"
)
shutil.rmtree(downloads_dir)
imatrix_path = Path(outdir) / "imatrix.dat"
if use_imatrix:
train_data_path = (
train_data_file.name
if train_data_file
else "llama.cpp/groups_merged.txt"
)
if not os.path.isfile(train_data_path):
raise FileNotFoundError(
f"Training data not found: {train_data_path}"
)
generate_importance_matrix(fp16, train_data_path, imatrix_path)
quant_methods = (
[imatrix_q_method]
if use_imatrix
else (q_method if isinstance(q_method, list) else [q_method])
)
suffix = "imat" if use_imatrix else None
gguf_files = []
for method in quant_methods:
logger.info("Begin quantize")
name = (
f"{model_name.lower()}-{method.lower()}-{suffix}.gguf"
if suffix
else f"{model_name.lower()}-{method.lower()}.gguf"
)
path = str(Path(outdir) / name)
quant_cmd = (
[
"./llama.cpp/llama-quantize",
"--imatrix",
imatrix_path,
fp16,
path,
method,
]
if use_imatrix
else ["./llama.cpp/llama-quantize", fp16, path, method]
)
result = subprocess.run(quant_cmd, shell=False, capture_output=True)
if result.returncode != 0:
raise RuntimeError(
f"Quantization failed ({method}): {result.stderr.decode()}"
)
size = os.path.getsize(path) / 1024 / 1024 / 1024
gguf_files.append((name, path, size, method))
logger.info("Quantize successfully!")
suffix_for_repo = (
f"{imatrix_q_method}-imat" if use_imatrix else "-".join(quant_methods)
)
repo_id = f"{repo_namespace}/{model_name}-GGUF"
new_repo_url = api.create_repo(
repo_id=repo_id, exist_ok=True, private=private_repo
)
try:
original_card = ModelCard.load(model_id, token=oauth_token.token)
except Exception:
original_card = ModelCard("")
card = get_new_model_card(
original_card, model_id, gguf_files, new_repo_url, split_model
)
readme_path = Path(outdir) / "README.md"
card.save(readme_path)
for name, path, _, _ in gguf_files:
if split_model:
split_upload_model(
path,
outdir,
repo_id,
oauth_token,
split_max_tensors,
split_max_size,
org_token,
export_to_org,
)
else:
api.upload_file(
path_or_fileobj=path, path_in_repo=name, repo_id=repo_id
)
if use_imatrix and os.path.isfile(imatrix_path):
api.upload_file(
path_or_fileobj=imatrix_path,
path_in_repo="imatrix.dat",
repo_id=repo_id,
)
api.upload_file(
path_or_fileobj=readme_path, path_in_repo="README.md", repo_id=repo_id
)
return (
f'<h1>✅ DONE</h1><br/>Repo: <a href="{new_repo_url}" target="_blank" style="text-decoration:underline">{repo_id}</a>',
f"llama{np.random.randint(9)}.png",
)
except Exception as e:
return (
f'<h1>❌ ERROR</h1><br/><pre style="white-space:pre-wrap;">{escape(str(e))}</pre>',
"error.png",
)
css = """/* Custom CSS to allow scrolling */
.gradio-container {overflow-y: auto;}
"""
model_id = HuggingfaceHubSearch(
label="Hub Model ID",
placeholder="Search for model id on Huggingface",
search_type="model",
)
export_to_org = gr.Checkbox(
label="Export to Organization Repository",
value=False,
info="If checked, you can select an organization to export to.",
)
repo_owner = gr.Dropdown(
choices=["self"], value="self", label="Repository Owner", visible=False
)
org_token = gr.Textbox(label="Org Access Token", type="password", visible=False)
q_method = gr.Dropdown(
[
"Q2_K",
"Q3_K_S",
"Q3_K_M",
"Q3_K_L",
"Q4_0",
"Q4_K_S",
"Q4_K_M",
"Q5_0",
"Q5_K_S",
"Q5_K_M",
"Q6_K",
"Q8_0",
],
label="Quantization Method",
info="GGML quantization type",
value="Q4_K_M",
filterable=False,
visible=True,
multiselect=True,
)
imatrix_q_method = gr.Dropdown(
["IQ3_M", "IQ3_XXS", "Q4_K_M", "Q4_K_S", "IQ4_NL", "IQ4_XS", "Q5_K_M", "Q5_K_S"],
label="Imatrix Quantization Method",
info="GGML imatrix quants type",
value="IQ4_NL",
filterable=False,
visible=False,
)
use_imatrix = gr.Checkbox(
value=False,
label="Use Imatrix Quantization",
info="Use importance matrix for quantization.",
)
private_repo = gr.Checkbox(
value=False, label="Private Repo", info="Create a private repo under your username."
)
train_data_file = gr.File(label="Training Data File", file_types=["txt"], visible=False)
split_model = gr.Checkbox(
value=False, label="Split Model", info="Shard the model using gguf-split."
)
split_max_tensors = gr.Number(
value=256,
label="Max Tensors per File",
info="Maximum number of tensors per file when splitting model.",
visible=False,
)
split_max_size = gr.Textbox(
label="Max File Size",
info="Maximum file size when splitting model (--split-max-size). May leave empty to use the default. Accepted suffixes: M, G. Example: 256M, 5G",
visible=False,
)
iface = gr.Interface(
fn=process_model,
inputs=[
model_id,
q_method,
use_imatrix,
imatrix_q_method,
private_repo,
train_data_file,
split_model,
split_max_tensors,
split_max_size,
export_to_org,
repo_owner,
org_token,
],
outputs=[gr.Markdown(label="Output"), gr.Image(show_label=False)],
title="Make your own GGUF Quants — faster than ever before, believe me.",
description="We take your Hugging Face repo — a terrific repo — we quantize it, we package it beautifully, and we give you your very own repo. It's smart. It's efficient. It's huge. You're gonna love it.",
api_name=False,
)
with gr.Blocks(css=".gradio-container {overflow-y: auto;}") as demo:
gr.Markdown("Logged in, you must be. Classy, secure, and victorious, it keeps us.")
gr.LoginButton(min_width=250)
export_to_org.change(
fn=toggle_repo_owner, inputs=[export_to_org], outputs=[repo_owner, org_token]
)
split_model.change(
fn=lambda sm: (gr.update(visible=sm), gr.update(visible=sm)),
inputs=split_model,
outputs=[split_max_tensors, split_max_size],
)
use_imatrix.change(
fn=lambda use: (
gr.update(visible=not use),
gr.update(visible=use),
gr.update(visible=use),
),
inputs=use_imatrix,
outputs=[q_method, imatrix_q_method, train_data_file],
)
iface.render()
def restart_space():
HfApi().restart_space(
repo_id="Antigma/quantize-my-repo", token=HF_TOKEN, factory_reboot=True
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=86400)
scheduler.start()
demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False)