import os import sys import cv2 import re from shared import path_manager import modules.async_worker as worker from tqdm import tqdm from modules.util import generate_temp_filename from PIL import Image import imageio.v3 as iio import numpy as np import torch import insightface from importlib.abc import MetaPathFinder, Loader from importlib.util import spec_from_loader, module_from_spec class ImportRedirector(MetaPathFinder): def __init__(self, redirect_map): self.redirect_map = redirect_map def find_spec(self, fullname, path, target=None): if fullname in self.redirect_map: return spec_from_loader(fullname, ImportLoader(self.redirect_map[fullname])) return None class ImportLoader(Loader): def __init__(self, redirect): self.redirect = redirect def create_module(self, spec): return None def exec_module(self, module): import importlib redirected = importlib.import_module(self.redirect) module.__dict__.update(redirected.__dict__) # Set up the redirection redirect_map = { 'torchvision.transforms.functional_tensor': 'torchvision.transforms.functional' } sys.meta_path.insert(0, ImportRedirector(redirect_map)) import gfpgan from facexlib.utils.face_restoration_helper import FaceRestoreHelper # Requirements: # insightface==0.7.3 # onnxruntime-gpu==1.16.1 # gfpgan==1.3.8 # # add to settings/powerup.json # # "Faceswap": { # "type": "faceswap" # } # # Models in models/faceswap/ # https://github.com/TencentARC/GFPGAN/releases/download/v1.3.0/GFPGANv1.4.pth # and inswapper_128.onnx from where you can find it class pipeline: pipeline_type = ["faceswap"] analyser_model = None analyser_hash = "" swapper_model = None swapper_hash = "" gfpgan_model = None def parse_gen_data(self, gen_data): gen_data["original_image_number"] = gen_data["image_number"] gen_data["image_number"] = 1 gen_data["show_preview"] = False return gen_data def load_base_model(self, name): model_name = "inswapper_128.onnx" if not self.swapper_hash == model_name: print(f"Loading swapper model: {model_name}") model_path = os.path.join(path_manager.model_paths["faceswap_path"], model_name) try: with open(os.devnull, "w") as sys.stdout: self.swapper_model = insightface.model_zoo.get_model( model_path, download=False, download_zip=False, ) self.swapper_hash = model_name sys.stdout = sys.__stdout__ except: print(f"Failed loading model! {model_path}") model_name = "buffalo_l" det_thresh = 0.5 if not self.analyser_hash == model_name: print(f"Loading analyser model: {model_name}") try: with open(os.devnull, "w") as sys.stdout: self.analyser_model = insightface.app.FaceAnalysis(name=model_name) self.analyser_model.prepare( ctx_id=0, det_thresh=det_thresh, det_size=(640, 640) ) self.analyser_hash = model_name sys.stdout = sys.__stdout__ except: print(f"Failed loading model! {model_name}") def load_gfpgan_model(self): if self.gfpgan_model is None: channel_multiplier = 2 model_name = "GFPGANv1.4.pth" model_path = os.path.join(path_manager.model_paths["faceswap_path"], model_name) # https://github.com/TencentARC/GFPGAN/blob/master/inference_gfpgan.py self.gfpgan_model = gfpgan.GFPGANer self.gfpgan_model.bg_upsampler = None # initialize model self.gfpgan_model.device = torch.device( "cuda" if torch.cuda.is_available() else "cpu" ) upscale = 2 self.gfpgan_model.face_helper = FaceRestoreHelper( upscale, det_model="retinaface_resnet50", model_rootpath=path_manager.model_paths["faceswap_path"], ) # face_size=512, # crop_ratio=(1, 1), # save_ext='png', # use_parse=True, # device=self.device, self.gfpgan_model.gfpgan = gfpgan.GFPGANv1Clean( out_size=512, num_style_feat=512, channel_multiplier=channel_multiplier, decoder_load_path=None, fix_decoder=False, num_mlp=8, input_is_latent=True, different_w=True, narrow=1, sft_half=True, ) loadnet = torch.load(model_path) if "params_ema" in loadnet: keyname = "params_ema" else: keyname = "params" self.gfpgan_model.gfpgan.load_state_dict(loadnet[keyname], strict=True) self.gfpgan_model.gfpgan.eval() self.gfpgan_model.gfpgan = self.gfpgan_model.gfpgan.to( self.gfpgan_model.device ) def load_keywords(self, lora): return "" def load_loras(self, loras): return def refresh_controlnet(self, name=None): return def clean_prompt_cond_caches(self): return def swap_faces(self, original_image, input_faces, out_faces): idx = 0 for out_face in out_faces: original_image = self.swapper_model.get( original_image, out_face, input_faces[idx % len(input_faces)], paste_back=True, ) idx += 1 return original_image def restore_faces(self, image): self.load_gfpgan_model() image_bgr = image[:, :, ::-1] _cropped_faces, _restored_faces, gfpgan_output_bgr = self.gfpgan_model.enhance( self.gfpgan_model, image_bgr, has_aligned=False, only_center_face=False, paste_back=True, weight=0.5, ) image = gfpgan_output_bgr[:, :, ::-1] return image def process( self, gen_data=None, callback=None, ): worker.add_result( gen_data["task_id"], "preview", (-1, f"Generating ...", None) ) input_image = gen_data["input_image"] input_image = cv2.cvtColor(np.asarray(input_image), cv2.COLOR_RGB2BGR) input_faces = sorted( self.analyser_model.get(input_image), key=lambda x: x.bbox[0] ) prompt = gen_data["prompt"].strip() if re.fullmatch("https?://.*\.gif", prompt, re.IGNORECASE) is not None: x = iio.immeta(prompt) duration = x["duration"] loop = x["loop"] gif = cv2.VideoCapture(prompt) # Swap in_imgs = [] out_imgs = [] while True: ret, frame = gif.read() if not ret: break in_imgs.append(frame) with tqdm(total=len(in_imgs), desc="Groop", unit="frames") as progress: i=0 steps=len(in_imgs) for frame in in_imgs: out_faces = sorted( self.analyser_model.get(frame), key=lambda x: x.bbox[0] ) frame = self.swap_faces(frame, input_faces, out_faces) out_imgs.append( Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) ) i+=1 callback(i, 0, 0, steps, out_imgs[-1]) progress.update(1) images = generate_temp_filename( folder=path_manager.model_paths["temp_outputs_path"], extension="gif" ) os.makedirs(os.path.dirname(images), exist_ok=True) out_imgs[0].save( images, save_all=True, append_images=out_imgs[1:], optimize=True, duration=duration, loop=loop, ) else: output_image = cv2.imread(gen_data["main_view"]) if output_image is None: images = "html/error.png" else: output_faces = sorted( self.analyser_model.get(output_image), key=lambda x: x.bbox[0] ) result_image = self.swap_faces(output_image, input_faces, output_faces) result_image = self.restore_faces(result_image) images = Image.fromarray(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB)) return [images]