Spaces:
Build error
Build error
# Copyright (c) 2021, InterDigital R&D France. All rights reserved. | |
# | |
# This source code is made available under the license found in the | |
# LICENSE.txt in the root directory of this source tree. | |
import cv2 | |
import glob | |
import numpy as np | |
import os | |
import face_alignment | |
import torch | |
from PIL import Image, ImageFilter | |
from scipy import ndimage | |
from scipy.ndimage import gaussian_filter1d | |
from skimage import io | |
from torchvision import transforms, utils | |
def pil_to_cv2(pil_image): | |
open_cv_image = np.array(pil_image) | |
return open_cv_image[:, :, ::-1].copy() | |
def cv2_to_pil(open_cv_image): | |
return Image.fromarray(open_cv_image[:, :, ::-1].copy()) | |
def put_text(img, text): | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
bottomLeftCornerOfText = (10,50) | |
fontScale = 1.5 | |
fontColor = (255,255,0) | |
lineType = 2 | |
return cv2.putText(img, text, | |
bottomLeftCornerOfText, | |
font, | |
fontScale, | |
fontColor, | |
lineType) | |
# Compare frames in two directory | |
def compare_frames(save_dir, origin_dir, target_dir, strs='Original,Projected,Manipulated', dim=None): | |
os.makedirs(save_dir, exist_ok=True) | |
try: | |
if not isinstance(target_dir, list): | |
target_dir = [target_dir] | |
image_list = glob.glob1(origin_dir,'frame*') | |
image_list.sort() | |
for name in image_list: | |
img_l = [] | |
for idx, dir_path in enumerate([origin_dir] + list(target_dir)): | |
img_1 = cv2.imread(dir_path + name) | |
img_1 = put_text(img_1, strs.split(',')[idx]) | |
img_l.append(img_1) | |
img = np.concatenate(img_l, axis=1) | |
cv2.imwrite(save_dir + name, img) | |
except FileNotFoundError: | |
pass | |
# Save frames into video | |
def create_video(image_folder, fps=24, video_format='.mp4', resize_ratio=1): | |
video_name = os.path.dirname(image_folder) + video_format | |
img_list = glob.glob1(image_folder,'frame*') | |
img_list.sort() | |
frame = cv2.imread(os.path.join(image_folder, img_list[0])) | |
frame = cv2.resize(frame, (0,0), fx=resize_ratio, fy=resize_ratio) | |
height, width, layers = frame.shape | |
if video_format == '.mp4': | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
elif video_format == '.avi': | |
fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
video = cv2.VideoWriter(video_name, fourcc, fps, (width,height)) | |
for image_name in img_list: | |
frame = cv2.imread(os.path.join(image_folder, image_name)) | |
frame = cv2.resize(frame, (0,0), fx=resize_ratio, fy=resize_ratio) | |
video.write(frame) | |
# Split video into frames | |
def video_to_frames(video_path, frame_path, img_format='.jpg', count_num=1000, resize=False): | |
os.makedirs(frame_path, exist_ok=True) | |
vidcap = cv2.VideoCapture(video_path) | |
success,image = vidcap.read() | |
count = 0 | |
while success: | |
if resize: | |
image = cv2.resize(image, (0,0), fx=0.5, fy=0.5) | |
cv2.imwrite(frame_path + '/frame%04d' % count + img_format, image) | |
success,image = vidcap.read() | |
count += 1 | |
if count >= count_num: | |
break | |
# Align faces | |
def align_frames(img_dir, save_dir, output_size=1024, transform_size=1024, optical_flow=True, gaussian=True, filter_size=3): | |
os.makedirs(save_dir, exist_ok=True) | |
# load face landmark detector | |
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda') | |
# list images in the directory | |
img_list = glob.glob1(img_dir, 'frame*') | |
img_list.sort() | |
# save align statistics | |
stat_dict = {'quad':[], 'qsize':[], 'coord':[], 'crop':[]} | |
lms = [] | |
for idx, img_name in enumerate(img_list): | |
img_path = os.path.join(img_dir, img_name) | |
img = io.imread(img_path) | |
lm = [] | |
preds = fa.get_landmarks(img) | |
for kk in range(68): | |
lm.append((preds[0][kk][0], preds[0][kk][1])) | |
# Eye distance | |
lm_eye_left = lm[36 : 42] # left-clockwise | |
lm_eye_right = lm[42 : 48] # left-clockwise | |
eye_left = np.mean(lm_eye_left, axis=0) | |
eye_right = np.mean(lm_eye_right, axis=0) | |
eye_to_eye = eye_right - eye_left | |
if optical_flow: | |
if idx > 0: | |
s = int(np.hypot(*eye_to_eye)/4) | |
lk_params = dict(winSize=(s, s), maxLevel=5, criteria = (cv2.TERM_CRITERIA_COUNT | cv2.TERM_CRITERIA_EPS, 10, 0.03)) | |
points_arr = np.array(lm, np.float32) | |
points_prevarr = np.array(prev_lm, np.float32) | |
points_arr,status, err = cv2.calcOpticalFlowPyrLK(prev_img, img, points_prevarr, points_arr, **lk_params) | |
sigma =100 | |
points_arr_float = np.array(points_arr,np.float32) | |
points = points_arr_float.tolist() | |
for k in range(0, len(lm)): | |
d = cv2.norm(np.array(prev_lm[k]) - np.array(lm[k])) | |
alpha = np.exp(-d*d/sigma) | |
lm[k] = (1 - alpha) * np.array(lm[k]) + alpha * np.array(points[k]) | |
prev_img = img | |
prev_lm = lm | |
lms.append(lm) | |
# Apply gaussian filter on landmarks | |
if gaussian: | |
lm_filtered = np.array(lms) | |
for kk in range(68): | |
lm_filtered[:, kk, 0] = gaussian_filter1d(lm_filtered[:, kk, 0], filter_size) | |
lm_filtered[:, kk, 1] = gaussian_filter1d(lm_filtered[:, kk, 1], filter_size) | |
lms = lm_filtered.tolist() | |
# save landmarks | |
landmark_out_dir = os.path.dirname(img_dir) + '_landmark/' | |
os.makedirs(landmark_out_dir, exist_ok=True) | |
for idx, img_name in enumerate(img_list): | |
img_path = os.path.join(img_dir, img_name) | |
img = io.imread(img_path) | |
lm = lms[idx] | |
img_lm = img.copy() | |
for kk in range(68): | |
img_lm = cv2.circle(img_lm, (int(lm[kk][0]),int(lm[kk][1])), radius=3, color=(255, 0, 255), thickness=-1) | |
# Save landmark images | |
cv2.imwrite(landmark_out_dir + img_name, img_lm[:,:,::-1]) | |
# Save mask images | |
""" | |
seg_mask = np.zeros(img.shape, img.dtype) | |
poly = np.array(lm[0:17] + lm[17:27][::-1], np.int32) | |
cv2.fillPoly(seg_mask, [poly], (255, 255, 255)) | |
cv2.imwrite(img_dir + "mask%04d.jpg"%idx, seg_mask); | |
""" | |
# Parse landmarks. | |
lm_eye_left = lm[36 : 42] # left-clockwise | |
lm_eye_right = lm[42 : 48] # left-clockwise | |
lm_mouth_outer = lm[48 : 60] # left-clockwise | |
# Calculate auxiliary vectors. | |
eye_left = np.mean([lm_eye_left[0], lm_eye_left[3]], axis=0) | |
eye_right = np.mean([lm_eye_right[0], lm_eye_right[3]], axis=0) | |
eye_avg = (eye_left + eye_right) * 0.5 | |
eye_to_eye = eye_right - eye_left | |
mouth_left = np.array(lm_mouth_outer[0]) | |
mouth_right = np.array(lm_mouth_outer[6]) | |
mouth_avg = (mouth_left + mouth_right) * 0.5 | |
eye_to_mouth = mouth_avg - eye_avg | |
# Choose oriented crop rectangle. | |
x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1] | |
x /= np.hypot(*x) | |
x *= max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8) | |
y = np.flipud(x) * [-1, 1] | |
c = eye_avg + eye_to_mouth * 0.1 | |
quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y]) | |
qsize = np.hypot(*x) * 2 | |
stat_dict['coord'].append(quad) | |
stat_dict['qsize'].append(qsize) | |
# Apply gaussian filter on crops | |
if gaussian: | |
quads = np.array(stat_dict['coord']) | |
quads = gaussian_filter1d(quads, 2*filter_size, axis=0) | |
stat_dict['coord'] = quads.tolist() | |
qsize = np.array(stat_dict['qsize']) | |
qsize = gaussian_filter1d(qsize, 2*filter_size, axis=0) | |
stat_dict['qsize'] = qsize.tolist() | |
for idx, img_name in enumerate(img_list): | |
img_path = os.path.join(img_dir, img_name) | |
img = Image.open(img_path) | |
qsize = stat_dict['qsize'][idx] | |
quad = np.array(stat_dict['coord'][idx]) | |
# Crop. | |
border = max(int(np.rint(qsize * 0.1)), 3) | |
crop = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]), min(crop[3] + border, img.size[1])) | |
if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]: | |
img = img.crop(crop) | |
quad -= crop[0:2] | |
stat_dict['crop'].append(crop) | |
stat_dict['quad'].append((quad + 0.5).flatten()) | |
# Pad. | |
pad = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0), max(pad[3] - img.size[1] + border, 0)) | |
if max(pad) > border - 4: | |
pad = np.maximum(pad, int(np.rint(qsize * 0.3))) | |
img = np.pad(np.float32(img), ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'reflect') | |
h, w, _ = img.shape | |
y, x, _ = np.ogrid[:h, :w, :1] | |
img = Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)), 'RGB') | |
quad += pad[:2] | |
# Transform. | |
img = img.transform((transform_size, transform_size), Image.QUAD, (quad + 0.5).flatten(), Image.BILINEAR) | |
# resizing | |
img_pil = img.resize((output_size, output_size), Image.LANCZOS) | |
img_pil.save(save_dir+img_name) | |
create_video(landmark_out_dir) | |
np.save(save_dir+'stat_dict.npy', stat_dict) | |
def find_coeffs(pa, pb): | |
matrix = [] | |
for p1, p2 in zip(pa, pb): | |
matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]]) | |
matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]]) | |
A = np.matrix(matrix, dtype=np.float) | |
B = np.array(pb).reshape(8) | |
res = np.dot(np.linalg.inv(A.T * A) * A.T, B) | |
return np.array(res).reshape(8) | |
# reproject aligned frames to the original video | |
def video_reproject(orig_dir_path, recon_dir_path, save_dir_path, state_dir_path, mask_dir_path, seamless=False): | |
if not os.path.exists(save_dir_path): | |
os.makedirs(save_dir_path) | |
img_list_0 = glob.glob1(orig_dir_path,'frame*') | |
img_list_2 = glob.glob1(recon_dir_path,'frame*') | |
img_list_0.sort() | |
img_list_2.sort() | |
stat_dict = np.load(state_dir_path + 'stat_dict.npy', allow_pickle=True).item() | |
counter = len(img_list_2) | |
for idx in range(counter): | |
img_0 = Image.open(orig_dir_path + img_list_0[idx]) | |
img_2 = Image.open(recon_dir_path + img_list_2[idx]) | |
quad_f = stat_dict['quad'][idx] | |
quad_0 = stat_dict['crop'][idx] | |
coeffs = find_coeffs( | |
[(quad_f[0], quad_f[1]), (quad_f[2] , quad_f[3]), (quad_f[4], quad_f[5]), (quad_f[6], quad_f[7])], | |
[(0, 0), (0, 1024), (1024, 1024), (1024, 0)]) | |
crop_size = (quad_0[2] - quad_0[0], quad_0[3] - quad_0[1]) | |
img_2 = img_2.transform(crop_size, Image.PERSPECTIVE, coeffs, Image.BICUBIC) | |
output = img_0.copy() | |
output.paste(img_2, (int(quad_0[0]), int(quad_0[1]))) | |
""" | |
mask = cv2.imread(orig_dir_path + 'mask%04d.jpg'%idx) | |
kernel = np.ones((10,10), np.uint8) | |
mask = cv2.dilate(mask, kernel, iterations=5) | |
""" | |
crop_mask = Image.open(mask_dir_path + img_list_0[idx]) | |
crop_mask = crop_mask.transform(crop_size, Image.PERSPECTIVE, coeffs, Image.BICUBIC) | |
mask = Image.fromarray(np.zeros(np.array(img_0).shape, np.array(img_0).dtype)) | |
mask.paste(crop_mask, (int(quad_0[0]), int(quad_0[1]))) | |
mask = pil_to_cv2(mask) | |
# Apply mask | |
if not seamless: | |
mask = cv2_to_pil(mask).filter(ImageFilter.GaussianBlur(radius=10)).convert('L') | |
mask = np.array(mask)[:, :, np.newaxis]/255. | |
output = np.array(img_0)*(1-mask) + np.array(output)*mask | |
output = Image.fromarray(output.astype(np.uint8)) | |
output.save(save_dir_path + img_list_2[idx]) | |
else: | |
src = pil_to_cv2(output) | |
dst = pil_to_cv2(img_0) | |
# clone | |
br = cv2.boundingRect(cv2.split(mask)[0]) # bounding rect (x,y,width,height) | |
center = (br[0] + br[2] // 2, br[1] + br[3] // 2) | |
output = cv2.seamlessClone(src, dst, mask, center, cv2.NORMAL_CLONE) | |
cv2.imwrite(save_dir_path + img_list_2[idx], output) | |
# Align faces | |
def align_image(img_dir, save_dir, output_size=1024, transform_size=1024, format='*.png'): | |
os.makedirs(save_dir, exist_ok=True) | |
# load face landmark detector | |
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda') | |
# list images in the directory | |
img_list = glob.glob1(img_dir, format) | |
#img_list = os.listdir(img_dir) | |
img_list.sort() | |
# save align statistics | |
stat_dict = {'quad':[], 'qsize':[], 'coord':[], 'crop':[]} | |
for idx, img_name in enumerate(img_list): | |
img_path = os.path.join(img_dir, img_name) | |
img = Image.open(img_path).convert('RGB') | |
img_np = np.array(img) | |
lm = [] | |
preds = fa.get_landmarks(img_np) | |
for kk in range(68): | |
lm.append((preds[0][kk][0], preds[0][kk][1])) | |
if len(lm)==0: | |
continue | |
# Parse landmarks. Code extracted from ffhq-dataset | |
# pylint: disable=unused-variable | |
lm_chin = lm[0 : 17] # left-right | |
lm_eyebrow_left = lm[17 : 22] # left-right | |
lm_eyebrow_right = lm[22 : 27] # left-right | |
lm_nose = lm[27 : 31] # top-down | |
lm_nostrils = lm[31 : 36] # top-down | |
lm_eye_left = lm[36 : 42] # left-clockwise | |
lm_eye_right = lm[42 : 48] # left-clockwise | |
lm_mouth_outer = lm[48 : 60] # left-clockwise | |
lm_mouth_inner = lm[60 : 68] # left-clockwise | |
# Calculate auxiliary vectors. | |
eye_left = np.mean([lm_eye_left[0], lm_eye_left[3]], axis=0) | |
eye_right = np.mean([lm_eye_right[0], lm_eye_right[3]], axis=0) | |
eye_avg = (eye_left + eye_right) * 0.5 | |
eye_to_eye = eye_right - eye_left | |
mouth_left = np.array(lm_mouth_outer[0]) | |
mouth_right = np.array(lm_mouth_outer[6]) | |
mouth_avg = (mouth_left + mouth_right) * 0.5 | |
eye_to_mouth = mouth_avg - eye_avg | |
# Choose oriented crop rectangle. | |
x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1] | |
x /= np.hypot(*x) | |
x *= np.hypot(*eye_to_eye) * 2.0#max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8) | |
y = np.flipud(x) * [-1, 1] | |
c = eye_avg + eye_to_mouth * 0.1 | |
quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y]) | |
qsize = np.hypot(*x) * 2 | |
stat_dict['coord'].append(quad) | |
stat_dict['qsize'].append(qsize) | |
qsize = stat_dict['qsize'][idx] | |
quad = np.array(stat_dict['coord'][idx]) | |
""" | |
# Shrink. | |
shrink = int(np.floor(qsize / output_size * 0.5)) | |
if shrink > 1: | |
print('shrink!') | |
rsize = (int(np.rint(float(img.size[0]) / shrink)), int(np.rint(float(img.size[1]) / shrink))) | |
img = img.resize(rsize, Image.ANTIALIAS) | |
quad /= shrink | |
qsize /= shrink | |
""" | |
# Crop. | |
border = max(int(np.rint(qsize * 0.1)), 3) | |
crop = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]), min(crop[3] + border, img.size[1])) | |
if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]: | |
img = img.crop(crop) | |
quad -= crop[0:2] | |
stat_dict['crop'].append(crop) | |
stat_dict['quad'].append((quad + 0.5).flatten()) | |
#img = img.crop(crop) | |
# Pad. | |
pad = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1])))) | |
pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0), max(pad[3] - img.size[1] + border, 0)) | |
if max(pad) > border - 4: | |
pad = np.maximum(pad, int(np.rint(qsize * 0.3))) | |
img = np.pad(np.float32(img), ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'edge') | |
h, w, _ = img.shape | |
y, x, _ = np.ogrid[:h, :w, :1] | |
img = Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)), 'RGB') | |
quad += pad[:2] | |
# Transform. | |
img = img.transform((transform_size, transform_size), Image.QUAD, (quad + 0.5).flatten(), Image.BILINEAR) | |
img_pil = img.resize((output_size, output_size), Image.LANCZOS) | |
# resizing | |
img_pil.save(save_dir+img_name) | |
np.save(save_dir+'stat_dict.npy', stat_dict) | |
img_to_tensor = transforms.Compose([ | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) | |
]) | |
def generate_mask(img_dir, save_dir, parsing_net, labels=[1,2,3,4,5,6,9,10,11,12,13], output_size=(1024, 1024), device=torch.device('cuda')): | |
os.makedirs(save_dir, exist_ok=True) | |
img_list = glob.glob1(img_dir, 'frame*') | |
img_list.sort() | |
for img_name in img_list: | |
img_path = os.path.join(img_dir, img_name) | |
img = Image.open(img_path).resize((512, 512), Image.LANCZOS) | |
x_1 = img_to_tensor(img).unsqueeze(0).to(device) | |
out_1 = parsing_net(x_1) | |
parsing = out_1[0].squeeze(0).detach().cpu().numpy().argmax(0) | |
mask = np.uint8(parsing) | |
for j in labels: | |
mask = np.where(mask==j, 255, mask) | |
mask = np.where(mask==255, 255, 0) | |
mask_pil = Image.fromarray(np.uint8(mask)).resize(output_size, Image.LANCZOS) | |
save_path = os.path.join(save_dir, img_name) | |
mask_pil.save(save_path) |