#!/bin/env python
import os
import sys
import subprocess
import configparser
from nipype import DataGrabber, DataSink, IdentityInterface, Node, Workflow, MapNode, JoinNode, Merge
from niflow.nipype1.workflows.dmri.fsl.dti import bedpostx_parallel
from nipype.interfaces import utility
import nipype.interfaces.fsl as fsl
import nipype.interfaces.ants as ants
from nipype.interfaces.freesurfer import ReconAll, MRIsConvert, MRIConvert
from nipype.interfaces.utility import Function
def create_diffusion_prep_pipeline(name='dMRI_preprocessing', bet_frac=0.34):
input_subject = Node(
IdentityInterface(
fields=['dwi', 'bval', 'bvec'],
),
name='input_subject'
)
def bvec_flip(bvecs_in, flip):
from os.path import join, basename
from os import getcwd
import numpy as np
print(bvecs_in)
bvecs = np.loadtxt(bvecs_in).T * flip
output_file = str(join(getcwd(), basename(bvecs_in)))
np.savetxt(output_file, bvecs)
return output_file
if __name__ == '__main__':
dmri_preprocess_workflow = create_diffusion_prep_pipeline(
'dmri_preprocess')
config = configparser.ConfigParser()
config.read(sys.argv[1])
PATH='/path/to/my/script'
subject_list = config['DEFAULT']['id_list'].split(" ")
visits = list(config['DEFAULT']['visits'])
subjects_dir = config['DEFAULT']['subjects_dir']
sessions = list(config['DEFAULT']['sessions'])
infosource = Node(IdentityInterface(fields=['subject_id', 'visit','session']),
name='subjects')
infosource.iterables = [('subject_id', subject_list), ('visit', visits), ('session', sessions)]
subject_id_visit = Node(
interface=Function(
input_names=['subject_id', 'visit'], output_names=['composite_id'],
function=lambda subject_id, visit: '{}_{}'.format(subject_id, visit)
),
name='subject_id_visit'
)
data_source = Node(DataGrabber(infields=['subject_id', 'visit','session'],
outfields=['dwi', 'bval', 'bvec', 'T1']),
name='data_grabber')
data_source.inputs.sort_filelist = True
data_source.inputs.base_directory = config['DEFAULT']['base_directory']
data_source.inputs.template = ''
data_source.inputs.field_template = {
'T1': '%s/visit%s/session%s/anat/T1w.nii',
'dwi': '%s/visit%s/session%s/dwi/dwi_raw.nii.gz',
'bval': '%s/visit%s/session%s/dwi/dti_raw.bvals',
'bvec': '%s/visit%s/session%s/dwi/dti_raw.bvecs',
#'bval': '%s/visit%s/session%s/dwi/dwepi.150.bvals',
#'bvec': '%s/visit%s/session%s/dwi/dwepi.150.grads'
}
data_source.inputs.template_args = {
template: [['subject_id', 'visit','session']]
for template in data_source.inputs.field_template.keys()
}
flip_bvectors_node = Node(
interface=Function(
input_names=['bvecs_in', 'flip'], output_names=['bvecs_out'],
function=bvec_flip
),
name='flip_bvecs',
)
flip_bvectors_node.inputs.flip = (-1, 1, 1)
recon_all = Node(interface=ReconAll(), name='recon_all')
recon_all.inputs.directive = 'all'
recon_all.inputs.subjects_dir = subjects_dir
recon_all.inputs.openmp = 20
recon_all.inputs.mprage = True
recon_all.inputs.parallel = True
recon_all.interface.num_threads = 20
recon_all.inputs.flags = "-no-isrunning"
bedpostx_parallel_wf_params = dict(n_fibres=3,
fudge=1,
burn_in=1000,
n_jumps=1250,
sample_every=25)
bedpostx_parallel_wf = bedpostx_parallel(
'nipype_bedpostx_parallel', params=bedpostx_parallel_wf_params)
mri_convert = Node(interface=MRIConvert(), name='mri_convert')
mri_convert.inputs.out_type = 'nii'
mri_convert.inputs.subjects_dir = subjects_dir
workflow = Workflow('diffusion_workflow_new_mgt', base_dir=PATH)
workflow.connect([
(infosource, data_source, [('subject_id', 'subject_id'),
('visit', 'visit'),
('session','session')]),
(infosource, subject_id_visit, [
('subject_id', 'subject_id'),
('visit', 'visit')
]),
(data_source, flip_bvectors_node, [('bvec', 'bvecs_in')]),
(data_source, dmri_preprocess_workflow,
[
('dwi', 'input_subject.dwi'),
('bval', 'input_subject.bval'),
]),
(data_source, recon_all, [('T1', 'T1_files')]),
(subject_id_visit, recon_all, [('composite_id', 'subject_id')]),
(flip_bvectors_node, dmri_preprocess_workflow, [('bvecs_out', 'input_subject.bvec')]),
(recon_all, mri_convert, [('brain', 'in_file')]),
(mri_convert, dmri_preprocess_workflow, [
('out_file', 'input_template.T1'),
('out_file', 'input_template.T2')
]),
(
dmri_preprocess_workflow,
bedpostx_parallel_wf,
[('output.bval', 'inputnode.bvals'),
('output.bvec_rotated', 'inputnode.bvecs'),
('output.dwi_rigid_registered', 'inputnode.dwi'),
('output.mask', 'inputnode.mask')],
),
])
workflow.write_graph(format='pdf', simple_form=False)
if config['DEFAULT'].get('server', '').lower() == 'margaret':
workflow.run(plugin='Linear')
#workflow.run(plugin='MultiProc', plugin_args={'n_procs':220, 'memory_gb': 320})