See the whole decoding script below, I ran it on the first-level output. As a side note, I am still learning the various details behind each Nilearn function, and I put some comments in the code to document my learning. Please do let me know if my interpretation is incorrect.
## Decode goal states based on LSS-N outputs on the subject surface
import nibabel as nib
import nilearn as nil
from nilearn.plotting import view_surf
import os
from sklearn.model_selection import LeaveOneGroupOut
import numpy as np
import pickle as pkl
import glob
import re
from nilearn.decoding import Decoder,FREMClassifier
import matplotlib.pyplot as plt
def decode_goal_GLM_surf_ROI(derivative_dir,label_filename, subID,event,GLM_type,fs_percent,visualize=False):
sub_dir=os.path.join(derivative_dir, 'sub-' + subID,'fMRI')
# .label to np arrays
label_files = [os.path.join(sub_dir, f) for f in label_filename]
label_vertices = [nib.freesurfer.io.read_label(f) for f in label_files]
# these should be the same surface space as the LSS-N outputs
# left_surf_file=os.path.join(derivative_dir, 'fmriprep', f'sub-{subID}', 'anat', f'sub-{subID}_hemi-L_midthickness.surf.gii')
# right_surf_file=os.path.join(derivative_dir, 'fmriprep', f'sub-{subID}', 'anat', f'sub-{subID}_hemi-R_midthickness.surf.gii')
left_surf_file=os.path.join(sub_dir,'lh.inflated')
right_surf_file=os.path.join(sub_dir,'rh.inflated')
surf=nil.surface.PolyMesh(left=left_surf_file, right=right_surf_file)
# load sulcal depth map
sulc_left_file=os.path.join(sub_dir,'lh.sulc')
sulc_right_file=os.path.join(sub_dir,'rh.sulc')
sulc_left=nib.freesurfer.io.read_morph_data(sulc_left_file)
sulc_right=nib.freesurfer.io.read_morph_data(sulc_right_file)
sulc_data = nil.surface.PolyData(
left=sulc_left,
right=sulc_right)
sulc_map = nil.surface.SurfaceImage(mesh=surf, data=sulc_data)
# put vertices index (np arrays) into polyData
mask_left = np.zeros(surf.parts['left'].n_vertices, dtype=bool)#int won't work and will return a noninformative numpy cast error
mask_left[label_vertices[0]] = True
mask_right = np.zeros(surf.parts['right'].n_vertices, dtype=bool)
mask_right[label_vertices[1]] = True
mask=nil.surface.PolyData(
left=mask_left,
right=mask_right)
# surface masker (PolyData + PolyMesh - > SurfaceImage instance)
mask_surfImg=nil.surface.SurfaceImage(mesh=surf, data=mask)
# SurfaceImage -> SurfaceMasker, standardize the data across time
mask_masker = nil.maskers.SurfaceMasker(mask_img=mask_surfImg,standardize=True)
if visualize:
# visualize the ROI on both hemispheres
roi_mask = np.zeros(surf.parts['left'].n_vertices)
roi_mask[label_vertices[0]] = 1
view_surf(
surf,
surf_map=roi_mask,
bg_map=sulc_map,
hemi='left',
title='ROI on the left hemisphere',
colorbar=True,
cmap='gray',
symmetric_cmap=False,
vmin=0,
vmax=1,
bg_on_data=True
)
roi_mask = np.zeros(surf.parts['right'].n_vertices)
roi_mask[label_vertices[1]] = 1
view_surf(
surf,
surf_map=roi_mask,
bg_map=sulc_map,
hemi='right',
title='ROI on the right hemisphere',
colorbar=True,
cmap='gray',
symmetric_cmap=False,
vmin=0,
vmax=1,
bg_on_data=True
)
# load the GLM outputs
if GLM_type == 'LSSN':
GLM_output_file = glob.glob(os.path.join(sub_dir,'1stlvl_native_surface','sub-'+subID+'_task-study_acq-blk*_event-'+event+'_native_surface_LSSN_output.pkl'))
elif GLM_type == 'LSA':
GLM_output_file = glob.glob(os.path.join(sub_dir,'1stlvl_native_surface','sub-'+subID+'_task-study_event-'+event+'_native_surface_LSA_output.pkl'))
else:
raise ValueError("GLM_type must be either 'LSSN' or 'LSA'")
feature_maps = []
conditions_label = []
run_label = []
# loop over runs
for file in GLM_output_file:
with open(file, 'rb') as f:
data = pkl.load(f)
# find indices for the goal and stim onsets
event_idx=[i for i, s in enumerate(data['conditions']) if event+'_' in s]
# stim_idx=[i for i, s in enumerate(data['conditions']) if "study_stim_" in s]
# fill the lists to be used for decoding (feature_maps, conditions_label, run_label)
for i in event_idx:
fmap = data['beta_maps'][i]['effect_size']
feature_maps.append(fmap)
match = re.search(r'(size|liv)', data['conditions'][i])
if match:
extracted = match.group(1)
else:
raise ValueError("trial neither size nor liv")
conditions_label.append(extracted)
run_label.append(data['runs'][i])
"""
the Nilearn decoder does not return trial-level accuracy, furthermore, it is unclear
whether the hyperparameter tuning is done across cross-validation folds, and whether the
dicoder.predict() method double-dip the data. Therefore, it is safer to manually separate
out a run to test the decoder performance and only fit the decoder on the rest of the runs (still with cross-validation).
In decoder.py in the fit() method, the following comments are made:
coef_ : numpy.ndarray, shape=(n_classes, n_features)
Contains the mean of the models weight vector across
fold for each class. Returns None for Dummy estimators.
Therefore, to truely avoid double-dipping, we need to leave one run out of the CV iterations
"""
test_accuracy = []
test_logits = []
inner_CV_accuracy = []
training_accuracy = []
null_distributions = []
for r in np.unique(run_label):
feature_maps_train = [z for z, run in zip(feature_maps, run_label) if run != r]
conditions_label_train = [c for c, run in zip(conditions_label, run_label) if run != r]
run_label_train = [run for run in run_label if run != r]
feature_maps_test = [z for z, run in zip(feature_maps, run_label) if run == r]
conditions_label_test = [c for c, run in zip(conditions_label, run_label) if run == r]
run_label_test = [run for run in run_label if run == r]
# decode
# decoder=FREMClassifier(
# estimator="logistic_l2",
# mask=mask_masker,
# standardize=True,
# screening_percentile=fs_percent,
# scoring='accuracy',
# cv=LeaveOneGroupOut(),
# )
decoder = Decoder(
estimator="logistic_l2",
mask=mask_masker,
standardize=True,
screening_percentile=fs_percent,
scoring='accuracy',
cv=LeaveOneGroupOut(),
)
decoder.fit(feature_maps_train, conditions_label_train, groups=run_label_train)
# get the training accuracy
train_predictions = decoder.predict(feature_maps_train)
train_accuracy = np.mean(train_predictions == conditions_label_train)
training_accuracy.append(train_accuracy)
# get the inner CV accuracy
inner_CV_accuracy.append(decoder.cv_scores_)
# get the predictions for the test set
predictions = decoder.predict(feature_maps_test)
# calculate the accuracy
accuracy = np.mean(predictions == conditions_label_test)
test_accuracy.append(accuracy)
"""
generate a surrogate null. The dummy decoder in Nilearn decoder fix the random state
of the sklearn dummy classifier to 0, and that parameter is only changeable after the
.fit() call, so we cannot use the dummy decoder to generate a surrogate null.
"""
# shuffle the test labels and compare that to the predicted labels to get a baseline accuracy
null_distribution = []
for i in range(1000):
shuffled_label_test = np.random.permutation(conditions_label_test)
null_accuracy = np.mean(predictions == shuffled_label_test)
null_distribution.append(null_accuracy)
null_distributions.append(null_distribution)
# get trial-level logit
logits = decoder.decision_function(feature_maps_test)
test_logits.append(logits)
"""
The logits need to be processed before we use them for hypothesis testing.
Specifically, since we are doing binary classification, the magnitude of the logits
basically reflect the amount of evidence for one class over the other. However, we
want to incorporate accuracy information into the logits. In other words, we want a
trial-level metric to reflect "how accurate" the classification is. Therefore we will flip
the signs for trials that are actually "liv". This way, all negative
values will reflect (the degree of) misclassification, while all positive values will reflect
(the degree of) correct classification.
"""
# concatenate across runs
test_logits = np.concatenate(test_logits)
test_logits[conditions_label == 'liv'] *= -1
# save the results
results = {
'subID': subID,
'fs_percent': fs_percent,
'decoding_event': event,
'test_accuracy': test_accuracy,
'test_logits': test_logits,
'conditions_label': conditions_label_test,
'run_label': run_label_test
}
results_file = os.path.join(sub_dir, f'sub-{subID}_pres-{event}_GLM-{GLM_type}_GoalDecoding_results.pkl')
with open(results_file, 'wb') as f:
pkl.dump(results, f)
# also save the logits as csv files for R
logits_file = os.path.join(sub_dir, f'sub-{subID}_pres-{event}_GLM-{GLM_type}_GoalDecoding_logits.csv')
np.savetxt(logits_file, test_logits, delimiter=',', header='decoding_logits')
# plot a time trace of the decoding logits for both goal and stim
plt.figure(figsize=(24, 6))
plt.plot(test_logits, label='Goal Decoding Logits', color='blue')
plt.title(f'Decoding Logits during {event}')
plt.xlabel('Trial')
plt.ylabel('Accuracy Logit Value (positive = correct, negative = incorrect)')
plt.axhline(0, color='black', linestyle='--', linewidth=0.5)
plt.legend()
plt.savefig(os.path.join(sub_dir, f'sub-{subID}_pres-{event}_GLM-{GLM_type}_GoalDecoding_logits.png'))
if visualize:
plt.show()
"""
In the LSS-N, the fixations are modelled as baseline therefore there is no betas associated with them.
But it is possible that the goal is maintained during the fixation, and that these goal maintenance
may be more susceptible to attention fluctuations. So here we use the BOLD time series to decode the
goal states TR by TR (enabled by the fact that each trial starts at a TR onset), though we need to account
for the jitter in the fixations.
"""
def decode_goal_BOLD_surf_ROI(label_files, subID):
"""
TODO : figure out how to decode on both hemispheres at the same time
plot ROI on the surface to visually check
leave one run out cross-validation
"""
print('not implemented yet')
if __name__ == "__main__":
subID = 'AE034Y'
derivative_dir='C:/Users/haozi/OneDrive/Desktop/Postdoc/Wagner/attention_goal_memory/AMASS_encoding_data/BIDS/derivatives'
label_filename = ['lh.yeo17_ifs.label','rh.yeo17_ifs.label']
fs_percent=100 # not sure if this is implemented correctly as it does not change the results at all
visualize=False
event='goal'
GLM_type='LSA'
decode_goal_GLM_surf_ROI(derivative_dir,label_filename, subID,event,GLM_type,fs_percent,visualize)