File size: 5,192 Bytes
5cf9383 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
import os
import sys
import numpy as np
import h5py
import scipy.io as spio
import nibabel as nib
import argparse
parser = argparse.ArgumentParser(description='Argument Parser')
parser.add_argument("-sub", "--sub",help="Subject Number",default=1)
args = parser.parse_args()
sub=int(args.sub)
assert sub in [1,2,5,7]
def loadmat(filename):
'''
this function should be called instead of direct spio.loadmat
as it cures the problem of not properly recovering python dictionaries
from mat files. It calls the function check keys to cure all entries
which are still mat-objects
'''
def _check_keys(d):
'''
checks if entries in dictionary are mat-objects. If yes
todict is called to change them to nested dictionaries
'''
for key in d:
if isinstance(d[key], spio.matlab.mio5_params.mat_struct):
d[key] = _todict(d[key])
return d
def _todict(matobj):
'''
A recursive function which constructs from matobjects nested dictionaries
'''
d = {}
for strg in matobj._fieldnames:
elem = matobj.__dict__[strg]
if isinstance(elem, spio.matlab.mio5_params.mat_struct):
d[strg] = _todict(elem)
elif isinstance(elem, np.ndarray):
d[strg] = _tolist(elem)
else:
d[strg] = elem
return d
def _tolist(ndarray):
'''
A recursive function which constructs lists from cellarrays
(which are loaded as numpy ndarrays), recursing into the elements
if they contain matobjects.
'''
elem_list = []
for sub_elem in ndarray:
if isinstance(sub_elem, spio.matlab.mio5_params.mat_struct):
elem_list.append(_todict(sub_elem))
elif isinstance(sub_elem, np.ndarray):
elem_list.append(_tolist(sub_elem))
else:
elem_list.append(sub_elem)
return elem_list
data = spio.loadmat(filename, struct_as_record=False, squeeze_me=True)
return _check_keys(data)
stim_order_f = 'nsddata/experiments/nsd/nsd_expdesign.mat'
stim_order = loadmat(stim_order_f)
## Selecting ids for training and test data
sig_train = {}
sig_test = {}
num_trials = 37*750
for idx in range(num_trials):
''' nsdId as in design csv files'''
nsdId = stim_order['subjectim'][sub-1, stim_order['masterordering'][idx] - 1] - 1
if stim_order['masterordering'][idx]>1000:
if nsdId not in sig_train:
sig_train[nsdId] = []
sig_train[nsdId].append(idx)
else:
if nsdId not in sig_test:
sig_test[nsdId] = []
sig_test[nsdId].append(idx)
train_im_idx = list(sig_train.keys())
test_im_idx = list(sig_test.keys())
roi_dir = 'nsddata/ppdata/subj{:02d}/func1pt8mm/roi/'.format(sub)
betas_dir = 'nsddata_betas/ppdata/subj{:02d}/func1pt8mm/betas_fithrf_GLMdenoise_RR/'.format(sub)
mask_filename = 'nsdgeneral.nii.gz'
mask = nib.load(roi_dir+mask_filename).get_fdata()
num_voxel = mask[mask>0].shape[0]
fmri = np.zeros((num_trials, num_voxel)).astype(np.float32)
for i in range(37):
beta_filename = "betas_session{0:02d}.nii.gz".format(i+1)
beta_f = nib.load(betas_dir+beta_filename).get_fdata().astype(np.float32)
fmri[i*750:(i+1)*750] = beta_f[mask>0].transpose()
del beta_f
print(i)
print("fMRI Data are loaded.")
f_stim = h5py.File('nsddata_stimuli/stimuli/nsd/nsd_stimuli.hdf5', 'r')
stim = f_stim['imgBrick'][:]
print("Stimuli are loaded.")
num_train, num_test = len(train_im_idx), len(test_im_idx)
vox_dim, im_dim, im_c = num_voxel, 425, 3
fmri_array = np.zeros((num_train,vox_dim))
stim_array = np.zeros((num_train,im_dim,im_dim,im_c))
for i,idx in enumerate(train_im_idx):
stim_array[i] = stim[idx]
fmri_array[i] = fmri[sorted(sig_train[idx])].mean(0)
print(i)
np.save('processed_data/subj{:02d}/nsd_train_fmriavg_nsdgeneral_sub{}.npy'.format(sub,sub),fmri_array )
np.save('processed_data/subj{:02d}/nsd_train_stim_sub{}.npy'.format(sub,sub),stim_array )
print("Training data is saved.")
fmri_array = np.zeros((num_test,vox_dim))
stim_array = np.zeros((num_test,im_dim,im_dim,im_c))
for i,idx in enumerate(test_im_idx):
stim_array[i] = stim[idx]
fmri_array[i] = fmri[sorted(sig_test[idx])].mean(0)
print(i)
np.save('processed_data/subj{:02d}/nsd_test_fmriavg_nsdgeneral_sub{}.npy'.format(sub,sub),fmri_array )
np.save('processed_data/subj{:02d}/nsd_test_stim_sub{}.npy'.format(sub,sub),stim_array )
print("Test data is saved.")
annots_cur = np.load('annots/COCO_73k_annots_curated.npy')
captions_array = np.empty((num_train,5),dtype=annots_cur.dtype)
for i,idx in enumerate(train_im_idx):
captions_array[i,:] = annots_cur[idx,:]
print(i)
np.save('processed_data/subj{:02d}/nsd_train_cap_sub{}.npy'.format(sub,sub),captions_array )
captions_array = np.empty((num_test,5),dtype=annots_cur.dtype)
for i,idx in enumerate(test_im_idx):
captions_array[i,:] = annots_cur[idx,:]
print(i)
np.save('processed_data/subj{:02d}/nsd_test_cap_sub{}.npy'.format(sub,sub),captions_array )
print("Caption data are saved.") |