diff --git "a/custom_nodes/comfyui-reactor-node/nodes.py" "b/custom_nodes/comfyui-reactor-node/nodes.py" --- "a/custom_nodes/comfyui-reactor-node/nodes.py" +++ "b/custom_nodes/comfyui-reactor-node/nodes.py" @@ -1,2290 +1,1396 @@ -from __future__ import annotations -import torch - -import os -import sys -import json -import hashlib -import traceback -import math -import time -import random +import os, glob, sys import logging -from PIL import Image, ImageOps, ImageSequence -from PIL.PngImagePlugin import PngInfo +import torch +import torch.nn.functional as torchfn +from torchvision.transforms.functional import normalize +from torchvision.ops import masks_to_boxes import numpy as np -import safetensors.torch - -sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy")) - -import comfy.diffusers_load -import comfy.samplers -import comfy.sample -import comfy.sd +import cv2 +import math +from typing import List +from PIL import Image +from scipy import stats +from insightface.app.common import Face +from segment_anything import sam_model_registry + +from modules.processing import StableDiffusionProcessingImg2Img +from modules.shared import state +# from comfy_extras.chainner_models import model_loading +import comfy.model_management as model_management import comfy.utils -import comfy.controlnet -from comfy.comfy_types import IO, ComfyNodeABC, InputTypeDict, FileLocator - -import comfy.clip_vision - -import comfy.model_management -from comfy.cli_args import args - -import importlib - import folder_paths -import latent_preview -import node_helpers - -def before_node_execution(): - comfy.model_management.throw_exception_if_processing_interrupted() - -def interrupt_processing(value=True): - comfy.model_management.interrupt_current_processing(value) - -MAX_RESOLUTION=16384 - -class CLIPTextEncode(ComfyNodeABC): +from folder_paths import add_folder_path_and_extensions # ← добавили + +import scripts.reactor_version +from r_chainner import model_loading +from scripts.reactor_faceswap import ( + FaceSwapScript, + get_models, + get_current_faces_model, + analyze_faces, + half_det_size, + providers +) +from scripts.reactor_swapper import ( + unload_all_models, +) +from scripts.reactor_logger import logger +from reactor_utils import ( + batch_tensor_to_pil, + batched_pil_to_tensor, + tensor_to_pil, + img2tensor, + tensor2img, + save_face_model, + load_face_model, + download, + set_ort_session, + prepare_cropped_face, + normalize_cropped_face, + add_folder_path_and_extensions, + rgba2rgb_tensor +) +from reactor_patcher import apply_patch +from r_facelib.utils.face_restoration_helper import FaceRestoreHelper +from r_basicsr.utils.registry import ARCH_REGISTRY +import scripts.r_archs.codeformer_arch +import scripts.r_masking.subcore as subcore +import scripts.r_masking.core as core +import scripts.r_masking.segs as masking_segs + +# … сервисные константы … +models_dir = folder_paths.models_dir +REACTOR_MODELS_PATH = os.path.join(models_dir, "reactor") +FACE_MODELS_PATH = os.path.join(REACTOR_MODELS_PATH, "faces") + +os.makedirs(REACTOR_MODELS_PATH, exist_ok=True) +os.makedirs(FACE_MODELS_PATH, exist_ok=True) + +if not os.path.exists(REACTOR_MODELS_PATH): + os.makedirs(REACTOR_MODELS_PATH) + if not os.path.exists(FACE_MODELS_PATH): + os.makedirs(FACE_MODELS_PATH) + +# —————————————————————————————————————————————————————————————— +# Настраиваем папку facerestore_models, чтобы туда «виделось» расширение .onnx +# Вместо старого куска: +# dir_facerestore_models = os.path.join(models_dir, "facerestore_models") +# os.makedirs(dir_facerestore_models, exist_ok=True) +# folder_paths.folder_names_and_paths["facerestore_models"] = ( +# [dir_facerestore_models], +# folder_paths.supported_pt_extensions +# ) +# +# Делаем так: +dir_facerestore_models = os.path.join(models_dir, "facerestore_models") +os.makedirs(dir_facerestore_models, exist_ok=True) + +add_folder_path_and_extensions( + "facerestore_models", # ключ, как мы потом будем доставать + [dir_facerestore_models], # список директорий + [".pth", ".pt", ".onnx"] # ← явно указываем .onnx +) +# —————————————————————————————————————————————————————————————— + +BLENDED_FACE_MODEL = None +FACE_SIZE: int = 512 +FACE_HELPER = None + +if "ultralytics" not in folder_paths.folder_names_and_paths: + add_folder_path_and_extensions("ultralytics_bbox", [os.path.join(models_dir, "ultralytics", "bbox")], folder_paths.supported_pt_extensions) + add_folder_path_and_extensions("ultralytics_segm", [os.path.join(models_dir, "ultralytics", "segm")], folder_paths.supported_pt_extensions) + add_folder_path_and_extensions("ultralytics", [os.path.join(models_dir, "ultralytics")], folder_paths.supported_pt_extensions) +if "sams" not in folder_paths.folder_names_and_paths: + add_folder_path_and_extensions("sams", [os.path.join(models_dir, "sams")], folder_paths.supported_pt_extensions) + +def get_facemodels(): + models_path = os.path.join(FACE_MODELS_PATH, "*") + models = glob.glob(models_path) + models = [x for x in models if x.endswith(".safetensors")] + return models + +def get_restorers(): + models_path = os.path.join(models_dir, "facerestore_models/*") + models = glob.glob(models_path) + models = [x for x in models if (x.endswith(".pth") or x.endswith(".onnx"))] + if len(models) == 0: + fr_urls = [ + "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GFPGANv1.3.pth", + "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GFPGANv1.4.pth", + "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/codeformer-v0.1.0.pth", + "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GPEN-BFR-512.onnx", + "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GPEN-BFR-1024.onnx", + "https://huggingface.co/datasets/Gourieff/ReActor/resolve/main/models/facerestore_models/GPEN-BFR-2048.onnx", + ] + for model_url in fr_urls: + model_name = os.path.basename(model_url) + model_path = os.path.join(dir_facerestore_models, model_name) + download(model_url, model_path, model_name) + models = glob.glob(models_path) + models = [x for x in models if (x.endswith(".pth") or x.endswith(".onnx"))] + return models + +def get_model_names(get_models): + models = get_models() + names = [] + for x in models: + names.append(os.path.basename(x)) + names.sort(key=str.lower) + names.insert(0, "none") + return names + +def model_names(): + models = get_models() + return {os.path.basename(x): x for x in models} + + +class reactor: @classmethod - def INPUT_TYPES(s) -> InputTypeDict: + def INPUT_TYPES(s): return { "required": { - "text": (IO.STRING, {"multiline": True, "dynamicPrompts": True, "tooltip": "The text to be encoded."}), - "clip": (IO.CLIP, {"tooltip": "The CLIP model used for encoding the text."}) - } + "enabled": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), + "input_image": ("IMAGE",), + "swap_model": (list(model_names().keys()),), + "facedetection": (["retinaface_resnet50", "retinaface_mobile0.25", "YOLOv5l", "YOLOv5n"],), + "face_restore_model": (get_model_names(get_restorers),), + "face_restore_visibility": ("FLOAT", {"default": 1, "min": 0.1, "max": 1, "step": 0.05}), + "codeformer_weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1, "step": 0.05}), + "detect_gender_input": (["no","female","male"], {"default": "no"}), + "detect_gender_source": (["no","female","male"], {"default": "no"}), + "input_faces_index": ("STRING", {"default": "0"}), + "source_faces_index": ("STRING", {"default": "0"}), + "console_log_level": ([0, 1, 2], {"default": 1}), + }, + "optional": { + "source_image": ("IMAGE",), + "face_model": ("FACE_MODEL",), + "face_boost": ("FACE_BOOST",), + }, + "hidden": {"faces_order": "FACES_ORDER"}, } - RETURN_TYPES = (IO.CONDITIONING,) - OUTPUT_TOOLTIPS = ("A conditioning containing the embedded text used to guide the diffusion model.",) - FUNCTION = "encode" - - CATEGORY = "conditioning" - DESCRIPTION = "Encodes a text prompt using a CLIP model into an embedding that can be used to guide the diffusion model towards generating specific images." - - def encode(self, clip, text): - if clip is None: - raise RuntimeError("ERROR: clip input is invalid: None\n\nIf the clip is from a checkpoint loader node your checkpoint does not contain a valid clip or text encoder model.") - tokens = clip.tokenize(text) - return (clip.encode_from_tokens_scheduled(tokens), ) - - -class ConditioningCombine: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning_1": ("CONDITIONING", ), "conditioning_2": ("CONDITIONING", )}} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "combine" - - CATEGORY = "conditioning" - - def combine(self, conditioning_1, conditioning_2): - return (conditioning_1 + conditioning_2, ) - -class ConditioningAverage : - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning_to": ("CONDITIONING", ), "conditioning_from": ("CONDITIONING", ), - "conditioning_to_strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01}) - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "addWeighted" - - CATEGORY = "conditioning" - - def addWeighted(self, conditioning_to, conditioning_from, conditioning_to_strength): - out = [] - - if len(conditioning_from) > 1: - logging.warning("Warning: ConditioningAverage conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") - - cond_from = conditioning_from[0][0] - pooled_output_from = conditioning_from[0][1].get("pooled_output", None) - - for i in range(len(conditioning_to)): - t1 = conditioning_to[i][0] - pooled_output_to = conditioning_to[i][1].get("pooled_output", pooled_output_from) - t0 = cond_from[:,:t1.shape[1]] - if t0.shape[1] < t1.shape[1]: - t0 = torch.cat([t0] + [torch.zeros((1, (t1.shape[1] - t0.shape[1]), t1.shape[2]))], dim=1) - - tw = torch.mul(t1, conditioning_to_strength) + torch.mul(t0, (1.0 - conditioning_to_strength)) - t_to = conditioning_to[i][1].copy() - if pooled_output_from is not None and pooled_output_to is not None: - t_to["pooled_output"] = torch.mul(pooled_output_to, conditioning_to_strength) + torch.mul(pooled_output_from, (1.0 - conditioning_to_strength)) - elif pooled_output_from is not None: - t_to["pooled_output"] = pooled_output_from - - n = [tw, t_to] - out.append(n) - return (out, ) - -class ConditioningConcat: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "conditioning_to": ("CONDITIONING",), - "conditioning_from": ("CONDITIONING",), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "concat" - - CATEGORY = "conditioning" - def concat(self, conditioning_to, conditioning_from): - out = [] + RETURN_TYPES = ("IMAGE","FACE_MODEL") + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" - if len(conditioning_from) > 1: - logging.warning("Warning: ConditioningConcat conditioning_from contains more than 1 cond, only the first one will actually be applied to conditioning_to.") - - cond_from = conditioning_from[0][0] - - for i in range(len(conditioning_to)): - t1 = conditioning_to[i][0] - tw = torch.cat((t1, cond_from),1) - n = [tw, conditioning_to[i][1].copy()] - out.append(n) - - return (out, ) - -class ConditioningSetArea: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "width": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 64, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, width, height, x, y, strength): - c = node_helpers.conditioning_set_values(conditioning, {"area": (height // 8, width // 8, y // 8, x // 8), - "strength": strength, - "set_area_to_bounds": False}) - return (c, ) - -class ConditioningSetAreaPercentage: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "width": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), - "height": ("FLOAT", {"default": 1.0, "min": 0, "max": 1.0, "step": 0.01}), - "x": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), - "y": ("FLOAT", {"default": 0, "min": 0, "max": 1.0, "step": 0.01}), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, width, height, x, y, strength): - c = node_helpers.conditioning_set_values(conditioning, {"area": ("percentage", height, width, y, x), - "strength": strength, - "set_area_to_bounds": False}) - return (c, ) - -class ConditioningSetAreaStrength: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" + def __init__(self): + # self.face_helper = None + self.faces_order = ["large-small", "large-small"] + # self.face_size = FACE_SIZE + self.face_boost_enabled = False + self.restore = True + self.boost_model = None + self.interpolation = "Bicubic" + self.boost_model_visibility = 1 + self.boost_cf_weight = 0.5 + + def restore_face( + self, + input_image, + face_restore_model, + face_restore_visibility, + codeformer_weight, + facedetection, + ): + result = input_image + + if face_restore_model != "none" and not model_management.processing_interrupted(): + global FACE_SIZE, FACE_HELPER + + self.face_helper = FACE_HELPER + + # Подготавливаем размер лица + faceSize = 512 + if "1024" in face_restore_model.lower(): + faceSize = 1024 + elif "2048" in face_restore_model.lower(): + faceSize = 2048 + + logger.status(f"Restoring with {face_restore_model} | Face Size is set to {faceSize}") + + # пытаемся найти путь через папки + model_path = folder_paths.get_full_path("facerestore_models", face_restore_model) + + # если по какой-то причине не нашли — склеиваем вручную + if model_path is None or not os.path.isfile(model_path): + model_path = os.path.join(dir_facerestore_models, face_restore_model) + + # И вот здесь, если model_path всё ещё неправильный, мы сразу бросим понятную ошибку + if not os.path.isfile(model_path): + raise FileNotFoundError(f"ONNX-модель не найдена: {model_path}") + + device = model_management.get_torch_device() + + # Единственный блок для .onnx — гарантируем, что сюда попадает только он + if face_restore_model.lower().endswith(".onnx"): + ort_session = set_ort_session(model_path, providers=providers) + ort_session_inputs = {} + facerestore_model = ort_session + + if ".onnx" in face_restore_model.lower(): + # в этом месте model_path уже точно строка, а не None + ort_session = set_ort_session(model_path, providers=providers) + ort_session_inputs = {} + facerestore_model = ort_session + + if "codeformer" in face_restore_model.lower(): + + codeformer_net = ARCH_REGISTRY.get("CodeFormer")( + dim_embd=512, + codebook_size=1024, + n_head=8, + n_layers=9, + connect_list=["32", "64", "128", "256"], + ).to(device) + checkpoint = torch.load(model_path)["params_ema"] + codeformer_net.load_state_dict(checkpoint) + facerestore_model = codeformer_net.eval() + + elif ".onnx" in face_restore_model: + + ort_session = set_ort_session(model_path, providers=providers) + ort_session_inputs = {} + facerestore_model = ort_session - CATEGORY = "conditioning" + else: - def append(self, conditioning, strength): - c = node_helpers.conditioning_set_values(conditioning, {"strength": strength}) - return (c, ) + sd = comfy.utils.load_torch_file(model_path, safe_load=True) + facerestore_model = model_loading.load_state_dict(sd).eval() + facerestore_model.to(device) + + if faceSize != FACE_SIZE or self.face_helper is None: + self.face_helper = FaceRestoreHelper(1, face_size=faceSize, crop_ratio=(1, 1), det_model=facedetection, save_ext='png', use_parse=True, device=device) + FACE_SIZE = faceSize + FACE_HELPER = self.face_helper + + image_np = 255. * result.numpy() + + total_images = image_np.shape[0] + + out_images = [] + + for i in range(total_images): + + if total_images > 1: + logger.status(f"Restoring {i+1}") + + cur_image_np = image_np[i,:, :, ::-1] + + original_resolution = cur_image_np.shape[0:2] + + if facerestore_model is None or self.face_helper is None: + return result + + self.face_helper.clean_all() + self.face_helper.read_image(cur_image_np) + self.face_helper.get_face_landmarks_5(only_center_face=False, resize=640, eye_dist_threshold=5) + self.face_helper.align_warp_face() + + restored_face = None + + # после подготовки self.face_helper и выравнивания лица начинается цикл по всем фрагментам: + for idx, cropped_face in enumerate(self.face_helper.cropped_faces): + + # если у вас несколько лиц, логируем прогресс + if len(self.face_helper.cropped_faces) > 1: + logger.status(f"Restoring face {idx+1}") + + # превращаем PIL→tensor и нормализуем (для PTH-моделей): + # if ".pth" in face_restore_model: + cropped_face_t = img2tensor(cropped_face / 255., bgr2rgb=True, float32=True) + normalize(cropped_face_t, (0.5, 0.5, 0.5), (0.5, 0.5, 0.5), inplace=True) + cropped_face_t = cropped_face_t.unsqueeze(0).to(device) + + try: + + with torch.no_grad(): + + # ────── 🚩 МЕСТО ВСТАВКИ SNIPPET-а for ONNX ────── + if face_restore_model.lower().endswith(".onnx"): + # тут мы готовим входы для onnxruntime + for inp in ort_session.get_inputs(): + if inp.name == "input": + ort_session_inputs[inp.name] = prepare_cropped_face(cropped_face) + elif inp.name == "weight": + ort_session_inputs[inp.name] = np.array([1], dtype=np.double) + + # собственно, инференс + output = ort_session.run(None, ort_session_inputs)[0][0] + # возвращаем в изображение + restored_face = normalize_cropped_face(output) + + else: + # ────── ВЕТКА ДЛЯ PTH / CODEFORMER ────── + # если CodeFormer + if "codeformer" in face_restore_model.lower(): + output = facerestore_model(cropped_face_t, w=codeformer_weight)[0] + else: + # обычная PTH-модель (GFPGAN и т.п.) + output = facerestore_model(cropped_face_t)[0] + + restored_face = tensor2img(output, rgb2bgr=True, min_max=(-1, 1)) + # ───────────────────────────────────────────────── + + # освобождаем память + del output + torch.cuda.empty_cache() + + except Exception as error: + # на случай краха инференса — возвращаем «сырой» кроп + print(f"\tFailed inference: {error}", file=sys.stderr) + restored_face = tensor2img(cropped_face_t, rgb2bgr=True, min_max=(-1, 1)) + + # далее вы накладываете прозрачность и собираете кадр: + if face_restore_visibility < 1: + restored_face = cropped_face * (1 - face_restore_visibility) + restored_face * face_restore_visibility + + restored_face = restored_face.astype("uint8") + self.face_helper.add_restored_face(restored_face) + + self.face_helper.get_inverse_affine(None) + + restored_img = self.face_helper.paste_faces_to_input_image() + restored_img = restored_img[:, :, ::-1] + + if original_resolution != restored_img.shape[0:2]: + restored_img = cv2.resize(restored_img, (0, 0), fx=original_resolution[1]/restored_img.shape[1], fy=original_resolution[0]/restored_img.shape[0], interpolation=cv2.INTER_AREA) + + self.face_helper.clean_all() + + # out_images[i] = restored_img + out_images.append(restored_img) + + if state.interrupted or model_management.processing_interrupted(): + logger.status("Interrupted by User") + return input_image + + restored_img_np = np.array(out_images).astype(np.float32) / 255.0 + restored_img_tensor = torch.from_numpy(restored_img_np) + + result = restored_img_tensor + + return result + + def execute(self, enabled, input_image, swap_model, detect_gender_source, detect_gender_input, source_faces_index, input_faces_index, console_log_level, face_restore_model,face_restore_visibility, codeformer_weight, facedetection, source_image=None, face_model=None, faces_order=None, face_boost=None): + + if face_boost is not None: + self.face_boost_enabled = face_boost["enabled"] + self.boost_model = face_boost["boost_model"] + self.interpolation = face_boost["interpolation"] + self.boost_model_visibility = face_boost["visibility"] + self.boost_cf_weight = face_boost["codeformer_weight"] + self.restore = face_boost["restore_with_main_after"] + else: + self.face_boost_enabled = False + + if faces_order is None: + faces_order = self.faces_order + + apply_patch(console_log_level) + + if not enabled: + return (input_image,face_model) + elif source_image is None and face_model is None: + logger.error("Please provide 'source_image' or `face_model`") + return (input_image,face_model) + + if face_model == "none": + face_model = None + + script = FaceSwapScript() + pil_images = batch_tensor_to_pil(input_image) + if source_image is not None: + source = tensor_to_pil(source_image) + else: + source = None + p = StableDiffusionProcessingImg2Img(pil_images) + script.process( + p=p, + img=source, + enable=True, + source_faces_index=source_faces_index, + faces_index=input_faces_index, + model=swap_model, + swap_in_source=True, + swap_in_generated=True, + gender_source=detect_gender_source, + gender_target=detect_gender_input, + face_model=face_model, + faces_order=faces_order, + # face boost: + face_boost_enabled=self.face_boost_enabled, + face_restore_model=self.boost_model, + face_restore_visibility=self.boost_model_visibility, + codeformer_weight=self.boost_cf_weight, + interpolation=self.interpolation, + ) + result = batched_pil_to_tensor(p.init_images) + if face_model is None: + current_face_model = get_current_faces_model() + face_model_to_provide = current_face_model[0] if (current_face_model is not None and len(current_face_model) > 0) else face_model + else: + face_model_to_provide = face_model -class ConditioningSetMask: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "mask": ("MASK", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - "set_cond_area": (["default", "mask bounds"],), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning" - - def append(self, conditioning, mask, set_cond_area, strength): - set_area_to_bounds = False - if set_cond_area != "default": - set_area_to_bounds = True - if len(mask.shape) < 3: - mask = mask.unsqueeze(0) - - c = node_helpers.conditioning_set_values(conditioning, {"mask": mask, - "set_area_to_bounds": set_area_to_bounds, - "mask_strength": strength}) - return (c, ) - -class ConditioningZeroOut: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", )}} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "zero_out" - - CATEGORY = "advanced/conditioning" - - def zero_out(self, conditioning): - c = [] - for t in conditioning: - d = t[1].copy() - pooled_output = d.get("pooled_output", None) - if pooled_output is not None: - d["pooled_output"] = torch.zeros_like(pooled_output) - n = [torch.zeros_like(t[0]), d] - c.append(n) - return (c, ) - -class ConditioningSetTimestepRange: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "start": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), - "end": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "set_range" + if self.restore or not self.face_boost_enabled: + result = reactor.restore_face(self,result,face_restore_model,face_restore_visibility,codeformer_weight,facedetection) - CATEGORY = "advanced/conditioning" + return (result,face_model_to_provide) - def set_range(self, conditioning, start, end): - c = node_helpers.conditioning_set_values(conditioning, {"start_percent": start, - "end_percent": end}) - return (c, ) -class VAEDecode: +class ReActorPlusOpt: @classmethod def INPUT_TYPES(s): return { "required": { - "samples": ("LATENT", {"tooltip": "The latent to be decoded."}), - "vae": ("VAE", {"tooltip": "The VAE model used for decoding the latent."}) + "enabled": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), + "input_image": ("IMAGE",), + "swap_model": (list(model_names().keys()),), + "facedetection": (["retinaface_resnet50", "retinaface_mobile0.25", "YOLOv5l", "YOLOv5n"],), + "face_restore_model": (get_model_names(get_restorers),), + "face_restore_visibility": ("FLOAT", {"default": 1, "min": 0.1, "max": 1, "step": 0.05}), + "codeformer_weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1, "step": 0.05}), + }, + "optional": { + "source_image": ("IMAGE",), + "face_model": ("FACE_MODEL",), + "options": ("OPTIONS",), + "face_boost": ("FACE_BOOST",), } } - RETURN_TYPES = ("IMAGE",) - OUTPUT_TOOLTIPS = ("The decoded image.",) - FUNCTION = "decode" - - CATEGORY = "latent" - DESCRIPTION = "Decodes latent images back into pixel space images." - def decode(self, vae, samples): - images = vae.decode(samples["samples"]) - if len(images.shape) == 5: #Combine batches - images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) - return (images, ) + RETURN_TYPES = ("IMAGE","FACE_MODEL") + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" -class VAEDecodeTiled: - @classmethod - def INPUT_TYPES(s): - return {"required": {"samples": ("LATENT", ), "vae": ("VAE", ), - "tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 32}), - "overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), - "temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to decode at a time."}), - "temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), - }} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "decode" - - CATEGORY = "_for_testing" - - def decode(self, vae, samples, tile_size, overlap=64, temporal_size=64, temporal_overlap=8): - if tile_size < overlap * 4: - overlap = tile_size // 4 - if temporal_size < temporal_overlap * 2: - temporal_overlap = temporal_overlap // 2 - temporal_compression = vae.temporal_compression_decode() - if temporal_compression is not None: - temporal_size = max(2, temporal_size // temporal_compression) - temporal_overlap = max(1, min(temporal_size // 2, temporal_overlap // temporal_compression)) - else: - temporal_size = None - temporal_overlap = None - - compression = vae.spacial_compression_decode() - images = vae.decode_tiled(samples["samples"], tile_x=tile_size // compression, tile_y=tile_size // compression, overlap=overlap // compression, tile_t=temporal_size, overlap_t=temporal_overlap) - if len(images.shape) == 5: #Combine batches - images = images.reshape(-1, images.shape[-3], images.shape[-2], images.shape[-1]) - return (images, ) - -class VAEEncode: - @classmethod - def INPUT_TYPES(s): - return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", )}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - - CATEGORY = "latent" - - def encode(self, vae, pixels): - t = vae.encode(pixels[:,:,:,:3]) - return ({"samples":t}, ) - -class VAEEncodeTiled: - @classmethod - def INPUT_TYPES(s): - return {"required": {"pixels": ("IMAGE", ), "vae": ("VAE", ), - "tile_size": ("INT", {"default": 512, "min": 64, "max": 4096, "step": 64}), - "overlap": ("INT", {"default": 64, "min": 0, "max": 4096, "step": 32}), - "temporal_size": ("INT", {"default": 64, "min": 8, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to encode at a time."}), - "temporal_overlap": ("INT", {"default": 8, "min": 4, "max": 4096, "step": 4, "tooltip": "Only used for video VAEs: Amount of frames to overlap."}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - - CATEGORY = "_for_testing" - - def encode(self, vae, pixels, tile_size, overlap, temporal_size=64, temporal_overlap=8): - t = vae.encode_tiled(pixels[:,:,:,:3], tile_x=tile_size, tile_y=tile_size, overlap=overlap, tile_t=temporal_size, overlap_t=temporal_overlap) - return ({"samples": t}, ) - -class VAEEncodeForInpaint: - @classmethod - def INPUT_TYPES(s): - return {"required": { "pixels": ("IMAGE", ), "vae": ("VAE", ), "mask": ("MASK", ), "grow_mask_by": ("INT", {"default": 6, "min": 0, "max": 64, "step": 1}),}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "encode" - - CATEGORY = "latent/inpaint" - - def encode(self, vae, pixels, mask, grow_mask_by=6): - x = (pixels.shape[1] // vae.downscale_ratio) * vae.downscale_ratio - y = (pixels.shape[2] // vae.downscale_ratio) * vae.downscale_ratio - mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") - - pixels = pixels.clone() - if pixels.shape[1] != x or pixels.shape[2] != y: - x_offset = (pixels.shape[1] % vae.downscale_ratio) // 2 - y_offset = (pixels.shape[2] % vae.downscale_ratio) // 2 - pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] - mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] - - #grow mask by a few pixels to keep things seamless in latent space - if grow_mask_by == 0: - mask_erosion = mask - else: - kernel_tensor = torch.ones((1, 1, grow_mask_by, grow_mask_by)) - padding = math.ceil((grow_mask_by - 1) / 2) - - mask_erosion = torch.clamp(torch.nn.functional.conv2d(mask.round(), kernel_tensor, padding=padding), 0, 1) - - m = (1.0 - mask.round()).squeeze(1) - for i in range(3): - pixels[:,:,:,i] -= 0.5 - pixels[:,:,:,i] *= m - pixels[:,:,:,i] += 0.5 - t = vae.encode(pixels) - - return ({"samples":t, "noise_mask": (mask_erosion[:,:,:x,:y].round())}, ) - - -class InpaintModelConditioning: - @classmethod - def INPUT_TYPES(s): - return {"required": {"positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "vae": ("VAE", ), - "pixels": ("IMAGE", ), - "mask": ("MASK", ), - "noise_mask": ("BOOLEAN", {"default": True, "tooltip": "Add a noise mask to the latent so sampling will only happen within the mask. Might improve results or completely break things depending on the model."}), - }} - - RETURN_TYPES = ("CONDITIONING","CONDITIONING","LATENT") - RETURN_NAMES = ("positive", "negative", "latent") - FUNCTION = "encode" - - CATEGORY = "conditioning/inpaint" - - def encode(self, positive, negative, pixels, vae, mask, noise_mask=True): - x = (pixels.shape[1] // 8) * 8 - y = (pixels.shape[2] // 8) * 8 - mask = torch.nn.functional.interpolate(mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])), size=(pixels.shape[1], pixels.shape[2]), mode="bilinear") - - orig_pixels = pixels - pixels = orig_pixels.clone() - if pixels.shape[1] != x or pixels.shape[2] != y: - x_offset = (pixels.shape[1] % 8) // 2 - y_offset = (pixels.shape[2] % 8) // 2 - pixels = pixels[:,x_offset:x + x_offset, y_offset:y + y_offset,:] - mask = mask[:,:,x_offset:x + x_offset, y_offset:y + y_offset] - - m = (1.0 - mask.round()).squeeze(1) - for i in range(3): - pixels[:,:,:,i] -= 0.5 - pixels[:,:,:,i] *= m - pixels[:,:,:,i] += 0.5 - concat_latent = vae.encode(pixels) - orig_latent = vae.encode(orig_pixels) - - out_latent = {} - - out_latent["samples"] = orig_latent - if noise_mask: - out_latent["noise_mask"] = mask - - out = [] - for conditioning in [positive, negative]: - c = node_helpers.conditioning_set_values(conditioning, {"concat_latent_image": concat_latent, - "concat_mask": mask}) - out.append(c) - return (out[0], out[1], out_latent) - - -class SaveLatent: def __init__(self): - self.output_dir = folder_paths.get_output_directory() - - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT", ), - "filename_prefix": ("STRING", {"default": "latents/ComfyUI"})}, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } - RETURN_TYPES = () - FUNCTION = "save" - - OUTPUT_NODE = True - - CATEGORY = "_for_testing" - - def save(self, samples, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir) - - # support save metadata for latent sharing - prompt_info = "" - if prompt is not None: - prompt_info = json.dumps(prompt) - - metadata = None - if not args.disable_metadata: - metadata = {"prompt": prompt_info} - if extra_pnginfo is not None: - for x in extra_pnginfo: - metadata[x] = json.dumps(extra_pnginfo[x]) - - file = f"{filename}_{counter:05}_.latent" - - results: list[FileLocator] = [] - results.append({ - "filename": file, - "subfolder": subfolder, - "type": "output" - }) - - file = os.path.join(full_output_folder, file) - - output = {} - output["latent_tensor"] = samples["samples"].contiguous() - output["latent_format_version_0"] = torch.tensor([]) - - comfy.utils.save_torch_file(output, file, metadata=metadata) - return { "ui": { "latents": results } } - - -class LoadLatent: - @classmethod - def INPUT_TYPES(s): - input_dir = folder_paths.get_input_directory() - files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f)) and f.endswith(".latent")] - return {"required": {"latent": [sorted(files), ]}, } - - CATEGORY = "_for_testing" - - RETURN_TYPES = ("LATENT", ) - FUNCTION = "load" - - def load(self, latent): - latent_path = folder_paths.get_annotated_filepath(latent) - latent = safetensors.torch.load_file(latent_path, device="cpu") - multiplier = 1.0 - if "latent_format_version_0" not in latent: - multiplier = 1.0 / 0.18215 - samples = {"samples": latent["latent_tensor"].float() * multiplier} - return (samples, ) - - @classmethod - def IS_CHANGED(s, latent): - image_path = folder_paths.get_annotated_filepath(latent) - m = hashlib.sha256() - with open(image_path, 'rb') as f: - m.update(f.read()) - return m.digest().hex() - - @classmethod - def VALIDATE_INPUTS(s, latent): - if not folder_paths.exists_annotated_filepath(latent): - return "Invalid latent file: {}".format(latent) - return True - - -class CheckpointLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "config_name": (folder_paths.get_filename_list("configs"), ), - "ckpt_name": (folder_paths.get_filename_list("checkpoints"), )}} - RETURN_TYPES = ("MODEL", "CLIP", "VAE") - FUNCTION = "load_checkpoint" + # self.face_helper = None + self.faces_order = ["large-small", "large-small"] + self.detect_gender_input = "no" + self.detect_gender_source = "no" + self.input_faces_index = "0" + self.source_faces_index = "0" + self.console_log_level = 1 + # self.face_size = 512 + self.face_boost_enabled = False + self.restore = True + self.boost_model = None + self.interpolation = "Bicubic" + self.boost_model_visibility = 1 + self.boost_cf_weight = 0.5 + + def execute(self, enabled, input_image, swap_model, facedetection, face_restore_model, face_restore_visibility, codeformer_weight, source_image=None, face_model=None, options=None, face_boost=None): + + if options is not None: + self.faces_order = [options["input_faces_order"], options["source_faces_order"]] + self.console_log_level = options["console_log_level"] + self.detect_gender_input = options["detect_gender_input"] + self.detect_gender_source = options["detect_gender_source"] + self.input_faces_index = options["input_faces_index"] + self.source_faces_index = options["source_faces_index"] + + if face_boost is not None: + self.face_boost_enabled = face_boost["enabled"] + self.restore = face_boost["restore_with_main_after"] + else: + self.face_boost_enabled = False + + result = reactor.execute( + self,enabled,input_image,swap_model,self.detect_gender_source,self.detect_gender_input,self.source_faces_index,self.input_faces_index,self.console_log_level,face_restore_model,face_restore_visibility,codeformer_weight,facedetection,source_image,face_model,self.faces_order, face_boost=face_boost + ) - CATEGORY = "advanced/loaders" - DEPRECATED = True + return result - def load_checkpoint(self, config_name, ckpt_name): - config_path = folder_paths.get_full_path("configs", config_name) - ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) - return comfy.sd.load_checkpoint(config_path, ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) -class CheckpointLoaderSimple: +class LoadFaceModel: @classmethod def INPUT_TYPES(s): return { "required": { - "ckpt_name": (folder_paths.get_filename_list("checkpoints"), {"tooltip": "The name of the checkpoint (model) to load."}), + "face_model": (get_model_names(get_facemodels),), } } - RETURN_TYPES = ("MODEL", "CLIP", "VAE") - OUTPUT_TOOLTIPS = ("The model used for denoising latents.", - "The CLIP model used for encoding text prompts.", - "The VAE model used for encoding and decoding images to and from latent space.") - FUNCTION = "load_checkpoint" - - CATEGORY = "loaders" - DESCRIPTION = "Loads a diffusion model checkpoint, diffusion models are used to denoise latents." - - def load_checkpoint(self, ckpt_name): - ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) - out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) - return out[:3] - -class DiffusersLoader: - @classmethod - def INPUT_TYPES(cls): - paths = [] - for search_path in folder_paths.get_folder_paths("diffusers"): - if os.path.exists(search_path): - for root, subdir, files in os.walk(search_path, followlinks=True): - if "model_index.json" in files: - paths.append(os.path.relpath(root, start=search_path)) - - return {"required": {"model_path": (paths,), }} - RETURN_TYPES = ("MODEL", "CLIP", "VAE") - FUNCTION = "load_checkpoint" - - CATEGORY = "advanced/loaders/deprecated" - - def load_checkpoint(self, model_path, output_vae=True, output_clip=True): - for search_path in folder_paths.get_folder_paths("diffusers"): - if os.path.exists(search_path): - path = os.path.join(search_path, model_path) - if os.path.exists(path): - model_path = path - break - - return comfy.diffusers_load.load_diffusers(model_path, output_vae=output_vae, output_clip=output_clip, embedding_directory=folder_paths.get_folder_paths("embeddings")) - - -class unCLIPCheckpointLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "ckpt_name": (folder_paths.get_filename_list("checkpoints"), ), - }} - RETURN_TYPES = ("MODEL", "CLIP", "VAE", "CLIP_VISION") - FUNCTION = "load_checkpoint" - - CATEGORY = "loaders" - - def load_checkpoint(self, ckpt_name, output_vae=True, output_clip=True): - ckpt_path = folder_paths.get_full_path_or_raise("checkpoints", ckpt_name) - out = comfy.sd.load_checkpoint_guess_config(ckpt_path, output_vae=True, output_clip=True, output_clipvision=True, embedding_directory=folder_paths.get_folder_paths("embeddings")) - return out - -class CLIPSetLastLayer: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip": ("CLIP", ), - "stop_at_clip_layer": ("INT", {"default": -1, "min": -24, "max": -1, "step": 1}), - }} - RETURN_TYPES = ("CLIP",) - FUNCTION = "set_last_layer" - - CATEGORY = "conditioning" - - def set_last_layer(self, clip, stop_at_clip_layer): - clip = clip.clone() - clip.clip_layer(stop_at_clip_layer) - return (clip,) + + RETURN_TYPES = ("FACE_MODEL",) + FUNCTION = "load_model" + CATEGORY = "🌌 ReActor" + + def load_model(self, face_model): + self.face_model = face_model + self.face_models_path = FACE_MODELS_PATH + if self.face_model != "none": + face_model_path = os.path.join(self.face_models_path, self.face_model) + out = load_face_model(face_model_path) + else: + out = None + return (out, ) -class LoraLoader: - def __init__(self): - self.loaded_lora = None +class ReActorWeight: @classmethod def INPUT_TYPES(s): return { "required": { - "model": ("MODEL", {"tooltip": "The diffusion model the LoRA will be applied to."}), - "clip": ("CLIP", {"tooltip": "The CLIP model the LoRA will be applied to."}), - "lora_name": (folder_paths.get_filename_list("loras"), {"tooltip": "The name of the LoRA."}), - "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the diffusion model. This value can be negative."}), - "strength_clip": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01, "tooltip": "How strongly to modify the CLIP model. This value can be negative."}), + "input_image": ("IMAGE",), + "faceswap_weight": (["0%", "12.5%", "25%", "37.5%", "50%", "62.5%", "75%", "87.5%", "100%"], {"default": "50%"}), + }, + "optional": { + "source_image": ("IMAGE",), + "face_model": ("FACE_MODEL",), } } + + RETURN_TYPES = ("IMAGE","FACE_MODEL") + RETURN_NAMES = ("INPUT_IMAGE","FACE_MODEL") + FUNCTION = "set_weight" - RETURN_TYPES = ("MODEL", "CLIP") - OUTPUT_TOOLTIPS = ("The modified diffusion model.", "The modified CLIP model.") - FUNCTION = "load_lora" - - CATEGORY = "loaders" - DESCRIPTION = "LoRAs are used to modify diffusion and CLIP models, altering the way in which latents are denoised such as applying styles. Multiple LoRA nodes can be linked together." - - def load_lora(self, model, clip, lora_name, strength_model, strength_clip): - if strength_model == 0 and strength_clip == 0: - return (model, clip) - - lora_path = folder_paths.get_full_path_or_raise("loras", lora_name) - lora = None - if self.loaded_lora is not None: - if self.loaded_lora[0] == lora_path: - lora = self.loaded_lora[1] - else: - self.loaded_lora = None + OUTPUT_NODE = True - if lora is None: - lora = comfy.utils.load_torch_file(lora_path, safe_load=True) - self.loaded_lora = (lora_path, lora) + CATEGORY = "🌌 ReActor" - model_lora, clip_lora = comfy.sd.load_lora_for_models(model, clip, lora, strength_model, strength_clip) - return (model_lora, clip_lora) + def set_weight(self, input_image, faceswap_weight, face_model=None, source_image=None): -class LoraLoaderModelOnly(LoraLoader): - @classmethod - def INPUT_TYPES(s): - return {"required": { "model": ("MODEL",), - "lora_name": (folder_paths.get_filename_list("loras"), ), - "strength_model": ("FLOAT", {"default": 1.0, "min": -100.0, "max": 100.0, "step": 0.01}), - }} - RETURN_TYPES = ("MODEL",) - FUNCTION = "load_lora_model_only" - - def load_lora_model_only(self, model, lora_name, strength_model): - return (self.load_lora(model, None, lora_name, strength_model, 0)[0],) - -class VAELoader: - @staticmethod - def vae_list(): - vaes = folder_paths.get_filename_list("vae") - approx_vaes = folder_paths.get_filename_list("vae_approx") - sdxl_taesd_enc = False - sdxl_taesd_dec = False - sd1_taesd_enc = False - sd1_taesd_dec = False - sd3_taesd_enc = False - sd3_taesd_dec = False - f1_taesd_enc = False - f1_taesd_dec = False - - for v in approx_vaes: - if v.startswith("taesd_decoder."): - sd1_taesd_dec = True - elif v.startswith("taesd_encoder."): - sd1_taesd_enc = True - elif v.startswith("taesdxl_decoder."): - sdxl_taesd_dec = True - elif v.startswith("taesdxl_encoder."): - sdxl_taesd_enc = True - elif v.startswith("taesd3_decoder."): - sd3_taesd_dec = True - elif v.startswith("taesd3_encoder."): - sd3_taesd_enc = True - elif v.startswith("taef1_encoder."): - f1_taesd_dec = True - elif v.startswith("taef1_decoder."): - f1_taesd_enc = True - if sd1_taesd_dec and sd1_taesd_enc: - vaes.append("taesd") - if sdxl_taesd_dec and sdxl_taesd_enc: - vaes.append("taesdxl") - if sd3_taesd_dec and sd3_taesd_enc: - vaes.append("taesd3") - if f1_taesd_dec and f1_taesd_enc: - vaes.append("taef1") - return vaes - - @staticmethod - def load_taesd(name): - sd = {} - approx_vaes = folder_paths.get_filename_list("vae_approx") - - encoder = next(filter(lambda a: a.startswith("{}_encoder.".format(name)), approx_vaes)) - decoder = next(filter(lambda a: a.startswith("{}_decoder.".format(name)), approx_vaes)) - - enc = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", encoder)) - for k in enc: - sd["taesd_encoder.{}".format(k)] = enc[k] - - dec = comfy.utils.load_torch_file(folder_paths.get_full_path_or_raise("vae_approx", decoder)) - for k in dec: - sd["taesd_decoder.{}".format(k)] = dec[k] - - if name == "taesd": - sd["vae_scale"] = torch.tensor(0.18215) - sd["vae_shift"] = torch.tensor(0.0) - elif name == "taesdxl": - sd["vae_scale"] = torch.tensor(0.13025) - sd["vae_shift"] = torch.tensor(0.0) - elif name == "taesd3": - sd["vae_scale"] = torch.tensor(1.5305) - sd["vae_shift"] = torch.tensor(0.0609) - elif name == "taef1": - sd["vae_scale"] = torch.tensor(0.3611) - sd["vae_shift"] = torch.tensor(0.1159) - return sd + if input_image is None: + logger.error("Please provide `input_image`") + return (input_image,None) + + if source_image is None and face_model is None: + logger.error("Please provide `source_image` or `face_model`") + return (input_image,None) - @classmethod - def INPUT_TYPES(s): - return {"required": { "vae_name": (s.vae_list(), )}} - RETURN_TYPES = ("VAE",) - FUNCTION = "load_vae" + weight = float(faceswap_weight.split("%")[0]) - CATEGORY = "loaders" + images = [] + faces = [] if face_model is None else [face_model] + embeddings = [] if face_model is None else [face_model.embedding] - #TODO: scale factor? - def load_vae(self, vae_name): - if vae_name in ["taesd", "taesdxl", "taesd3", "taef1"]: - sd = self.load_taesd(vae_name) + if weight == 0: + images = [input_image] + faces = [] + embeddings = [] + elif weight == 100: + if face_model is None: + images = [source_image] else: - vae_path = folder_paths.get_full_path_or_raise("vae", vae_name) - sd = comfy.utils.load_torch_file(vae_path) - vae = comfy.sd.VAE(sd=sd) - vae.throw_exception_if_invalid() - return (vae,) - -class ControlNetLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} - - RETURN_TYPES = ("CONTROL_NET",) - FUNCTION = "load_controlnet" - - CATEGORY = "loaders" - - def load_controlnet(self, control_net_name): - controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) - controlnet = comfy.controlnet.load_controlnet(controlnet_path) - if controlnet is None: - raise RuntimeError("ERROR: controlnet file is invalid and does not contain a valid controlnet model.") - return (controlnet,) - -class DiffControlNetLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "model": ("MODEL",), - "control_net_name": (folder_paths.get_filename_list("controlnet"), )}} - - RETURN_TYPES = ("CONTROL_NET",) - FUNCTION = "load_controlnet" - - CATEGORY = "loaders" - - def load_controlnet(self, model, control_net_name): - controlnet_path = folder_paths.get_full_path_or_raise("controlnet", control_net_name) - controlnet = comfy.controlnet.load_controlnet(controlnet_path, model) - return (controlnet,) - - -class ControlNetApply: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "control_net": ("CONTROL_NET", ), - "image": ("IMAGE", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}) - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "apply_controlnet" - - DEPRECATED = True - CATEGORY = "conditioning/controlnet" - - def apply_controlnet(self, conditioning, control_net, image, strength): - if strength == 0: - return (conditioning, ) - - c = [] - control_hint = image.movedim(-1,1) - for t in conditioning: - n = [t[0], t[1].copy()] - c_net = control_net.copy().set_cond_hint(control_hint, strength) - if 'control' in t[1]: - c_net.set_previous_controlnet(t[1]['control']) - n[1]['control'] = c_net - n[1]['control_apply_to_uncond'] = True - c.append(n) - return (c, ) - - -class ControlNetApplyAdvanced: - @classmethod - def INPUT_TYPES(s): - return {"required": {"positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "control_net": ("CONTROL_NET", ), - "image": ("IMAGE", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.01}), - "start_percent": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.001}), - "end_percent": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.001}) - }, - "optional": {"vae": ("VAE", ), - } - } - - RETURN_TYPES = ("CONDITIONING","CONDITIONING") - RETURN_NAMES = ("positive", "negative") - FUNCTION = "apply_controlnet" - - CATEGORY = "conditioning/controlnet" - - def apply_controlnet(self, positive, negative, control_net, image, strength, start_percent, end_percent, vae=None, extra_concat=[]): - if strength == 0: - return (positive, negative) - - control_hint = image.movedim(-1,1) - cnets = {} - - out = [] - for conditioning in [positive, negative]: - c = [] - for t in conditioning: - d = t[1].copy() - - prev_cnet = d.get('control', None) - if prev_cnet in cnets: - c_net = cnets[prev_cnet] - else: - c_net = control_net.copy().set_cond_hint(control_hint, strength, (start_percent, end_percent), vae=vae, extra_concat=extra_concat) - c_net.set_previous_controlnet(prev_cnet) - cnets[prev_cnet] = c_net - - d['control'] = c_net - d['control_apply_to_uncond'] = False - n = [t[0], d] - c.append(n) - out.append(c) - return (out[0], out[1]) - - -class UNETLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "unet_name": (folder_paths.get_filename_list("diffusion_models"), ), - "weight_dtype": (["default", "fp8_e4m3fn", "fp8_e4m3fn_fast", "fp8_e5m2"],) - }} - RETURN_TYPES = ("MODEL",) - FUNCTION = "load_unet" - - CATEGORY = "advanced/loaders" - - def load_unet(self, unet_name, weight_dtype): - model_options = {} - if weight_dtype == "fp8_e4m3fn": - model_options["dtype"] = torch.float8_e4m3fn - elif weight_dtype == "fp8_e4m3fn_fast": - model_options["dtype"] = torch.float8_e4m3fn - model_options["fp8_optimizations"] = True - elif weight_dtype == "fp8_e5m2": - model_options["dtype"] = torch.float8_e5m2 - - unet_path = folder_paths.get_full_path_or_raise("diffusion_models", unet_name) - model = comfy.sd.load_diffusion_model(unet_path, model_options=model_options) - return (model,) - -class CLIPLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_name": (folder_paths.get_filename_list("text_encoders"), ), - "type": (["stable_diffusion", "stable_cascade", "sd3", "stable_audio", "mochi", "ltxv", "pixart", "cosmos", "lumina2", "wan", "hidream"], ), - }, - "optional": { - "device": (["default", "cpu"], {"advanced": True}), - }} - RETURN_TYPES = ("CLIP",) - FUNCTION = "load_clip" - - CATEGORY = "advanced/loaders" - - DESCRIPTION = "[Recipes]\n\nstable_diffusion: clip-l\nstable_cascade: clip-g\nsd3: t5 xxl/ clip-g / clip-l\nstable_audio: t5 base\nmochi: t5 xxl\ncosmos: old t5 xxl\nlumina2: gemma 2 2B\nwan: umt5 xxl\n hidream: llama-3.1 (Recommend) or t5" - - def load_clip(self, clip_name, type="stable_diffusion", device="default"): - clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION) - - model_options = {} - if device == "cpu": - model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") - - clip_path = folder_paths.get_full_path_or_raise("text_encoders", clip_name) - clip = comfy.sd.load_clip(ckpt_paths=[clip_path], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) - return (clip,) - -class DualCLIPLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_name1": (folder_paths.get_filename_list("text_encoders"), ), - "clip_name2": (folder_paths.get_filename_list("text_encoders"), ), - "type": (["sdxl", "sd3", "flux", "hunyuan_video", "hidream"], ), - }, - "optional": { - "device": (["default", "cpu"], {"advanced": True}), - }} - RETURN_TYPES = ("CLIP",) - FUNCTION = "load_clip" - - CATEGORY = "advanced/loaders" - - DESCRIPTION = "[Recipes]\n\nsdxl: clip-l, clip-g\nsd3: clip-l, clip-g / clip-l, t5 / clip-g, t5\nflux: clip-l, t5\nhidream: at least one of t5 or llama, recommended t5 and llama" - - def load_clip(self, clip_name1, clip_name2, type, device="default"): - clip_type = getattr(comfy.sd.CLIPType, type.upper(), comfy.sd.CLIPType.STABLE_DIFFUSION) - - clip_path1 = folder_paths.get_full_path_or_raise("text_encoders", clip_name1) - clip_path2 = folder_paths.get_full_path_or_raise("text_encoders", clip_name2) - - model_options = {} - if device == "cpu": - model_options["load_device"] = model_options["offload_device"] = torch.device("cpu") - - clip = comfy.sd.load_clip(ckpt_paths=[clip_path1, clip_path2], embedding_directory=folder_paths.get_folder_paths("embeddings"), clip_type=clip_type, model_options=model_options) - return (clip,) - -class CLIPVisionLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_name": (folder_paths.get_filename_list("clip_vision"), ), - }} - RETURN_TYPES = ("CLIP_VISION",) - FUNCTION = "load_clip" - - CATEGORY = "loaders" - - def load_clip(self, clip_name): - clip_path = folder_paths.get_full_path_or_raise("clip_vision", clip_name) - clip_vision = comfy.clip_vision.load(clip_path) - if clip_vision is None: - raise RuntimeError("ERROR: clip vision file is invalid and does not contain a valid vision model.") - return (clip_vision,) - -class CLIPVisionEncode: - @classmethod - def INPUT_TYPES(s): - return {"required": { "clip_vision": ("CLIP_VISION",), - "image": ("IMAGE",), - "crop": (["center", "none"],) - }} - RETURN_TYPES = ("CLIP_VISION_OUTPUT",) - FUNCTION = "encode" - - CATEGORY = "conditioning" - - def encode(self, clip_vision, image, crop): - crop_image = True - if crop != "center": - crop_image = False - output = clip_vision.encode_image(image, crop=crop_image) - return (output,) - -class StyleModelLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "style_model_name": (folder_paths.get_filename_list("style_models"), )}} - - RETURN_TYPES = ("STYLE_MODEL",) - FUNCTION = "load_style_model" - - CATEGORY = "loaders" - - def load_style_model(self, style_model_name): - style_model_path = folder_paths.get_full_path_or_raise("style_models", style_model_name) - style_model = comfy.sd.load_style_model(style_model_path) - return (style_model,) - - -class StyleModelApply: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "style_model": ("STYLE_MODEL", ), - "clip_vision_output": ("CLIP_VISION_OUTPUT", ), - "strength": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 10.0, "step": 0.001}), - "strength_type": (["multiply", "attn_bias"], ), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "apply_stylemodel" - - CATEGORY = "conditioning/style_model" - - def apply_stylemodel(self, conditioning, style_model, clip_vision_output, strength, strength_type): - cond = style_model.get_cond(clip_vision_output).flatten(start_dim=0, end_dim=1).unsqueeze(dim=0) - if strength_type == "multiply": - cond *= strength - - n = cond.shape[1] - c_out = [] - for t in conditioning: - (txt, keys) = t - keys = keys.copy() - # even if the strength is 1.0 (i.e, no change), if there's already a mask, we have to add to it - if "attention_mask" in keys or (strength_type == "attn_bias" and strength != 1.0): - # math.log raises an error if the argument is zero - # torch.log returns -inf, which is what we want - attn_bias = torch.log(torch.Tensor([strength if strength_type == "attn_bias" else 1.0])) - # get the size of the mask image - mask_ref_size = keys.get("attention_mask_img_shape", (1, 1)) - n_ref = mask_ref_size[0] * mask_ref_size[1] - n_txt = txt.shape[1] - # grab the existing mask - mask = keys.get("attention_mask", None) - # create a default mask if it doesn't exist - if mask is None: - mask = torch.zeros((txt.shape[0], n_txt + n_ref, n_txt + n_ref), dtype=torch.float16) - # convert the mask dtype, because it might be boolean - # we want it to be interpreted as a bias - if mask.dtype == torch.bool: - # log(True) = log(1) = 0 - # log(False) = log(0) = -inf - mask = torch.log(mask.to(dtype=torch.float16)) - # now we make the mask bigger to add space for our new tokens - new_mask = torch.zeros((txt.shape[0], n_txt + n + n_ref, n_txt + n + n_ref), dtype=torch.float16) - # copy over the old mask, in quandrants - new_mask[:, :n_txt, :n_txt] = mask[:, :n_txt, :n_txt] - new_mask[:, :n_txt, n_txt+n:] = mask[:, :n_txt, n_txt:] - new_mask[:, n_txt+n:, :n_txt] = mask[:, n_txt:, :n_txt] - new_mask[:, n_txt+n:, n_txt+n:] = mask[:, n_txt:, n_txt:] - # now fill in the attention bias to our redux tokens - new_mask[:, :n_txt, n_txt:n_txt+n] = attn_bias - new_mask[:, n_txt+n:, n_txt:n_txt+n] = attn_bias - keys["attention_mask"] = new_mask.to(txt.device) - keys["attention_mask_img_shape"] = mask_ref_size - - c_out.append([torch.cat((txt, cond), dim=1), keys]) - - return (c_out,) - -class unCLIPConditioning: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning": ("CONDITIONING", ), - "clip_vision_output": ("CLIP_VISION_OUTPUT", ), - "strength": ("FLOAT", {"default": 1.0, "min": -10.0, "max": 10.0, "step": 0.01}), - "noise_augmentation": ("FLOAT", {"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "apply_adm" - - CATEGORY = "conditioning" - - def apply_adm(self, conditioning, clip_vision_output, strength, noise_augmentation): - if strength == 0: - return (conditioning, ) - - c = [] - for t in conditioning: - o = t[1].copy() - x = {"clip_vision_output": clip_vision_output, "strength": strength, "noise_augmentation": noise_augmentation} - if "unclip_conditioning" in o: - o["unclip_conditioning"] = o["unclip_conditioning"][:] + [x] + if weight > 50: + images = [input_image] + count = round(100/(100-weight)) else: - o["unclip_conditioning"] = [x] - n = [t[0], o] - c.append(n) - return (c, ) - -class GLIGENLoader: - @classmethod - def INPUT_TYPES(s): - return {"required": { "gligen_name": (folder_paths.get_filename_list("gligen"), )}} - - RETURN_TYPES = ("GLIGEN",) - FUNCTION = "load_gligen" - - CATEGORY = "loaders" - - def load_gligen(self, gligen_name): - gligen_path = folder_paths.get_full_path_or_raise("gligen", gligen_name) - gligen = comfy.sd.load_gligen(gligen_path) - return (gligen,) - -class GLIGENTextBoxApply: - @classmethod - def INPUT_TYPES(s): - return {"required": {"conditioning_to": ("CONDITIONING", ), - "clip": ("CLIP", ), - "gligen_textbox_model": ("GLIGEN", ), - "text": ("STRING", {"multiline": True, "dynamicPrompts": True}), - "width": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 64, "min": 8, "max": MAX_RESOLUTION, "step": 8}), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - }} - RETURN_TYPES = ("CONDITIONING",) - FUNCTION = "append" - - CATEGORY = "conditioning/gligen" - - def append(self, conditioning_to, clip, gligen_textbox_model, text, width, height, x, y): - c = [] - cond, cond_pooled = clip.encode_from_tokens(clip.tokenize(text), return_pooled="unprojected") - for t in conditioning_to: - n = [t[0], t[1].copy()] - position_params = [(cond_pooled, height // 8, width // 8, y // 8, x // 8)] - prev = [] - if "gligen" in n[1]: - prev = n[1]['gligen'][2] - - n[1]['gligen'] = ("position", gligen_textbox_model, prev + position_params) - c.append(n) - return (c, ) - -class EmptyLatentImage: + if face_model is None: + images = [source_image] + count = round(100/(weight)) + for i in range(count-1): + if weight > 50: + if face_model is None: + images.append(source_image) + else: + faces.append(face_model) + embeddings.append(face_model.embedding) + else: + images.append(input_image) + + images_list: List[Image.Image] = [] + + apply_patch(1) + + if len(images) > 0: + + for image in images: + img = tensor_to_pil(image) + images_list.append(img) + + for image in images_list: + face = BuildFaceModel.build_face_model(self,image) + if isinstance(face, str): + continue + faces.append(face) + embeddings.append(face.embedding) + + if len(faces) > 0: + blended_embedding = np.mean(embeddings, axis=0) + blended_face = Face( + bbox=faces[0].bbox, + kps=faces[0].kps, + det_score=faces[0].det_score, + landmark_3d_68=faces[0].landmark_3d_68, + pose=faces[0].pose, + landmark_2d_106=faces[0].landmark_2d_106, + embedding=blended_embedding, + gender=faces[0].gender, + age=faces[0].age + ) + if blended_face is None: + no_face_msg = "Something went wrong, please try another set of images" + logger.error(no_face_msg) + + return (input_image,blended_face) + + +class BuildFaceModel: def __init__(self): - self.device = comfy.model_management.intermediate_device() - + self.output_dir = FACE_MODELS_PATH + @classmethod def INPUT_TYPES(s): return { "required": { - "width": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The width of the latent images in pixels."}), - "height": ("INT", {"default": 512, "min": 16, "max": MAX_RESOLUTION, "step": 8, "tooltip": "The height of the latent images in pixels."}), - "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096, "tooltip": "The number of latent images in the batch."}) + "save_mode": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), + "send_only": ("BOOLEAN", {"default": False, "label_off": "NO", "label_on": "YES"}), + "face_model_name": ("STRING", {"default": "default"}), + "compute_method": (["Mean", "Median", "Mode"], {"default": "Mean"}), + }, + "optional": { + "images": ("IMAGE",), + "face_models": ("FACE_MODEL",), } } - RETURN_TYPES = ("LATENT",) - OUTPUT_TOOLTIPS = ("The empty latent image batch.",) - FUNCTION = "generate" - CATEGORY = "latent" - DESCRIPTION = "Create a new batch of empty latent images to be denoised via sampling." + RETURN_TYPES = ("FACE_MODEL",) + FUNCTION = "blend_faces" - def generate(self, width, height, batch_size=1): - latent = torch.zeros([batch_size, 4, height // 8, width // 8], device=self.device) - return ({"samples":latent}, ) - - -class LatentFromBatch: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "batch_index": ("INT", {"default": 0, "min": 0, "max": 63}), - "length": ("INT", {"default": 1, "min": 1, "max": 64}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "frombatch" - - CATEGORY = "latent/batch" - - def frombatch(self, samples, batch_index, length): - s = samples.copy() - s_in = samples["samples"] - batch_index = min(s_in.shape[0] - 1, batch_index) - length = min(s_in.shape[0] - batch_index, length) - s["samples"] = s_in[batch_index:batch_index + length].clone() - if "noise_mask" in samples: - masks = samples["noise_mask"] - if masks.shape[0] == 1: - s["noise_mask"] = masks.clone() - else: - if masks.shape[0] < s_in.shape[0]: - masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] - s["noise_mask"] = masks[batch_index:batch_index + length].clone() - if "batch_index" not in s: - s["batch_index"] = [x for x in range(batch_index, batch_index+length)] - else: - s["batch_index"] = samples["batch_index"][batch_index:batch_index + length] - return (s,) - -class RepeatLatentBatch: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "amount": ("INT", {"default": 1, "min": 1, "max": 64}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "repeat" - - CATEGORY = "latent/batch" - - def repeat(self, samples, amount): - s = samples.copy() - s_in = samples["samples"] - - s["samples"] = s_in.repeat((amount, 1,1,1)) - if "noise_mask" in samples and samples["noise_mask"].shape[0] > 1: - masks = samples["noise_mask"] - if masks.shape[0] < s_in.shape[0]: - masks = masks.repeat(math.ceil(s_in.shape[0] / masks.shape[0]), 1, 1, 1)[:s_in.shape[0]] - s["noise_mask"] = samples["noise_mask"].repeat((amount, 1,1,1)) - if "batch_index" in s: - offset = max(s["batch_index"]) - min(s["batch_index"]) + 1 - s["batch_index"] = s["batch_index"] + [x + (i * offset) for i in range(1, amount) for x in s["batch_index"]] - return (s,) - -class LatentUpscale: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] - crop_methods = ["disabled", "center"] + OUTPUT_NODE = True - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), - "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "crop": (s.crop_methods,)}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "upscale" - - CATEGORY = "latent" - - def upscale(self, samples, upscale_method, width, height, crop): - if width == 0 and height == 0: - s = samples + CATEGORY = "🌌 ReActor" + + def build_face_model(self, image: Image.Image, det_size=(640, 640)): + logging.StreamHandler.terminator = "\n" + if image is None: + error_msg = "Please load an Image" + logger.error(error_msg) + return error_msg + image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) + face_model = analyze_faces(image, det_size) + + if len(face_model) == 0: + print("") + det_size_half = half_det_size(det_size) + face_model = analyze_faces(image, det_size_half) + if face_model is not None and len(face_model) > 0: + print("...........................................................", end=" ") + + if face_model is not None and len(face_model) > 0: + return face_model[0] else: - s = samples.copy() - - if width == 0: - height = max(64, height) - width = max(64, round(samples["samples"].shape[-1] * height / samples["samples"].shape[-2])) - elif height == 0: - width = max(64, width) - height = max(64, round(samples["samples"].shape[-2] * width / samples["samples"].shape[-1])) - else: - width = max(64, width) - height = max(64, height) - - s["samples"] = comfy.utils.common_upscale(samples["samples"], width // 8, height // 8, upscale_method, crop) - return (s,) + no_face_msg = "No face found, please try another image" + # logger.error(no_face_msg) + return no_face_msg + + def blend_faces(self, save_mode, send_only, face_model_name, compute_method, images=None, face_models=None): + global BLENDED_FACE_MODEL + blended_face: Face = BLENDED_FACE_MODEL -class LatentUpscaleBy: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "bislerp"] + if send_only and blended_face is None: + send_only = False - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), "upscale_method": (s.upscale_methods,), - "scale_by": ("FLOAT", {"default": 1.5, "min": 0.01, "max": 8.0, "step": 0.01}),}} - RETURN_TYPES = ("LATENT",) - FUNCTION = "upscale" + if (images is not None or face_models is not None) and not send_only: - CATEGORY = "latent" + faces = [] + embeddings = [] - def upscale(self, samples, upscale_method, scale_by): - s = samples.copy() - width = round(samples["samples"].shape[-1] * scale_by) - height = round(samples["samples"].shape[-2] * scale_by) - s["samples"] = comfy.utils.common_upscale(samples["samples"], width, height, upscale_method, "disabled") - return (s,) + apply_patch(1) -class LatentRotate: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "rotation": (["none", "90 degrees", "180 degrees", "270 degrees"],), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "rotate" - - CATEGORY = "latent/transform" - - def rotate(self, samples, rotation): - s = samples.copy() - rotate_by = 0 - if rotation.startswith("90"): - rotate_by = 1 - elif rotation.startswith("180"): - rotate_by = 2 - elif rotation.startswith("270"): - rotate_by = 3 - - s["samples"] = torch.rot90(samples["samples"], k=rotate_by, dims=[3, 2]) - return (s,) - -class LatentFlip: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "flip_method": (["x-axis: vertically", "y-axis: horizontally"],), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "flip" + if images is not None: + images_list: List[Image.Image] = batch_tensor_to_pil(images) - CATEGORY = "latent/transform" + n = len(images_list) - def flip(self, samples, flip_method): - s = samples.copy() - if flip_method.startswith("x"): - s["samples"] = torch.flip(samples["samples"], dims=[2]) - elif flip_method.startswith("y"): - s["samples"] = torch.flip(samples["samples"], dims=[3]) + for i,image in enumerate(images_list): + logging.StreamHandler.terminator = " " + logger.status(f"Building Face Model {i+1} of {n}...") + face = self.build_face_model(image) + if isinstance(face, str): + logger.error(f"No faces found in image {i+1}, skipping") + continue + else: + print(f"{int(((i+1)/n)*100)}%") + faces.append(face) + embeddings.append(face.embedding) + + elif face_models is not None: + + n = len(face_models) + + for i,face_model in enumerate(face_models): + logging.StreamHandler.terminator = " " + logger.status(f"Extracting Face Model {i+1} of {n}...") + face = face_model + if isinstance(face, str): + logger.error(f"No faces found for face_model {i+1}, skipping") + continue + else: + print(f"{int(((i+1)/n)*100)}%") + faces.append(face) + embeddings.append(face.embedding) + + logging.StreamHandler.terminator = "\n" + if len(faces) > 0: + # compute_method_name = "Mean" if compute_method == 0 else "Median" if compute_method == 1 else "Mode" + logger.status(f"Blending with Compute Method '{compute_method}'...") + blended_embedding = np.mean(embeddings, axis=0) if compute_method == "Mean" else np.median(embeddings, axis=0) if compute_method == "Median" else stats.mode(embeddings, axis=0)[0].astype(np.float32) + blended_face = Face( + bbox=faces[0].bbox, + kps=faces[0].kps, + det_score=faces[0].det_score, + landmark_3d_68=faces[0].landmark_3d_68, + pose=faces[0].pose, + landmark_2d_106=faces[0].landmark_2d_106, + embedding=blended_embedding, + gender=faces[0].gender, + age=faces[0].age + ) + if blended_face is not None: + BLENDED_FACE_MODEL = blended_face + if save_mode: + face_model_path = os.path.join(FACE_MODELS_PATH, face_model_name + ".safetensors") + save_face_model(blended_face,face_model_path) + # done_msg = f"Face model has been saved to '{face_model_path}'" + # logger.status(done_msg) + logger.status("--Done!--") + # return (blended_face,) + else: + no_face_msg = "Something went wrong, please try another set of images" + logger.error(no_face_msg) + # return (blended_face,) + # logger.status("--Done!--") + if images is None and face_models is None: + logger.error("Please provide `images` or `face_models`") + return (blended_face,) - return (s,) -class LatentComposite: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples_to": ("LATENT",), - "samples_from": ("LATENT",), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "feather": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "composite" - - CATEGORY = "latent" - - def composite(self, samples_to, samples_from, x, y, composite_method="normal", feather=0): - x = x // 8 - y = y // 8 - feather = feather // 8 - samples_out = samples_to.copy() - s = samples_to["samples"].clone() - samples_to = samples_to["samples"] - samples_from = samples_from["samples"] - if feather == 0: - s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] - else: - samples_from = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] - mask = torch.ones_like(samples_from) - for t in range(feather): - if y != 0: - mask[:,:,t:1+t,:] *= ((1.0/feather) * (t + 1)) - - if y + samples_from.shape[2] < samples_to.shape[2]: - mask[:,:,mask.shape[2] -1 -t: mask.shape[2]-t,:] *= ((1.0/feather) * (t + 1)) - if x != 0: - mask[:,:,:,t:1+t] *= ((1.0/feather) * (t + 1)) - if x + samples_from.shape[3] < samples_to.shape[3]: - mask[:,:,:,mask.shape[3]- 1 - t: mask.shape[3]- t] *= ((1.0/feather) * (t + 1)) - rev_mask = torch.ones_like(mask) - mask - s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] = samples_from[:,:,:samples_to.shape[2] - y, :samples_to.shape[3] - x] * mask + s[:,:,y:y+samples_from.shape[2],x:x+samples_from.shape[3]] * rev_mask - samples_out["samples"] = s - return (samples_out,) - -class LatentBlend: - @classmethod - def INPUT_TYPES(s): - return {"required": { - "samples1": ("LATENT",), - "samples2": ("LATENT",), - "blend_factor": ("FLOAT", { - "default": 0.5, - "min": 0, - "max": 1, - "step": 0.01 - }), - }} - - RETURN_TYPES = ("LATENT",) - FUNCTION = "blend" - - CATEGORY = "_for_testing" - - def blend(self, samples1, samples2, blend_factor:float, blend_mode: str="normal"): - - samples_out = samples1.copy() - samples1 = samples1["samples"] - samples2 = samples2["samples"] - - if samples1.shape != samples2.shape: - samples2.permute(0, 3, 1, 2) - samples2 = comfy.utils.common_upscale(samples2, samples1.shape[3], samples1.shape[2], 'bicubic', crop='center') - samples2.permute(0, 2, 3, 1) - - samples_blended = self.blend_mode(samples1, samples2, blend_mode) - samples_blended = samples1 * blend_factor + samples_blended * (1 - blend_factor) - samples_out["samples"] = samples_blended - return (samples_out,) - - def blend_mode(self, img1, img2, mode): - if mode == "normal": - return img2 - else: - raise ValueError(f"Unsupported blend mode: {mode}") +class SaveFaceModel: + def __init__(self): + self.output_dir = FACE_MODELS_PATH -class LatentCrop: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "width": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "height": ("INT", {"default": 512, "min": 64, "max": MAX_RESOLUTION, "step": 8}), - "x": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "y": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "crop" - - CATEGORY = "latent/transform" - - def crop(self, samples, width, height, x, y): - s = samples.copy() - samples = samples['samples'] - x = x // 8 - y = y // 8 - - #enfonce minimum size of 64 - if x > (samples.shape[3] - 8): - x = samples.shape[3] - 8 - if y > (samples.shape[2] - 8): - y = samples.shape[2] - 8 - - new_height = height // 8 - new_width = width // 8 - to_x = new_width + x - to_y = new_height + y - s['samples'] = samples[:,:,y:to_y, x:to_x] - return (s,) - -class SetLatentNoiseMask: - @classmethod - def INPUT_TYPES(s): - return {"required": { "samples": ("LATENT",), - "mask": ("MASK",), - }} - RETURN_TYPES = ("LATENT",) - FUNCTION = "set_mask" - - CATEGORY = "latent/inpaint" - - def set_mask(self, samples, mask): - s = samples.copy() - s["noise_mask"] = mask.reshape((-1, 1, mask.shape[-2], mask.shape[-1])) - return (s,) - -def common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent, denoise=1.0, disable_noise=False, start_step=None, last_step=None, force_full_denoise=False): - latent_image = latent["samples"] - latent_image = comfy.sample.fix_empty_latent_channels(model, latent_image) - - if disable_noise: - noise = torch.zeros(latent_image.size(), dtype=latent_image.dtype, layout=latent_image.layout, device="cpu") - else: - batch_inds = latent["batch_index"] if "batch_index" in latent else None - noise = comfy.sample.prepare_noise(latent_image, seed, batch_inds) - - noise_mask = None - if "noise_mask" in latent: - noise_mask = latent["noise_mask"] - - callback = latent_preview.prepare_callback(model, steps) - disable_pbar = not comfy.utils.PROGRESS_BAR_ENABLED - samples = comfy.sample.sample(model, noise, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, - denoise=denoise, disable_noise=disable_noise, start_step=start_step, last_step=last_step, - force_full_denoise=force_full_denoise, noise_mask=noise_mask, callback=callback, disable_pbar=disable_pbar, seed=seed) - out = latent.copy() - out["samples"] = samples - return (out, ) - -class KSampler: @classmethod def INPUT_TYPES(s): return { "required": { - "model": ("MODEL", {"tooltip": "The model used for denoising the input latent."}), - "seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True, "tooltip": "The random seed used for creating the noise."}), - "steps": ("INT", {"default": 20, "min": 1, "max": 10000, "tooltip": "The number of steps used in the denoising process."}), - "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01, "tooltip": "The Classifier-Free Guidance scale balances creativity and adherence to the prompt. Higher values result in images more closely matching the prompt however too high values will negatively impact quality."}), - "sampler_name": (comfy.samplers.KSampler.SAMPLERS, {"tooltip": "The algorithm used when sampling, this can affect the quality, speed, and style of the generated output."}), - "scheduler": (comfy.samplers.KSampler.SCHEDULERS, {"tooltip": "The scheduler controls how noise is gradually removed to form the image."}), - "positive": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to include in the image."}), - "negative": ("CONDITIONING", {"tooltip": "The conditioning describing the attributes you want to exclude from the image."}), - "latent_image": ("LATENT", {"tooltip": "The latent image to denoise."}), - "denoise": ("FLOAT", {"default": 1.0, "min": 0.0, "max": 1.0, "step": 0.01, "tooltip": "The amount of denoising applied, lower values will maintain the structure of the initial image allowing for image to image sampling."}), + "save_mode": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), + "face_model_name": ("STRING", {"default": "default"}), + "select_face_index": ("INT", {"default": 0, "min": 0}), + }, + "optional": { + "image": ("IMAGE",), + "face_model": ("FACE_MODEL",), } } - RETURN_TYPES = ("LATENT",) - OUTPUT_TOOLTIPS = ("The denoised latent.",) - FUNCTION = "sample" - - CATEGORY = "sampling" - DESCRIPTION = "Uses the provided model, positive and negative conditioning to denoise the latent image." - - def sample(self, model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=1.0): - return common_ksampler(model, seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise) + RETURN_TYPES = () + FUNCTION = "save_model" -class KSamplerAdvanced: - @classmethod - def INPUT_TYPES(s): - return {"required": - {"model": ("MODEL",), - "add_noise": (["enable", "disable"], ), - "noise_seed": ("INT", {"default": 0, "min": 0, "max": 0xffffffffffffffff, "control_after_generate": True}), - "steps": ("INT", {"default": 20, "min": 1, "max": 10000}), - "cfg": ("FLOAT", {"default": 8.0, "min": 0.0, "max": 100.0, "step":0.1, "round": 0.01}), - "sampler_name": (comfy.samplers.KSampler.SAMPLERS, ), - "scheduler": (comfy.samplers.KSampler.SCHEDULERS, ), - "positive": ("CONDITIONING", ), - "negative": ("CONDITIONING", ), - "latent_image": ("LATENT", ), - "start_at_step": ("INT", {"default": 0, "min": 0, "max": 10000}), - "end_at_step": ("INT", {"default": 10000, "min": 0, "max": 10000}), - "return_with_leftover_noise": (["disable", "enable"], ), - } - } - - RETURN_TYPES = ("LATENT",) - FUNCTION = "sample" - - CATEGORY = "sampling" - - def sample(self, model, add_noise, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, start_at_step, end_at_step, return_with_leftover_noise, denoise=1.0): - force_full_denoise = True - if return_with_leftover_noise == "enable": - force_full_denoise = False - disable_noise = False - if add_noise == "disable": - disable_noise = True - return common_ksampler(model, noise_seed, steps, cfg, sampler_name, scheduler, positive, negative, latent_image, denoise=denoise, disable_noise=disable_noise, start_step=start_at_step, last_step=end_at_step, force_full_denoise=force_full_denoise) - -class SaveImage: - def __init__(self): - self.output_dir = folder_paths.get_output_directory() - self.type = "output" - self.prefix_append = "" - self.compress_level = 4 + OUTPUT_NODE = True + CATEGORY = "🌌 ReActor" + + def save_model(self, save_mode, face_model_name, select_face_index, image=None, face_model=None, det_size=(640, 640)): + if save_mode and image is not None: + source = tensor_to_pil(image) + source = cv2.cvtColor(np.array(source), cv2.COLOR_RGB2BGR) + apply_patch(1) + logger.status("Building Face Model...") + face_model_raw = analyze_faces(source, det_size) + if len(face_model_raw) == 0: + det_size_half = half_det_size(det_size) + face_model_raw = analyze_faces(source, det_size_half) + try: + face_model = face_model_raw[select_face_index] + except: + logger.error("No face(s) found") + return face_model_name + logger.status("--Done!--") + if save_mode and (face_model != "none" or face_model is not None): + face_model_path = os.path.join(self.output_dir, face_model_name + ".safetensors") + save_face_model(face_model,face_model_path) + if image is None and face_model is None: + logger.error("Please provide `face_model` or `image`") + return face_model_name + + +class RestoreFace: @classmethod def INPUT_TYPES(s): return { "required": { - "images": ("IMAGE", {"tooltip": "The images to save."}), - "filename_prefix": ("STRING", {"default": "ComfyUI", "tooltip": "The prefix for the file to save. This may include formatting information such as %date:yyyy-MM-dd% or %Empty Latent Image.width% to include values from nodes."}) - }, - "hidden": { - "prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO" + "image": ("IMAGE",), + "facedetection": (["retinaface_resnet50", "retinaface_mobile0.25", "YOLOv5l", "YOLOv5n"],), + "model": (get_model_names(get_restorers),), + "visibility": ("FLOAT", {"default": 1, "min": 0.0, "max": 1, "step": 0.05}), + "codeformer_weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1, "step": 0.05}), }, } - RETURN_TYPES = () - FUNCTION = "save_images" + RETURN_TYPES = ("IMAGE",) + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" - OUTPUT_NODE = True + # def __init__(self): + # self.face_helper = None + # self.face_size = 512 - CATEGORY = "image" - DESCRIPTION = "Saves the input images to your ComfyUI output directory." - - def save_images(self, images, filename_prefix="ComfyUI", prompt=None, extra_pnginfo=None): - filename_prefix += self.prefix_append - full_output_folder, filename, counter, subfolder, filename_prefix = folder_paths.get_save_image_path(filename_prefix, self.output_dir, images[0].shape[1], images[0].shape[0]) - results = list() - for (batch_number, image) in enumerate(images): - i = 255. * image.cpu().numpy() - img = Image.fromarray(np.clip(i, 0, 255).astype(np.uint8)) - metadata = None - if not args.disable_metadata: - metadata = PngInfo() - if prompt is not None: - metadata.add_text("prompt", json.dumps(prompt)) - if extra_pnginfo is not None: - for x in extra_pnginfo: - metadata.add_text(x, json.dumps(extra_pnginfo[x])) - - filename_with_batch_num = filename.replace("%batch_num%", str(batch_number)) - file = f"{filename_with_batch_num}_{counter:05}_.png" - img.save(os.path.join(full_output_folder, file), pnginfo=metadata, compress_level=self.compress_level) - results.append({ - "filename": file, - "subfolder": subfolder, - "type": self.type - }) - counter += 1 - - return { "ui": { "images": results } } - -class PreviewImage(SaveImage): - def __init__(self): - self.output_dir = folder_paths.get_temp_directory() - self.type = "temp" - self.prefix_append = "_temp_" + ''.join(random.choice("abcdefghijklmnopqrstupvxyz") for x in range(5)) - self.compress_level = 1 + def execute(self, image, model, visibility, codeformer_weight, facedetection): + result = reactor.restore_face(self,image,model,visibility,codeformer_weight,facedetection) + return (result,) - @classmethod - def INPUT_TYPES(s): - return {"required": - {"images": ("IMAGE", ), }, - "hidden": {"prompt": "PROMPT", "extra_pnginfo": "EXTRA_PNGINFO"}, - } -class LoadImage: +class MaskHelper: + def __init__(self): + # self.threshold = 0.5 + # self.dilation = 10 + # self.crop_factor = 3.0 + # self.drop_size = 1 + self.labels = "all" + self.detailer_hook = None + self.device_mode = "AUTO" + self.detection_hint = "center-1" + # self.sam_dilation = 0 + # self.sam_threshold = 0.93 + # self.bbox_expansion = 0 + # self.mask_hint_threshold = 0.7 + # self.mask_hint_use_negative = "False" + # self.force_resize_width = 0 + # self.force_resize_height = 0 + # self.resize_behavior = "source_size" + @classmethod def INPUT_TYPES(s): - input_dir = folder_paths.get_input_directory() - files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] - files = folder_paths.filter_files_content_types(files, ["image"]) - return {"required": - {"image": (sorted(files), {"image_upload": True})}, - } - - CATEGORY = "image" + bboxs = ["bbox/"+x for x in folder_paths.get_filename_list("ultralytics_bbox")] + segms = ["segm/"+x for x in folder_paths.get_filename_list("ultralytics_segm")] + sam_models = [x for x in folder_paths.get_filename_list("sams") if 'hq' not in x] + return { + "required": { + "image": ("IMAGE",), + "swapped_image": ("IMAGE",), + "bbox_model_name": (bboxs + segms, ), + "bbox_threshold": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1.0, "step": 0.01}), + "bbox_dilation": ("INT", {"default": 10, "min": -512, "max": 512, "step": 1}), + "bbox_crop_factor": ("FLOAT", {"default": 3.0, "min": 1.0, "max": 100, "step": 0.1}), + "bbox_drop_size": ("INT", {"min": 1, "max": 8192, "step": 1, "default": 10}), + "sam_model_name": (sam_models, ), + "sam_dilation": ("INT", {"default": 0, "min": -512, "max": 512, "step": 1}), + "sam_threshold": ("FLOAT", {"default": 0.93, "min": 0.0, "max": 1.0, "step": 0.01}), + "bbox_expansion": ("INT", {"default": 0, "min": 0, "max": 1000, "step": 1}), + "mask_hint_threshold": ("FLOAT", {"default": 0.7, "min": 0.0, "max": 1.0, "step": 0.01}), + "mask_hint_use_negative": (["False", "Small", "Outter"], ), + "morphology_operation": (["dilate", "erode", "open", "close"],), + "morphology_distance": ("INT", {"default": 0, "min": 0, "max": 128, "step": 1}), + "blur_radius": ("INT", {"default": 9, "min": 0, "max": 48, "step": 1}), + "sigma_factor": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 3., "step": 0.01}), + }, + "optional": { + "mask_optional": ("MASK",), + } + } + + RETURN_TYPES = ("IMAGE","MASK","IMAGE","IMAGE") + RETURN_NAMES = ("IMAGE","MASK","MASK_PREVIEW","SWAPPED_FACE") + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" - RETURN_TYPES = ("IMAGE", "MASK") - FUNCTION = "load_image" - def load_image(self, image): - image_path = folder_paths.get_annotated_filepath(image) + def execute(self, image, swapped_image, bbox_model_name, bbox_threshold, bbox_dilation, bbox_crop_factor, bbox_drop_size, sam_model_name, sam_dilation, sam_threshold, bbox_expansion, mask_hint_threshold, mask_hint_use_negative, morphology_operation, morphology_distance, blur_radius, sigma_factor, mask_optional=None): - img = node_helpers.pillow(Image.open, image_path) + # images = [image[i:i + 1, ...] for i in range(image.shape[0])] - output_images = [] - output_masks = [] - w, h = None, None + images = image - excluded_formats = ['MPO'] + if mask_optional is None: - for i in ImageSequence.Iterator(img): - i = node_helpers.pillow(ImageOps.exif_transpose, i) + bbox_model_path = folder_paths.get_full_path("ultralytics", bbox_model_name) + bbox_model = subcore.load_yolo(bbox_model_path) + bbox_detector = subcore.UltraBBoxDetector(bbox_model) - if i.mode == 'I': - i = i.point(lambda i: i * (1 / 255)) - image = i.convert("RGB") + segs = bbox_detector.detect(images, bbox_threshold, bbox_dilation, bbox_crop_factor, bbox_drop_size, self.detailer_hook) - if len(output_images) == 0: - w = image.size[0] - h = image.size[1] + if isinstance(self.labels, list): + self.labels = str(self.labels[0]) - if image.size[0] != w or image.size[1] != h: - continue + if self.labels is not None and self.labels != '': + self.labels = self.labels.split(',') + if len(self.labels) > 0: + segs, _ = masking_segs.filter(segs, self.labels) + # segs, _ = masking_segs.filter(segs, "all") + + sam_modelname = folder_paths.get_full_path("sams", sam_model_name) - image = np.array(image).astype(np.float32) / 255.0 - image = torch.from_numpy(image)[None,] - if 'A' in i.getbands(): - mask = np.array(i.getchannel('A')).astype(np.float32) / 255.0 - mask = 1. - torch.from_numpy(mask) - elif i.mode == 'P' and 'transparency' in i.info: - mask = np.array(i.convert('RGBA').getchannel('A')).astype(np.float32) / 255.0 - mask = 1. - torch.from_numpy(mask) + if 'vit_h' in sam_model_name: + model_kind = 'vit_h' + elif 'vit_l' in sam_model_name: + model_kind = 'vit_l' else: - mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") - output_images.append(image) - output_masks.append(mask.unsqueeze(0)) + model_kind = 'vit_b' - if len(output_images) > 1 and img.format not in excluded_formats: - output_image = torch.cat(output_images, dim=0) - output_mask = torch.cat(output_masks, dim=0) - else: - output_image = output_images[0] - output_mask = output_masks[0] - - return (output_image, output_mask) + sam = sam_model_registry[model_kind](checkpoint=sam_modelname) + size = os.path.getsize(sam_modelname) + sam.safe_to = core.SafeToGPU(size) - @classmethod - def IS_CHANGED(s, image): - image_path = folder_paths.get_annotated_filepath(image) - m = hashlib.sha256() - with open(image_path, 'rb') as f: - m.update(f.read()) - return m.digest().hex() + device = model_management.get_torch_device() - @classmethod - def VALIDATE_INPUTS(s, image): - if not folder_paths.exists_annotated_filepath(image): - return "Invalid image file: {}".format(image) + sam.safe_to.to_device(sam, device) - return True + sam.is_auto_mode = self.device_mode == "AUTO" -class LoadImageMask: - _color_channels = ["alpha", "red", "green", "blue"] - @classmethod - def INPUT_TYPES(s): - input_dir = folder_paths.get_input_directory() - files = [f for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))] - return {"required": - {"image": (sorted(files), {"image_upload": True}), - "channel": (s._color_channels, ), } - } - - CATEGORY = "mask" - - RETURN_TYPES = ("MASK",) - FUNCTION = "load_image" - def load_image(self, image, channel): - image_path = folder_paths.get_annotated_filepath(image) - i = node_helpers.pillow(Image.open, image_path) - i = node_helpers.pillow(ImageOps.exif_transpose, i) - if i.getbands() != ("R", "G", "B", "A"): - if i.mode == 'I': - i = i.point(lambda i: i * (1 / 255)) - i = i.convert("RGBA") - mask = None - c = channel[0].upper() - if c in i.getbands(): - mask = np.array(i.getchannel(c)).astype(np.float32) / 255.0 - mask = torch.from_numpy(mask) - if c == 'A': - mask = 1. - mask + combined_mask, _ = core.make_sam_mask_segmented(sam, segs, images, self.detection_hint, sam_dilation, sam_threshold, bbox_expansion, mask_hint_threshold, mask_hint_use_negative) + else: - mask = torch.zeros((64,64), dtype=torch.float32, device="cpu") - return (mask.unsqueeze(0),) - - @classmethod - def IS_CHANGED(s, image, channel): - image_path = folder_paths.get_annotated_filepath(image) - m = hashlib.sha256() - with open(image_path, 'rb') as f: - m.update(f.read()) - return m.digest().hex() - - @classmethod - def VALIDATE_INPUTS(s, image): - if not folder_paths.exists_annotated_filepath(image): - return "Invalid image file: {}".format(image) - - return True + combined_mask = mask_optional + + # *** MASK TO IMAGE ***: + + mask_image = combined_mask.reshape((-1, 1, combined_mask.shape[-2], combined_mask.shape[-1])).movedim(1, -1).expand(-1, -1, -1, 3) + + # *** MASK MORPH ***: + + mask_image = core.tensor2mask(mask_image) + + if morphology_operation == "dilate": + mask_image = self.dilate(mask_image, morphology_distance) + elif morphology_operation == "erode": + mask_image = self.erode(mask_image, morphology_distance) + elif morphology_operation == "open": + mask_image = self.erode(mask_image, morphology_distance) + mask_image = self.dilate(mask_image, morphology_distance) + elif morphology_operation == "close": + mask_image = self.dilate(mask_image, morphology_distance) + mask_image = self.erode(mask_image, morphology_distance) + + # *** MASK BLUR ***: + + if len(mask_image.size()) == 3: + mask_image = mask_image.unsqueeze(3) + + mask_image = mask_image.permute(0, 3, 1, 2) + kernel_size = blur_radius * 2 + 1 + sigma = sigma_factor * (0.6 * blur_radius - 0.3) + mask_image_final = self.gaussian_blur(mask_image, kernel_size, sigma).permute(0, 2, 3, 1) + if mask_image_final.size()[3] == 1: + mask_image_final = mask_image_final[:, :, :, 0] + + # *** CUT BY MASK ***: + + if len(swapped_image.shape) < 4: + C = 1 + else: + C = swapped_image.shape[3] + + # We operate on RGBA to keep the code clean and then convert back after + swapped_image = core.tensor2rgba(swapped_image) + mask = core.tensor2mask(mask_image_final) + + # Scale the mask to be a matching size if it isn't + B, H, W, _ = swapped_image.shape + mask = torch.nn.functional.interpolate(mask.unsqueeze(1), size=(H, W), mode='nearest')[:,0,:,:] + MB, _, _ = mask.shape + + if MB < B: + assert(B % MB == 0) + mask = mask.repeat(B // MB, 1, 1) + + # masks_to_boxes errors if the tensor is all zeros, so we'll add a single pixel and zero it out at the end + is_empty = ~torch.gt(torch.max(torch.reshape(mask,[MB, H * W]), dim=1).values, 0.) + mask[is_empty,0,0] = 1. + boxes = masks_to_boxes(mask) + mask[is_empty,0,0] = 0. + + min_x = boxes[:,0] + min_y = boxes[:,1] + max_x = boxes[:,2] + max_y = boxes[:,3] + + width = max_x - min_x + 1 + height = max_y - min_y + 1 + + use_width = int(torch.max(width).item()) + use_height = int(torch.max(height).item()) + + # if self.force_resize_width > 0: + # use_width = self.force_resize_width + + # if self.force_resize_height > 0: + # use_height = self.force_resize_height + + alpha_mask = torch.ones((B, H, W, 4)) + alpha_mask[:,:,:,3] = mask + + swapped_image = swapped_image * alpha_mask + + cutted_image = torch.zeros((B, use_height, use_width, 4)) + for i in range(0, B): + if not is_empty[i]: + ymin = int(min_y[i].item()) + ymax = int(max_y[i].item()) + xmin = int(min_x[i].item()) + xmax = int(max_x[i].item()) + single = (swapped_image[i, ymin:ymax+1, xmin:xmax+1,:]).unsqueeze(0) + resized = torch.nn.functional.interpolate(single.permute(0, 3, 1, 2), size=(use_height, use_width), mode='bicubic').permute(0, 2, 3, 1) + cutted_image[i] = resized[0] + + # Preserve our type unless we were previously RGB and added non-opaque alpha due to the mask size + if C == 1: + cutted_image = core.tensor2mask(cutted_image) + elif C == 3 and torch.min(cutted_image[:,:,:,3]) == 1: + cutted_image = core.tensor2rgb(cutted_image) + + # *** PASTE BY MASK ***: + + image_base = core.tensor2rgba(images) + image_to_paste = core.tensor2rgba(cutted_image) + mask = core.tensor2mask(mask_image_final) + + # Scale the mask to be a matching size if it isn't + B, H, W, C = image_base.shape + MB = mask.shape[0] + PB = image_to_paste.shape[0] + + if B < PB: + assert(PB % B == 0) + image_base = image_base.repeat(PB // B, 1, 1, 1) + B, H, W, C = image_base.shape + if MB < B: + assert(B % MB == 0) + mask = mask.repeat(B // MB, 1, 1) + elif B < MB: + assert(MB % B == 0) + image_base = image_base.repeat(MB // B, 1, 1, 1) + if PB < B: + assert(B % PB == 0) + image_to_paste = image_to_paste.repeat(B // PB, 1, 1, 1) + + mask = torch.nn.functional.interpolate(mask.unsqueeze(1), size=(H, W), mode='nearest')[:,0,:,:] + MB, MH, MW = mask.shape + + # masks_to_boxes errors if the tensor is all zeros, so we'll add a single pixel and zero it out at the end + is_empty = ~torch.gt(torch.max(torch.reshape(mask,[MB, MH * MW]), dim=1).values, 0.) + mask[is_empty,0,0] = 1. + boxes = masks_to_boxes(mask) + mask[is_empty,0,0] = 0. + + min_x = boxes[:,0] + min_y = boxes[:,1] + max_x = boxes[:,2] + max_y = boxes[:,3] + mid_x = (min_x + max_x) / 2 + mid_y = (min_y + max_y) / 2 + + target_width = max_x - min_x + 1 + target_height = max_y - min_y + 1 + + result = image_base.detach().clone() + face_segment = mask_image_final + + for i in range(0, MB): + if is_empty[i]: + continue + else: + image_index = i + source_size = image_to_paste.size() + SB, SH, SW, _ = image_to_paste.shape + + # Figure out the desired size + width = int(target_width[i].item()) + height = int(target_height[i].item()) + # if self.resize_behavior == "keep_ratio_fill": + # target_ratio = width / height + # actual_ratio = SW / SH + # if actual_ratio > target_ratio: + # width = int(height * actual_ratio) + # elif actual_ratio < target_ratio: + # height = int(width / actual_ratio) + # elif self.resize_behavior == "keep_ratio_fit": + # target_ratio = width / height + # actual_ratio = SW / SH + # if actual_ratio > target_ratio: + # height = int(width / actual_ratio) + # elif actual_ratio < target_ratio: + # width = int(height * actual_ratio) + # elif self.resize_behavior == "source_size" or self.resize_behavior == "source_size_unmasked": + + width = SW + height = SH + + # Resize the image we're pasting if needed + resized_image = image_to_paste[i].unsqueeze(0) + # if SH != height or SW != width: + # resized_image = torch.nn.functional.interpolate(resized_image.permute(0, 3, 1, 2), size=(height,width), mode='bicubic').permute(0, 2, 3, 1) + + pasting = torch.ones([H, W, C]) + ymid = float(mid_y[i].item()) + ymin = int(math.floor(ymid - height / 2)) + 1 + ymax = int(math.floor(ymid + height / 2)) + 1 + xmid = float(mid_x[i].item()) + xmin = int(math.floor(xmid - width / 2)) + 1 + xmax = int(math.floor(xmid + width / 2)) + 1 + + _, source_ymax, source_xmax, _ = resized_image.shape + source_ymin, source_xmin = 0, 0 + + if xmin < 0: + source_xmin = abs(xmin) + xmin = 0 + if ymin < 0: + source_ymin = abs(ymin) + ymin = 0 + if xmax > W: + source_xmax -= (xmax - W) + xmax = W + if ymax > H: + source_ymax -= (ymax - H) + ymax = H + + pasting[ymin:ymax, xmin:xmax, :] = resized_image[0, source_ymin:source_ymax, source_xmin:source_xmax, :] + pasting[:, :, 3] = 1. + + pasting_alpha = torch.zeros([H, W]) + pasting_alpha[ymin:ymax, xmin:xmax] = resized_image[0, source_ymin:source_ymax, source_xmin:source_xmax, 3] + + # if self.resize_behavior == "keep_ratio_fill" or self.resize_behavior == "source_size_unmasked": + # # If we explicitly want to fill the area, we are ok with extending outside + # paste_mask = pasting_alpha.unsqueeze(2).repeat(1, 1, 4) + # else: + # paste_mask = torch.min(pasting_alpha, mask[i]).unsqueeze(2).repeat(1, 1, 4) + paste_mask = torch.min(pasting_alpha, mask[i]).unsqueeze(2).repeat(1, 1, 4) + result[image_index] = pasting * paste_mask + result[image_index] * (1. - paste_mask) + + face_segment = result + + face_segment[...,3] = mask[i] + + result = rgba2rgb_tensor(result) + + return (result,combined_mask,mask_image_final,face_segment,) + + def gaussian_blur(self, image, kernel_size, sigma): + kernel = torch.Tensor(kernel_size, kernel_size).to(device=image.device) + center = kernel_size // 2 + variance = sigma**2 + for i in range(kernel_size): + for j in range(kernel_size): + x = i - center + y = j - center + kernel[i, j] = math.exp(-(x**2 + y**2)/(2*variance)) + kernel /= kernel.sum() + + # Pad the input tensor + padding = (kernel_size - 1) // 2 + input_pad = torch.nn.functional.pad(image, (padding, padding, padding, padding), mode='reflect') + + # Reshape the padded input tensor for batched convolution + batch_size, num_channels, height, width = image.shape + input_reshaped = input_pad.reshape(batch_size*num_channels, 1, height+padding*2, width+padding*2) + + # Perform batched convolution with the Gaussian kernel + output_reshaped = torch.nn.functional.conv2d(input_reshaped, kernel.unsqueeze(0).unsqueeze(0)) + + # Reshape the output tensor to its original shape + output_tensor = output_reshaped.reshape(batch_size, num_channels, height, width) + + return output_tensor + + def erode(self, image, distance): + return 1. - self.dilate(1. - image, distance) + + def dilate(self, image, distance): + kernel_size = 1 + distance * 2 + # Add the channels dimension + image = image.unsqueeze(1) + out = torchfn.max_pool2d(image, kernel_size=kernel_size, stride=1, padding=kernel_size // 2).squeeze(1) + return out -class LoadImageOutput(LoadImage): +class ImageDublicator: @classmethod def INPUT_TYPES(s): return { "required": { - "image": ("COMBO", { - "image_upload": True, - "image_folder": "output", - "remote": { - "route": "/internal/files/output", - "refresh_button": True, - "control_after_refresh": "first", - }, - }), - } + "image": ("IMAGE",), + "count": ("INT", {"default": 1, "min": 0}), + }, } - DESCRIPTION = "Load an image from the output folder. When the refresh button is clicked, the node will update the image list and automatically select the first image, allowing for easy iteration." - EXPERIMENTAL = True - FUNCTION = "load_image" - - -class ImageScale: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] - crop_methods = ["disabled", "center"] - - @classmethod - def INPUT_TYPES(s): - return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), - "width": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), - "height": ("INT", {"default": 512, "min": 0, "max": MAX_RESOLUTION, "step": 1}), - "crop": (s.crop_methods,)}} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "upscale" - - CATEGORY = "image/upscaling" - - def upscale(self, image, upscale_method, width, height, crop): - if width == 0 and height == 0: - s = image - else: - samples = image.movedim(-1,1) - - if width == 0: - width = max(1, round(samples.shape[3] * height / samples.shape[2])) - elif height == 0: - height = max(1, round(samples.shape[2] * width / samples.shape[3])) - - s = comfy.utils.common_upscale(samples, width, height, upscale_method, crop) - s = s.movedim(1,-1) - return (s,) - -class ImageScaleBy: - upscale_methods = ["nearest-exact", "bilinear", "area", "bicubic", "lanczos"] - - @classmethod - def INPUT_TYPES(s): - return {"required": { "image": ("IMAGE",), "upscale_method": (s.upscale_methods,), - "scale_by": ("FLOAT", {"default": 1.0, "min": 0.01, "max": 8.0, "step": 0.01}),}} RETURN_TYPES = ("IMAGE",) - FUNCTION = "upscale" - - CATEGORY = "image/upscaling" + RETURN_NAMES = ("IMAGES",) + OUTPUT_IS_LIST = (True,) + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" - def upscale(self, image, upscale_method, scale_by): - samples = image.movedim(-1,1) - width = round(samples.shape[3] * scale_by) - height = round(samples.shape[2] * scale_by) - s = comfy.utils.common_upscale(samples, width, height, upscale_method, "disabled") - s = s.movedim(1,-1) - return (s,) + def execute(self, image, count): + images = [image for i in range(count)] + return (images,) -class ImageInvert: +class ImageRGBA2RGB: @classmethod def INPUT_TYPES(s): - return {"required": { "image": ("IMAGE",)}} + return { + "required": { + "image": ("IMAGE",), + }, + } RETURN_TYPES = ("IMAGE",) - FUNCTION = "invert" - - CATEGORY = "image" + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" - def invert(self, image): - s = 1.0 - image - return (s,) + def execute(self, image): + out = rgba2rgb_tensor(image) + return (out,) -class ImageBatch: +class MakeFaceModelBatch: @classmethod def INPUT_TYPES(s): - return {"required": { "image1": ("IMAGE",), "image2": ("IMAGE",)}} + return { + "required": { + "face_model1": ("FACE_MODEL",), + }, + "optional": { + "face_model2": ("FACE_MODEL",), + "face_model3": ("FACE_MODEL",), + "face_model4": ("FACE_MODEL",), + "face_model5": ("FACE_MODEL",), + "face_model6": ("FACE_MODEL",), + "face_model7": ("FACE_MODEL",), + "face_model8": ("FACE_MODEL",), + "face_model9": ("FACE_MODEL",), + "face_model10": ("FACE_MODEL",), + }, + } - RETURN_TYPES = ("IMAGE",) - FUNCTION = "batch" + RETURN_TYPES = ("FACE_MODEL",) + RETURN_NAMES = ("FACE_MODELS",) + FUNCTION = "execute" - CATEGORY = "image" + CATEGORY = "🌌 ReActor" - def batch(self, image1, image2): - if image1.shape[1:] != image2.shape[1:]: - image2 = comfy.utils.common_upscale(image2.movedim(-1,1), image1.shape[2], image1.shape[1], "bilinear", "center").movedim(1,-1) - s = torch.cat((image1, image2), dim=0) - return (s,) + def execute(self, **kwargs): + if len(kwargs) > 0: + face_models = [value for value in kwargs.values()] + return (face_models,) + else: + logger.error("Please provide at least 1 `face_model`") + return (None,) -class EmptyImage: - def __init__(self, device="cpu"): - self.device = device +class ReActorOptions: @classmethod def INPUT_TYPES(s): - return {"required": { "width": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), - "height": ("INT", {"default": 512, "min": 1, "max": MAX_RESOLUTION, "step": 1}), - "batch_size": ("INT", {"default": 1, "min": 1, "max": 4096}), - "color": ("INT", {"default": 0, "min": 0, "max": 0xFFFFFF, "step": 1, "display": "color"}), - }} - RETURN_TYPES = ("IMAGE",) - FUNCTION = "generate" - - CATEGORY = "image" + return { + "required": { + "input_faces_order": ( + ["left-right","right-left","top-bottom","bottom-top","small-large","large-small"], {"default": "large-small"} + ), + "input_faces_index": ("STRING", {"default": "0"}), + "detect_gender_input": (["no","female","male"], {"default": "no"}), + "source_faces_order": ( + ["left-right","right-left","top-bottom","bottom-top","small-large","large-small"], {"default": "large-small"} + ), + "source_faces_index": ("STRING", {"default": "0"}), + "detect_gender_source": (["no","female","male"], {"default": "no"}), + "console_log_level": ([0, 1, 2], {"default": 1}), + } + } - def generate(self, width, height, batch_size=1, color=0): - r = torch.full([batch_size, height, width, 1], ((color >> 16) & 0xFF) / 0xFF) - g = torch.full([batch_size, height, width, 1], ((color >> 8) & 0xFF) / 0xFF) - b = torch.full([batch_size, height, width, 1], ((color) & 0xFF) / 0xFF) - return (torch.cat((r, g, b), dim=-1), ) + RETURN_TYPES = ("OPTIONS",) + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" + + def execute(self,input_faces_order, input_faces_index, detect_gender_input, source_faces_order, source_faces_index, detect_gender_source, console_log_level): + options: dict = { + "input_faces_order": input_faces_order, + "input_faces_index": input_faces_index, + "detect_gender_input": detect_gender_input, + "source_faces_order": source_faces_order, + "source_faces_index": source_faces_index, + "detect_gender_source": detect_gender_source, + "console_log_level": console_log_level, + } + return (options, ) -class ImagePadForOutpaint: +class ReActorFaceBoost: @classmethod def INPUT_TYPES(s): return { "required": { - "image": ("IMAGE",), - "left": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "top": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "right": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "bottom": ("INT", {"default": 0, "min": 0, "max": MAX_RESOLUTION, "step": 8}), - "feathering": ("INT", {"default": 40, "min": 0, "max": MAX_RESOLUTION, "step": 1}), + "enabled": ("BOOLEAN", {"default": True, "label_off": "OFF", "label_on": "ON"}), + "boost_model": (get_model_names(get_restorers),), + "interpolation": (["Nearest","Bilinear","Bicubic","Lanczos"], {"default": "Bicubic"}), + "visibility": ("FLOAT", {"default": 1, "min": 0.1, "max": 1, "step": 0.05}), + "codeformer_weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1, "step": 0.05}), + "restore_with_main_after": ("BOOLEAN", {"default": False}), } } - RETURN_TYPES = ("IMAGE", "MASK") - FUNCTION = "expand_image" - - CATEGORY = "image" - - def expand_image(self, image, left, top, right, bottom, feathering): - d1, d2, d3, d4 = image.size() - - new_image = torch.ones( - (d1, d2 + top + bottom, d3 + left + right, d4), - dtype=torch.float32, - ) * 0.5 - - new_image[:, top:top + d2, left:left + d3, :] = image - - mask = torch.ones( - (d2 + top + bottom, d3 + left + right), - dtype=torch.float32, - ) - - t = torch.zeros( - (d2, d3), - dtype=torch.float32 - ) - - if feathering > 0 and feathering * 2 < d2 and feathering * 2 < d3: - - for i in range(d2): - for j in range(d3): - dt = i if top != 0 else d2 - db = d2 - i if bottom != 0 else d2 - - dl = j if left != 0 else d3 - dr = d3 - j if right != 0 else d3 - - d = min(dt, db, dl, dr) - - if d >= feathering: - continue - - v = (feathering - d) / feathering - - t[i, j] = v * v + RETURN_TYPES = ("FACE_BOOST",) + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" + + def execute(self,enabled,boost_model,interpolation,visibility,codeformer_weight,restore_with_main_after): + face_boost: dict = { + "enabled": enabled, + "boost_model": boost_model, + "interpolation": interpolation, + "visibility": visibility, + "codeformer_weight": codeformer_weight, + "restore_with_main_after": restore_with_main_after, + } + return (face_boost, ) + +class ReActorUnload: + @classmethod + def INPUT_TYPES(s): + return { + "required": { + "trigger": ("IMAGE", ), + }, + } - mask[top:top + d2, left:left + d3] = t + RETURN_TYPES = ("IMAGE",) + FUNCTION = "execute" + CATEGORY = "🌌 ReActor" - return (new_image, mask) + def execute(self, trigger): + unload_all_models() + return (trigger,) NODE_CLASS_MAPPINGS = { - "KSampler": KSampler, - "CheckpointLoaderSimple": CheckpointLoaderSimple, - "CLIPTextEncode": CLIPTextEncode, - "CLIPSetLastLayer": CLIPSetLastLayer, - "VAEDecode": VAEDecode, - "VAEEncode": VAEEncode, - "VAEEncodeForInpaint": VAEEncodeForInpaint, - "VAELoader": VAELoader, - "EmptyLatentImage": EmptyLatentImage, - "LatentUpscale": LatentUpscale, - "LatentUpscaleBy": LatentUpscaleBy, - "LatentFromBatch": LatentFromBatch, - "RepeatLatentBatch": RepeatLatentBatch, - "SaveImage": SaveImage, - "PreviewImage": PreviewImage, - "LoadImage": LoadImage, - "LoadImageMask": LoadImageMask, - "LoadImageOutput": LoadImageOutput, - "ImageScale": ImageScale, - "ImageScaleBy": ImageScaleBy, - "ImageInvert": ImageInvert, - "ImageBatch": ImageBatch, - "ImagePadForOutpaint": ImagePadForOutpaint, - "EmptyImage": EmptyImage, - "ConditioningAverage": ConditioningAverage , - "ConditioningCombine": ConditioningCombine, - "ConditioningConcat": ConditioningConcat, - "ConditioningSetArea": ConditioningSetArea, - "ConditioningSetAreaPercentage": ConditioningSetAreaPercentage, - "ConditioningSetAreaStrength": ConditioningSetAreaStrength, - "ConditioningSetMask": ConditioningSetMask, - "KSamplerAdvanced": KSamplerAdvanced, - "SetLatentNoiseMask": SetLatentNoiseMask, - "LatentComposite": LatentComposite, - "LatentBlend": LatentBlend, - "LatentRotate": LatentRotate, - "LatentFlip": LatentFlip, - "LatentCrop": LatentCrop, - "LoraLoader": LoraLoader, - "CLIPLoader": CLIPLoader, - "UNETLoader": UNETLoader, - "DualCLIPLoader": DualCLIPLoader, - "CLIPVisionEncode": CLIPVisionEncode, - "StyleModelApply": StyleModelApply, - "unCLIPConditioning": unCLIPConditioning, - "ControlNetApply": ControlNetApply, - "ControlNetApplyAdvanced": ControlNetApplyAdvanced, - "ControlNetLoader": ControlNetLoader, - "DiffControlNetLoader": DiffControlNetLoader, - "StyleModelLoader": StyleModelLoader, - "CLIPVisionLoader": CLIPVisionLoader, - "VAEDecodeTiled": VAEDecodeTiled, - "VAEEncodeTiled": VAEEncodeTiled, - "unCLIPCheckpointLoader": unCLIPCheckpointLoader, - "GLIGENLoader": GLIGENLoader, - "GLIGENTextBoxApply": GLIGENTextBoxApply, - "InpaintModelConditioning": InpaintModelConditioning, - - "CheckpointLoader": CheckpointLoader, - "DiffusersLoader": DiffusersLoader, - - "LoadLatent": LoadLatent, - "SaveLatent": SaveLatent, - - "ConditioningZeroOut": ConditioningZeroOut, - "ConditioningSetTimestepRange": ConditioningSetTimestepRange, - "LoraLoaderModelOnly": LoraLoaderModelOnly, + # --- MAIN NODES --- + "ReActorFaceSwap": reactor, + "ReActorFaceSwapOpt": ReActorPlusOpt, + "ReActorOptions": ReActorOptions, + "ReActorFaceBoost": ReActorFaceBoost, + "ReActorMaskHelper": MaskHelper, + "ReActorSetWeight": ReActorWeight, + # --- Operations with Face Models --- + "ReActorSaveFaceModel": SaveFaceModel, + "ReActorLoadFaceModel": LoadFaceModel, + "ReActorBuildFaceModel": BuildFaceModel, + "ReActorMakeFaceModelBatch": MakeFaceModelBatch, + # --- Additional Nodes --- + "ReActorRestoreFace": RestoreFace, + "ReActorImageDublicator": ImageDublicator, + "ImageRGBA2RGB": ImageRGBA2RGB, + "ReActorUnload": ReActorUnload, } NODE_DISPLAY_NAME_MAPPINGS = { - # Sampling - "KSampler": "KSampler", - "KSamplerAdvanced": "KSampler (Advanced)", - # Loaders - "CheckpointLoader": "Load Checkpoint With Config (DEPRECATED)", - "CheckpointLoaderSimple": "Load Checkpoint", - "VAELoader": "Load VAE", - "LoraLoader": "Load LoRA", - "CLIPLoader": "Load CLIP", - "ControlNetLoader": "Load ControlNet Model", - "DiffControlNetLoader": "Load ControlNet Model (diff)", - "StyleModelLoader": "Load Style Model", - "CLIPVisionLoader": "Load CLIP Vision", - "UpscaleModelLoader": "Load Upscale Model", - "UNETLoader": "Load Diffusion Model", - # Conditioning - "CLIPVisionEncode": "CLIP Vision Encode", - "StyleModelApply": "Apply Style Model", - "CLIPTextEncode": "CLIP Text Encode (Prompt)", - "CLIPSetLastLayer": "CLIP Set Last Layer", - "ConditioningCombine": "Conditioning (Combine)", - "ConditioningAverage ": "Conditioning (Average)", - "ConditioningConcat": "Conditioning (Concat)", - "ConditioningSetArea": "Conditioning (Set Area)", - "ConditioningSetAreaPercentage": "Conditioning (Set Area with Percentage)", - "ConditioningSetMask": "Conditioning (Set Mask)", - "ControlNetApply": "Apply ControlNet (OLD)", - "ControlNetApplyAdvanced": "Apply ControlNet", - # Latent - "VAEEncodeForInpaint": "VAE Encode (for Inpainting)", - "SetLatentNoiseMask": "Set Latent Noise Mask", - "VAEDecode": "VAE Decode", - "VAEEncode": "VAE Encode", - "LatentRotate": "Rotate Latent", - "LatentFlip": "Flip Latent", - "LatentCrop": "Crop Latent", - "EmptyLatentImage": "Empty Latent Image", - "LatentUpscale": "Upscale Latent", - "LatentUpscaleBy": "Upscale Latent By", - "LatentComposite": "Latent Composite", - "LatentBlend": "Latent Blend", - "LatentFromBatch" : "Latent From Batch", - "RepeatLatentBatch": "Repeat Latent Batch", - # Image - "SaveImage": "Save Image", - "PreviewImage": "Preview Image", - "LoadImage": "Load Image", - "LoadImageMask": "Load Image (as Mask)", - "LoadImageOutput": "Load Image (from Outputs)", - "ImageScale": "Upscale Image", - "ImageScaleBy": "Upscale Image By", - "ImageUpscaleWithModel": "Upscale Image (using Model)", - "ImageInvert": "Invert Image", - "ImagePadForOutpaint": "Pad Image for Outpainting", - "ImageBatch": "Batch Images", - "ImageCrop": "Image Crop", - "ImageBlend": "Image Blend", - "ImageBlur": "Image Blur", - "ImageQuantize": "Image Quantize", - "ImageSharpen": "Image Sharpen", - "ImageScaleToTotalPixels": "Scale Image to Total Pixels", - # _for_testing - "VAEDecodeTiled": "VAE Decode (Tiled)", - "VAEEncodeTiled": "VAE Encode (Tiled)", + # --- MAIN NODES --- + "ReActorFaceSwap": "ReActor 🌌 Fast Face Swap", + "ReActorFaceSwapOpt": "ReActor 🌌 Fast Face Swap [OPTIONS]", + "ReActorOptions": "ReActor 🌌 Options", + "ReActorFaceBoost": "ReActor 🌌 Face Booster", + "ReActorMaskHelper": "ReActor 🌌 Masking Helper", + "ReActorSetWeight": "ReActor 🌌 Set Face Swap Weight", + # --- Operations with Face Models --- + "ReActorSaveFaceModel": "Save Face Model 🌌 ReActor", + "ReActorLoadFaceModel": "Load Face Model 🌌 ReActor", + "ReActorBuildFaceModel": "Build Blended Face Model 🌌 ReActor", + "ReActorMakeFaceModelBatch": "Make Face Model Batch 🌌 ReActor", + # --- Additional Nodes --- + "ReActorRestoreFace": "Restore Face 🌌 ReActor", + "ReActorImageDublicator": "Image Dublicator (List) 🌌 ReActor", + "ImageRGBA2RGB": "Convert RGBA to RGB 🌌 ReActor", + "ReActorUnload": "Unload ReActor Models 🌌 ReActor", } - -EXTENSION_WEB_DIRS = {} - -# Dictionary of successfully loaded module names and associated directories. -LOADED_MODULE_DIRS = {} - - -def get_module_name(module_path: str) -> str: - """ - Returns the module name based on the given module path. - Examples: - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.py") -> "my_custom_node" - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node") -> "my_custom_node" - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/") -> "my_custom_node" - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__.py") -> "my_custom_node" - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__") -> "my_custom_node" - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node/__init__/") -> "my_custom_node" - get_module_name("C:/Users/username/ComfyUI/custom_nodes/my_custom_node.disabled") -> "custom_nodes - Args: - module_path (str): The path of the module. - Returns: - str: The module name. - """ - base_path = os.path.basename(module_path) - if os.path.isfile(module_path): - base_path = os.path.splitext(base_path)[0] - return base_path - - -def load_custom_node(module_path: str, ignore=set(), module_parent="custom_nodes") -> bool: - module_name = get_module_name(module_path) - if os.path.isfile(module_path): - sp = os.path.splitext(module_path) - module_name = sp[0] - sys_module_name = module_name - elif os.path.isdir(module_path): - sys_module_name = module_path.replace(".", "_x_") - - try: - logging.debug("Trying to load custom node {}".format(module_path)) - if os.path.isfile(module_path): - module_spec = importlib.util.spec_from_file_location(sys_module_name, module_path) - module_dir = os.path.split(module_path)[0] - else: - module_spec = importlib.util.spec_from_file_location(sys_module_name, os.path.join(module_path, "__init__.py")) - module_dir = module_path - - module = importlib.util.module_from_spec(module_spec) - sys.modules[sys_module_name] = module - module_spec.loader.exec_module(module) - - LOADED_MODULE_DIRS[module_name] = os.path.abspath(module_dir) - - if hasattr(module, "WEB_DIRECTORY") and getattr(module, "WEB_DIRECTORY") is not None: - web_dir = os.path.abspath(os.path.join(module_dir, getattr(module, "WEB_DIRECTORY"))) - if os.path.isdir(web_dir): - EXTENSION_WEB_DIRS[module_name] = web_dir - - if hasattr(module, "NODE_CLASS_MAPPINGS") and getattr(module, "NODE_CLASS_MAPPINGS") is not None: - for name, node_cls in module.NODE_CLASS_MAPPINGS.items(): - if name not in ignore: - NODE_CLASS_MAPPINGS[name] = node_cls - node_cls.RELATIVE_PYTHON_MODULE = "{}.{}".format(module_parent, get_module_name(module_path)) - if hasattr(module, "NODE_DISPLAY_NAME_MAPPINGS") and getattr(module, "NODE_DISPLAY_NAME_MAPPINGS") is not None: - NODE_DISPLAY_NAME_MAPPINGS.update(module.NODE_DISPLAY_NAME_MAPPINGS) - return True - else: - logging.warning(f"Skip {module_path} module for custom nodes due to the lack of NODE_CLASS_MAPPINGS.") - return False - except Exception as e: - logging.warning(traceback.format_exc()) - logging.warning(f"Cannot import {module_path} module for custom nodes: {e}") - return False - -def init_external_custom_nodes(): - """ - Initializes the external custom nodes. - - This function loads custom nodes from the specified folder paths and imports them into the application. - It measures the import times for each custom node and logs the results. - - Returns: - None - """ - base_node_names = set(NODE_CLASS_MAPPINGS.keys()) - node_paths = folder_paths.get_folder_paths("custom_nodes") - node_import_times = [] - for custom_node_path in node_paths: - possible_modules = os.listdir(os.path.realpath(custom_node_path)) - if "__pycache__" in possible_modules: - possible_modules.remove("__pycache__") - - for possible_module in possible_modules: - module_path = os.path.join(custom_node_path, possible_module) - if os.path.isfile(module_path) and os.path.splitext(module_path)[1] != ".py": continue - if module_path.endswith(".disabled"): continue - time_before = time.perf_counter() - success = load_custom_node(module_path, base_node_names, module_parent="custom_nodes") - node_import_times.append((time.perf_counter() - time_before, module_path, success)) - - if len(node_import_times) > 0: - logging.info("\nImport times for custom nodes:") - for n in sorted(node_import_times): - if n[2]: - import_message = "" - else: - import_message = " (IMPORT FAILED)" - logging.info("{:6.1f} seconds{}: {}".format(n[0], import_message, n[1])) - logging.info("") - -def init_builtin_extra_nodes(): - """ - Initializes the built-in extra nodes in ComfyUI. - - This function loads the extra node files located in the "comfy_extras" directory and imports them into ComfyUI. - If any of the extra node files fail to import, a warning message is logged. - - Returns: - None - """ - extras_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), "comfy_extras") - extras_files = [ - "nodes_latent.py", - "nodes_hypernetwork.py", - "nodes_upscale_model.py", - "nodes_post_processing.py", - "nodes_mask.py", - "nodes_compositing.py", - "nodes_rebatch.py", - "nodes_model_merging.py", - "nodes_tomesd.py", - "nodes_clip_sdxl.py", - "nodes_canny.py", - "nodes_freelunch.py", - "nodes_custom_sampler.py", - "nodes_hypertile.py", - "nodes_model_advanced.py", - "nodes_model_downscale.py", - "nodes_images.py", - "nodes_video_model.py", - "nodes_sag.py", - "nodes_perpneg.py", - "nodes_stable3d.py", - "nodes_sdupscale.py", - "nodes_photomaker.py", - "nodes_pixart.py", - "nodes_cond.py", - "nodes_morphology.py", - "nodes_stable_cascade.py", - "nodes_differential_diffusion.py", - "nodes_ip2p.py", - "nodes_model_merging_model_specific.py", - "nodes_pag.py", - "nodes_align_your_steps.py", - "nodes_attention_multiply.py", - "nodes_advanced_samplers.py", - "nodes_webcam.py", - "nodes_audio.py", - "nodes_sd3.py", - "nodes_gits.py", - "nodes_controlnet.py", - "nodes_hunyuan.py", - "nodes_flux.py", - "nodes_lora_extract.py", - "nodes_torch_compile.py", - "nodes_mochi.py", - "nodes_slg.py", - "nodes_mahiro.py", - "nodes_lt.py", - "nodes_hooks.py", - "nodes_load_3d.py", - "nodes_cosmos.py", - "nodes_video.py", - "nodes_lumina2.py", - "nodes_wan.py", - "nodes_lotus.py", - "nodes_hunyuan3d.py", - "nodes_primitive.py", - "nodes_cfg.py", - "nodes_optimalsteps.py", - "nodes_hidream.py", - "nodes_fresca.py", - ] - - import_failed = [] - for node_file in extras_files: - if not load_custom_node(os.path.join(extras_dir, node_file), module_parent="comfy_extras"): - import_failed.append(node_file) - - return import_failed - - -def init_extra_nodes(init_custom_nodes=True): - import_failed = init_builtin_extra_nodes() - - if init_custom_nodes: - init_external_custom_nodes() - else: - logging.info("Skipping loading of custom nodes") - - if len(import_failed) > 0: - logging.warning("WARNING: some comfy_extras/ nodes did not import correctly. This may be because they are missing some dependencies.\n") - for node in import_failed: - logging.warning("IMPORT FAILED: {}".format(node)) - logging.warning("\nThis issue might be caused by new missing dependencies added the last time you updated ComfyUI.") - if args.windows_standalone_build: - logging.warning("Please run the update script: update/update_comfyui.bat") - else: - logging.warning("Please do a: pip install -r requirements.txt") - logging.warning("") - - return import_failed