civitaiarchive's picture
Added target file existence check for incremental upload. (#4)
4885fba verified
import requests
import os
import gradio as gr
from huggingface_hub import update_repo_visibility, whoami, upload_folder, create_repo, upload_file, update_repo_visibility, file_exists
import subprocess
import gradio as gr
import re
import uuid
from typing import Optional
import json
from apscheduler.schedulers.background import BackgroundScheduler
from huggingface_hub import Repository, HfApi
api = HfApi()
def restart_space():
api.restart_space(repo_id="civitaiarchive/civitai-to-hf-uploader", token=os.environ["HF_TOKEN"])
def download_file(url, file_path, folder, api_key=None):
headers = {}
full_path = os.path.join(folder, file_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
curl_cmd = ['curl', '--fail', '-L', '-o', full_path, url]
# Always use API key if provided
if api_key:
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}'])
try:
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
if ('401' in e.stderr or '403' in e.stderr) and not api_key:
# Try again with authorization
api_key = os.environ.get("CIVITAI_API_KEY")
curl_cmd.extend(['-H', f'Authorization: Bearer {api_key}'])
try:
result = subprocess.run(curl_cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
raise gr.Error(f"Error downloading file with authorization: {e.stderr}")
else:
raise gr.Error(f"Error downloading file: {e.stderr}")
except Exception as e:
raise gr.Error(f"Error downloading file: {str(e)}")
def get_files_by_username(username, api_key=None):
url = f"https://civitai.com/api/v1/models?username={username}&limit=100&nsfw=true"
output = {}
headers = {}
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
while url:
response = requests.get(url, headers=headers, timeout=30)
data = response.json()
# Add current page items to the list
for model in data['items']:
for version in model['modelVersions']:
for file in version['files']:
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = {
'downloadUrl': file['downloadUrl'],
'modelId': model['name'] + ' - ' + version['name'],
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}",
'author': model['creator']['username'],
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}",
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}",
}
metadata = data.get('metadata', {})
url = metadata.get('nextPage', None)
return output
def get_files_by_model_id(model_id, api_key=None):
api_url = f"https://civitai.com/api/v1/models/{model_id}"
headers = {}
if api_key:
headers['Authorization'] = f'Bearer {api_key}'
try:
response = requests.get(api_url, headers=headers)
response.raise_for_status()
model = response.json()
output = {}
for version in model['modelVersions']:
for file in version['files']:
output[str(model['id']) + '/' + str(version['id']) + '/' + file['name']] = {
'downloadUrl': file['downloadUrl'],
'modelId': model['name'] + ' - ' + version['name'],
'modelUrl': f"https://civitai.com/models/{model['id']}?modelVersionId={version['id']}",
'author': model['creator']['username'],
'authorUrl': f"https://civitai.com/user/{model['creator']['username']}",
'mirrorUrl': f"https://civitaiarchive.com/models/{model['id']}?modelVersionId={version['id']}",
}
return output
except requests.exceptions.RequestException as e:
raise gr.Error("Something went wrong in fetching CivitAI API")
def process_url(url, profile, user_repo_id, oauth_token, folder, api_key=None):
if url.startswith("https://civitai.com/models/"):
model_id = url.split('/')[4]
files = get_files_by_model_id(model_id, api_key)
elif url.startswith("https://civitai.com/user/"):
username = url.split('/')[4]
files = get_files_by_username(username, api_key)
else:
raise gr.Error("Unknown CivitAI URL format, please provide model URL or user profile URL")
gr.Info(f"Found {len(files)} files to download")
downloaded_files = {}
total_files = len(files)
current_file = 1
for dl_path, data in files.items():
try:
download_url = data['downloadUrl']
filename = dl_path.split('/')[-1]
if file_exists(
repo_id = user_repo_id,
filename = dl_path,
token = oauth_token
):
gr.Info(f"Skipping {filename}, folder exists {dl_path}")
continue
gr.Info(f"Downloading {filename} ({current_file}/{total_files})")
download_file(download_url, dl_path, folder, api_key)
# Upload the model and card
gr.Info(f"Uploading {filename} ({current_file}/{total_files})")
base_folder = os.path.join(folder, os.path.dirname(dl_path))
# Create README.md file
readme = f"""
Author: [{data['author']}]({data['authorUrl']})
Model: [{data['modelUrl']}]({data['modelUrl']})
Mirror: [{data['mirrorUrl']}]({data['mirrorUrl']})
"""
with open(os.path.join(base_folder, "README.md"), "w") as f:
f.write(readme)
upload_folder(
folder_path=base_folder,
repo_id=user_repo_id,
repo_type="model",
path_in_repo=os.path.dirname(dl_path),
token=oauth_token
)
downloaded_files[dl_path] = download_url
except Exception as e:
gr.Warning(f"Failed to download {dl_path}: {str(e)}")
finally:
current_file += 1
return files
def add_mirror(repo_id):
response = requests.post("https://civitaiarchive.com/api/mirrors",
headers={
"Authorization": f"Bearer {os.environ['CIVITAIARCHIVE_API_KEY']}",
"Content-Type": "application/json"
},
json={
"type": "huggingface",
"url": repo_id
})
if response.status_code == 200:
gr.Info("Added mirror to CivitaiArchive.com")
else:
gr.Error("Failed to add mirror to CivitaiArchive.com")
def upload_civit_to_hf(profile: Optional[gr.OAuthProfile], oauth_token: gr.OAuthToken, url, destination_repo, civitai_api_key=None):
if not profile.name:
raise gr.Error("Are you sure you are logged in?")
if not destination_repo:
raise gr.Error("Please provide a destination repository name")
# validate destination repo is alphanumeric
if not re.match(r'^[a-zA-Z0-9_-]+$', destination_repo):
raise gr.Error("Destination repository name must contain only alphanumeric characters, underscores, and hyphens")
folder = str(uuid.uuid4())
os.makedirs(folder, exist_ok=False)
gr.Info(f"Starting download from {url}")
try:
user_repo_id = f"{profile.username}/{destination_repo}"
# Try to create repo only if it doesn't exist
try:
create_repo(repo_id=user_repo_id, private=True, exist_ok=False, token=oauth_token.token)
gr.Info(f"Created new repository {user_repo_id}")
except Exception as e:
gr.Info(f"Repository {user_repo_id} already exists, will update it")
update_repo_visibility(repo_id=user_repo_id, private=False, token=oauth_token.token)
files = process_url(url, profile, user_repo_id, oauth_token.token, folder, civitai_api_key)
if not files or len(files.keys()) == 0:
raise gr.Error("No files were copied. Something went wrong.")
gr.Info(f"Copied {len(files)} files")
results = []
results.append(f"## [{user_repo_id}](https://huggingface.co/{user_repo_id})")
if not results:
raise gr.Error("Failed to upload any models. Please check the logs for details.")
add_mirror(user_repo_id)
return "# Models uploaded to 🤗!\n" + "\n".join(results)
except Exception as e:
print(e)
raise gr.Error(f"Error during upload process: {str(e)}")
finally:
# Cleanup
if os.path.exists(folder):
import shutil
shutil.rmtree(folder)
css = '''
#login {
width: 100% !important;
margin: 0 auto;
}
#disabled_upload{
opacity: 0.5;
pointer-events:none;
}
'''
with gr.Blocks(css=css) as demo:
gr.Markdown('''# Upload CivitAI models to HuggingFace
You can upload either:
- A single model by providing a CivitAI model URL (e.g., https://civitai.com/models/144684)
- All models from a user by providing their profile URL (e.g., https://civitai.com/user/username)
This will create a new HuggingFace repository under your username if it doesn't exist.
Once uploaded, it will add this repository to CivitaiArchive.com as a mirror.
''')
gr.LoginButton(elem_id="login")
with gr.Column() :
submit_source_civit = gr.Textbox(
placeholder="https://civitai.com/models/144684 or https://civitai.com/user/username",
label="CivitAI URL",
info="Enter either a model URL or user profile URL",
)
destination_repo = gr.Textbox(
placeholder="my-awesome-model",
label="HF Repo Name",
info="Name for the HuggingFace repository (a new one will be created if it doesn't exist)",
)
civitai_api_key = gr.Textbox(
placeholder="Your CivitAI API key (optional)",
label="CivitAI API Key",
info="Optional: Provide your own CivitAI API key to avoid rate limits. If not provided, a default key will be used.",
)
instructions = gr.HTML("")
submit_button_civit = gr.Button("Upload to Hugging Face", interactive=True)
output = gr.Markdown(label="Upload Progress")
submit_button_civit.click(
fn=upload_civit_to_hf,
inputs=[submit_source_civit, destination_repo, civitai_api_key],
outputs=[output]
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, 'interval', seconds=3600)
scheduler.start()
demo.queue(default_concurrency_limit=50)
demo.launch()