Retain output of custom function using datasink

Hi everyone,
I have happily integrated a custom function of mine in a nipype workflow for the first time. Although generally speaking it works fine, in the sense that the input of the previous node get correctly fed to the function and the output of the function get correctly fed to the following node, I am left wondering about what i should do if I wanted to collect the output of my custom function using a datasink.
Below the code that I have up until now:

    from nipype.interfaces.dcm2nii import Dcm2niix
    from nipype.interfaces.spm.preprocess import Normalize
    from nipype.pipeline import Node
    from nipype.pipeline import Workflow
    from nipype.interfaces.io import DataSink
    from nipype.interfaces.utility import Function
    from glob import glob
    from os import chdir

def center_volumes(to_be_centered_file, out_files = None):
    """
This function center the volume(s) given as input by setting
the offset of the volume so that the center is in the middle of the box.
If no out_files name is given than the "centered_" will be added to the 
original name. 
:in_files: name of the volume as a string
:out_files (optional): the name of the centered volumes, default 
to None
:return: centered image as a nifti file
    """
    from nipy import load_image
    import nibabel as nib
    import os
    import numpy as np

    img_name = os.path.basename(to_be_centered_file)
    img = load_image(to_be_centered_file)
    img_centered_header = img.metadata["header"]
    img_dim = img_centered_header["dim"][1:4]
    img_pixdim = img_centered_header["pixdim"][1:4]
    img_centered_header["qoffset_z"] = (img_dim[2]* img_pixdim[2]*
                        np.sign(img.metadata["header"]["srow_z"][2])*-1)/2
    img_centered_header["srow_z"][3] = img_centered_header["qoffset_z"]
    img_centered_header["qoffset_y"] = (img_dim[1]* img_pixdim[1]*
                        np.sign(img.metadata["header"]["srow_y"][1])*-1)/2
    img_centered_header["srow_y"][3] = img_centered_header["qoffset_y"]
    img_centered_header["qoffset_x"] = (img_dim[0]* img_pixdim[0]*
                        np.sign(img.metadata["header"]["srow_x"][0])*-1)/2
    img_centered_header["srow_x"][3] = img_centered_header["qoffset_x"]
    img_original_affine = img.affine
    img_centered_affine = img_original_affine[~(img_original_affine==0).all(0)]
    img_centered_affine = img_centered_affine[:,~(img_centered_affine==0).all(0)]
    img_centered_affine[[0,1,2],3] = np.array([img_centered_header["qoffset_x"],
                         img_centered_header["qoffset_y"],
                         img_centered_header["qoffset_z"]])
    img_centered = nib.Nifti1Image(img.get_data(), img_centered_affine, header = img_centered_header)
    if out_files is None:
        centered_filename = "centered_{}".format(img_name)
        img_centered.to_filename(centered_filename)
    else:
        img_centered.to_filename(out_files)
    return(os.path.abspath(centered_filename))


dcm2nii = Node(Dcm2niix(compress = "n"), "dcm2nii")
norm = Node(Normalize(template = "/mnt/mydir/TEMPLATE_FDGPET_100.nii"), "norm")
func = Node(Function(input_names=["to_be_centered_file"],
                             output_names=["centered_filename"],
                             function=center_volumes), "func")
dsk = Node(DataSink(base_directory="/mnt/mydir/", container="spm_preproc"), "dsk")

app_spm = Workflow(name = "app_spm")
app_spm.base_dir = "/mnt/mydir/"

app_spm.connect([
        (dcm2nii, func, [("converted_files", "to_be_centered_file")]),
        (func, norm, [("centered_filename", "source")]),
        (norm, dsk, [("normalized_source", "norm")]),
        ])



chdir("/mnt/mydir/")    

ss = glob("*")[0:70]

for s in ss:    
    app_spm.inputs.dcm2nii.source_dir = "/mnt/mydir/" + s
    app_spm.run()

My point is that I do not know which output of my custom function use in the connection with datasink, since in this case the output I am interested in is not something that the function return, but a side effect of the function.

Any suggestion or tip will be greatly appreciated

Ahoi hoi @fednem,

could you maybe provide a bit more information on what exactly you want to do, that
is what result/side effect of your function should be place into the datasink?
In general, it should be as easy as adjusting your function to return whatever you need and connect
this output to the datasink (only posting relevant lines below):

def my_function(input):

     a = input +3
     b = input + 4

     return a,b

my_function_node = Node(Function(input_names=['input'],
                                 output_names=['a','b'],
                                 function=my_function), 
                                 name='my_function_node')

my_workflow.connect([
            (my_function_node, datasink, [('a', 'output_a')]),
            (my_function_node, datasink, [('b', 'output_b')])])

HTH, best, Peer

Hi,
thanks for your answer. I’ll give more details below.

Basically the custom function is meant to put the origin of any input image roughly in the center of the brain in order to roughly align it with a template for further normalization.

So what the function does is writing a nifti file identical to the input nifti file except for where the origin is and returning the name of the centered nifti file as a string. The name (or to be more precise the path) is returned so that I can use it as input (source) for a Normalize node in the workflow.

So, if I got the semantic correctly, the nifti file that is written is a side effect of my function, in that it is not something that the function is returning but a modification that happen outside the function. I guess that my point is that since I cannot return the written file, I do not know how to link this written file with the dataskin node in order to collect the centered file as an intermediate step that can be useful later on.

I really hope that I have been more clear now, even if I am not so sure.

F.

Hi, anyone has any take on this ?

call pattern: function A --> calls some function/tool --> produces nifti file --> returns to function A --> function A returns

this is the pattern for all nipype interfaces. it is expected that one knows where the file is produced and what it’s name is. so in theory one should be able to return this. essentially one needs to use the information of what the other function or tool will do to determine where this file is and what its name will be.

for any deterministic tool this should be doable. if, however, the tool randomly creates a file, for example in some random temporary directory, then it gets more complicated. in such a scenario you may have to control the temporary directory location via setting some environment variable or some other approach.

also if in the future the tool changes behavior, function A would need to be updated to reflect such changes.

Hi Satra, thanks a lot for your answer. I see what you mean, and indeed I do know where the output file of the custom function is written, and I do know its name. However, I still don’t see how this can be combined with the datasink node.

When I use an spm.Normalize node, I can link the output “normalized_files” as input to a datasink node and I know that the normalized files (but not other output that I am not interested in) will be written (or moved) to the container specified in the datasink instance. I would like to be able to do the same with the output (i.e. the written centred nifti file) of my custom function.

you can name multiple outputs for a function, and then connect that output the way you normally would to a datasink node.

def my_func(in1, in2):
   return in2 - in1, in2 + in1

Node(Function(my_func, output_names=['out1', 'out2']), ...)
...
wf.connect(funcnode, 'out2', sinker, 'filesinc')

Hi Satra, thanks again for your inputs and sorry for bothering you so much. The whole point of my confusion is that a written file cannot be something a function return. I mean, I could return a Nifti object using nibabel.Nifti1Image or any other format, but AFAIK, I cannot return a file. Hence, I cannot see how your example can extend to my case. I start feeling that there’s something extremely obvious that I’m missing.

one can always return a pointer to a file (e.g., a path).

# Create a small example function
def add_two(x_input):
    import os
    filename = os.path.join(os.getcwd(), 'test.txt')
    with open(filename, 'w') as fp:
        fp.write('test')
    return x_input + 2, filename

# Import Node and Function module
from nipype import Node, Function

# Create Node
addtwo = Node(Function(input_names=["x_input"],
                       output_names=["val_output", "filename"],
                       function=add_two),
              name='add_node')

addtwo.inputs.x_input = 4
addtwo.run()

addtwo.result.outputs
   filename = /tmp/tmp9cfxtbdf/add_node/test.txt
   val_output = 6