Spaces:
Sleeping
Sleeping
File size: 15,006 Bytes
327b68f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2020 Imperial College London (Pingchuan Ma)
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
""" Crop Mouth ROIs from videos for lipreading"""
# from msilib.schema import File
from ast import Pass
import os
import cv2 # OpenCV λΌμ΄λΈλ¬λ¦¬
import glob # 리λ
μ€μ κ²½λ‘ νκΈ°λ²μ μ¬μ©νμ¬ μνλ ν΄λ/νμΌ λ¦¬μ€νΈ μ»μ
import argparse # λͺ
λ Ήν μΈμλ₯Ό νμ±ν΄μ£Όλ λͺ¨λ
import numpy as np
from collections import deque # collections λͺ¨λμ μλ λ°ν¬ λΆλ¬μ€κΈ° # λ°ν¬: μ€νκ³Ό νλ₯Ό ν©μΉ μλ£κ΅¬μ‘°
from utils import * # utils.py λͺ¨λμ μλ λͺ¨λ ν¨μ λΆλ¬μ€κΈ°
from transform import * # transform.py λͺ¨λμ μλ λͺ¨λ ν¨μ λΆλ¬μ€κΈ°
import dlib # face landmark μ°Ύλ λΌμ΄λΈλ¬λ¦¬
import face_alignment # face landmark μ°Ύλ λΌμ΄λΈλ¬λ¦¬
from PIL import Image
# μΈμκ°μ λ°μμ μ²λ¦¬νλ ν¨μ
def load_args(default_config=None):
# μΈμκ°μ λ°μμ μ²λ¦¬νλ ν¨μ
parser = argparse.ArgumentParser(description='Lipreading Pre-processing')
# μ
λ ₯λ°μ μΈμκ° λ±λ‘
# -- utils
parser.add_argument('--video-direc', default=None, help='raw video directory')
parser.add_argument('--video-format', default='.mp4', help='raw video format')
parser.add_argument('--landmark-direc', default=None, help='landmark directory')
parser.add_argument('--filename-path', default='./vietnamese_detected_face_30.csv', help='list of detected video and its subject ID')
parser.add_argument('--save-direc', default=None, help='the directory of saving mouth ROIs')
# -- mean face utils
parser.add_argument('--mean-face', default='./20words_mean_face.npy', help='mean face pathname')
# -- mouthROIs utils
parser.add_argument('--crop-width', default=96, type=int, help='the width of mouth ROIs')
parser.add_argument('--crop-height', default=96, type=int, help='the height of mouth ROIs')
parser.add_argument('--start-idx', default=48, type=int, help='the start of landmark index')
parser.add_argument('--stop-idx', default=68, type=int, help='the end of landmark index')
parser.add_argument('--window-margin', default=12, type=int, help='window margin for smoothed_landmarks')
# -- convert to gray scale
parser.add_argument('--convert-gray', default=False, action='store_true', help='convert2grayscale')
# -- test set only
parser.add_argument('--testset-only', default=False, action='store_true', help='process testing set only')
# μ
λ ₯λ°μ μΈμκ°μ argsμ μ μ₯ (type: namespace)
args = parser.parse_args()
return args
args = load_args() # args νμ± λ° λ‘λ
# -- mean face utils
STD_SIZE = (256, 256)
mean_face_landmarks = np.load(args.mean_face) # 20words_mean_face.npy
stablePntsIDs = [33, 36, 39, 42, 45]
# μμμμ λλλ§ν¬ λ°μμ μ
μ μλΌλ΄κΈ°
def crop_patch( video_pathname, landmarks):
"""Crop mouth patch
:param str video_pathname: pathname for the video_dieo # μμ μμΉ
:param list landmarks: interpolated landmarks # 보κ°λ λλλ§ν¬
"""
frame_idx = 0 # νλ μ μΈλ±μ€ λ²νΈ 0 μΌλ‘ μ΄κΈ°ν
frame_gen = read_video(video_pathname) # λΉλμ€ λΆλ¬μ€κΈ°
# 무ν λ°λ³΅
while True:
try:
frame = frame_gen.__next__() ## -- BGR # μ΄λ―Έμ§ νλ μ νλμ© λΆλ¬μ€κΈ°
except StopIteration: # λ μ΄μ next μμκ° μμΌλ©΄ StopIterraion Exception λ°μ
break # while λΉ μ Έλκ°κΈ°
if frame_idx == 0: # νλ μ μΈλ±μ€ λ²νΈκ° 0μΌ κ²½μ°
q_frame, q_landmarks = deque(), deque() # λ°ν¬ μμ±
sequence = []
q_landmarks.append(landmarks[frame_idx]) # νλ μ μΈλ±μ€ λ²νΈμ λ§λ λλλ§ν¬ μ 보 μΆκ°
q_frame.append(frame) # νλ μ μ 보 μΆκ°
if len(q_frame) == args.window_margin:
smoothed_landmarks = np.mean(q_landmarks, axis=0) # κ° κ·Έλ£Ήμ κ°μ μμλΌλ¦¬ νκ·
cur_landmarks = q_landmarks.popleft() # λ°ν¬ μ μΌ μΌμͺ½ κ° κΊΌλ΄κΈ°
cur_frame = q_frame.popleft() # λ°ν¬ μ μΌ μΌμͺ½ κ° κΊΌλ΄κΈ°
# -- affine transformation # μν λ³ν
trans_frame, trans = warp_img( smoothed_landmarks[stablePntsIDs, :],
mean_face_landmarks[stablePntsIDs, :],
cur_frame,
STD_SIZE)
trans_landmarks = trans(cur_landmarks)
# -- crop mouth patch # μ
μ μλΌλ΄κΈ°
sequence.append( cut_patch( trans_frame,
trans_landmarks[args.start_idx:args.stop_idx],
args.crop_height//2,
args.crop_width//2,))
if frame_idx == len(landmarks)-1:
while q_frame:
cur_frame = q_frame.popleft() # λ°ν¬ μ μΌ μΌμͺ½ κ° κΊΌλ΄κΈ°
# -- transform frame # νλ μ λ³ν
trans_frame = apply_transform( trans, cur_frame, STD_SIZE)
# -- transform landmarks # λλλ§ν¬ λ³ν
trans_landmarks = trans(q_landmarks.popleft())
# -- crop mouth patch # μ
μ μλΌλ΄κΈ°
sequence.append( cut_patch( trans_frame,
trans_landmarks[args.start_idx:args.stop_idx],
args.crop_height//2,
args.crop_width//2,))
return np.array(sequence) # μ
μ numpy λ°ν
frame_idx += 1 # νλ μ μΈλ±μ€ λ²νΈ μ¦κ°
return None
# λλλ§ν¬ 보κ°
def landmarks_interpolate(landmarks):
"""Interpolate landmarks
param list landmarks: landmarks detected in raw videos # μλ³Έ μμ λ°μ΄ν°μμ κ²μΆν λλλ§ν¬
"""
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # λλλ§ν¬ λ²νΈ list μμ±
# λλλ§ν¬ λ²νΈ list κ° λΉμ΄μλ€λ©΄
if not valid_frames_idx:
return None
# 1λΆν° (λλλ§ν¬ λ²νΈ list κ°μ-1)λ§νΌ for λ¬Έ λ°λ³΅
for idx in range(1, len(valid_frames_idx)):
if valid_frames_idx[idx] - valid_frames_idx[idx-1] == 1: # νμ¬ λλλ§ν¬ λ²νΈ - μ΄μ λλλ§ν¬ λ²νΈ == 1 μΌ κ²½μ°
continue # μ½λ μ€ν 건λλ°κΈ°
else: # μλλΌλ©΄
landmarks = linear_interpolate(landmarks, valid_frames_idx[idx-1], valid_frames_idx[idx]) # λλλ§ν¬ μ
λ°μ΄νΈ(보κ°)
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # λλλ§ν¬ λ²νΈ list μμ±
# -- Corner case: keep frames at the beginning or at the end failed to be detected. # μμ λλ λ νλ μμ 보κ΄νμ§ λͺ»ν¨
if valid_frames_idx:
landmarks[:valid_frames_idx[0]] = [landmarks[valid_frames_idx[0]]] * valid_frames_idx[0] # λλλ§ν¬ 첫λ²μ§Έ νλ μ μ 보 μ μ₯
landmarks[valid_frames_idx[-1]:] = [landmarks[valid_frames_idx[-1]]] * (len(landmarks) - valid_frames_idx[-1]) # λλλ§ν¬ λ§μ§λ§ νλ μ μ 보 μ μ₯
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # λλλ§ν¬ λ²νΈ list μμ±
# λλλ§ν¬ λ²νΈ list κ°μ == 보κ°ν λλλ§ν¬ κ°μ νμΈ, μλλ©΄ AssertionError λ©μμ§λ₯Ό λμ
assert len(valid_frames_idx) == len(landmarks), "not every frame has landmark" # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ©
return landmarks # λλλ§ν¬ λ°ν
def get_yield(output_video):
for frame in output_video:
yield frame
lines = open(args.filename_path).read().splitlines() # λ¬Έμμ΄μ '\n' κΈ°μ€μΌλ‘ μͺΌκ° ν list μμ±
lines = list(filter(lambda x: 'test' == x.split('/')[-2], lines)) if args.testset_only else lines # args.testset_only κ°μ΄ μλ€λ©΄ test ν΄λ μ νμΌλͺ
λ§ λΆλ¬μμ list μμ±, μλλΌλ©΄ μλ lines κ·Έλλ‘ κ° μ μ§
# lines κ°μλ§νΌ λ°λ³΅λ¬Έ μ€ν
for filename_idx, line in enumerate(lines):
# νμΌλͺ
, μ¬λid
filename, person_id = line.split(',')
print('idx: {} \tProcessing.\t{}'.format(filename_idx, filename)) # νμΌ μΈλ±μ€λ²νΈ, νμΌλͺ
μΆλ ₯
video_pathname = os.path.join(args.video_direc, filename+args.video_format) # μμλλ ν 리 + νμΌλͺ
.λΉλμ€ν¬λ§·/
landmarks_pathname = os.path.join(args.landmark_direc, filename+'.npz') # μ μ₯λλ ν 리 + λλλ§ν¬ νμΌλͺ
.npz
dst_pathname = os.path.join( args.save_direc, filename+'.npz') # μ μ₯λλ ν 리 + κ²°κ³Όμμ νμΌλͺ
.npz
# νμΌμ΄ μλμ§ νμΈ, μμΌλ©΄ AssertionError λ©μμ§λ₯Ό λμ
assert os.path.isfile(video_pathname), "File does not exist. Path input: {}".format(video_pathname) # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ©
# video μ λν face landmark npz νμΌμ΄ μκ³ μμ νμ₯μ avi μΈ κ²½μ° dlib μΌλ‘ μ§μ npz νμΌ μμ±
if not os.path.exists(landmarks_pathname) and video_pathname.split('.')[-1] == 'mp4':
# dlib μ¬μ©ν΄μ face landmark μ°ΎκΈ°
def get_face_landmark(img):
detector_hog = dlib.get_frontal_face_detector()
dlib_rects = detector_hog(img, 1)
model_path = os.path.dirname(os.path.abspath(__file__)) + '/shape_predictor_68_face_landmarks.dat'
landmark_predictor = dlib.shape_predictor(model_path)
# dlib μΌλ‘ face landmark μ°ΎκΈ°
list_landmarks = []
for dlib_rect in dlib_rects:
points = landmark_predictor(img, dlib_rect)
list_points = list(map(lambda p: (p.x, p.y), points.parts()))
list_landmarks.append(list_points)
input_width, input_height = img.shape
output_width, output_height = (256, 256)
width_rate = input_width / output_width
height_rate = input_height / output_height
img_rate = [(width_rate, height_rate)]*68
face_rate = np.array(img_rate)
eye_rate = np.array(img_rate[36:48])
# face landmark list κ° λΉμ΄μμ§ μμ κ²½μ°
if list_landmarks:
for dlib_rect, landmark in zip(dlib_rects, list_landmarks):
face_landmark = np.array(landmark) # face landmark
eye_landmark = np.array(landmark[36:48]) # eye landmark
return face_landmark, eye_landmark
# face landmark list κ° λΉμ΄μλ κ²½μ°
else:
landmark = [(0.0, 0.0)] * 68
face_landmark = np.array(landmark) # face landmark
eye_landmark = np.array(landmark[36:48]) # eye landmark
return face_landmark, eye_landmark
target_frames = 29 # μνλ νλ μ κ°μ
video = videoToArray(video_pathname, is_gray=args.convert_gray) # μμ μ 보 μμ μμ νλ μ κ°μλ₯Ό μΆκ°ν numpy
output_video = frameAdjust(video, target_frames) # frame sampling (νλ μ κ°μ λ§μΆκΈ°)
multi_sub_landmarks = []
person_landmarks = []
frame_landmarks = []
for frame_idx, frame in enumerate(get_yield(output_video)):
print(f'\n ------------frame {frame_idx}------------ ')
facial_landmarks, eye_landmarks = get_face_landmark(frame) # dlib μ¬μ©ν΄μ face landmark μ°ΎκΈ°
person_landmarks = {
'id': 0,
'most_recent_fitting_scores': np.array([2.0,2.0,2.0]),
'facial_landmarks': facial_landmarks,
'roll': 7,
'yaw': 3.5,
'eye_landmarks': eye_landmarks,
'fitting_scores_updated': True,
'pitch': -0.05
}
frame_landmarks.append(person_landmarks)
multi_sub_landmarks.append(np.array(frame_landmarks.copy(), dtype=object))
multi_sub_landmarks = np.array(multi_sub_landmarks) # list to numpy
save2npz(landmarks_pathname, data=multi_sub_landmarks) # face landmark npz μ μ₯
print('\n ------------ save npz ------------ \n')
# video μ λν face landmark npz νμΌμ΄ μλ κ²½μ°
else:
# νμΌμ΄ μλμ§ νμΈ, μμΌλ©΄ AssertionError λ©μμ§λ₯Ό λμ
assert os.path.isfile(landmarks_pathname), "File does not exist. Path input: {}".format(landmarks_pathname) # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ©
# νμΌμ΄ μ‘΄μ¬ν κ²½μ°
if os.path.exists(dst_pathname):
continue # μ½λ μ€ν 건λλ°κΈ°
multi_sub_landmarks = np.load( landmarks_pathname, allow_pickle=True)['data'] # numpy νμΌ μ΄κΈ°
landmarks = [None] * len( multi_sub_landmarks) # λλλ§ν¬ λ³μ μ΄κΈ°ν
for frame_idx in range(len(landmarks)):
try:
landmarks[frame_idx] = multi_sub_landmarks[frame_idx][int(person_id)]['facial_landmarks'].astype(np.float64) # νλ μ μΈλ±μ€ λ²νΈμμ μ¬λidμ μΌκ΅΄ λλλ§ν¬ μ 보 κ°μ Έμ€κΈ°
except IndexError: # ν΄λΉ μΈλ±μ€ λ²νΈμ κΉμ΄ μμΌλ©΄ IndexError λ°μ
continue # μ½λ μ€ν 건λλ°κΈ°
# face landmark κ° [(0,0)]*68 μ΄ μλλ©΄ λλλ§ν¬ λ³΄κ° ν npz νμΌ μμ±
landmarks_empty_list = []
landmarks_empty = [(0, 0)]*68
landmarks_empty = np.array(landmarks_empty, dtype=object)
for i in range(len(landmarks_empty)):
landmarks_empty_list.append(landmarks_empty.copy())
condition = landmarks != landmarks_empty_list
if condition:
# -- pre-process landmarks: interpolate frames not being detected.
preprocessed_landmarks = landmarks_interpolate(landmarks) # λλλ§ν¬ 보κ°
# λ³μκ° λΉμ΄μμ§ μλ€λ©΄
if not preprocessed_landmarks:
continue # μ½λ μ€ν 건λλ°κΈ°
# -- crop
sequence = crop_patch(video_pathname, preprocessed_landmarks) # μμμμ λλλ§ν¬ λ°μμ μ
μ μλΌλ΄κΈ°
# sequenceκ° λΉμ΄μλμ§ νμΈ, λΉμ΄μμΌλ©΄ AssertionError λ©μμ§λ₯Ό λμ
assert sequence is not None, "cannot crop from {}.".format(filename) # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ©
# -- save
data = convert_bgr2gray(sequence) if args.convert_gray else sequence[...,::-1] # gray λ³ν
save2npz(dst_pathname, data=data) # λ°μ΄ν°λ₯Ό npz νμμΌλ‘ μ μ₯
print('Done.') |