File size: 15,006 Bytes
327b68f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
#! /usr/bin/env python
# -*- coding: utf-8 -*-

# Copyright 2020 Imperial College London (Pingchuan Ma)
# Apache 2.0  (http://www.apache.org/licenses/LICENSE-2.0)

""" Crop Mouth ROIs from videos for lipreading"""

# from msilib.schema import File
from ast import Pass
import os
import cv2  # OpenCV 라이브러리
import glob  # λ¦¬λˆ…μŠ€μ‹ 경둜 ν‘œκΈ°λ²•μ„ μ‚¬μš©ν•˜μ—¬ μ›ν•˜λŠ” 폴더/파일 리슀트 μ–»μŒ
import argparse  # λͺ…λ Ήν–‰ 인자λ₯Ό νŒŒμ‹±ν•΄μ£ΌλŠ” λͺ¨λ“ˆ
import numpy as np
from collections import deque  # collections λͺ¨λ“ˆμ— μžˆλŠ” 데크 뢈러였기 # 데크: μŠ€νƒκ³Ό 큐λ₯Ό ν•©μΉœ 자료ꡬ쑰

from utils import *  # utils.py λͺ¨λ“ˆμ— μžˆλŠ” λͺ¨λ“  ν•¨μˆ˜ 뢈러였기
from transform import *  # transform.py λͺ¨λ“ˆμ— μžˆλŠ” λͺ¨λ“  ν•¨μˆ˜ 뢈러였기

import dlib  # face landmark μ°ΎλŠ” 라이브러리
import face_alignment  # face landmark μ°ΎλŠ” 라이브러리
from PIL import Image


# μΈμžκ°’μ„ λ°›μ•„μ„œ μ²˜λ¦¬ν•˜λŠ” ν•¨μˆ˜
def load_args(default_config=None):
    # μΈμžκ°’μ„ λ°›μ•„μ„œ μ²˜λ¦¬ν•˜λŠ” ν•¨μˆ˜
    parser = argparse.ArgumentParser(description='Lipreading Pre-processing')

    # μž…λ ₯받을 μΈμžκ°’ 등둝
    # -- utils
    parser.add_argument('--video-direc', default=None, help='raw video directory')
    parser.add_argument('--video-format', default='.mp4', help='raw video format')
    parser.add_argument('--landmark-direc', default=None, help='landmark directory')
    parser.add_argument('--filename-path', default='./vietnamese_detected_face_30.csv', help='list of detected video and its subject ID')
    parser.add_argument('--save-direc', default=None, help='the directory of saving mouth ROIs')
    # -- mean face utils
    parser.add_argument('--mean-face', default='./20words_mean_face.npy', help='mean face pathname')
    # -- mouthROIs utils
    parser.add_argument('--crop-width', default=96, type=int, help='the width of mouth ROIs')
    parser.add_argument('--crop-height', default=96, type=int, help='the height of mouth ROIs')
    parser.add_argument('--start-idx', default=48, type=int, help='the start of landmark index')
    parser.add_argument('--stop-idx', default=68, type=int, help='the end of landmark index')
    parser.add_argument('--window-margin', default=12, type=int, help='window margin for smoothed_landmarks')
    # -- convert to gray scale
    parser.add_argument('--convert-gray', default=False, action='store_true', help='convert2grayscale')
    # -- test set only
    parser.add_argument('--testset-only', default=False, action='store_true', help='process testing set only')

    # μž…λ ₯받은 μΈμžκ°’μ„ args에 μ €μž₯ (type: namespace)
    args = parser.parse_args()
    return args

args = load_args()  # args νŒŒμ‹± 및 λ‘œλ“œ

# -- mean face utils
STD_SIZE = (256, 256)
mean_face_landmarks = np.load(args.mean_face)  # 20words_mean_face.npy
stablePntsIDs = [33, 36, 39, 42, 45]


# μ˜μƒμ—μ„œ λžœλ“œλ§ˆν¬ λ°›μ•„μ„œ μž…μˆ  μž˜λΌλ‚΄κΈ°
def crop_patch( video_pathname, landmarks):

    """Crop mouth patch
    :param str video_pathname: pathname for the video_dieo  # μ˜μƒ μœ„μΉ˜
    :param list landmarks: interpolated landmarks  # λ³΄κ°„λœ λžœλ“œλ§ˆν¬
    """

    frame_idx = 0  # ν”„λ ˆμž„ 인덱슀 번호 0 으둜 μ΄ˆκΈ°ν™”
    frame_gen = read_video(video_pathname)  # λΉ„λ””μ˜€ 뢈러였기
    
    # λ¬΄ν•œ 반볡
    while True:
        try:
            frame = frame_gen.__next__() ## -- BGR  # 이미지 ν”„λ ˆμž„ ν•˜λ‚˜μ”© 뢈러였기
        except StopIteration:  # 더 이상 next μš”μ†Œκ°€ μ—†μœΌλ©΄ StopIterraion Exception λ°œμƒ
            break  # while λΉ μ Έλ‚˜κ°€κΈ°
        if frame_idx == 0:  # ν”„λ ˆμž„ 인덱슀 λ²ˆν˜Έκ°€ 0일 경우
            q_frame, q_landmarks = deque(), deque()  # 데크 생성
            sequence = []

        q_landmarks.append(landmarks[frame_idx])  # ν”„λ ˆμž„ 인덱슀 λ²ˆν˜Έμ— λ§žλŠ” λžœλ“œλ§ˆν¬ 정보 μΆ”κ°€
        q_frame.append(frame)  # ν”„λ ˆμž„ 정보 μΆ”κ°€
        if len(q_frame) == args.window_margin:
            smoothed_landmarks = np.mean(q_landmarks, axis=0)  # 각 그룹의 같은 μ›μ†ŒλΌλ¦¬ 평균
            cur_landmarks = q_landmarks.popleft()  # 데크 제일 μ™Όμͺ½ κ°’ κΊΌλ‚΄κΈ°
            cur_frame = q_frame.popleft()  # 데크 제일 μ™Όμͺ½ κ°’ κΊΌλ‚΄κΈ°
            # -- affine transformation  # μ•„ν•€ λ³€ν™˜
            trans_frame, trans = warp_img( smoothed_landmarks[stablePntsIDs, :],
                                           mean_face_landmarks[stablePntsIDs, :],
                                           cur_frame,
                                           STD_SIZE)
            trans_landmarks = trans(cur_landmarks)
            # -- crop mouth patch  # μž…μˆ  μž˜λΌλ‚΄κΈ°
            sequence.append( cut_patch( trans_frame,
                                        trans_landmarks[args.start_idx:args.stop_idx],
                                        args.crop_height//2,
                                        args.crop_width//2,))
        if frame_idx == len(landmarks)-1:
            while q_frame:
                cur_frame = q_frame.popleft()  # 데크 제일 μ™Όμͺ½ κ°’ κΊΌλ‚΄κΈ°
                # -- transform frame  # ν”„λ ˆμž„ λ³€ν™˜
                trans_frame = apply_transform( trans, cur_frame, STD_SIZE)
                # -- transform landmarks  # λžœλ“œλ§ˆν¬ λ³€ν™˜
                trans_landmarks = trans(q_landmarks.popleft())
                # -- crop mouth patch  # μž…μˆ  μž˜λΌλ‚΄κΈ°
                sequence.append( cut_patch( trans_frame,
                                            trans_landmarks[args.start_idx:args.stop_idx],
                                            args.crop_height//2,
                                            args.crop_width//2,))
            return np.array(sequence)  # μž…μˆ  numpy λ°˜ν™˜
        frame_idx += 1  # ν”„λ ˆμž„ 인덱슀 번호 증가
    return None


# λžœλ“œλ§ˆν¬ 보간
def landmarks_interpolate(landmarks):
    
    """Interpolate landmarks
    param list landmarks: landmarks detected in raw videos  # 원본 μ˜μƒ λ°μ΄ν„°μ—μ„œ κ²€μΆœν•œ λžœλ“œλ§ˆν¬
    """

    valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None]  # λžœλ“œλ§ˆν¬ 번호 list 생성

    # λžœλ“œλ§ˆν¬ 번호 list κ°€ λΉ„μ–΄μžˆλ‹€λ©΄
    if not valid_frames_idx:
        return None

    # 1λΆ€ν„° (λžœλ“œλ§ˆν¬ 번호 list 개수-1)만큼 for λ¬Έ 반볡
    for idx in range(1, len(valid_frames_idx)):
        if valid_frames_idx[idx] - valid_frames_idx[idx-1] == 1:  # ν˜„μž¬ λžœλ“œλ§ˆν¬ 번호 - 이전 λžœλ“œλ§ˆν¬ 번호 == 1 일 경우
            continue  # μ½”λ“œ μ‹€ν–‰ κ±΄λ„ˆλ›°κΈ°
        else:  # μ•„λ‹ˆλΌλ©΄
            landmarks = linear_interpolate(landmarks, valid_frames_idx[idx-1], valid_frames_idx[idx])  # λžœλ“œλ§ˆν¬ μ—…λ°μ΄νŠΈ(보간)

    valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None]  # λžœλ“œλ§ˆν¬ 번호 list 생성
    # -- Corner case: keep frames at the beginning or at the end failed to be detected.  # μ‹œμž‘ λ˜λŠ” 끝 ν”„λ ˆμž„μ„ λ³΄κ΄€ν•˜μ§€ λͺ»ν•¨
    if valid_frames_idx:
        landmarks[:valid_frames_idx[0]] = [landmarks[valid_frames_idx[0]]] * valid_frames_idx[0]  # λžœλ“œλ§ˆν¬ 첫번째 ν”„λ ˆμž„ 정보 μ €μž₯
        landmarks[valid_frames_idx[-1]:] = [landmarks[valid_frames_idx[-1]]] * (len(landmarks) - valid_frames_idx[-1])  # λžœλ“œλ§ˆν¬ λ§ˆμ§€λ§‰ ν”„λ ˆμž„ 정보 μ €μž₯

    valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None]  # λžœλ“œλ§ˆν¬ 번호 list 생성
    # λžœλ“œλ§ˆν¬ 번호 list 개수 == λ³΄κ°„ν•œ λžœλ“œλ§ˆν¬ 개수 확인, μ•„λ‹ˆλ©΄ AssertionError λ©”μ‹œμ§€λ₯Ό 띄움
    assert len(valid_frames_idx) == len(landmarks), "not every frame has landmark"  # μ›ν•˜λŠ” 쑰건의 λ³€μˆ˜κ°’μ„ λ³΄μ¦ν•˜κΈ° μœ„ν•΄ μ‚¬μš©

    return landmarks  # λžœλ“œλ§ˆν¬ λ°˜ν™˜


def get_yield(output_video):
    for frame in output_video:
        yield frame


lines = open(args.filename_path).read().splitlines()  # λ¬Έμžμ—΄μ„ '\n' κΈ°μ€€μœΌλ‘œ μͺΌκ°  ν›„ list 생성
lines = list(filter(lambda x: 'test' == x.split('/')[-2], lines)) if args.testset_only else lines  # args.testset_only 값이 μžˆλ‹€λ©΄ test 폴더 속 파일λͺ…λ§Œ λΆˆλŸ¬μ™€μ„œ list 생성, μ•„λ‹ˆλΌλ©΄ μ›λž˜ lines κ·ΈλŒ€λ‘œ κ°’ μœ μ§€

# lines 개수만큼 반볡문 μ‹€ν–‰
for filename_idx, line in enumerate(lines):

    # 파일λͺ…, μ‚¬λžŒid
    filename, person_id = line.split(',')
    print('idx: {} \tProcessing.\t{}'.format(filename_idx, filename))  # 파일 인덱슀번호, 파일λͺ… 좜λ ₯

    video_pathname = os.path.join(args.video_direc, filename+args.video_format)  # μ˜μƒλ””λ ‰ν† λ¦¬ + 파일λͺ….λΉ„λ””μ˜€ν¬λ§·/
    landmarks_pathname = os.path.join(args.landmark_direc, filename+'.npz')  # μ €μž₯디렉토리 + λžœλ“œλ§ˆν¬ 파일λͺ….npz
    dst_pathname = os.path.join( args.save_direc, filename+'.npz')  # μ €μž₯디렉토리 + κ²°κ³Όμ˜μƒ 파일λͺ….npz

    # 파일이 μžˆλŠ”μ§€ 확인, μ—†μœΌλ©΄ AssertionError λ©”μ‹œμ§€λ₯Ό 띄움
    assert os.path.isfile(video_pathname), "File does not exist. Path input: {}".format(video_pathname)  # μ›ν•˜λŠ” 쑰건의 λ³€μˆ˜κ°’μ„ λ³΄μ¦ν•˜κΈ° μœ„ν•΄ μ‚¬μš©
    
    # video 에 λŒ€ν•œ face landmark npz 파일이 μ—†κ³  μ˜μƒ ν™•μž₯자 avi 인 경우 dlib 으둜 직접 npz 파일 생성
    if not os.path.exists(landmarks_pathname) and video_pathname.split('.')[-1] == 'mp4':
        
        # dlib μ‚¬μš©ν•΄μ„œ face landmark μ°ΎκΈ°
        def get_face_landmark(img):
            detector_hog = dlib.get_frontal_face_detector()
            dlib_rects = detector_hog(img, 1)
            model_path = os.path.dirname(os.path.abspath(__file__)) + '/shape_predictor_68_face_landmarks.dat'
            landmark_predictor = dlib.shape_predictor(model_path)
            
            # dlib 으둜 face landmark 찾기
            list_landmarks = []
            for dlib_rect in dlib_rects:
                points = landmark_predictor(img, dlib_rect)
                list_points = list(map(lambda p: (p.x, p.y), points.parts()))
                list_landmarks.append(list_points)

            input_width, input_height = img.shape
            output_width, output_height = (256, 256)
            width_rate = input_width / output_width
            height_rate = input_height / output_height
            img_rate = [(width_rate, height_rate)]*68
            face_rate = np.array(img_rate)
            eye_rate = np.array(img_rate[36:48])

            # face landmark list κ°€ λΉ„μ–΄μžˆμ§€ μ•Šμ€ 경우
            if list_landmarks:
                for dlib_rect, landmark in zip(dlib_rects, list_landmarks):
                    face_landmark = np.array(landmark)  # face landmark
                    eye_landmark = np.array(landmark[36:48])  # eye landmark

                    return face_landmark, eye_landmark
            # face landmark list κ°€ λΉ„μ–΄μžˆλŠ” 경우
            else:
                landmark = [(0.0, 0.0)] * 68
                face_landmark = np.array(landmark)  # face landmark
                eye_landmark = np.array(landmark[36:48])  # eye landmark
                return face_landmark, eye_landmark
        
        
        target_frames = 29  # μ›ν•˜λŠ” ν”„λ ˆμž„ 개수
        video = videoToArray(video_pathname, is_gray=args.convert_gray)  # μ˜μƒ 정보 μ•žμ— μ˜μƒ ν”„λ ˆμž„ 개수λ₯Ό μΆ”κ°€ν•œ numpy
        output_video = frameAdjust(video, target_frames)  # frame sampling (ν”„λ ˆμž„ 개수 λ§žμΆ”κΈ°)        

        multi_sub_landmarks = []
        person_landmarks = []
        frame_landmarks = []
        for frame_idx, frame in enumerate(get_yield(output_video)):
            print(f'\n ------------frame {frame_idx}------------ ')
            
            facial_landmarks, eye_landmarks = get_face_landmark(frame)  # dlib μ‚¬μš©ν•΄μ„œ face landmark μ°ΎκΈ°

            person_landmarks = {
                'id': 0,
                'most_recent_fitting_scores': np.array([2.0,2.0,2.0]),
                'facial_landmarks': facial_landmarks,
                'roll': 7,
                'yaw': 3.5,
                'eye_landmarks': eye_landmarks,
                'fitting_scores_updated': True,
                'pitch': -0.05
            }
            frame_landmarks.append(person_landmarks)
            multi_sub_landmarks.append(np.array(frame_landmarks.copy(), dtype=object))

        multi_sub_landmarks = np.array(multi_sub_landmarks)  # list to numpy
        save2npz(landmarks_pathname, data=multi_sub_landmarks)  # face landmark npz μ €μž₯
        print('\n ------------ save npz ------------ \n')
    
    # video 에 λŒ€ν•œ face landmark npz 파일이 μžˆλŠ” 경우
    else:
        
        # 파일이 μžˆλŠ”μ§€ 확인, μ—†μœΌλ©΄ AssertionError λ©”μ‹œμ§€λ₯Ό 띄움
        assert os.path.isfile(landmarks_pathname), "File does not exist. Path input: {}".format(landmarks_pathname)  # μ›ν•˜λŠ” 쑰건의 λ³€μˆ˜κ°’μ„ λ³΄μ¦ν•˜κΈ° μœ„ν•΄ μ‚¬μš©

        # 파일이 μ‘΄μž¬ν•  경우
        if os.path.exists(dst_pathname):
            continue  # μ½”λ“œ μ‹€ν–‰ κ±΄λ„ˆλ›°κΈ°

        multi_sub_landmarks = np.load( landmarks_pathname, allow_pickle=True)['data']  # numpy 파일 μ—΄κΈ°
        landmarks = [None] * len( multi_sub_landmarks)  # λžœλ“œλ§ˆν¬ λ³€μˆ˜ μ΄ˆκΈ°ν™”
        for frame_idx in range(len(landmarks)):
            try:
                landmarks[frame_idx] = multi_sub_landmarks[frame_idx][int(person_id)]['facial_landmarks'].astype(np.float64)  # ν”„λ ˆμž„ 인덱슀 λ²ˆν˜Έμ—μ„œ μ‚¬λžŒid의 μ–Όκ΅΄ λžœλ“œλ§ˆν¬ 정보 κ°€μ Έμ˜€κΈ°
            except IndexError:  # ν•΄λ‹Ή 인덱슀 λ²ˆν˜Έμ— 깂이 μ—†μœΌλ©΄ IndexError λ°œμƒ
                continue  # μ½”λ“œ μ‹€ν–‰ κ±΄λ„ˆλ›°κΈ°

        # face landmark κ°€ [(0,0)]*68 이 μ•„λ‹ˆλ©΄ λžœλ“œλ§ˆν¬ 보간 ν›„ npz 파일 생성
        landmarks_empty_list = []
        landmarks_empty = [(0, 0)]*68
        landmarks_empty = np.array(landmarks_empty, dtype=object)
        for i in range(len(landmarks_empty)):
            landmarks_empty_list.append(landmarks_empty.copy())
        condition = landmarks != landmarks_empty_list
        if condition:
            # -- pre-process landmarks: interpolate frames not being detected.
            preprocessed_landmarks = landmarks_interpolate(landmarks)  # λžœλ“œλ§ˆν¬ 보간
            # λ³€μˆ˜κ°€ λΉ„μ–΄μžˆμ§€ μ•Šλ‹€λ©΄
            if not preprocessed_landmarks:
                continue  # μ½”λ“œ μ‹€ν–‰ κ±΄λ„ˆλ›°κΈ°

            # -- crop
            sequence = crop_patch(video_pathname, preprocessed_landmarks)  # μ˜μƒμ—μ„œ λžœλ“œλ§ˆν¬ λ°›μ•„μ„œ μž…μˆ  μž˜λΌλ‚΄κΈ°
            # sequenceκ°€ λΉ„μ–΄μžˆλŠ”μ§€ 확인, λΉ„μ–΄μžˆμœΌλ©΄ AssertionError λ©”μ‹œμ§€λ₯Ό 띄움
            assert sequence is not None, "cannot crop from {}.".format(filename)  # μ›ν•˜λŠ” 쑰건의 λ³€μˆ˜κ°’μ„ λ³΄μ¦ν•˜κΈ° μœ„ν•΄ μ‚¬μš©

            # -- save
            data = convert_bgr2gray(sequence) if args.convert_gray else sequence[...,::-1]  # gray λ³€ν™˜
            save2npz(dst_pathname, data=data)  # 데이터λ₯Ό npz ν˜•μ‹μœΌλ‘œ μ €μž₯

print('Done.')