|
import os |
|
import sys |
|
import numpy as np |
|
import h5py |
|
import scipy.io as spio |
|
import nibabel as nib |
|
|
|
import argparse |
|
parser = argparse.ArgumentParser(description='Argument Parser') |
|
parser.add_argument("-sub", "--sub",help="Subject Number",default=1) |
|
args = parser.parse_args() |
|
sub=int(args.sub) |
|
assert sub in [1,2,5,7] |
|
|
|
def loadmat(filename): |
|
''' |
|
this function should be called instead of direct spio.loadmat |
|
as it cures the problem of not properly recovering python dictionaries |
|
from mat files. It calls the function check keys to cure all entries |
|
which are still mat-objects |
|
''' |
|
def _check_keys(d): |
|
''' |
|
checks if entries in dictionary are mat-objects. If yes |
|
todict is called to change them to nested dictionaries |
|
''' |
|
for key in d: |
|
if isinstance(d[key], spio.matlab.mio5_params.mat_struct): |
|
d[key] = _todict(d[key]) |
|
return d |
|
|
|
def _todict(matobj): |
|
''' |
|
A recursive function which constructs from matobjects nested dictionaries |
|
''' |
|
d = {} |
|
for strg in matobj._fieldnames: |
|
elem = matobj.__dict__[strg] |
|
if isinstance(elem, spio.matlab.mio5_params.mat_struct): |
|
d[strg] = _todict(elem) |
|
elif isinstance(elem, np.ndarray): |
|
d[strg] = _tolist(elem) |
|
else: |
|
d[strg] = elem |
|
return d |
|
|
|
def _tolist(ndarray): |
|
''' |
|
A recursive function which constructs lists from cellarrays |
|
(which are loaded as numpy ndarrays), recursing into the elements |
|
if they contain matobjects. |
|
''' |
|
elem_list = [] |
|
for sub_elem in ndarray: |
|
if isinstance(sub_elem, spio.matlab.mio5_params.mat_struct): |
|
elem_list.append(_todict(sub_elem)) |
|
elif isinstance(sub_elem, np.ndarray): |
|
elem_list.append(_tolist(sub_elem)) |
|
else: |
|
elem_list.append(sub_elem) |
|
return elem_list |
|
data = spio.loadmat(filename, struct_as_record=False, squeeze_me=True) |
|
return _check_keys(data) |
|
|
|
|
|
|
|
stim_order_f = 'nsddata/experiments/nsd/nsd_expdesign.mat' |
|
stim_order = loadmat(stim_order_f) |
|
|
|
|
|
|
|
|
|
sig_train = {} |
|
sig_test = {} |
|
num_trials = 37*750 |
|
for idx in range(num_trials): |
|
''' nsdId as in design csv files''' |
|
nsdId = stim_order['subjectim'][sub-1, stim_order['masterordering'][idx] - 1] - 1 |
|
if stim_order['masterordering'][idx]>1000: |
|
if nsdId not in sig_train: |
|
sig_train[nsdId] = [] |
|
sig_train[nsdId].append(idx) |
|
else: |
|
if nsdId not in sig_test: |
|
sig_test[nsdId] = [] |
|
sig_test[nsdId].append(idx) |
|
|
|
|
|
train_im_idx = list(sig_train.keys()) |
|
test_im_idx = list(sig_test.keys()) |
|
|
|
|
|
roi_dir = 'nsddata/ppdata/subj{:02d}/func1pt8mm/roi/'.format(sub) |
|
betas_dir = 'nsddata_betas/ppdata/subj{:02d}/func1pt8mm/betas_fithrf_GLMdenoise_RR/'.format(sub) |
|
|
|
mask_filename = 'nsdgeneral.nii.gz' |
|
mask = nib.load(roi_dir+mask_filename).get_fdata() |
|
num_voxel = mask[mask>0].shape[0] |
|
|
|
fmri = np.zeros((num_trials, num_voxel)).astype(np.float32) |
|
for i in range(37): |
|
beta_filename = "betas_session{0:02d}.nii.gz".format(i+1) |
|
beta_f = nib.load(betas_dir+beta_filename).get_fdata().astype(np.float32) |
|
fmri[i*750:(i+1)*750] = beta_f[mask>0].transpose() |
|
del beta_f |
|
print(i) |
|
|
|
print("fMRI Data are loaded.") |
|
|
|
f_stim = h5py.File('nsddata_stimuli/stimuli/nsd/nsd_stimuli.hdf5', 'r') |
|
stim = f_stim['imgBrick'][:] |
|
|
|
print("Stimuli are loaded.") |
|
|
|
num_train, num_test = len(train_im_idx), len(test_im_idx) |
|
vox_dim, im_dim, im_c = num_voxel, 425, 3 |
|
fmri_array = np.zeros((num_train,vox_dim)) |
|
stim_array = np.zeros((num_train,im_dim,im_dim,im_c)) |
|
for i,idx in enumerate(train_im_idx): |
|
stim_array[i] = stim[idx] |
|
fmri_array[i] = fmri[sorted(sig_train[idx])].mean(0) |
|
print(i) |
|
|
|
np.save('processed_data/subj{:02d}/nsd_train_fmriavg_nsdgeneral_sub{}.npy'.format(sub,sub),fmri_array ) |
|
np.save('processed_data/subj{:02d}/nsd_train_stim_sub{}.npy'.format(sub,sub),stim_array ) |
|
|
|
print("Training data is saved.") |
|
|
|
fmri_array = np.zeros((num_test,vox_dim)) |
|
stim_array = np.zeros((num_test,im_dim,im_dim,im_c)) |
|
for i,idx in enumerate(test_im_idx): |
|
stim_array[i] = stim[idx] |
|
fmri_array[i] = fmri[sorted(sig_test[idx])].mean(0) |
|
print(i) |
|
|
|
np.save('processed_data/subj{:02d}/nsd_test_fmriavg_nsdgeneral_sub{}.npy'.format(sub,sub),fmri_array ) |
|
np.save('processed_data/subj{:02d}/nsd_test_stim_sub{}.npy'.format(sub,sub),stim_array ) |
|
|
|
print("Test data is saved.") |
|
|
|
annots_cur = np.load('annots/COCO_73k_annots_curated.npy') |
|
|
|
captions_array = np.empty((num_train,5),dtype=annots_cur.dtype) |
|
for i,idx in enumerate(train_im_idx): |
|
captions_array[i,:] = annots_cur[idx,:] |
|
print(i) |
|
np.save('processed_data/subj{:02d}/nsd_train_cap_sub{}.npy'.format(sub,sub),captions_array ) |
|
|
|
captions_array = np.empty((num_test,5),dtype=annots_cur.dtype) |
|
for i,idx in enumerate(test_im_idx): |
|
captions_array[i,:] = annots_cur[idx,:] |
|
print(i) |
|
np.save('processed_data/subj{:02d}/nsd_test_cap_sub{}.npy'.format(sub,sub),captions_array ) |
|
|
|
print("Caption data are saved.") |