import yaml from yaml.loader import SafeLoader from pathlib import Path # Import configuration settings with open('/blue/vabfmc/data/working/robert727/cda2/data_processing/scripts/spm_l1_and_l2_wf.yml') as config_file: my_config = yaml.load(config_file, Loader=SafeLoader) # Based on documentation, config and logging need to be imported and # set before importing workflow from nipype import config # Set up base and results directory, i.e., where my workflow will look for files # and where it will write output files. l1_base_dir = Path(my_config['input_data_dir']) results_dir = Path(my_config['output_data_dir']).joinpath( my_config['analysis_name']) # Create results directory if it doesn't already exist results_dir.mkdir(parents=True, exist_ok=True) # Create crash and log file directories crashdump_dir = results_dir.joinpath('crash') crashdump_dir.mkdir(parents=True, exist_ok=True) log_dir = results_dir.joinpath('logs') log_dir.mkdir(parents=True, exist_ok=True) # Set log and crash directories globally config_dict = {'execution': {'crashdump_dir': str(crashdump_dir), 'crashfile_format': 'txt'}, 'logging': {'log_directory': str(log_dir), 'log_to_file': True}} config.update_config(config_dict) # If enable debug is set to true in config, enable this setting. if my_config['enable_debug_mode']: config.enable_debug_mode() import pandas as pd import nipype.interfaces.fsl as fsl from nipype import SelectFiles import os # system function from nipype import IdentityInterface from nipype.interfaces.utility import Function from nipype.pipeline import Node, Workflow, MapNode from bids.layout import BIDSLayout from nipype.interfaces.io import DataGrabber, DataSink import nipype.algorithms.modelgen as model # model specification import nipype.pipeline.engine as pe # pypeline engine import nipype.interfaces.spm as spm # spm # Set matlab command which really is a standalone, compiled version of SPM12. Use mcr. Both are version 2021a. # Documentation: https://nipype.readthedocs.io/en/latest/api/generated/nipype.interfaces.spm.base.html#module-nipype.interfaces.spm.base matlab_cmd = '/blue/vabfmc/data/working/robert727/Software/spm12_standalone/run_spm12.sh /blue/vabfmc/data/working/robert727/Software/MCR/v910/ script' spm.SPMCommand.set_mlab_paths(matlab_cmd=matlab_cmd, use_mcr=True) print(f"Running SPM version: {spm.SPMCommand().version}") # Specify output type for FSL nodes fsl.FSLCommand.set_default_output_type('NIFTI') print(f'FSLDIR: {os.getenv("FSLDIR")}') print(f'FSL version: {fsl.Info().version()}') # Create overall workflow l1_analyses = Workflow(name="l1_analyses") l1_analyses.base_dir = results_dir # Note that we should be able to use this node to grab the files we # need for analysis but for some reason I couldn't get it to grab the # specific files I needed. # If enable debug is set to true, use test subjects set inside config.yml # else use all subjects inside input directory if my_config['use_test_subs']: subject_ids = my_config['test_subs'] else: layout = BIDSLayout(l1_base_dir) subject_ids = layout.get_subjects() print( f"{len(subject_ids)} subjects to be included in analyses: {subject_ids}") # Set up IdentityInterface node to iterate over subject IDs subject_iterator = Node(IdentityInterface(fields=['subject_id']), name='subject_iterator') subject_iterator.iterables = ('subject_id', subject_ids) subject_iterator.inputs.subject_id = subject_ids # Set up imagegrabber node to get the required file paths imagegrabber = Node(DataGrabber(infields=['subject_id', 'task', 'desc'], outfields=['anat', 'func', 'mask', 'events', 'confounds']), name='imagegrabber') imagegrabber.inputs.template = '*' # Create patterns to be used to grab the different kind of files anat_pattern = l1_base_dir.joinpath( 'sub-%s', 'anat', 'sub-%s_space-MNI152NLin2009cAsym_res-2_desc-%s_T1w.nii*') func_pattern = l1_base_dir.joinpath( 'sub-%s', 'func', 'sub-%s_task-%s_space-MNI152NLin2009cAsym_res-2_desc-%s_bold.nii*') events_pattern = l1_base_dir.joinpath( 'sub-%s', 'func', 'sub-%s_task-%s_events.tsv') confounds_pattern = l1_base_dir.joinpath( 'sub-%s', 'func', 'sub-%s_task-%s_desc-confounds_timeseries.tsv') imagegrabber.inputs.template_args = {'anat': [['subject_id', 'subject_id', 'desc']], 'func': [['subject_id', 'subject_id', 'task', 'desc']], 'events': [['subject_id', 'subject_id', 'task']], 'confounds': [['subject_id', 'subject_id', 'task']]} imagegrabber.inputs.field_template = {'anat': str(anat_pattern), 'func': str(func_pattern), 'events': str(events_pattern), 'confounds': str(confounds_pattern)} imagegrabber.inputs.subject_id = subject_ids imagegrabber.inputs.task = my_config['task_name'] imagegrabber.inputs.desc = 'preproc' imagegrabber.inputs.base_dir = l1_base_dir imagegrabber.inputs.sort_filelist = True # Connect subject_iterator node to imagegrabber node l1_analyses.connect(subject_iterator, 'subject_id', imagegrabber, 'subject_id') # Create function node to unzip functional files def unzip_functional_images(func: str): """The smooth module in spm requires that functional files are unzipped. This will iterate through a list of paths to functional images and unzip them. Args: func (str): str paths to functional image """ from pathlib import Path import os # Datagrabber returns paths as lists. Make them into pathlib paths func_path = Path(func) # initialize list that will hold updated paths after the files are unzipped if func_path.suffix == '.gz': print(f"Unizipping functional image with path {func_path}") os.system(f'gunzip -f {func_path}') # Append new path without .gz to unzipped_func list. Parent attribute will give # the file path to the directory that contains the functional image. The stem attribute # gives the filepath with the file extension removed. unzipped_func = func_path.parent.joinpath(func_path.stem) else: print(f"File {func_path} is already unzipped") unzipped_func = func_path unzipped_func_list = [str(unzipped_func.resolve())] return (unzipped_func, unzipped_func_list) gunzipFuncs = Node(Function(input_names=['func'], output_names=['unzipped_func', 'unzipped_func_list'], # type: ignore function=unzip_functional_images), name='gunzipFuncs') # Connect connect datagrabber node to gunzip func node l1_analyses.connect(imagegrabber, 'func', gunzipFuncs, 'func') # Create mean image from functional run to pass into BET meanimg = Node(fsl.MeanImage(), name='average_images') # Connect unzip node to mean image node l1_analyses.connect(gunzipFuncs, 'unzipped_func', meanimg, 'in_file') # Use BET to create brain mask and strip skill from functional images bet = Node(fsl.BET(mask=True, frac=0.3, no_output=False, ignore_exception=True), name='bet') # Pass mean image into BET to create mask l1_analyses.connect(meanimg, 'out_file', bet, 'in_file') # Spatially smooth functional data smooth = pe.Node(interface=spm.Smooth(), name="smooth") fwhm = my_config['fwhm'] smooth.iterables = ('fwhm', fwhm) # Connect smooth node to gunzipFuncs. Input functional images l1_analyses.connect(gunzipFuncs, 'unzipped_func_list', smooth, 'in_files') # Apply mask to smoothed functional images apply_mask = Node(fsl.ApplyMask(), name="apply_mask") # Pass smoothed functional image from smooth node to apply_mask node l1_analyses.connect(smooth, "smoothed_files", apply_mask, 'in_file') # Pass mask from imagegrabber to apply_mask node l1_analyses.connect(bet, 'mask_file', apply_mask, 'mask_file') # Create custom function to get subject info needed to pass into specify model def get_subject_info(events, confounds): """Creates subject_info bunch to be passed into specify model. Also creates the motion outlier text files required by specify model Args: events (str): path to subjects events tsv passed in from select files confounds (str): path to subjects confounds tsv passed in from select files Returns: subject_info (Bunch): contains subject info to be passed into model specification outlier_file_paths (list): list containing the paths to subjects motion outlier txt files """ import pandas as pd from nipype.interfaces.base import Bunch from pathlib import Path import yaml from yaml.loader import SafeLoader # Import configuration settings with open('/blue/vabfmc/data/working/robert727/cda2/data_processing/scripts/spm_l1_and_l2_wf.yml') as config_file: my_config = yaml.load(config_file, Loader=SafeLoader) # Loop through each subjects events and confounds tsv and use it to create their # subject info Bunch. Also create outlier txt files for each subject and return the # paths in a list subject_info = [] outlier_file_paths = [] my_confounds = my_config['confounds'] # Read in events.tsv. Need to make this grab the events tsv output from workflow # get_subject_data_wf above print(f'Reading in events.tsv at {events}') events_tsv = pd.read_table(events) # Initialize lists containing conditions, onsets, and durations. conditions = [] onsets = [] durations = [] # Group events tsv by trial type for group in events_tsv.groupby('trial_type'): # Group[0] will be the trial types (i.e., the conditions) conditions.append(group[0]) onsets.append(group[1].onset.tolist()) durations.append(group[1].duration.tolist()) print(f'Conditions: {conditions}') print(f'Onsets: {onsets}') print(f'Durations: {durations}') # Get model confounds. Need to make this grab the confounds tsv output from workflow # get_subject_data_wf above. Use confounds list to only read in desired confounds. print(f'Reading in confounds.tsv at {confounds}') confounds_tsv_df = pd.read_table(confounds) # Extract motion outlier columns, add them together and write to txt. Add outlier txt file # path to motion outliers list print(f'Creating outliers text file.') motion_outliers = confounds_tsv_df.filter(regex=r"motion_outlier\d+") outlier_file_path = Path(confounds).parent.joinpath(f"outlier_volumes.txt") motion_outliers_series = motion_outliers.sum(axis=1).astype(dtype=int) volumes_to_be_removed = motion_outliers_series.index[motion_outliers_series == 1].tolist( ) print( f'{len(volumes_to_be_removed)} volumes to be scrubbed from analyses.') print(f'Volumes to be scrubbed: {volumes_to_be_removed}') motion_outliers_series.to_csv( outlier_file_path, sep="\n", index=False, header=False) outlier_file_paths.append(outlier_file_path) # Filter confounds df to selected confounds to be used in analyses. print( f'Filtering confounds tsv for selected confounds: {my_confounds}') confounds_tsv_df = confounds_tsv_df.filter(items=my_confounds) # # Initialize lists containing confound values for each volume and confound names regressors = [] regressor_names = [] # Loop through confounds, convert each to a list and append to regressors list for confound in confounds_tsv_df.columns: regressors.append( confounds_tsv_df.loc[:, confound].tolist()) # type: ignore regressor_names.append(confound) subject_info.append(Bunch(conditions=conditions, onsets=onsets, durations=durations, regressors=regressors, regressor_names=regressor_names )) # this output will later be returned to infosource return (subject_info, outlier_file_paths) # Get Subject Info - get subject specific condition information. Also create # motion confound text files for each subject. getsubjectinfo = Node(Function(input_names=['events', 'confounds'], output_names=['subject_info', # type: ignore 'outlier_file_paths'], function=get_subject_info), name='getsubjectinfo') # selectfiles node passes in necessary subject data into the get subject info node. # This node returns the subject info Bunch as well as a list of paths to a subjects # outlier text files to be used for modeling. l1_analyses.connect([(imagegrabber, getsubjectinfo, [("events", "events"), ("confounds", "confounds") ]) ]) # Create specify model node and set parameter values l1_specify_model = pe.Node( interface=model.SpecifyModel(), name="l1_specify_model") l1_specify_model.inputs.input_units = 'secs' # Can set this up to read from functional file json to make this not hard coded. l1_specify_model.inputs.time_repetition = 0.8 l1_specify_model.inputs.high_pass_filter_cutoff = 120 # Connect subject info node to specify model inputs node, passing along the subject # info Bunch and the outlier file paths l1_analyses.connect([(getsubjectinfo, l1_specify_model, [("subject_info", "subject_info"), ("outlier_file_paths", "outlier_files") ]) ]) # Connect apply_mask node to specify model node to pass in smoothed functional images l1_analyses.connect(apply_mask, 'out_file', l1_specify_model, 'functional_runs') # Create level 1 design node. Session info input will be passed in from SpecifyModel l1_design = pe.Node(interface=spm.Level1Design(), name="l1_design") l1_design.inputs.bases = {'hrf': {'derivs': [0, 0]}} l1_design.inputs.timing_units = 'secs' l1_design.inputs.interscan_interval = l1_specify_model.inputs.time_repetition l1_design.inputs.mask_image = my_config['roi_mask_path'] # Connect l1_design node to specify model node. Passing in session_info l1_analyses.connect(l1_specify_model, 'session_info', l1_design, 'session_info') # Create l1_model_estimate node. The output of this node is the spm_mat_file that # will be passed into contrast estimate l1_model_estimate = pe.Node( interface=spm.EstimateModel(), name="l1_model_estimate") l1_model_estimate.inputs.estimation_method = {'Classical': 1} l1_model_estimate.inputs.write_residuals = True # Connect l1_model_estimate node to l1_design node. The output is an updated spm mat file l1_analyses.connect(l1_design, 'spm_mat_file', l1_model_estimate, 'spm_mat_file') # Specify contrasts negative = ('Negative', 'T', ['Negative'], [1]) positive = ('Positive', 'T', ['Positive'], [1]) neutral = ('Neutral', 'T', ['Neutral'], [1]) neg_min_neutral = ('Negative > Neutral', 'T', ["Negative", 'Neutral'], [1, -1]) pos_min_neutral = ('Positive > Neutral', 'T', ["Positive", 'Neutral'], [1, -1]) # Contrasts need to be specified as a list l1_contrasts = [negative, positive, neutral, neg_min_neutral, pos_min_neutral] # Create contrast estimate node l1_contrast_estimate = pe.Node( interface=spm.EstimateContrast(group_contrast = False), name="l1_contrast_estimate" ) l1_contrast_estimate.inputs.contrasts = l1_contrasts # Connect l1_model_estimate to contrast estimate. l1_model_estimate outputs needed for input to # contrastestimate are beta_images, residual_image, and spm_mat_file. # Contrast estimate outputs: con_images, ess_images, spmF_images, spmT_images, and an updated spm_mat_file. l1_analyses.connect([(l1_model_estimate, l1_contrast_estimate, [('spm_mat_file', 'spm_mat_file'), ('beta_images', 'beta_images'), ('residual_image', 'residual_image') ]) ]) # Create datasink to save contrast images. We will need these for the second level analyses datasink = Node(DataSink(), name='datasink') datasink.inputs.base_directory = str( results_dir.joinpath(l1_analyses.name)) # type: ignore # Connect contrast estimate to datasink l1_analyses.connect(l1_contrast_estimate, 'con_images', datasink, 'con_images') # Contrast estimate node outputs a list and thresholding # only takes in one contrast at a time and requires a str # or pathlike object. MapNode will iteratively apply thresholding # to each element of the spmT_images list. We will also map this # node over contrast_index. When mapping over two iterables # they both need to be the same length and it gets mapped over # pair-wise meaning the first stat_image will be paired with contrast 1 # the second stat_image will be paired with contrast 2 and so on which # is what we want. l1_thresh = MapNode(spm.Threshold(use_topo_fdr=True, use_fwe_correction=False, extent_threshold=10, height_threshold=0.005, height_threshold_type='p-value', extent_fdr_p_threshold=0.05), iterfield=['stat_image', 'contrast_index'], name="l1_thresh") l1_thresh.inputs.contrast_index = [ con_id + 1 for con_id in range(len(l1_contrasts))] l1_analyses.connect([(l1_contrast_estimate, l1_thresh, [('spm_mat_file', 'spm_mat_file'), ('spmT_images', 'stat_image') ]) ]) # Write workflow graphs l1_analyses.write_graph(graph2use='hierarchical', dotfilename=f'{results_dir}/{l1_analyses.name}/{l1_analyses.name}_graph_hierarchical.dot') l1_analyses.write_graph( graph2use='exec', dotfilename=f'{results_dir}/{l1_analyses.name}/{l1_analyses.name}_graph_exec.dot') plugin_args = {'max_jobs': len(subject_ids), 'sbatch_args': '--time 72:00:00 --cpus-per-task 4 --account vabfmc --qos vabfmc-b --mem 16GB'} # Run first_level_analysis workflow l1_analyses.run(plugin='SLURM', plugin_args=plugin_args) # # Set up data structures for second level analyses # l2_base_dir = results_dir.joinpath(l1_analyses.name) # type: ignore # print(f"Second-level base directory is: {l2_base_dir}") # # Create workflow for 2nd level analyses # l2_analyses = Workflow(name="l2_analyses") # # Set base directory where outputs will be written to # l2_analyses.base_dir = results_dir # # Set up IdentityInterface node to iterate over contrast IDs and smoothing kernels # l2_contrast_iterator = Node(IdentityInterface(fields=['contrast_idxs', 'fwhm']), # name='l2_contrast_iterator') # # Each contrast set in the level1 analyses will generate a contrast image named # # conn_.nii where contrast number is a 4 digit number. # # Create list of possible contrasts to iterate over. # # The emotional task has 5 contrasts so this will generate the list # # [1,2,3,4,5] # contrast_idxs = [con_id + 1 for con_id in range(len(l1_contrasts))] # l2_contrast_iterator.iterables = [ # ('contrast_idx', contrast_idxs), ('fwhm', fwhm)] # l2_contrast_iterator.inputs.contrast_idx = contrast_idxs # l2_contrast_iterator.inputs.fwhm = fwhm # # Create contrastselector node to get contrasts # template = {"contrast_niis": str(Path('con_images').joinpath( # '_subject_id_*', '_fwhm_{fwhm}', 'con_000{contrast_idx}.nii'))} # type: ignore # contrastselector = Node(SelectFiles(template), # name='contrastselector') # # Set base directory for selector where files will be searched for relative to # contrastselector.inputs.base_directory = l2_base_dir # contrastselector.inputs.contrast_idx = contrast_idxs # contrastselector.inputs.fwhm = fwhm # # Connect contrast iterator to contrastselector # l2_analyses.connect([(l2_contrast_iterator, contrastselector, [('contrast_idx', 'contrast_idx'), # ('fwhm', 'fwhm') # ]) # ]) # # Create multiple regression design to test for group differences. # l2_multiple_reg = Node(spm.MultipleRegressionDesign(), # name="multiple_reg") # # Define function get get group membership binary vectors # def get_sub_group_binary_vecs(redcap_export: Path, included_subs: list): # # Read in record ID as str to get leading 0, the rest need to be int # subject_groups = pd.read_csv(redcap_export, # usecols=["record_id", "health_prob_category___1", "health_prob_category___2", # "health_prob_category___3", "health_prob_category___4"], # dtype={"record_id": str, # "health_prob_category___1": int, # "health_prob_category___2": int, # "health_prob_category___3": int, # "health_prob_category___4": int}) # # Filter redcap data to subjects who have complete data in fmriprep dataset # # get copy so we don't get setting copy warning # included_subjects = subject_groups.loc[subject_groups.record_id.isin( # included_subs), :].copy() # # SPM regression models need variables such as group membership to be binary int vectors/lists # hc = included_subjects.health_prob_category___1.tolist() # ptsd = included_subjects.health_prob_category___2.tolist() # tbi = included_subjects.health_prob_category___3.tolist() # combo = included_subjects.health_prob_category___4.tolist() # # Create all_subjects column that has 1's for all subjects that may be needed # all_subs = [1] * len(hc) # return (hc, ptsd, tbi, combo, all_subs) # # Get group membership binary vecs # # Grab redcap export csv. Note that only the most recent export should be present at the # # top level of the redcap dir. If there is more than one csv this will break # redcap_path = list(Path("..").joinpath("data", "redcap").glob("*.csv"))[0] # hc_list, ptsd_list, tbi_list, combo_list, all_subjects = get_sub_group_binary_vecs( # redcap_path, subject_ids) # l2_multiple_reg.inputs.covariates = [{'vector': hc_list, # 'name': 'hc'}, # {'vector': tbi_list, # 'name': 'tbi'}, # {'vector': ptsd_list, # 'name': 'ptsd'}, # {'vector': combo_list, # 'name': 'combo'}] # # Connect contrastselector node with onesamplettestdesign. # l2_analyses.connect(contrastselector, 'contrast_niis', # l2_multiple_reg, "in_files") # # Create estimate model node # l2_model_estimate = Node(spm.EstimateModel(estimation_method={'Classical': 1}), # name='l2_model_estimate') # # Connect onesamplettestdes to estimate model, passing the created spm_mat_file # l2_analyses.connect(l2_multiple_reg, 'spm_mat_file', # l2_model_estimate, 'spm_mat_file') # # EstimateContrast - estimates group contrast # l2_contrast_estimate = Node(spm.EstimateContrast(group_contrast=True), # name="l2_contrast_estimate") # # Create contrast to get mean activation across subjects for each contrast # # ptsd_mean_act = ('PTSD', 'T', ['ptsd'], [1]) # # tbi_mean_act = ('TBI', 'T', ['tbi'], [1]) # # combo_mean_act = ('Combo', 'T', ['combo'], [1]) # # hc_mean_act = ('HC', 'T', ['hc'], [1]) # ptsd_vs_hc = ('PTSD > HC', 'T', ['ptsd', 'hc'], [1, -1]) # tbi_vs_hc = ('TBI > HC', 'T', ['tbi', 'hc'], [1, -1]) # combo_vs_hc = ('PTSD+TBI > HC', 'T', ['combo', 'hc'], [1, -1]) # all_mean_act = ('Group', 'T', ['mean'], [1]) # l2_contrasts = [ptsd_vs_hc, tbi_vs_hc, combo_vs_hc, all_mean_act] # l2_contrast_estimate.inputs.contrasts = l2_contrasts # # Connect second level model estimate to second level contrast estimate # l2_analyses.connect([(l2_model_estimate, l2_contrast_estimate, [('spm_mat_file', # 'spm_mat_file'), # ('beta_images', # 'beta_images'), # ('residual_image', # 'residual_image') # ]) # ]) # # Threshold - thresholds contrasts # # Note that each contrast, when doing second level analysis, has index 1 # l2_thresh = MapNode(spm.Threshold(use_topo_fdr=True, # use_fwe_correction=False, # extent_threshold=10, # height_threshold=0.005, # height_threshold_type='p-value', # extent_fdr_p_threshold=0.05), # iterfield=['stat_image', 'contrast_index'], # name="l2_thresh") # l2_thresh.inputs.contrast_index = [ # con_id + 1 for con_id in range(len(l2_contrasts))] # # Connect second level contrast estimate to second level threshold # l2_analyses.connect([(l2_contrast_estimate, l2_thresh, [('spm_mat_file', # 'spm_mat_file'), # ('spmT_images', # 'stat_image') # ]) # ]) # # Write workflow graphs # l2_analyses.write_graph(graph2use='hierarchical', # dotfilename=f'{results_dir}/{l2_analyses.name}/{l2_analyses.name}_graph_hierarchical.dot') # l2_analyses.write_graph( # graph2use='exec', dotfilename=f'{results_dir}/{l2_analyses.name}/{l2_analyses.name}_graph_exec.dot') # plugin_args = {'max_jobs': len(l2_contrasts), # 'sbatch_args': '--time 72:00:00 --cpus-per-task 4 --account vabfmc --qos vabfmc-b --mem 16GB'} # # Run first_level_analysis workflow # l2_analyses.run(plugin='SLURM', plugin_args=plugin_args)