import requests import hashlib import shutil import os import cv2 import json import threading import time from pathlib import Path import numpy as np class Models: civit_workers = [] def civit_update_worker(self, model_type, folder_paths): from shared import path_manager try: import imageio.v3 except: # Skip updates if we are missing imageio print(f"Can't find imageio.v3 module: Skip CivitAI update") return if str(model_type) in self.civit_workers: # Already working on this folder print(f"Skip CivitAI check. Update for {model_type} already running.") return if not Path(self.cache_paths[model_type]).is_dir(): print(f"WARNING: Can't find {self.cache_paths[model_type]} Will not update thumbnails.") return self.civit_workers.append(str(model_type)) self.ready[model_type] = False updated = 0 # Quick list self.names[model_type] = [] for folder in folder_paths: for path in folder.rglob("*"): if path.suffix.lower() in self.EXTENSIONS: # Add to model names self.names[model_type].append(str(path.relative_to(folder))) # Return a sorted list, prepend names with 0 if they are in a folder or 1 # if it is a plain file. This will sort folders above files in the dropdown self.names[model_type] = sorted( self.names[model_type], key=lambda x: ( f"0{x.casefold()}" if not str(Path(x).parent) == "." else f"1{x.casefold()}" ), ) self.ready[model_type] = True if self.offline: self.civit_workers.remove(str(model_type)) return if model_type == "inbox" and self.names["inbox"]: checkpoints = path_manager.model_paths["modelfile_path"] checkpoints = checkpoints[0] if isinstance(checkpoints, list) else checkpoints loras = path_manager.model_paths["lorafile_path"] loras = loras[0] if isinstance(loras, list) else loras folders = { "LORA": (loras, self.cache_paths["loras"]), "LoCon": (loras, self.cache_paths["loras"]), "Checkpoint": (checkpoints, self.cache_paths["checkpoints"]), } # Go though and check previews for folder in folder_paths: for path in folder.rglob("*"): if path.suffix.lower() in self.EXTENSIONS: # get file name, add cache path change suffix cache_file = Path(self.cache_paths[model_type] / path.name) models = self.get_models_by_path(model_type, str(path)) suffixes = [".jpeg", ".jpg", ".png", ".gif"] has_preview = False for suffix in suffixes: thumbcheck = cache_file.with_suffix(suffix) if Path(thumbcheck).is_file(): has_preview = True break if not has_preview: #print(f"Downloading model thumbnail for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})") self.get_image(models, thumbcheck) updated += 1 time.sleep(1) txtcheck = cache_file.with_suffix(".txt") if model_type == "loras" and not txtcheck.exists(): print(f"Get LoRA keywords for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})") keywords = self.get_keywords(models) with open(txtcheck, "w") as f: f.write(", ".join(keywords)) updated += 1 if model_type == "inbox": name = str(path.relative_to(folder_paths[0])) # FIXME handle if inbox is a list model = self.get_models_by_path("inbox", name) filename = self.get_file_from_name("inbox", name) if model is None: continue baseModel = self.get_model_base(model) folder, cache = folders.get(self.get_model_type(model), [None, None]) if folder is None or baseModel is None: print(f"Skipping {name} not sure what {self.get_model_type(model)} is.") continue # Move model to correct folder dest = Path(folder) / baseModel if not dest.exists(): dest.mkdir(parents=True, exist_ok=True) shutil.move(Path(filename), Path(dest) / name) # Move cache-files cache_file = Path(self.cache_paths[model_type] / name) suffixes = [".json", ".txt", ".jpeg", ".jpg", ".png", ".gif"] for suffix in suffixes: cachefile = cache_file.with_suffix(suffix) if cachefile.is_file(): shutil.move(cachefile, Path(cache) / cachefile.name) print(f"Moved {name} to {dest}") if updated > 0: print(f"CivitAI update for {model_type} done.") self.civit_workers.remove(str(model_type)) def get_names(self, model_type): while not self.ready[model_type]: # Wait until we have read all the filenames time.sleep(0.2) return self.names[model_type] def get_file(self, model_type, name): # Search the folders for the model for folder in self.model_dirs[model_type]: file = Path(folder) / name if file.is_file(): return file return None def update_all_models(self): for model_type in ["checkpoints", "loras", "inbox"]: threading.Thread( target=self.civit_update_worker, args=( model_type, self.model_dirs[model_type], ), daemon=True, ).start() def __init__(self, offline=False): from shared import path_manager, settings self.offline = offline self.ready = { "checkpoints": False, "loras": False, "inbox": False, } self.names = { "checkpoints": [], "loras": [], "inbox": [], } checkpoints = path_manager.model_paths["modelfile_path"] checkpoints = checkpoints if isinstance(checkpoints, list) else [checkpoints] loras = path_manager.model_paths["lorafile_path"] loras = loras if isinstance(loras, list) else [loras] inbox = path_manager.model_paths["inbox_path"] inbox = inbox if isinstance(inbox, list) else [inbox] self.model_dirs = { "checkpoints": checkpoints, "loras": loras, "inbox": inbox, } self.cache_paths = { "checkpoints": Path(path_manager.model_paths["cache_path"] / "checkpoints"), "loras": Path(path_manager.model_paths["cache_path"] / "loras"), "inbox": Path(path_manager.model_paths["cache_path"] / "inbox"), } self.base_url = "https://civitai.com/api/v1/" self.headers = {"Content-Type": "application/json"} self.session = requests.Session() self.EXTENSIONS = [".pth", ".ckpt", ".bin", ".safetensors", ".gguf"] self.update_all_models() def get_file_from_name(self, model_type, model_name): for folder in self.model_dirs[model_type]: path = Path(folder) / model_name if path.is_file(): return path return None def model_sha256(self, filename): print(f"Hashing {filename}") blksize = 1024 * 1024 hash_sha256 = hashlib.sha256() try: with open(filename, 'rb') as f: for chunk in iter(lambda: f.read(blksize), b""): hash_sha256.update(chunk) f.close() return hash_sha256.hexdigest().upper() except Exception as e: print(f"model_sha256(): Failed reading {filename}") print(f"Error: {e}") return None def get_models_by_path(self, model_type, path): data = None cache_path = Path(self.cache_paths[model_type]) / Path(Path(path).name) if cache_path.is_dir(): # Give up return {} json_path = Path(cache_path).with_suffix(".json") if json_path.exists(): try: with open(json_path) as f: data = json.load(f) except: data = None if data is not None: return data if Path(path).suffix == ".merge": return {"baseModel": "Merge"} hash = self.model_sha256(path) url = f"{self.base_url}model-versions/by-hash/{hash}" try: response = requests.get(url, headers=self.headers) response.raise_for_status() data = response.json() except requests.exceptions.HTTPError as e: if response.status_code == 404: print(f"Warning: Could not find {Path(path).name} on civit.ai") elif response.status_code == 503: print("Error: Civit.ai Service Currently Unavailable") else: print(f"HTTP Error: {e}") except requests.exceptions.RequestException as e: print(f"Error: {e}") if data is None: # Create our own data data = { "files": [ { "hashes": { "SHA256": hash, } } ] } print(f"Update model data: {json_path}") with open(json_path, "w") as f: json.dump(data, f, indent=2) return data def get_keywords(self, model): keywords = model.get("trainedWords", [""]) return keywords def get_model_base(self, model): return model.get("baseModel", "Unknown") def get_model_type(self, model): res = model.get("model", None) if res is not None: res = res.get("type", "Unknown") else: res = "Unknown" return res def get_image(self, model, path): from shared import settings if "baseModel" in model and model["baseModel"] == "Merge": return import imageio.v3 as iio if "model_preview" in settings.default_settings: opts = settings.default_settings["model_preview"].split(",") if "caption" in opts: caption=True if "nogifzoom" in opts: nogifzoom=True if "zoom" in opts: zoom=True else: caption=False nogifzoom=False zoom=False def make_thumbnail(image, text, zoom=False, caption=False): max = 166 # Max width or height if image is None: return None if zoom: oh = image.shape[0] ow = image.shape[1] scale = max / oh if oh > ow else max / ow image = cv2.resize( image, dsize=(int(ow * scale), int(oh * scale)), interpolation=cv2.INTER_LANCZOS4, ) if caption: font = cv2.FONT_HERSHEY_SIMPLEX fontScale = 0.35 thickness = 1 org = (3, 10) color = (25, 15, 11) # BGR image = cv2.putText( image, text, org, font, fontScale, color, thickness*2, cv2.LINE_AA ) org = (3, 10) color = (255, 215, 185) # BGR image = cv2.putText( image, text, org, font, fontScale, color, thickness, cv2.LINE_AA ) return image path = path.with_suffix(".jpeg") caption_text = f"{path.with_suffix('').name}" image_url = None for preview in model.get("images", [{}]): url = preview.get("url") format = preview.get("type") if url: print(f"Updating preview for {caption_text}.") image_url = url response = self.session.get(image_url) if response.status_code != 200: print(f"WARNING: get_image() for {caption_text} - {response.status_code} : {response.reason}") break image = np.asarray(bytearray(response.content), dtype="uint8") out = make_thumbnail(cv2.imdecode(image, cv2.IMREAD_COLOR), caption_text, caption=caption, zoom=zoom) if out is not None: out = cv2.imencode('.jpg', out)[1] else: out = response.content with open(path, "wb") as file: file.write(out) if format == "video": tmp_path = f"{path}.tmp" shutil.move(path, tmp_path) video = iio.imiter(tmp_path) fps = iio.immeta(tmp_path)["fps"] video_out = [] for i in video: out = make_thumbnail(i, caption_text, caption=caption, zoom=not nogifzoom) if out is None: out = i video_out.append(out) iio.imwrite( str(path.with_suffix(".gif")), video_out, fps=fps, loop=0 ) os.remove(tmp_path) break if image_url is None: shutil.copyfile("html/warning.jpeg", path)