amanSethSmava
new commit
6d314be
raw
history blame
18.3 kB
# Copyright (c) 2021, InterDigital R&D France. All rights reserved.
#
# This source code is made available under the license found in the
# LICENSE.txt in the root directory of this source tree.
import cv2
import glob
import numpy as np
import os
import face_alignment
import torch
from PIL import Image, ImageFilter
from scipy import ndimage
from scipy.ndimage import gaussian_filter1d
from skimage import io
from torchvision import transforms, utils
def pil_to_cv2(pil_image):
open_cv_image = np.array(pil_image)
return open_cv_image[:, :, ::-1].copy()
def cv2_to_pil(open_cv_image):
return Image.fromarray(open_cv_image[:, :, ::-1].copy())
def put_text(img, text):
font = cv2.FONT_HERSHEY_SIMPLEX
bottomLeftCornerOfText = (10,50)
fontScale = 1.5
fontColor = (255,255,0)
lineType = 2
return cv2.putText(img, text,
bottomLeftCornerOfText,
font,
fontScale,
fontColor,
lineType)
# Compare frames in two directory
def compare_frames(save_dir, origin_dir, target_dir, strs='Original,Projected,Manipulated', dim=None):
os.makedirs(save_dir, exist_ok=True)
try:
if not isinstance(target_dir, list):
target_dir = [target_dir]
image_list = glob.glob1(origin_dir,'frame*')
image_list.sort()
for name in image_list:
img_l = []
for idx, dir_path in enumerate([origin_dir] + list(target_dir)):
img_1 = cv2.imread(dir_path + name)
img_1 = put_text(img_1, strs.split(',')[idx])
img_l.append(img_1)
img = np.concatenate(img_l, axis=1)
cv2.imwrite(save_dir + name, img)
except FileNotFoundError:
pass
# Save frames into video
def create_video(image_folder, fps=24, video_format='.mp4', resize_ratio=1):
video_name = os.path.dirname(image_folder) + video_format
img_list = glob.glob1(image_folder,'frame*')
img_list.sort()
frame = cv2.imread(os.path.join(image_folder, img_list[0]))
frame = cv2.resize(frame, (0,0), fx=resize_ratio, fy=resize_ratio)
height, width, layers = frame.shape
if video_format == '.mp4':
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
elif video_format == '.avi':
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video = cv2.VideoWriter(video_name, fourcc, fps, (width,height))
for image_name in img_list:
frame = cv2.imread(os.path.join(image_folder, image_name))
frame = cv2.resize(frame, (0,0), fx=resize_ratio, fy=resize_ratio)
video.write(frame)
# Split video into frames
def video_to_frames(video_path, frame_path, img_format='.jpg', count_num=1000, resize=False):
os.makedirs(frame_path, exist_ok=True)
vidcap = cv2.VideoCapture(video_path)
success,image = vidcap.read()
count = 0
while success:
if resize:
image = cv2.resize(image, (0,0), fx=0.5, fy=0.5)
cv2.imwrite(frame_path + '/frame%04d' % count + img_format, image)
success,image = vidcap.read()
count += 1
if count >= count_num:
break
# Align faces
def align_frames(img_dir, save_dir, output_size=1024, transform_size=1024, optical_flow=True, gaussian=True, filter_size=3):
os.makedirs(save_dir, exist_ok=True)
# load face landmark detector
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda')
# list images in the directory
img_list = glob.glob1(img_dir, 'frame*')
img_list.sort()
# save align statistics
stat_dict = {'quad':[], 'qsize':[], 'coord':[], 'crop':[]}
lms = []
for idx, img_name in enumerate(img_list):
img_path = os.path.join(img_dir, img_name)
img = io.imread(img_path)
lm = []
preds = fa.get_landmarks(img)
for kk in range(68):
lm.append((preds[0][kk][0], preds[0][kk][1]))
# Eye distance
lm_eye_left = lm[36 : 42] # left-clockwise
lm_eye_right = lm[42 : 48] # left-clockwise
eye_left = np.mean(lm_eye_left, axis=0)
eye_right = np.mean(lm_eye_right, axis=0)
eye_to_eye = eye_right - eye_left
if optical_flow:
if idx > 0:
s = int(np.hypot(*eye_to_eye)/4)
lk_params = dict(winSize=(s, s), maxLevel=5, criteria = (cv2.TERM_CRITERIA_COUNT | cv2.TERM_CRITERIA_EPS, 10, 0.03))
points_arr = np.array(lm, np.float32)
points_prevarr = np.array(prev_lm, np.float32)
points_arr,status, err = cv2.calcOpticalFlowPyrLK(prev_img, img, points_prevarr, points_arr, **lk_params)
sigma =100
points_arr_float = np.array(points_arr,np.float32)
points = points_arr_float.tolist()
for k in range(0, len(lm)):
d = cv2.norm(np.array(prev_lm[k]) - np.array(lm[k]))
alpha = np.exp(-d*d/sigma)
lm[k] = (1 - alpha) * np.array(lm[k]) + alpha * np.array(points[k])
prev_img = img
prev_lm = lm
lms.append(lm)
# Apply gaussian filter on landmarks
if gaussian:
lm_filtered = np.array(lms)
for kk in range(68):
lm_filtered[:, kk, 0] = gaussian_filter1d(lm_filtered[:, kk, 0], filter_size)
lm_filtered[:, kk, 1] = gaussian_filter1d(lm_filtered[:, kk, 1], filter_size)
lms = lm_filtered.tolist()
# save landmarks
landmark_out_dir = os.path.dirname(img_dir) + '_landmark/'
os.makedirs(landmark_out_dir, exist_ok=True)
for idx, img_name in enumerate(img_list):
img_path = os.path.join(img_dir, img_name)
img = io.imread(img_path)
lm = lms[idx]
img_lm = img.copy()
for kk in range(68):
img_lm = cv2.circle(img_lm, (int(lm[kk][0]),int(lm[kk][1])), radius=3, color=(255, 0, 255), thickness=-1)
# Save landmark images
cv2.imwrite(landmark_out_dir + img_name, img_lm[:,:,::-1])
# Save mask images
"""
seg_mask = np.zeros(img.shape, img.dtype)
poly = np.array(lm[0:17] + lm[17:27][::-1], np.int32)
cv2.fillPoly(seg_mask, [poly], (255, 255, 255))
cv2.imwrite(img_dir + "mask%04d.jpg"%idx, seg_mask);
"""
# Parse landmarks.
lm_eye_left = lm[36 : 42] # left-clockwise
lm_eye_right = lm[42 : 48] # left-clockwise
lm_mouth_outer = lm[48 : 60] # left-clockwise
# Calculate auxiliary vectors.
eye_left = np.mean([lm_eye_left[0], lm_eye_left[3]], axis=0)
eye_right = np.mean([lm_eye_right[0], lm_eye_right[3]], axis=0)
eye_avg = (eye_left + eye_right) * 0.5
eye_to_eye = eye_right - eye_left
mouth_left = np.array(lm_mouth_outer[0])
mouth_right = np.array(lm_mouth_outer[6])
mouth_avg = (mouth_left + mouth_right) * 0.5
eye_to_mouth = mouth_avg - eye_avg
# Choose oriented crop rectangle.
x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1]
x /= np.hypot(*x)
x *= max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8)
y = np.flipud(x) * [-1, 1]
c = eye_avg + eye_to_mouth * 0.1
quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y])
qsize = np.hypot(*x) * 2
stat_dict['coord'].append(quad)
stat_dict['qsize'].append(qsize)
# Apply gaussian filter on crops
if gaussian:
quads = np.array(stat_dict['coord'])
quads = gaussian_filter1d(quads, 2*filter_size, axis=0)
stat_dict['coord'] = quads.tolist()
qsize = np.array(stat_dict['qsize'])
qsize = gaussian_filter1d(qsize, 2*filter_size, axis=0)
stat_dict['qsize'] = qsize.tolist()
for idx, img_name in enumerate(img_list):
img_path = os.path.join(img_dir, img_name)
img = Image.open(img_path)
qsize = stat_dict['qsize'][idx]
quad = np.array(stat_dict['coord'][idx])
# Crop.
border = max(int(np.rint(qsize * 0.1)), 3)
crop = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1]))))
crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]), min(crop[3] + border, img.size[1]))
if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]:
img = img.crop(crop)
quad -= crop[0:2]
stat_dict['crop'].append(crop)
stat_dict['quad'].append((quad + 0.5).flatten())
# Pad.
pad = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1]))))
pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0), max(pad[3] - img.size[1] + border, 0))
if max(pad) > border - 4:
pad = np.maximum(pad, int(np.rint(qsize * 0.3)))
img = np.pad(np.float32(img), ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'reflect')
h, w, _ = img.shape
y, x, _ = np.ogrid[:h, :w, :1]
img = Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)), 'RGB')
quad += pad[:2]
# Transform.
img = img.transform((transform_size, transform_size), Image.QUAD, (quad + 0.5).flatten(), Image.BILINEAR)
# resizing
img_pil = img.resize((output_size, output_size), Image.LANCZOS)
img_pil.save(save_dir+img_name)
create_video(landmark_out_dir)
np.save(save_dir+'stat_dict.npy', stat_dict)
def find_coeffs(pa, pb):
matrix = []
for p1, p2 in zip(pa, pb):
matrix.append([p1[0], p1[1], 1, 0, 0, 0, -p2[0]*p1[0], -p2[0]*p1[1]])
matrix.append([0, 0, 0, p1[0], p1[1], 1, -p2[1]*p1[0], -p2[1]*p1[1]])
A = np.matrix(matrix, dtype=np.float)
B = np.array(pb).reshape(8)
res = np.dot(np.linalg.inv(A.T * A) * A.T, B)
return np.array(res).reshape(8)
# reproject aligned frames to the original video
def video_reproject(orig_dir_path, recon_dir_path, save_dir_path, state_dir_path, mask_dir_path, seamless=False):
if not os.path.exists(save_dir_path):
os.makedirs(save_dir_path)
img_list_0 = glob.glob1(orig_dir_path,'frame*')
img_list_2 = glob.glob1(recon_dir_path,'frame*')
img_list_0.sort()
img_list_2.sort()
stat_dict = np.load(state_dir_path + 'stat_dict.npy', allow_pickle=True).item()
counter = len(img_list_2)
for idx in range(counter):
img_0 = Image.open(orig_dir_path + img_list_0[idx])
img_2 = Image.open(recon_dir_path + img_list_2[idx])
quad_f = stat_dict['quad'][idx]
quad_0 = stat_dict['crop'][idx]
coeffs = find_coeffs(
[(quad_f[0], quad_f[1]), (quad_f[2] , quad_f[3]), (quad_f[4], quad_f[5]), (quad_f[6], quad_f[7])],
[(0, 0), (0, 1024), (1024, 1024), (1024, 0)])
crop_size = (quad_0[2] - quad_0[0], quad_0[3] - quad_0[1])
img_2 = img_2.transform(crop_size, Image.PERSPECTIVE, coeffs, Image.BICUBIC)
output = img_0.copy()
output.paste(img_2, (int(quad_0[0]), int(quad_0[1])))
"""
mask = cv2.imread(orig_dir_path + 'mask%04d.jpg'%idx)
kernel = np.ones((10,10), np.uint8)
mask = cv2.dilate(mask, kernel, iterations=5)
"""
crop_mask = Image.open(mask_dir_path + img_list_0[idx])
crop_mask = crop_mask.transform(crop_size, Image.PERSPECTIVE, coeffs, Image.BICUBIC)
mask = Image.fromarray(np.zeros(np.array(img_0).shape, np.array(img_0).dtype))
mask.paste(crop_mask, (int(quad_0[0]), int(quad_0[1])))
mask = pil_to_cv2(mask)
# Apply mask
if not seamless:
mask = cv2_to_pil(mask).filter(ImageFilter.GaussianBlur(radius=10)).convert('L')
mask = np.array(mask)[:, :, np.newaxis]/255.
output = np.array(img_0)*(1-mask) + np.array(output)*mask
output = Image.fromarray(output.astype(np.uint8))
output.save(save_dir_path + img_list_2[idx])
else:
src = pil_to_cv2(output)
dst = pil_to_cv2(img_0)
# clone
br = cv2.boundingRect(cv2.split(mask)[0]) # bounding rect (x,y,width,height)
center = (br[0] + br[2] // 2, br[1] + br[3] // 2)
output = cv2.seamlessClone(src, dst, mask, center, cv2.NORMAL_CLONE)
cv2.imwrite(save_dir_path + img_list_2[idx], output)
# Align faces
def align_image(img_dir, save_dir, output_size=1024, transform_size=1024, format='*.png'):
os.makedirs(save_dir, exist_ok=True)
# load face landmark detector
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType._2D, flip_input=False, device='cuda')
# list images in the directory
img_list = glob.glob1(img_dir, format)
#img_list = os.listdir(img_dir)
img_list.sort()
# save align statistics
stat_dict = {'quad':[], 'qsize':[], 'coord':[], 'crop':[]}
for idx, img_name in enumerate(img_list):
img_path = os.path.join(img_dir, img_name)
img = Image.open(img_path).convert('RGB')
img_np = np.array(img)
lm = []
preds = fa.get_landmarks(img_np)
for kk in range(68):
lm.append((preds[0][kk][0], preds[0][kk][1]))
if len(lm)==0:
continue
# Parse landmarks. Code extracted from ffhq-dataset
# pylint: disable=unused-variable
lm_chin = lm[0 : 17] # left-right
lm_eyebrow_left = lm[17 : 22] # left-right
lm_eyebrow_right = lm[22 : 27] # left-right
lm_nose = lm[27 : 31] # top-down
lm_nostrils = lm[31 : 36] # top-down
lm_eye_left = lm[36 : 42] # left-clockwise
lm_eye_right = lm[42 : 48] # left-clockwise
lm_mouth_outer = lm[48 : 60] # left-clockwise
lm_mouth_inner = lm[60 : 68] # left-clockwise
# Calculate auxiliary vectors.
eye_left = np.mean([lm_eye_left[0], lm_eye_left[3]], axis=0)
eye_right = np.mean([lm_eye_right[0], lm_eye_right[3]], axis=0)
eye_avg = (eye_left + eye_right) * 0.5
eye_to_eye = eye_right - eye_left
mouth_left = np.array(lm_mouth_outer[0])
mouth_right = np.array(lm_mouth_outer[6])
mouth_avg = (mouth_left + mouth_right) * 0.5
eye_to_mouth = mouth_avg - eye_avg
# Choose oriented crop rectangle.
x = eye_to_eye - np.flipud(eye_to_mouth) * [-1, 1]
x /= np.hypot(*x)
x *= np.hypot(*eye_to_eye) * 2.0#max(np.hypot(*eye_to_eye) * 2.0, np.hypot(*eye_to_mouth) * 1.8)
y = np.flipud(x) * [-1, 1]
c = eye_avg + eye_to_mouth * 0.1
quad = np.stack([c - x - y, c - x + y, c + x + y, c + x - y])
qsize = np.hypot(*x) * 2
stat_dict['coord'].append(quad)
stat_dict['qsize'].append(qsize)
qsize = stat_dict['qsize'][idx]
quad = np.array(stat_dict['coord'][idx])
"""
# Shrink.
shrink = int(np.floor(qsize / output_size * 0.5))
if shrink > 1:
print('shrink!')
rsize = (int(np.rint(float(img.size[0]) / shrink)), int(np.rint(float(img.size[1]) / shrink)))
img = img.resize(rsize, Image.ANTIALIAS)
quad /= shrink
qsize /= shrink
"""
# Crop.
border = max(int(np.rint(qsize * 0.1)), 3)
crop = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1]))))
crop = (max(crop[0] - border, 0), max(crop[1] - border, 0), min(crop[2] + border, img.size[0]), min(crop[3] + border, img.size[1]))
if crop[2] - crop[0] < img.size[0] or crop[3] - crop[1] < img.size[1]:
img = img.crop(crop)
quad -= crop[0:2]
stat_dict['crop'].append(crop)
stat_dict['quad'].append((quad + 0.5).flatten())
#img = img.crop(crop)
# Pad.
pad = (int(np.floor(min(quad[:,0]))), int(np.floor(min(quad[:,1]))), int(np.ceil(max(quad[:,0]))), int(np.ceil(max(quad[:,1]))))
pad = (max(-pad[0] + border, 0), max(-pad[1] + border, 0), max(pad[2] - img.size[0] + border, 0), max(pad[3] - img.size[1] + border, 0))
if max(pad) > border - 4:
pad = np.maximum(pad, int(np.rint(qsize * 0.3)))
img = np.pad(np.float32(img), ((pad[1], pad[3]), (pad[0], pad[2]), (0, 0)), 'edge')
h, w, _ = img.shape
y, x, _ = np.ogrid[:h, :w, :1]
img = Image.fromarray(np.uint8(np.clip(np.rint(img), 0, 255)), 'RGB')
quad += pad[:2]
# Transform.
img = img.transform((transform_size, transform_size), Image.QUAD, (quad + 0.5).flatten(), Image.BILINEAR)
img_pil = img.resize((output_size, output_size), Image.LANCZOS)
# resizing
img_pil.save(save_dir+img_name)
np.save(save_dir+'stat_dict.npy', stat_dict)
img_to_tensor = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
def generate_mask(img_dir, save_dir, parsing_net, labels=[1,2,3,4,5,6,9,10,11,12,13], output_size=(1024, 1024), device=torch.device('cuda')):
os.makedirs(save_dir, exist_ok=True)
img_list = glob.glob1(img_dir, 'frame*')
img_list.sort()
for img_name in img_list:
img_path = os.path.join(img_dir, img_name)
img = Image.open(img_path).resize((512, 512), Image.LANCZOS)
x_1 = img_to_tensor(img).unsqueeze(0).to(device)
out_1 = parsing_net(x_1)
parsing = out_1[0].squeeze(0).detach().cpu().numpy().argmax(0)
mask = np.uint8(parsing)
for j in labels:
mask = np.where(mask==j, 255, mask)
mask = np.where(mask==255, 255, 0)
mask_pil = Image.fromarray(np.uint8(mask)).resize(output_size, Image.LANCZOS)
save_path = os.path.join(save_dir, img_name)
mask_pil.save(save_path)