import os
import subprocess
import signal
import tempfile
from pathlib import Path
import logging
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import numpy as np
import shutil
from copy import deepcopy
HF_TOKEN = os.environ.get("HF_TOKEN")
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
CONVERSION_SCRIPT = "./llama.cpp/convert_hf_to_gguf.py"
log_dir = "/data/logs"
downloads_dir = "/data/downloads"
outputs_dir = "/data/outputs"
os.makedirs(log_dir, exist_ok=True)
logging.basicConfig(
filename=os.path.join(log_dir, "app.log"),
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
def get_llama_cpp_version():
try:
result = subprocess.run(
["git", "-C", "./llama.cpp", "describe", "--tags", "--always"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
text=True,
)
version = result.stdout.strip().split("-")[0]
return version
except subprocess.CalledProcessError as e:
logger.error("Error getting llama.cpp version: %s", e.stderr.strip())
return None
def get_repo_namespace(repo_owner: str, username: str, user_orgs: list) -> str:
if repo_owner == "self":
return username
for org in user_orgs:
if org["name"] == repo_owner:
return org["name"]
raise ValueError(f"Invalid repo_owner: {repo_owner}")
def escape(s: str) -> str:
return (
s.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
.replace("\n", "
")
)
def toggle_repo_owner(export_to_org: bool, oauth_token: gr.OAuthToken | None) -> tuple:
if oauth_token is None or oauth_token.token is None:
raise gr.Error("You must be logged in to use quantize-my-repo")
if not export_to_org:
return gr.update(visible=False, choices=["self"], value="self"), gr.update(
visible=False, value=""
)
info = whoami(oauth_token.token)
orgs = [org["name"] for org in info.get("orgs", [])]
return gr.update(visible=True, choices=["self"] + orgs, value="self"), gr.update(
visible=True
)
def generate_importance_matrix(
model_path: str, train_data_path: str, output_path: str
) -> None:
imatrix_command = [
"./llama.cpp/llama-imatrix",
"-m",
model_path,
"-f",
train_data_path,
"-ngl",
"99",
"--output-frequency",
"10",
"-o",
output_path,
]
if not os.path.isfile(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
logger.info("Running imatrix command...")
process = subprocess.Popen(imatrix_command, shell=False)
try:
process.wait(timeout=60)
except subprocess.TimeoutExpired:
logger.warning(
"Imatrix computation timed out. Sending SIGINT to allow graceful termination..."
)
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.error(
"Imatrix proc still didn't term. Forecfully terming process..."
)
process.kill()
logger.info("Importance matrix generation completed.")
def split_upload_model(
model_path: str,
outdir: str,
repo_id: str,
oauth_token: gr.OAuthToken | None,
split_max_tensors: int = 256,
split_max_size: str | None = None,
org_token: str | None = None,
export_to_org: bool = False,
) -> None:
logger.info("Model path: %s", model_path)
logger.info("Output dir: %s", outdir)
if oauth_token is None or oauth_token.token is None:
raise ValueError("You have to be logged in.")
split_cmd = ["./llama.cpp/llama-gguf-split", "--split"]
if split_max_size:
split_cmd.extend(["--split-max-size", split_max_size])
else:
split_cmd.extend(["--split-max-tensors", str(split_max_tensors)])
model_path_prefix = ".".join(model_path.split(".")[:-1])
split_cmd.extend([model_path, model_path_prefix])
logger.info("Split command: %s", split_cmd)
result = subprocess.run(split_cmd, shell=False, capture_output=True, text=True)
logger.info("Split command stdout: %s", result.stdout)
logger.info("Split command stderr: %s", result.stderr)
if result.returncode != 0:
raise RuntimeError(f"Error splitting the model: {result.stderr}")
logger.info("Model split successfully!")
if os.path.exists(model_path):
os.remove(model_path)
model_file_prefix = model_path_prefix.split("/")[-1]
logger.info("Model file name prefix: %s", model_file_prefix)
sharded_model_files = [
f
for f in os.listdir(outdir)
if f.startswith(model_file_prefix) and f.endswith(".gguf")
]
if not sharded_model_files:
raise RuntimeError("No sharded files found.")
logger.info("Sharded model files: %s", sharded_model_files)
api = HfApi(token=org_token if (export_to_org and org_token) else oauth_token.token)
for file in sharded_model_files:
file_path = os.path.join(outdir, file)
logger.info("Uploading file: %s", file_path)
try:
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=file,
repo_id=repo_id,
)
except Exception as e:
raise RuntimeError(f"Error uploading file {file_path}: {e}") from e
logger.info("Sharded model has been uploaded successfully!")
def get_new_model_card(
original_card: ModelCard,
original_model_id: str,
gguf_files: list,
new_repo_url: str,
split_model: bool,
) -> ModelCard:
version = get_llama_cpp_version()
model_card = deepcopy(original_card)
model_card.data.tags = (model_card.data.tags or []) + [
"antigma",
"quantize-my-repo",
]
model_card.data.base_model = original_model_id
# Format the table rows
table_rows = []
for file_info in gguf_files:
name, _, size, method = file_info
if split_model:
display_name = name[:-5]
else:
display_name = f"[{name}]({new_repo_url}/blob/main/{name})"
table_rows.append(f"{display_name}|{method}|{size:.2f} GB|{split_model}|\n")
model_card.text = f"""
*Produced by [Antigma Labs](https://antigma.ai), [Antigma Quantize Space](https://huggingface.co/spaces/Antigma/quantize-my-repo)*
*Follow Antigma Labs in X [https://x.com/antigma_labs](https://x.com/antigma_labs)*
*Antigma's GitHub Homepage [https://github.com/AntigmaLabs](https://github.com/AntigmaLabs)*
## Quantization Format (GGUF)
We use llama.cpp release {version} for quantization.
Original model: https://huggingface.co/{original_model_id}
## Download a file (not the whole branch) from below:
| Filename | Quant type | File Size | Split |
| -------- | ---------- | --------- | ----- |
| {'|'.join(table_rows)}
## Original Model Card
{original_card.text}
## Downloading using huggingface-cli
Click to view download instructions
First, make sure you have hugginface-cli installed:
```
pip install -U "huggingface_hub[cli]"
```
Then, you can target the specific file you want:
```
huggingface-cli download {new_repo_url} --include "{gguf_files[0][0]}" --local-dir ./
```
If the model is bigger than 50GB, it will have been split into multiple files. In order to download them all to a local folder, run:
```
huggingface-cli download {new_repo_url} --include "{gguf_files[0][0]}/*" --local-dir ./
```
You can either specify a new local-dir (e.g. deepseek-ai_DeepSeek-V3-0324-Q8_0) or it will be in default hugging face cache
{escape(str(e))}', "error.png", ) css = """/* Custom CSS to allow scrolling */ .gradio-container {overflow-y: auto;} """ model_id = HuggingfaceHubSearch( label="Hub Model ID", placeholder="Search for model id on Huggingface", search_type="model", ) export_to_org = gr.Checkbox( label="Export to Organization Repository", value=False, info="If checked, you can select an organization to export to.", ) repo_owner = gr.Dropdown( choices=["self"], value="self", label="Repository Owner", visible=False ) org_token = gr.Textbox(label="Org Access Token", type="password", visible=False) q_method = gr.Dropdown( [ "Q2_K", "Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_0", "Q4_K_S", "Q4_K_M", "Q5_0", "Q5_K_S", "Q5_K_M", "Q6_K", "Q8_0", ], label="Quantization Method", info="GGML quantization type", value="Q4_K_M", filterable=False, visible=True, multiselect=True, ) imatrix_q_method = gr.Dropdown( ["IQ3_M", "IQ3_XXS", "Q4_K_M", "Q4_K_S", "IQ4_NL", "IQ4_XS", "Q5_K_M", "Q5_K_S"], label="Imatrix Quantization Method", info="GGML imatrix quants type", value="IQ4_NL", filterable=False, visible=False, ) use_imatrix = gr.Checkbox( value=False, label="Use Imatrix Quantization", info="Use importance matrix for quantization.", ) private_repo = gr.Checkbox( value=False, label="Private Repo", info="Create a private repo under your username." ) train_data_file = gr.File(label="Training Data File", file_types=["txt"], visible=False) split_model = gr.Checkbox( value=False, label="Split Model", info="Shard the model using gguf-split." ) split_max_tensors = gr.Number( value=256, label="Max Tensors per File", info="Maximum number of tensors per file when splitting model.", visible=False, ) split_max_size = gr.Textbox( label="Max File Size", info="Maximum file size when splitting model (--split-max-size). May leave empty to use the default. Accepted suffixes: M, G. Example: 256M, 5G", visible=False, ) iface = gr.Interface( fn=process_model, inputs=[ model_id, q_method, use_imatrix, imatrix_q_method, private_repo, train_data_file, split_model, split_max_tensors, split_max_size, export_to_org, repo_owner, org_token, ], outputs=[gr.Markdown(label="Output"), gr.Image(show_label=False)], title="Make your own GGUF Quants — faster than ever before, believe me.", description="We take your Hugging Face repo — a terrific repo — we quantize it, we package it beautifully, and we give you your very own repo. It's smart. It's efficient. It's huge. You're gonna love it.", api_name=False, ) with gr.Blocks(css=".gradio-container {overflow-y: auto;}") as demo: gr.Markdown("Logged in, you must be. Classy, secure, and victorious, it keeps us.") gr.LoginButton(min_width=250) export_to_org.change( fn=toggle_repo_owner, inputs=[export_to_org], outputs=[repo_owner, org_token] ) split_model.change( fn=lambda sm: (gr.update(visible=sm), gr.update(visible=sm)), inputs=split_model, outputs=[split_max_tensors, split_max_size], ) use_imatrix.change( fn=lambda use: ( gr.update(visible=not use), gr.update(visible=use), gr.update(visible=use), ), inputs=use_imatrix, outputs=[q_method, imatrix_q_method, train_data_file], ) iface.render() def restart_space(): HfApi().restart_space( repo_id="Antigma/quantize-my-repo", token=HF_TOKEN, factory_reboot=True ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=86400) scheduler.start() demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False)