doc_moi_tieng_Viet / preprocessing /crop_mouth_from_video.py
aiface's picture
Upload 12 files
327b68f
raw
history blame
15 kB
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2020 Imperial College London (Pingchuan Ma)
# Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
""" Crop Mouth ROIs from videos for lipreading"""
# from msilib.schema import File
from ast import Pass
import os
import cv2 # OpenCV ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
import glob # ๋ฆฌ๋ˆ…์Šค์‹ ๊ฒฝ๋กœ ํ‘œ๊ธฐ๋ฒ•์„ ์‚ฌ์šฉํ•˜์—ฌ ์›ํ•˜๋Š” ํด๋”/ํŒŒ์ผ ๋ฆฌ์ŠคํŠธ ์–ป์Œ
import argparse # ๋ช…๋ นํ–‰ ์ธ์ž๋ฅผ ํŒŒ์‹ฑํ•ด์ฃผ๋Š” ๋ชจ๋“ˆ
import numpy as np
from collections import deque # collections ๋ชจ๋“ˆ์— ์žˆ๋Š” ๋ฐํฌ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ # ๋ฐํฌ: ์Šคํƒ๊ณผ ํ๋ฅผ ํ•ฉ์นœ ์ž๋ฃŒ๊ตฌ์กฐ
from utils import * # utils.py ๋ชจ๋“ˆ์— ์žˆ๋Š” ๋ชจ๋“  ํ•จ์ˆ˜ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
from transform import * # transform.py ๋ชจ๋“ˆ์— ์žˆ๋Š” ๋ชจ๋“  ํ•จ์ˆ˜ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
import dlib # face landmark ์ฐพ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
import face_alignment # face landmark ์ฐพ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
from PIL import Image
# ์ธ์ž๊ฐ’์„ ๋ฐ›์•„์„œ ์ฒ˜๋ฆฌํ•˜๋Š” ํ•จ์ˆ˜
def load_args(default_config=None):
# ์ธ์ž๊ฐ’์„ ๋ฐ›์•„์„œ ์ฒ˜๋ฆฌํ•˜๋Š” ํ•จ์ˆ˜
parser = argparse.ArgumentParser(description='Lipreading Pre-processing')
# ์ž…๋ ฅ๋ฐ›์„ ์ธ์ž๊ฐ’ ๋“ฑ๋ก
# -- utils
parser.add_argument('--video-direc', default=None, help='raw video directory')
parser.add_argument('--video-format', default='.mp4', help='raw video format')
parser.add_argument('--landmark-direc', default=None, help='landmark directory')
parser.add_argument('--filename-path', default='./vietnamese_detected_face_30.csv', help='list of detected video and its subject ID')
parser.add_argument('--save-direc', default=None, help='the directory of saving mouth ROIs')
# -- mean face utils
parser.add_argument('--mean-face', default='./20words_mean_face.npy', help='mean face pathname')
# -- mouthROIs utils
parser.add_argument('--crop-width', default=96, type=int, help='the width of mouth ROIs')
parser.add_argument('--crop-height', default=96, type=int, help='the height of mouth ROIs')
parser.add_argument('--start-idx', default=48, type=int, help='the start of landmark index')
parser.add_argument('--stop-idx', default=68, type=int, help='the end of landmark index')
parser.add_argument('--window-margin', default=12, type=int, help='window margin for smoothed_landmarks')
# -- convert to gray scale
parser.add_argument('--convert-gray', default=False, action='store_true', help='convert2grayscale')
# -- test set only
parser.add_argument('--testset-only', default=False, action='store_true', help='process testing set only')
# ์ž…๋ ฅ๋ฐ›์€ ์ธ์ž๊ฐ’์„ args์— ์ €์žฅ (type: namespace)
args = parser.parse_args()
return args
args = load_args() # args ํŒŒ์‹ฑ ๋ฐ ๋กœ๋“œ
# -- mean face utils
STD_SIZE = (256, 256)
mean_face_landmarks = np.load(args.mean_face) # 20words_mean_face.npy
stablePntsIDs = [33, 36, 39, 42, 45]
# ์˜์ƒ์—์„œ ๋žœ๋“œ๋งˆํฌ ๋ฐ›์•„์„œ ์ž…์ˆ  ์ž˜๋ผ๋‚ด๊ธฐ
def crop_patch( video_pathname, landmarks):
"""Crop mouth patch
:param str video_pathname: pathname for the video_dieo # ์˜์ƒ ์œ„์น˜
:param list landmarks: interpolated landmarks # ๋ณด๊ฐ„๋œ ๋žœ๋“œ๋งˆํฌ
"""
frame_idx = 0 # ํ”„๋ ˆ์ž„ ์ธ๋ฑ์Šค ๋ฒˆํ˜ธ 0 ์œผ๋กœ ์ดˆ๊ธฐํ™”
frame_gen = read_video(video_pathname) # ๋น„๋””์˜ค ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
# ๋ฌดํ•œ ๋ฐ˜๋ณต
while True:
try:
frame = frame_gen.__next__() ## -- BGR # ์ด๋ฏธ์ง€ ํ”„๋ ˆ์ž„ ํ•˜๋‚˜์”ฉ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
except StopIteration: # ๋” ์ด์ƒ next ์š”์†Œ๊ฐ€ ์—†์œผ๋ฉด StopIterraion Exception ๋ฐœ์ƒ
break # while ๋น ์ ธ๋‚˜๊ฐ€๊ธฐ
if frame_idx == 0: # ํ”„๋ ˆ์ž„ ์ธ๋ฑ์Šค ๋ฒˆํ˜ธ๊ฐ€ 0์ผ ๊ฒฝ์šฐ
q_frame, q_landmarks = deque(), deque() # ๋ฐํฌ ์ƒ์„ฑ
sequence = []
q_landmarks.append(landmarks[frame_idx]) # ํ”„๋ ˆ์ž„ ์ธ๋ฑ์Šค ๋ฒˆํ˜ธ์— ๋งž๋Š” ๋žœ๋“œ๋งˆํฌ ์ •๋ณด ์ถ”๊ฐ€
q_frame.append(frame) # ํ”„๋ ˆ์ž„ ์ •๋ณด ์ถ”๊ฐ€
if len(q_frame) == args.window_margin:
smoothed_landmarks = np.mean(q_landmarks, axis=0) # ๊ฐ ๊ทธ๋ฃน์˜ ๊ฐ™์€ ์›์†Œ๋ผ๋ฆฌ ํ‰๊ท 
cur_landmarks = q_landmarks.popleft() # ๋ฐํฌ ์ œ์ผ ์™ผ์ชฝ ๊ฐ’ ๊บผ๋‚ด๊ธฐ
cur_frame = q_frame.popleft() # ๋ฐํฌ ์ œ์ผ ์™ผ์ชฝ ๊ฐ’ ๊บผ๋‚ด๊ธฐ
# -- affine transformation # ์•„ํ•€ ๋ณ€ํ™˜
trans_frame, trans = warp_img( smoothed_landmarks[stablePntsIDs, :],
mean_face_landmarks[stablePntsIDs, :],
cur_frame,
STD_SIZE)
trans_landmarks = trans(cur_landmarks)
# -- crop mouth patch # ์ž…์ˆ  ์ž˜๋ผ๋‚ด๊ธฐ
sequence.append( cut_patch( trans_frame,
trans_landmarks[args.start_idx:args.stop_idx],
args.crop_height//2,
args.crop_width//2,))
if frame_idx == len(landmarks)-1:
while q_frame:
cur_frame = q_frame.popleft() # ๋ฐํฌ ์ œ์ผ ์™ผ์ชฝ ๊ฐ’ ๊บผ๋‚ด๊ธฐ
# -- transform frame # ํ”„๋ ˆ์ž„ ๋ณ€ํ™˜
trans_frame = apply_transform( trans, cur_frame, STD_SIZE)
# -- transform landmarks # ๋žœ๋“œ๋งˆํฌ ๋ณ€ํ™˜
trans_landmarks = trans(q_landmarks.popleft())
# -- crop mouth patch # ์ž…์ˆ  ์ž˜๋ผ๋‚ด๊ธฐ
sequence.append( cut_patch( trans_frame,
trans_landmarks[args.start_idx:args.stop_idx],
args.crop_height//2,
args.crop_width//2,))
return np.array(sequence) # ์ž…์ˆ  numpy ๋ฐ˜ํ™˜
frame_idx += 1 # ํ”„๋ ˆ์ž„ ์ธ๋ฑ์Šค ๋ฒˆํ˜ธ ์ฆ๊ฐ€
return None
# ๋žœ๋“œ๋งˆํฌ ๋ณด๊ฐ„
def landmarks_interpolate(landmarks):
"""Interpolate landmarks
param list landmarks: landmarks detected in raw videos # ์›๋ณธ ์˜์ƒ ๋ฐ์ดํ„ฐ์—์„œ ๊ฒ€์ถœํ•œ ๋žœ๋“œ๋งˆํฌ
"""
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ list ์ƒ์„ฑ
# ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ list ๊ฐ€ ๋น„์–ด์žˆ๋‹ค๋ฉด
if not valid_frames_idx:
return None
# 1๋ถ€ํ„ฐ (๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ list ๊ฐœ์ˆ˜-1)๋งŒํผ for ๋ฌธ ๋ฐ˜๋ณต
for idx in range(1, len(valid_frames_idx)):
if valid_frames_idx[idx] - valid_frames_idx[idx-1] == 1: # ํ˜„์žฌ ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ - ์ด์ „ ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ == 1 ์ผ ๊ฒฝ์šฐ
continue # ์ฝ”๋“œ ์‹คํ–‰ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
else: # ์•„๋‹ˆ๋ผ๋ฉด
landmarks = linear_interpolate(landmarks, valid_frames_idx[idx-1], valid_frames_idx[idx]) # ๋žœ๋“œ๋งˆํฌ ์—…๋ฐ์ดํŠธ(๋ณด๊ฐ„)
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ list ์ƒ์„ฑ
# -- Corner case: keep frames at the beginning or at the end failed to be detected. # ์‹œ์ž‘ ๋˜๋Š” ๋ ํ”„๋ ˆ์ž„์„ ๋ณด๊ด€ํ•˜์ง€ ๋ชปํ•จ
if valid_frames_idx:
landmarks[:valid_frames_idx[0]] = [landmarks[valid_frames_idx[0]]] * valid_frames_idx[0] # ๋žœ๋“œ๋งˆํฌ ์ฒซ๋ฒˆ์งธ ํ”„๋ ˆ์ž„ ์ •๋ณด ์ €์žฅ
landmarks[valid_frames_idx[-1]:] = [landmarks[valid_frames_idx[-1]]] * (len(landmarks) - valid_frames_idx[-1]) # ๋žœ๋“œ๋งˆํฌ ๋งˆ์ง€๋ง‰ ํ”„๋ ˆ์ž„ ์ •๋ณด ์ €์žฅ
valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ list ์ƒ์„ฑ
# ๋žœ๋“œ๋งˆํฌ ๋ฒˆํ˜ธ list ๊ฐœ์ˆ˜ == ๋ณด๊ฐ„ํ•œ ๋žœ๋“œ๋งˆํฌ ๊ฐœ์ˆ˜ ํ™•์ธ, ์•„๋‹ˆ๋ฉด AssertionError ๋ฉ”์‹œ์ง€๋ฅผ ๋„์›€
assert len(valid_frames_idx) == len(landmarks), "not every frame has landmark" # ์›ํ•˜๋Š” ์กฐ๊ฑด์˜ ๋ณ€์ˆ˜๊ฐ’์„ ๋ณด์ฆํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ
return landmarks # ๋žœ๋“œ๋งˆํฌ ๋ฐ˜ํ™˜
def get_yield(output_video):
for frame in output_video:
yield frame
lines = open(args.filename_path).read().splitlines() # ๋ฌธ์ž์—ด์„ '\n' ๊ธฐ์ค€์œผ๋กœ ์ชผ๊ฐ  ํ›„ list ์ƒ์„ฑ
lines = list(filter(lambda x: 'test' == x.split('/')[-2], lines)) if args.testset_only else lines # args.testset_only ๊ฐ’์ด ์žˆ๋‹ค๋ฉด test ํด๋” ์† ํŒŒ์ผ๋ช…๋งŒ ๋ถˆ๋Ÿฌ์™€์„œ list ์ƒ์„ฑ, ์•„๋‹ˆ๋ผ๋ฉด ์›๋ž˜ lines ๊ทธ๋Œ€๋กœ ๊ฐ’ ์œ ์ง€
# lines ๊ฐœ์ˆ˜๋งŒํผ ๋ฐ˜๋ณต๋ฌธ ์‹คํ–‰
for filename_idx, line in enumerate(lines):
# ํŒŒ์ผ๋ช…, ์‚ฌ๋žŒid
filename, person_id = line.split(',')
print('idx: {} \tProcessing.\t{}'.format(filename_idx, filename)) # ํŒŒ์ผ ์ธ๋ฑ์Šค๋ฒˆํ˜ธ, ํŒŒ์ผ๋ช… ์ถœ๋ ฅ
video_pathname = os.path.join(args.video_direc, filename+args.video_format) # ์˜์ƒ๋””๋ ‰ํ† ๋ฆฌ + ํŒŒ์ผ๋ช….๋น„๋””์˜คํฌ๋งท/
landmarks_pathname = os.path.join(args.landmark_direc, filename+'.npz') # ์ €์žฅ๋””๋ ‰ํ† ๋ฆฌ + ๋žœ๋“œ๋งˆํฌ ํŒŒ์ผ๋ช….npz
dst_pathname = os.path.join( args.save_direc, filename+'.npz') # ์ €์žฅ๋””๋ ‰ํ† ๋ฆฌ + ๊ฒฐ๊ณผ์˜์ƒ ํŒŒ์ผ๋ช….npz
# ํŒŒ์ผ์ด ์žˆ๋Š”์ง€ ํ™•์ธ, ์—†์œผ๋ฉด AssertionError ๋ฉ”์‹œ์ง€๋ฅผ ๋„์›€
assert os.path.isfile(video_pathname), "File does not exist. Path input: {}".format(video_pathname) # ์›ํ•˜๋Š” ์กฐ๊ฑด์˜ ๋ณ€์ˆ˜๊ฐ’์„ ๋ณด์ฆํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ
# video ์— ๋Œ€ํ•œ face landmark npz ํŒŒ์ผ์ด ์—†๊ณ  ์˜์ƒ ํ™•์žฅ์ž avi ์ธ ๊ฒฝ์šฐ dlib ์œผ๋กœ ์ง์ ‘ npz ํŒŒ์ผ ์ƒ์„ฑ
if not os.path.exists(landmarks_pathname) and video_pathname.split('.')[-1] == 'mp4':
# dlib ์‚ฌ์šฉํ•ด์„œ face landmark ์ฐพ๊ธฐ
def get_face_landmark(img):
detector_hog = dlib.get_frontal_face_detector()
dlib_rects = detector_hog(img, 1)
model_path = os.path.dirname(os.path.abspath(__file__)) + '/shape_predictor_68_face_landmarks.dat'
landmark_predictor = dlib.shape_predictor(model_path)
# dlib ์œผ๋กœ face landmark ์ฐพ๊ธฐ
list_landmarks = []
for dlib_rect in dlib_rects:
points = landmark_predictor(img, dlib_rect)
list_points = list(map(lambda p: (p.x, p.y), points.parts()))
list_landmarks.append(list_points)
input_width, input_height = img.shape
output_width, output_height = (256, 256)
width_rate = input_width / output_width
height_rate = input_height / output_height
img_rate = [(width_rate, height_rate)]*68
face_rate = np.array(img_rate)
eye_rate = np.array(img_rate[36:48])
# face landmark list ๊ฐ€ ๋น„์–ด์žˆ์ง€ ์•Š์€ ๊ฒฝ์šฐ
if list_landmarks:
for dlib_rect, landmark in zip(dlib_rects, list_landmarks):
face_landmark = np.array(landmark) # face landmark
eye_landmark = np.array(landmark[36:48]) # eye landmark
return face_landmark, eye_landmark
# face landmark list ๊ฐ€ ๋น„์–ด์žˆ๋Š” ๊ฒฝ์šฐ
else:
landmark = [(0.0, 0.0)] * 68
face_landmark = np.array(landmark) # face landmark
eye_landmark = np.array(landmark[36:48]) # eye landmark
return face_landmark, eye_landmark
target_frames = 29 # ์›ํ•˜๋Š” ํ”„๋ ˆ์ž„ ๊ฐœ์ˆ˜
video = videoToArray(video_pathname, is_gray=args.convert_gray) # ์˜์ƒ ์ •๋ณด ์•ž์— ์˜์ƒ ํ”„๋ ˆ์ž„ ๊ฐœ์ˆ˜๋ฅผ ์ถ”๊ฐ€ํ•œ numpy
output_video = frameAdjust(video, target_frames) # frame sampling (ํ”„๋ ˆ์ž„ ๊ฐœ์ˆ˜ ๋งž์ถ”๊ธฐ)
multi_sub_landmarks = []
person_landmarks = []
frame_landmarks = []
for frame_idx, frame in enumerate(get_yield(output_video)):
print(f'\n ------------frame {frame_idx}------------ ')
facial_landmarks, eye_landmarks = get_face_landmark(frame) # dlib ์‚ฌ์šฉํ•ด์„œ face landmark ์ฐพ๊ธฐ
person_landmarks = {
'id': 0,
'most_recent_fitting_scores': np.array([2.0,2.0,2.0]),
'facial_landmarks': facial_landmarks,
'roll': 7,
'yaw': 3.5,
'eye_landmarks': eye_landmarks,
'fitting_scores_updated': True,
'pitch': -0.05
}
frame_landmarks.append(person_landmarks)
multi_sub_landmarks.append(np.array(frame_landmarks.copy(), dtype=object))
multi_sub_landmarks = np.array(multi_sub_landmarks) # list to numpy
save2npz(landmarks_pathname, data=multi_sub_landmarks) # face landmark npz ์ €์žฅ
print('\n ------------ save npz ------------ \n')
# video ์— ๋Œ€ํ•œ face landmark npz ํŒŒ์ผ์ด ์žˆ๋Š” ๊ฒฝ์šฐ
else:
# ํŒŒ์ผ์ด ์žˆ๋Š”์ง€ ํ™•์ธ, ์—†์œผ๋ฉด AssertionError ๋ฉ”์‹œ์ง€๋ฅผ ๋„์›€
assert os.path.isfile(landmarks_pathname), "File does not exist. Path input: {}".format(landmarks_pathname) # ์›ํ•˜๋Š” ์กฐ๊ฑด์˜ ๋ณ€์ˆ˜๊ฐ’์„ ๋ณด์ฆํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ
# ํŒŒ์ผ์ด ์กด์žฌํ•  ๊ฒฝ์šฐ
if os.path.exists(dst_pathname):
continue # ์ฝ”๋“œ ์‹คํ–‰ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
multi_sub_landmarks = np.load( landmarks_pathname, allow_pickle=True)['data'] # numpy ํŒŒ์ผ ์—ด๊ธฐ
landmarks = [None] * len( multi_sub_landmarks) # ๋žœ๋“œ๋งˆํฌ ๋ณ€์ˆ˜ ์ดˆ๊ธฐํ™”
for frame_idx in range(len(landmarks)):
try:
landmarks[frame_idx] = multi_sub_landmarks[frame_idx][int(person_id)]['facial_landmarks'].astype(np.float64) # ํ”„๋ ˆ์ž„ ์ธ๋ฑ์Šค ๋ฒˆํ˜ธ์—์„œ ์‚ฌ๋žŒid์˜ ์–ผ๊ตด ๋žœ๋“œ๋งˆํฌ ์ •๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ
except IndexError: # ํ•ด๋‹น ์ธ๋ฑ์Šค ๋ฒˆํ˜ธ์— ๊น‚์ด ์—†์œผ๋ฉด IndexError ๋ฐœ์ƒ
continue # ์ฝ”๋“œ ์‹คํ–‰ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
# face landmark ๊ฐ€ [(0,0)]*68 ์ด ์•„๋‹ˆ๋ฉด ๋žœ๋“œ๋งˆํฌ ๋ณด๊ฐ„ ํ›„ npz ํŒŒ์ผ ์ƒ์„ฑ
landmarks_empty_list = []
landmarks_empty = [(0, 0)]*68
landmarks_empty = np.array(landmarks_empty, dtype=object)
for i in range(len(landmarks_empty)):
landmarks_empty_list.append(landmarks_empty.copy())
condition = landmarks != landmarks_empty_list
if condition:
# -- pre-process landmarks: interpolate frames not being detected.
preprocessed_landmarks = landmarks_interpolate(landmarks) # ๋žœ๋“œ๋งˆํฌ ๋ณด๊ฐ„
# ๋ณ€์ˆ˜๊ฐ€ ๋น„์–ด์žˆ์ง€ ์•Š๋‹ค๋ฉด
if not preprocessed_landmarks:
continue # ์ฝ”๋“œ ์‹คํ–‰ ๊ฑด๋„ˆ๋›ฐ๊ธฐ
# -- crop
sequence = crop_patch(video_pathname, preprocessed_landmarks) # ์˜์ƒ์—์„œ ๋žœ๋“œ๋งˆํฌ ๋ฐ›์•„์„œ ์ž…์ˆ  ์ž˜๋ผ๋‚ด๊ธฐ
# sequence๊ฐ€ ๋น„์–ด์žˆ๋Š”์ง€ ํ™•์ธ, ๋น„์–ด์žˆ์œผ๋ฉด AssertionError ๋ฉ”์‹œ์ง€๋ฅผ ๋„์›€
assert sequence is not None, "cannot crop from {}.".format(filename) # ์›ํ•˜๋Š” ์กฐ๊ฑด์˜ ๋ณ€์ˆ˜๊ฐ’์„ ๋ณด์ฆํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉ
# -- save
data = convert_bgr2gray(sequence) if args.convert_gray else sequence[...,::-1] # gray ๋ณ€ํ™˜
save2npz(dst_pathname, data=data) # ๋ฐ์ดํ„ฐ๋ฅผ npz ํ˜•์‹์œผ๋กœ ์ €์žฅ
print('Done.')