Workflow execution time with simple intermediate nodes

Hi nipype experts,

Sorry in advance if this question is too trivial. I am currently working on building a nipype workflow that performs small steps for manipulating neuroimaging related data. Each step is encoded in its own defined function. What I’ve done is to embed each of these functions as nodes using the Function utility interface. This works OK, but I am worried about the execution time of the workflow.

For instance, let’s say that my workflow consists of the folllowing steps:

read_input --> operation1 --> operation2 --> export_data

Here is a simple example doing this:

import numpy as np
import nipype.pipeline.engine as pe
import nipype.interfaces.utility as util

wf = pe.Workflow(name="My-worfklow")

# generate some data
data=np.random.normal(size=(200000, 10))

# input node of the workflow
node_input = pe.Node(util.IdentityInterface(fields = ['input_data']),
                     name = "inputNode")

def square(input_data):
    square_data = input_data**2
    return square_data

# node performing the square of each element of the data
node_square = pe.Node(util.Function(input_names = ['input_data'],
                                    output_names = ['square_data'],
                                    function = square),
                      name = "square")

def sq_root(input_data):
    sqroot_data = np.sqrt(input_data)
    return sqroot_data

# node performing the square root of each element of the data
node_sqroot = pe.Node(util.Function(input_names = ['input_data'],
                                    output_names = ['sqroot_data'],
                                    function = sq_root,
                                    imports = ["import numpy as np"]),
                      name = "squareRoot")

# output node of the workflow
output_node = pe.Node(util.IdentityInterface(fields = ['output_data']),
                      name = "outputNode")

wf.connect([(node_input, node_square, [('input_data','input_data')])])
wf.connect([(node_square, node_sqroot, [('square_data','input_data')])])
wf.connect([(node_sqroot, output_node, [('sqroot_data','output_data')])])

wf.inputs.inputNode.input_data = data

wf.run()

The problem is that running this workflow is much slower than using the functions on the data directly (here only like 3 secs slower, but imagine using more complex operations). I get this and it makes sense, because each node is writing and reading from the hard disk, whereas if you use the functions directly on the data, this takes place on RAM and therefore is much faster.

I am just wondering whether is there a way of speeding things up in this kind of situations in nipype. I guess I could always gather all the small operations into one big function and use this as the only intermediate node in the workflow or even create an interface incorporating all the intermediate operations. The problem with doing this is that you lose traceability of what’s going on in the middle.

Have you ever encountered a similar problem? Any thoughts/recommendations?

Thanks a bunch!

Nipype is definitely not an efficient workflow engine when each task is a simple operation. In such cases calling these operations in a single function node is a better approach.

one possibility is to use /dev/shm as your working directory space if you have enough ram to support this. obviously this can only be done for some subset of use-cases, but would atleast alleviate the file i/o part of nipype.

we have considering chaining operations in memory, but then we would need to have different models for different schedulers. also from a nipype perspective, till we “understood” the computations better, it would be really hard to optimize, while tracking things like provenance. At present there is not automation to optimize this, although our future version of the engine may have some support for this. Not there yet though.