File size: 7,395 Bytes
424188c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import numpy as np
from datasets.corners import CornersDataset
import os
import skimage
import cv2
from torchvision import transforms
from PIL import Image
from datasets.data_utils import RandomBlur
class OutdoorBuildingDataset(CornersDataset):
def __init__(self, data_path, det_path, phase='train', image_size=256, rand_aug=True,
inference=False):
super(OutdoorBuildingDataset, self).__init__(image_size, inference)
self.data_path = data_path
self.det_path = det_path
self.phase = phase
self.rand_aug = rand_aug
self.image_size = image_size
self.inference = inference
blur_transform = RandomBlur()
self.train_transform = transforms.Compose([
transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
transforms.RandomGrayscale(p=0.3),
blur_transform])
if phase == 'train':
datalistfile = os.path.join(data_path, 'train_list.txt')
self.training = True
else:
datalistfile = os.path.join(data_path, 'valid_list.txt')
self.training = False
with open(datalistfile, 'r') as f:
_data_names = f.readlines()
if phase == 'train':
self._data_names = _data_names
else:
# based on the data split rule from previous works
if phase == 'valid':
self._data_names = _data_names[:50]
elif phase == 'test':
self._data_names = _data_names[50:]
else:
raise ValueError('Invalid phase {}'.format(phase))
def __len__(self):
return len(self._data_names)
def __getitem__(self, idx):
data_name = self._data_names[idx][:-1]
annot_path = os.path.join(self.data_path, 'annot', data_name + '.npy')
annot = np.load(annot_path, allow_pickle=True, encoding='latin1').tolist()
det_path = os.path.join(self.det_path, data_name + '.npy')
det_corners = np.array(np.load(det_path, allow_pickle=True)) # [N, 2]
det_corners = det_corners[:, ::-1] # turn into x,y format
img_path = os.path.join(self.data_path, 'rgb', data_name + '.jpg')
rgb = cv2.imread(img_path)
if self.image_size != 256:
rgb, annot, det_corners = self.resize_data(rgb, annot, det_corners)
if self.rand_aug:
image, annot, corner_mapping, det_corners = self.random_aug_annot(rgb, annot, det_corners=det_corners)
else:
image = rgb
rec_mat = None
corners = np.array(list(annot.keys()))[:, [1, 0]]
if not self.inference and len(corners) > 100:
new_idx = np.random.randint(0, len(self))
return self.__getitem__(new_idx)
if self.training:
# Add some randomness for g.t. corners
corners += np.random.normal(0, 0, size=corners.shape)
pil_img = Image.fromarray(image)
image = self.train_transform(pil_img)
image = np.array(image)
image = skimage.img_as_float(image)
# sort by the second value and then the first value, here the corners are in the format of (y, x)
sort_idx = np.lexsort(corners.T)
corners = corners[sort_idx]
corner_list = []
for corner_i in range(corners.shape[0]):
corner_list.append((corners[corner_i][1], corners[corner_i][0])) # to (x, y) format
raw_data = {
'name': data_name,
'corners': corner_list,
'annot': annot,
'image': image,
'rec_mat': rec_mat,
'annot_path': annot_path,
'det_path': det_path,
'img_path': img_path,
}
return self.process_data(raw_data)
def random_aug_annot(self, img, annot, det_corners=None):
# do random flipping
img, annot, det_corners = self.random_flip(img, annot, det_corners)
# prepare random augmentation parameters (only do random rotation for now)
theta = np.random.randint(0, 360) / 360 * np.pi * 2
r = self.image_size / 256
origin = [127 * r, 127 * r]
p1_new = [127 * r + 100 * np.sin(theta) * r, 127 * r - 100 * np.cos(theta) * r]
p2_new = [127 * r + 100 * np.cos(theta) * r, 127 * r + 100 * np.sin(theta) * r]
p1_old = [127 * r, 127 * r - 100 * r] # y_axis
p2_old = [127 * r + 100 * r, 127 * r] # x_axis
pts1 = np.array([origin, p1_old, p2_old]).astype(np.float32)
pts2 = np.array([origin, p1_new, p2_new]).astype(np.float32)
M_rot = cv2.getAffineTransform(pts1, pts2)
# Combine annotation corners and detection corners
all_corners = list(annot.keys())
if det_corners is not None:
for i in range(det_corners.shape[0]):
all_corners.append(tuple(det_corners[i]))
all_corners_ = np.array(all_corners)
# Do the corner transform within a big matrix transformation
corner_mapping = dict()
ones = np.ones([all_corners_.shape[0], 1])
all_corners_ = np.concatenate([all_corners_, ones], axis=-1)
aug_corners = np.matmul(M_rot, all_corners_.T).T
for idx, corner in enumerate(all_corners):
corner_mapping[corner] = aug_corners[idx]
# If the transformed geometry goes beyond image boundary, we simply re-do the augmentation
new_corners = np.array(list(corner_mapping.values()))
if new_corners.min() <= 0 or new_corners.max() >= (self.image_size - 1):
# return self.random_aug_annot(img, annot, det_corners)
return img, annot, None, det_corners
# build the new annot dict
aug_annot = dict()
for corner, connections in annot.items():
new_corner = corner_mapping[corner]
tuple_new_corner = tuple(new_corner)
aug_annot[tuple_new_corner] = list()
for to_corner in connections:
aug_annot[tuple_new_corner].append(corner_mapping[tuple(to_corner)])
# Also transform the image correspondingly
rows, cols, ch = img.shape
new_img = cv2.warpAffine(img, M_rot, (cols, rows), borderValue=(255, 255, 255))
y_start = (new_img.shape[0] - self.image_size) // 2
x_start = (new_img.shape[1] - self.image_size) // 2
aug_img = new_img[y_start:y_start + self.image_size, x_start:x_start + self.image_size, :]
if det_corners is None:
return aug_img, aug_annot, corner_mapping, None
else:
aug_det_corners = list()
for corner in det_corners:
new_corner = corner_mapping[tuple(corner)]
aug_det_corners.append(new_corner)
aug_det_corners = np.array(aug_det_corners)
return aug_img, aug_annot, corner_mapping, aug_det_corners
if __name__ == '__main__':
from torch.utils.data import DataLoader
DATAPATH = './data/cities_dataset'
DET_PATH = './data/det_final'
train_dataset = OutdoorBuildingDataset(DATAPATH, DET_PATH, phase='train')
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0,
collate_fn=collate_fn)
for i, item in enumerate(train_dataloader):
import pdb;
pdb.set_trace()
print(item)
|