Spaces:
Sleeping
Sleeping
#! /usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# Copyright 2020 Imperial College London (Pingchuan Ma) | |
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0) | |
""" Crop Mouth ROIs from videos for lipreading""" | |
# from msilib.schema import File | |
from ast import Pass | |
import os | |
import cv2 # OpenCV ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
import glob # ๋ฆฌ๋ ์ค์ ๊ฒฝ๋ก ํ๊ธฐ๋ฒ์ ์ฌ์ฉํ์ฌ ์ํ๋ ํด๋/ํ์ผ ๋ฆฌ์คํธ ์ป์ | |
import argparse # ๋ช ๋ นํ ์ธ์๋ฅผ ํ์ฑํด์ฃผ๋ ๋ชจ๋ | |
import numpy as np | |
from collections import deque # collections ๋ชจ๋์ ์๋ ๋ฐํฌ ๋ถ๋ฌ์ค๊ธฐ # ๋ฐํฌ: ์คํ๊ณผ ํ๋ฅผ ํฉ์น ์๋ฃ๊ตฌ์กฐ | |
from utils import * # utils.py ๋ชจ๋์ ์๋ ๋ชจ๋ ํจ์ ๋ถ๋ฌ์ค๊ธฐ | |
from transform import * # transform.py ๋ชจ๋์ ์๋ ๋ชจ๋ ํจ์ ๋ถ๋ฌ์ค๊ธฐ | |
import dlib # face landmark ์ฐพ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
import face_alignment # face landmark ์ฐพ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
from PIL import Image | |
# ์ธ์๊ฐ์ ๋ฐ์์ ์ฒ๋ฆฌํ๋ ํจ์ | |
def load_args(default_config=None): | |
# ์ธ์๊ฐ์ ๋ฐ์์ ์ฒ๋ฆฌํ๋ ํจ์ | |
parser = argparse.ArgumentParser(description='Lipreading Pre-processing') | |
# ์ ๋ ฅ๋ฐ์ ์ธ์๊ฐ ๋ฑ๋ก | |
# -- utils | |
parser.add_argument('--video-direc', default=None, help='raw video directory') | |
parser.add_argument('--video-format', default='.mp4', help='raw video format') | |
parser.add_argument('--landmark-direc', default=None, help='landmark directory') | |
parser.add_argument('--filename-path', default='./vietnamese_detected_face_30.csv', help='list of detected video and its subject ID') | |
parser.add_argument('--save-direc', default=None, help='the directory of saving mouth ROIs') | |
# -- mean face utils | |
parser.add_argument('--mean-face', default='./20words_mean_face.npy', help='mean face pathname') | |
# -- mouthROIs utils | |
parser.add_argument('--crop-width', default=96, type=int, help='the width of mouth ROIs') | |
parser.add_argument('--crop-height', default=96, type=int, help='the height of mouth ROIs') | |
parser.add_argument('--start-idx', default=48, type=int, help='the start of landmark index') | |
parser.add_argument('--stop-idx', default=68, type=int, help='the end of landmark index') | |
parser.add_argument('--window-margin', default=12, type=int, help='window margin for smoothed_landmarks') | |
# -- convert to gray scale | |
parser.add_argument('--convert-gray', default=False, action='store_true', help='convert2grayscale') | |
# -- test set only | |
parser.add_argument('--testset-only', default=False, action='store_true', help='process testing set only') | |
# ์ ๋ ฅ๋ฐ์ ์ธ์๊ฐ์ args์ ์ ์ฅ (type: namespace) | |
args = parser.parse_args() | |
return args | |
args = load_args() # args ํ์ฑ ๋ฐ ๋ก๋ | |
# -- mean face utils | |
STD_SIZE = (256, 256) | |
mean_face_landmarks = np.load(args.mean_face) # 20words_mean_face.npy | |
stablePntsIDs = [33, 36, 39, 42, 45] | |
# ์์์์ ๋๋๋งํฌ ๋ฐ์์ ์ ์ ์๋ผ๋ด๊ธฐ | |
def crop_patch( video_pathname, landmarks): | |
"""Crop mouth patch | |
:param str video_pathname: pathname for the video_dieo # ์์ ์์น | |
:param list landmarks: interpolated landmarks # ๋ณด๊ฐ๋ ๋๋๋งํฌ | |
""" | |
frame_idx = 0 # ํ๋ ์ ์ธ๋ฑ์ค ๋ฒํธ 0 ์ผ๋ก ์ด๊ธฐํ | |
frame_gen = read_video(video_pathname) # ๋น๋์ค ๋ถ๋ฌ์ค๊ธฐ | |
# ๋ฌดํ ๋ฐ๋ณต | |
while True: | |
try: | |
frame = frame_gen.__next__() ## -- BGR # ์ด๋ฏธ์ง ํ๋ ์ ํ๋์ฉ ๋ถ๋ฌ์ค๊ธฐ | |
except StopIteration: # ๋ ์ด์ next ์์๊ฐ ์์ผ๋ฉด StopIterraion Exception ๋ฐ์ | |
break # while ๋น ์ ธ๋๊ฐ๊ธฐ | |
if frame_idx == 0: # ํ๋ ์ ์ธ๋ฑ์ค ๋ฒํธ๊ฐ 0์ผ ๊ฒฝ์ฐ | |
q_frame, q_landmarks = deque(), deque() # ๋ฐํฌ ์์ฑ | |
sequence = [] | |
q_landmarks.append(landmarks[frame_idx]) # ํ๋ ์ ์ธ๋ฑ์ค ๋ฒํธ์ ๋ง๋ ๋๋๋งํฌ ์ ๋ณด ์ถ๊ฐ | |
q_frame.append(frame) # ํ๋ ์ ์ ๋ณด ์ถ๊ฐ | |
if len(q_frame) == args.window_margin: | |
smoothed_landmarks = np.mean(q_landmarks, axis=0) # ๊ฐ ๊ทธ๋ฃน์ ๊ฐ์ ์์๋ผ๋ฆฌ ํ๊ท | |
cur_landmarks = q_landmarks.popleft() # ๋ฐํฌ ์ ์ผ ์ผ์ชฝ ๊ฐ ๊บผ๋ด๊ธฐ | |
cur_frame = q_frame.popleft() # ๋ฐํฌ ์ ์ผ ์ผ์ชฝ ๊ฐ ๊บผ๋ด๊ธฐ | |
# -- affine transformation # ์ํ ๋ณํ | |
trans_frame, trans = warp_img( smoothed_landmarks[stablePntsIDs, :], | |
mean_face_landmarks[stablePntsIDs, :], | |
cur_frame, | |
STD_SIZE) | |
trans_landmarks = trans(cur_landmarks) | |
# -- crop mouth patch # ์ ์ ์๋ผ๋ด๊ธฐ | |
sequence.append( cut_patch( trans_frame, | |
trans_landmarks[args.start_idx:args.stop_idx], | |
args.crop_height//2, | |
args.crop_width//2,)) | |
if frame_idx == len(landmarks)-1: | |
while q_frame: | |
cur_frame = q_frame.popleft() # ๋ฐํฌ ์ ์ผ ์ผ์ชฝ ๊ฐ ๊บผ๋ด๊ธฐ | |
# -- transform frame # ํ๋ ์ ๋ณํ | |
trans_frame = apply_transform( trans, cur_frame, STD_SIZE) | |
# -- transform landmarks # ๋๋๋งํฌ ๋ณํ | |
trans_landmarks = trans(q_landmarks.popleft()) | |
# -- crop mouth patch # ์ ์ ์๋ผ๋ด๊ธฐ | |
sequence.append( cut_patch( trans_frame, | |
trans_landmarks[args.start_idx:args.stop_idx], | |
args.crop_height//2, | |
args.crop_width//2,)) | |
return np.array(sequence) # ์ ์ numpy ๋ฐํ | |
frame_idx += 1 # ํ๋ ์ ์ธ๋ฑ์ค ๋ฒํธ ์ฆ๊ฐ | |
return None | |
# ๋๋๋งํฌ ๋ณด๊ฐ | |
def landmarks_interpolate(landmarks): | |
"""Interpolate landmarks | |
param list landmarks: landmarks detected in raw videos # ์๋ณธ ์์ ๋ฐ์ดํฐ์์ ๊ฒ์ถํ ๋๋๋งํฌ | |
""" | |
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # ๋๋๋งํฌ ๋ฒํธ list ์์ฑ | |
# ๋๋๋งํฌ ๋ฒํธ list ๊ฐ ๋น์ด์๋ค๋ฉด | |
if not valid_frames_idx: | |
return None | |
# 1๋ถํฐ (๋๋๋งํฌ ๋ฒํธ list ๊ฐ์-1)๋งํผ for ๋ฌธ ๋ฐ๋ณต | |
for idx in range(1, len(valid_frames_idx)): | |
if valid_frames_idx[idx] - valid_frames_idx[idx-1] == 1: # ํ์ฌ ๋๋๋งํฌ ๋ฒํธ - ์ด์ ๋๋๋งํฌ ๋ฒํธ == 1 ์ผ ๊ฒฝ์ฐ | |
continue # ์ฝ๋ ์คํ ๊ฑด๋๋ฐ๊ธฐ | |
else: # ์๋๋ผ๋ฉด | |
landmarks = linear_interpolate(landmarks, valid_frames_idx[idx-1], valid_frames_idx[idx]) # ๋๋๋งํฌ ์ ๋ฐ์ดํธ(๋ณด๊ฐ) | |
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # ๋๋๋งํฌ ๋ฒํธ list ์์ฑ | |
# -- Corner case: keep frames at the beginning or at the end failed to be detected. # ์์ ๋๋ ๋ ํ๋ ์์ ๋ณด๊ดํ์ง ๋ชปํจ | |
if valid_frames_idx: | |
landmarks[:valid_frames_idx[0]] = [landmarks[valid_frames_idx[0]]] * valid_frames_idx[0] # ๋๋๋งํฌ ์ฒซ๋ฒ์งธ ํ๋ ์ ์ ๋ณด ์ ์ฅ | |
landmarks[valid_frames_idx[-1]:] = [landmarks[valid_frames_idx[-1]]] * (len(landmarks) - valid_frames_idx[-1]) # ๋๋๋งํฌ ๋ง์ง๋ง ํ๋ ์ ์ ๋ณด ์ ์ฅ | |
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # ๋๋๋งํฌ ๋ฒํธ list ์์ฑ | |
# ๋๋๋งํฌ ๋ฒํธ list ๊ฐ์ == ๋ณด๊ฐํ ๋๋๋งํฌ ๊ฐ์ ํ์ธ, ์๋๋ฉด AssertionError ๋ฉ์์ง๋ฅผ ๋์ | |
assert len(valid_frames_idx) == len(landmarks), "not every frame has landmark" # ์ํ๋ ์กฐ๊ฑด์ ๋ณ์๊ฐ์ ๋ณด์ฆํ๊ธฐ ์ํด ์ฌ์ฉ | |
return landmarks # ๋๋๋งํฌ ๋ฐํ | |
def get_yield(output_video): | |
for frame in output_video: | |
yield frame | |
lines = open(args.filename_path).read().splitlines() # ๋ฌธ์์ด์ '\n' ๊ธฐ์ค์ผ๋ก ์ชผ๊ฐ ํ list ์์ฑ | |
lines = list(filter(lambda x: 'test' == x.split('/')[-2], lines)) if args.testset_only else lines # args.testset_only ๊ฐ์ด ์๋ค๋ฉด test ํด๋ ์ ํ์ผ๋ช ๋ง ๋ถ๋ฌ์์ list ์์ฑ, ์๋๋ผ๋ฉด ์๋ lines ๊ทธ๋๋ก ๊ฐ ์ ์ง | |
# lines ๊ฐ์๋งํผ ๋ฐ๋ณต๋ฌธ ์คํ | |
for filename_idx, line in enumerate(lines): | |
# ํ์ผ๋ช , ์ฌ๋id | |
filename, person_id = line.split(',') | |
print('idx: {} \tProcessing.\t{}'.format(filename_idx, filename)) # ํ์ผ ์ธ๋ฑ์ค๋ฒํธ, ํ์ผ๋ช ์ถ๋ ฅ | |
video_pathname = os.path.join(args.video_direc, filename+args.video_format) # ์์๋๋ ํ ๋ฆฌ + ํ์ผ๋ช .๋น๋์คํฌ๋งท/ | |
landmarks_pathname = os.path.join(args.landmark_direc, filename+'.npz') # ์ ์ฅ๋๋ ํ ๋ฆฌ + ๋๋๋งํฌ ํ์ผ๋ช .npz | |
dst_pathname = os.path.join( args.save_direc, filename+'.npz') # ์ ์ฅ๋๋ ํ ๋ฆฌ + ๊ฒฐ๊ณผ์์ ํ์ผ๋ช .npz | |
# ํ์ผ์ด ์๋์ง ํ์ธ, ์์ผ๋ฉด AssertionError ๋ฉ์์ง๋ฅผ ๋์ | |
assert os.path.isfile(video_pathname), "File does not exist. Path input: {}".format(video_pathname) # ์ํ๋ ์กฐ๊ฑด์ ๋ณ์๊ฐ์ ๋ณด์ฆํ๊ธฐ ์ํด ์ฌ์ฉ | |
# video ์ ๋ํ face landmark npz ํ์ผ์ด ์๊ณ ์์ ํ์ฅ์ avi ์ธ ๊ฒฝ์ฐ dlib ์ผ๋ก ์ง์ npz ํ์ผ ์์ฑ | |
if not os.path.exists(landmarks_pathname) and video_pathname.split('.')[-1] == 'mp4': | |
# dlib ์ฌ์ฉํด์ face landmark ์ฐพ๊ธฐ | |
def get_face_landmark(img): | |
detector_hog = dlib.get_frontal_face_detector() | |
dlib_rects = detector_hog(img, 1) | |
model_path = os.path.dirname(os.path.abspath(__file__)) + '/shape_predictor_68_face_landmarks.dat' | |
landmark_predictor = dlib.shape_predictor(model_path) | |
# dlib ์ผ๋ก face landmark ์ฐพ๊ธฐ | |
list_landmarks = [] | |
for dlib_rect in dlib_rects: | |
points = landmark_predictor(img, dlib_rect) | |
list_points = list(map(lambda p: (p.x, p.y), points.parts())) | |
list_landmarks.append(list_points) | |
input_width, input_height = img.shape | |
output_width, output_height = (256, 256) | |
width_rate = input_width / output_width | |
height_rate = input_height / output_height | |
img_rate = [(width_rate, height_rate)]*68 | |
face_rate = np.array(img_rate) | |
eye_rate = np.array(img_rate[36:48]) | |
# face landmark list ๊ฐ ๋น์ด์์ง ์์ ๊ฒฝ์ฐ | |
if list_landmarks: | |
for dlib_rect, landmark in zip(dlib_rects, list_landmarks): | |
face_landmark = np.array(landmark) # face landmark | |
eye_landmark = np.array(landmark[36:48]) # eye landmark | |
return face_landmark, eye_landmark | |
# face landmark list ๊ฐ ๋น์ด์๋ ๊ฒฝ์ฐ | |
else: | |
landmark = [(0.0, 0.0)] * 68 | |
face_landmark = np.array(landmark) # face landmark | |
eye_landmark = np.array(landmark[36:48]) # eye landmark | |
return face_landmark, eye_landmark | |
target_frames = 29 # ์ํ๋ ํ๋ ์ ๊ฐ์ | |
video = videoToArray(video_pathname, is_gray=args.convert_gray) # ์์ ์ ๋ณด ์์ ์์ ํ๋ ์ ๊ฐ์๋ฅผ ์ถ๊ฐํ numpy | |
output_video = frameAdjust(video, target_frames) # frame sampling (ํ๋ ์ ๊ฐ์ ๋ง์ถ๊ธฐ) | |
multi_sub_landmarks = [] | |
person_landmarks = [] | |
frame_landmarks = [] | |
for frame_idx, frame in enumerate(get_yield(output_video)): | |
print(f'\n ------------frame {frame_idx}------------ ') | |
facial_landmarks, eye_landmarks = get_face_landmark(frame) # dlib ์ฌ์ฉํด์ face landmark ์ฐพ๊ธฐ | |
person_landmarks = { | |
'id': 0, | |
'most_recent_fitting_scores': np.array([2.0,2.0,2.0]), | |
'facial_landmarks': facial_landmarks, | |
'roll': 7, | |
'yaw': 3.5, | |
'eye_landmarks': eye_landmarks, | |
'fitting_scores_updated': True, | |
'pitch': -0.05 | |
} | |
frame_landmarks.append(person_landmarks) | |
multi_sub_landmarks.append(np.array(frame_landmarks.copy(), dtype=object)) | |
multi_sub_landmarks = np.array(multi_sub_landmarks) # list to numpy | |
save2npz(landmarks_pathname, data=multi_sub_landmarks) # face landmark npz ์ ์ฅ | |
print('\n ------------ save npz ------------ \n') | |
# video ์ ๋ํ face landmark npz ํ์ผ์ด ์๋ ๊ฒฝ์ฐ | |
else: | |
# ํ์ผ์ด ์๋์ง ํ์ธ, ์์ผ๋ฉด AssertionError ๋ฉ์์ง๋ฅผ ๋์ | |
assert os.path.isfile(landmarks_pathname), "File does not exist. Path input: {}".format(landmarks_pathname) # ์ํ๋ ์กฐ๊ฑด์ ๋ณ์๊ฐ์ ๋ณด์ฆํ๊ธฐ ์ํด ์ฌ์ฉ | |
# ํ์ผ์ด ์กด์ฌํ ๊ฒฝ์ฐ | |
if os.path.exists(dst_pathname): | |
continue # ์ฝ๋ ์คํ ๊ฑด๋๋ฐ๊ธฐ | |
multi_sub_landmarks = np.load( landmarks_pathname, allow_pickle=True)['data'] # numpy ํ์ผ ์ด๊ธฐ | |
landmarks = [None] * len( multi_sub_landmarks) # ๋๋๋งํฌ ๋ณ์ ์ด๊ธฐํ | |
for frame_idx in range(len(landmarks)): | |
try: | |
landmarks[frame_idx] = multi_sub_landmarks[frame_idx][int(person_id)]['facial_landmarks'].astype(np.float64) # ํ๋ ์ ์ธ๋ฑ์ค ๋ฒํธ์์ ์ฌ๋id์ ์ผ๊ตด ๋๋๋งํฌ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ | |
except IndexError: # ํด๋น ์ธ๋ฑ์ค ๋ฒํธ์ ๊น์ด ์์ผ๋ฉด IndexError ๋ฐ์ | |
continue # ์ฝ๋ ์คํ ๊ฑด๋๋ฐ๊ธฐ | |
# face landmark ๊ฐ [(0,0)]*68 ์ด ์๋๋ฉด ๋๋๋งํฌ ๋ณด๊ฐ ํ npz ํ์ผ ์์ฑ | |
landmarks_empty_list = [] | |
landmarks_empty = [(0, 0)]*68 | |
landmarks_empty = np.array(landmarks_empty, dtype=object) | |
for i in range(len(landmarks_empty)): | |
landmarks_empty_list.append(landmarks_empty.copy()) | |
condition = landmarks != landmarks_empty_list | |
if condition: | |
# -- pre-process landmarks: interpolate frames not being detected. | |
preprocessed_landmarks = landmarks_interpolate(landmarks) # ๋๋๋งํฌ ๋ณด๊ฐ | |
# ๋ณ์๊ฐ ๋น์ด์์ง ์๋ค๋ฉด | |
if not preprocessed_landmarks: | |
continue # ์ฝ๋ ์คํ ๊ฑด๋๋ฐ๊ธฐ | |
# -- crop | |
sequence = crop_patch(video_pathname, preprocessed_landmarks) # ์์์์ ๋๋๋งํฌ ๋ฐ์์ ์ ์ ์๋ผ๋ด๊ธฐ | |
# sequence๊ฐ ๋น์ด์๋์ง ํ์ธ, ๋น์ด์์ผ๋ฉด AssertionError ๋ฉ์์ง๋ฅผ ๋์ | |
assert sequence is not None, "cannot crop from {}.".format(filename) # ์ํ๋ ์กฐ๊ฑด์ ๋ณ์๊ฐ์ ๋ณด์ฆํ๊ธฐ ์ํด ์ฌ์ฉ | |
# -- save | |
data = convert_bgr2gray(sequence) if args.convert_gray else sequence[...,::-1] # gray ๋ณํ | |
save2npz(dst_pathname, data=data) # ๋ฐ์ดํฐ๋ฅผ npz ํ์์ผ๋ก ์ ์ฅ | |
print('Done.') |