import requests import os import gradio as gr from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility, file_exists import subprocess import gradio as gr import re import uuid from typing import Optional import json from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import Repository, HfApi api = HfApi() def restart_space(): api.restart_space(repo_id="civitaiarchive/civitai-to-hf-uploader", token=os.environ["HF_TOKEN"]) def download_file(url, file_path, folder, api_key=None): headers = {} full_path = os.path.join(folder, file_path) os.makedirs(os.path.dirname(full_path), exist_ok=True) curl_cmd = ['curl', '--fail', '-L', '-o', full_path, url] # Always use API key if provided if api_key: curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) try: result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: if ('401' in e.stderr or '403' in e.stderr) and not api_key: # Try again with authorization api_key = os.environ.get("CIVITAI_API_KEY") curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}']) try: result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: raise gr.Error(f"Error downloading file with authorization: {e.stderr}") else: raise gr.Error(f"Error downloading file: {e.stderr}") except Exception as e: raise gr.Error(f"Error downloading file: {str(e)}") def get_files_by_username(username, api_key=None): url = f"https://civitai.com/api/v1/models?username={username}&limit=100&nsfw=true" output = {} headers = {} if api_key: headers['Authorization'] = f'Bearer {api_key}' while url: response = requests.get(url, headers=headers, timeout=30) data = response.json() # Add current page items to the list for model in data['items']: for version in model['modelVersions']: for file in version['files']: output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { 'downloadUrl': file['downloadUrl'], 'modelId': model['name'] + ' - ' + version['name'], 'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", 'author': model['creator']['username'], 'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", 'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", } metadata = data.get('metadata', {}) url = metadata.get('nextPage', None) return output def get_files_by_model_id(model_id, api_key=None): api_url = f"https://civitai.com/api/v1/models/{model_id}" headers = {} if api_key: headers['Authorization'] = f'Bearer {api_key}' try: response = requests.get(api_url, headers=headers) response.raise_for_status() model = response.json() output = {} for version in model['modelVersions']: for file in version['files']: output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = { 'downloadUrl': file['downloadUrl'], 'modelId': model['name'] + ' - ' + version['name'], 'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}", 'author': model['creator']['username'], 'authorUrl': f"https://civitai.com/user/{model['creator']['username']}", 'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}", } return output except requests.exceptions.RequestException as e: raise gr.Error("Something went wrong in fetching CivitAI API") def process_url(url, profile, user_repo_id, oauth_token, folder, api_key=None): if url.startswith("https://civitai.com/models/"): model_id = url.split('/')[4] files = get_files_by_model_id(model_id, api_key) elif url.startswith("https://civitai.com/user/"): username = url.split('/')[4] files = get_files_by_username(username, api_key) else: raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL") gr.Info(f"Found {len(files)} files to download") downloaded_files = {} total_files = len(files) current_file = 1 for dl_path, data in files.items(): try: download_url = data['downloadUrl'] filename = dl_path.split('/')[-1] if file_exists( repo_id = user_repo_id, filename = dl_path, token = oauth_token ): gr.Info(f"Skipping {filename}, folder exists {dl_path}") continue gr.Info(f"Downloading {filename} ({current_file}/{total_files})") download_file(download_url, dl_path, folder, api_key) # Upload the model and card gr.Info(f"Uploading {filename} ({current_file}/{total_files})") base_folder = os.path.join(folder, os.path.dirname(dl_path)) # Create README.md file readme = f""" Author: [{data['author']}]({data['authorUrl']}) Model: [{data['modelUrl']}]({data['modelUrl']}) Mirror: [{data['mirrorUrl']}]({data['mirrorUrl']}) """ with open(os.path.join(base_folder, "README.md"), "w") as f: f.write(readme) upload_folder( folder_path=base_folder, repo_id=user_repo_id, repo_type="model", path_in_repo=os.path.dirname(dl_path), token=oauth_token ) downloaded_files[dl_path] = download_url except Exception as e: gr.Warning(f"Failed to download {dl_path}: {str(e)}") finally: current_file += 1 return files def add_mirror(repo_id): response = requests.post("https://civitaiarchive.com/api/mirrors", headers={ "Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}", "Content-Type": "application/json" }, json={ "type": "huggingface", "url": repo_id }) if response.status_code == 200: gr.Info("Added mirror to CivitaiArchive.com") else: gr.Error("Failed to add mirror to CivitaiArchive.com") def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo, civitai_api_key=None): if not profile.name: raise gr.Error("Are you sure you are logged in?") if not destination_repo: raise gr.Error("Please provide a destination repository name") # validate destination repo is alphanumeric if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo): raise gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens") folder = str(uuid.uuid4()) os.makedirs(folder, exist_ok=False) gr.Info(f"Starting download from {url}") try: user_repo_id = f"{profile.username}/{destination_repo}" # Try to create repo only if it doesn't exist try: create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token) gr.Info(f"Created new repository {user_repo_id}") except Exception as e: gr.Info(f"Repository {user_repo_id} already exists, will update it") update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token) files = process_url(url, profile, user_repo_id, oauth_token.token, folder, civitai_api_key) if not files or len(files.keys()) == 0: raise gr.Error("No files were copied. Something went wrong.") gr.Info(f"Copied {len(files)} files") results = [] results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})") if not results: raise gr.Error("Failed to upload any models. Please check the logs for details.") add_mirror(user_repo_id) return "# Models uploaded to 🤗!\n" + "\n".join(results) except Exception as e: print(e) raise gr.Error(f"Error during upload process: {str(e)}") finally: # Cleanup if os.path.exists(folder): import shutil shutil.rmtree(folder) css = ''' #login { width: 100% !important; margin: 0 auto; } #disabled_upload{ opacity: 0.5; pointer-events:none; } ''' with gr.Blocks(css=css) as demo: gr.Markdown('''# Upload CivitAI models to HuggingFace You can upload either: - A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684) - All models from a user by providing their profile URL (e.g., https://civitai.com/user/username) This will create a new HuggingFace repository under your username if it doesn't exist. Once uploaded, it will add this repository to CivitaiArchive.com as a mirror. ''') gr.LoginButton(elem_id="login") with gr.Column() : submit_source_civit = gr.Textbox( placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username", label="CivitAI URL", info="Enter either a model URL or user profile URL", ) destination_repo = gr.Textbox( placeholder="my-awesome-model", label="HF Repo Name", info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)", ) civitai_api_key = gr.Textbox( placeholder="Your CivitAI API key (optional)", label="CivitAI API Key", info="Optional: Provide your own CivitAI API key to avoid rate limits. If not provided, a default key will be used.", ) instructions = gr.HTML("") submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True) output = gr.Markdown(label="Upload Progress") submit_button_civit.click( fn=upload_civit_to_hf, inputs=[submit_source_civit, destination_repo, civitai_api_key], outputs=[output] ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, 'interval', seconds=3600) scheduler.start() demo.queue(default_concurrency_limit=50) demo.launch()