Spaces:
Runtime error
Runtime error
import requests | |
import os | |
import gradio as gr | |
from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility, file_exists | |
import subprocess | |
import gradio as gr | |
import re | |
import uuid | |
from typing import Optional | |
import json | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from huggingface_hub import Repository, HfApi | |
api = HfApi() | |
def restart_space(): | |
api.restart_space(repo_id="civitaiarchive/civitai-to-hf-uploader", token=os.environ["HF_TOKEN"]) | |
def download_file(url, file_path, folder, api_key=None): | |
headers = {} | |
full_path = os.path.join(folder, file_path) | |
os.makedirs(os.path.dirname(full_path), exist_ok=True) | |
curl_cmd = ['curl', '--fail', '-L', '-o', full_path, url] | |
# Always use API key if provided | |
if api_key: | |
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) | |
try: | |
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) | |
except subprocess.CalledProcessError as e: | |
if ('401' in e.stderr or '403' in e.stderr) and not api_key: | |
# Try again with authorization | |
api_key = os.environ.get("CIVITAI_API_KEY") | |
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) | |
try: | |
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) | |
except subprocess.CalledProcessError as e: | |
raise gr.Error(f"Error downloading file with authorization: {e.stderr}") | |
else: | |
raise gr.Error(f"Error downloading file: {e.stderr}") | |
except Exception as e: | |
raise gr.Error(f"Error downloading file: {str(e)}") | |
def get_files_by_username(username, api_key=None): | |
url = f"https://civitai.com/api/v1/models?username={username}&limit=100&nsfw=true" | |
output = {} | |
headers = {} | |
if api_key: | |
headers['Authorization'] = f'Bearer {api_key}' | |
while url: | |
response = requests.get(url, headers=headers, timeout=30) | |
data = response.json() | |
# Add current page items to the list | |
for model in data['items']: | |
for version in model['modelVersions']: | |
for file in version['files']: | |
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { | |
'downloadUrl': file['downloadUrl'], | |
'modelId': model['name'] + ' - ' + version['name'], | |
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", | |
'author': model['creator']['username'], | |
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", | |
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", | |
} | |
metadata = data.get('metadata', {}) | |
url = metadata.get('nextPage', None) | |
return output | |
def get_files_by_model_id(model_id, api_key=None): | |
api_url = f"https://civitai.com/api/v1/models/{model_id}" | |
headers = {} | |
if api_key: | |
headers['Authorization'] = f'Bearer {api_key}' | |
try: | |
response = requests.get(api_url, headers=headers) | |
response.raise_for_status() | |
model = response.json() | |
output = {} | |
for version in model['modelVersions']: | |
for file in version['files']: | |
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { | |
'downloadUrl': file['downloadUrl'], | |
'modelId': model['name'] + ' - ' + version['name'], | |
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", | |
'author': model['creator']['username'], | |
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", | |
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", | |
} | |
return output | |
except requests.exceptions.RequestException as e: | |
raise gr.Error("Something went wrong in fetching CivitAI API") | |
def process_url(url, profile, user_repo_id, oauth_token, folder, api_key=None): | |
if url.startswith("https://civitai.com/models/"): | |
model_id = url.split('/')[4] | |
files = get_files_by_model_id(model_id, api_key) | |
elif url.startswith("https://civitai.com/user/"): | |
username = url.split('/')[4] | |
files = get_files_by_username(username, api_key) | |
else: | |
raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL") | |
gr.Info(f"Found {len(files)} files to download") | |
downloaded_files = {} | |
total_files = len(files) | |
current_file = 1 | |
for dl_path, data in files.items(): | |
try: | |
download_url = data['downloadUrl'] | |
filename = dl_path.split('/')[-1] | |
if file_exists( | |
repo_id = user_repo_id, | |
filename = dl_path, | |
token = oauth_token | |
): | |
gr.Info(f"Skipping {filename}, folder exists {dl_path}") | |
continue | |
gr.Info(f"Downloading {filename} ({current_file}/{total_files})") | |
download_file(download_url, dl_path, folder, api_key) | |
# Upload the model and card | |
gr.Info(f"Uploading {filename} ({current_file}/{total_files})") | |
base_folder = os.path.join(folder, os.path.dirname(dl_path)) | |
# Create README.md file | |
readme = f""" | |
Author: [{data['author']}]({data['authorUrl']}) | |
Model: [{data['modelUrl']}]({data['modelUrl']}) | |
Mirror: [{data['mirrorUrl']}]({data['mirrorUrl']}) | |
""" | |
with open(os.path.join(base_folder, "README.md"), "w") as f: | |
f.write(readme) | |
upload_folder( | |
folder_path=base_folder, | |
repo_id=user_repo_id, | |
repo_type="model", | |
path_in_repo=os.path.dirname(dl_path), | |
token=oauth_token | |
) | |
downloaded_files[dl_path] = download_url | |
except Exception as e: | |
gr.Warning(f"Failed to download {dl_path}: {str(e)}") | |
finally: | |
current_file += 1 | |
return files | |
def add_mirror(repo_id): | |
response = requests.post("https://civitaiarchive.com/api/mirrors", | |
headers={ | |
"Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}", | |
"Content-Type": "application/json" | |
}, | |
json={ | |
"type": "huggingface", | |
"url": repo_id | |
}) | |
if response.status_code == 200: | |
gr.Info("Added mirror to CivitaiArchive.com") | |
else: | |
gr.Error("Failed to add mirror to CivitaiArchive.com") | |
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo, civitai_api_key=None): | |
if not profile.name: | |
raise gr.Error("Are you sure you are logged in?") | |
if not destination_repo: | |
raise gr.Error("Please provide a destination repository name") | |
# validate destination repo is alphanumeric | |
if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo): | |
raise gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens") | |
folder = str(uuid.uuid4()) | |
os.makedirs(folder, exist_ok=False) | |
gr.Info(f"Starting download from {url}") | |
try: | |
user_repo_id = f"{profile.username}/{destination_repo}" | |
# Try to create repo only if it doesn't exist | |
try: | |
create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token) | |
gr.Info(f"Created new repository {user_repo_id}") | |
except Exception as e: | |
gr.Info(f"Repository {user_repo_id} already exists, will update it") | |
update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token) | |
files = process_url(url, profile, user_repo_id, oauth_token.token, folder, civitai_api_key) | |
if not files or len(files.keys()) == 0: | |
raise gr.Error("No files were copied. Something went wrong.") | |
gr.Info(f"Copied {len(files)} files") | |
results = [] | |
results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})") | |
if not results: | |
raise gr.Error("Failed to upload any models. Please check the logs for details.") | |
add_mirror(user_repo_id) | |
return "# Models uploaded to 🤗!\n" + "\n".join(results) | |
except Exception as e: | |
print(e) | |
raise gr.Error(f"Error during upload process: {str(e)}") | |
finally: | |
# Cleanup | |
if os.path.exists(folder): | |
import shutil | |
shutil.rmtree(folder) | |
css = ''' | |
#login { | |
width: 100% !important; | |
margin: 0 auto; | |
} | |
#disabled_upload{ | |
opacity: 0.5; | |
pointer-events:none; | |
} | |
''' | |
with gr.Blocks(css=css) as demo: | |
gr.Markdown('''# Upload CivitAI models to HuggingFace | |
You can upload either: | |
- A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684) | |
- All models from a user by providing their profile URL (e.g., https://civitai.com/user/username) | |
This will create a new HuggingFace repository under your username if it doesn't exist. | |
Once uploaded, it will add this repository to CivitaiArchive.com as a mirror. | |
''') | |
gr.LoginButton(elem_id="login") | |
with gr.Column() : | |
submit_source_civit = gr.Textbox( | |
placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username", | |
label="CivitAI URL", | |
info="Enter either a model URL or user profile URL", | |
) | |
destination_repo = gr.Textbox( | |
placeholder="my-awesome-model", | |
label="HF Repo Name", | |
info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)", | |
) | |
civitai_api_key = gr.Textbox( | |
placeholder="Your CivitAI API key (optional)", | |
label="CivitAI API Key", | |
info="Optional: Provide your own CivitAI API key to avoid rate limits. If not provided, a default key will be used.", | |
) | |
instructions = gr.HTML("") | |
submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True) | |
output = gr.Markdown(label="Upload Progress") | |
submit_button_civit.click( | |
fn=upload_civit_to_hf, | |
inputs=[submit_source_civit, destination_repo, civitai_api_key], | |
outputs=[output] | |
) | |
scheduler = BackgroundScheduler() | |
scheduler.add_job(restart_space, 'interval', seconds=3600) | |
scheduler.start() | |
demo.queue(default_concurrency_limit=50) | |
demo.launch() |