Joining over two iterators

I was hoping someone could let me know why the following toy example throws an error running the first join node

from nipype.pipeline.engine import Workflow, Node, JoinNode
from nipype.interfaces.utility import IdentityInterface


workflow = Workflow('test_workflow')

subjects = Node(
    name='subjects',
    interface=IdentityInterface(
        fields=['subject_id']))

subjects.iterables = ('subject_id', ['subject1', 'subject2', 'subject3'])

visits = Node(
    name='visits',
    interface=IdentityInterface(
        fields=['visit_id']))

visits.iterables = ('visit_id', ['visit1', 'visit2'])

sessions = Node(
    name='sessions',
    interface=IdentityInterface(
        fields=['subject_id', 'visit_id']))

join_subjects = JoinNode(
    name='join_subjects',
    interface=IdentityInterface(
        fields=['subject_ids', 'visit_ids']),
    joinsource='subjects',
    joinfield=['subject_ids', 'visit_ids'])

join_visits = JoinNode(
    name='join_visits',
    interface=IdentityInterface(
        fields=['subject_ids', 'visit_ids']),
    joinsource='visits',
    joinfield=['subject_ids', 'visit_ids'])

workflow.connect(subjects, 'subject_id', sessions, 'subject_id')
workflow.connect(visits, 'visit_id', sessions, 'visit_id')
workflow.connect(sessions, 'subject_id', join_subjects, 'subject_ids')
workflow.connect(sessions, 'visit_id', join_subjects, 'visit_ids')
workflow.connect(join_subjects, 'subject_id', join_visits, 'subject_ids')
workflow.connect(join_subjects, 'visit_id', join_visits, 'visit_ids')

result = workflow.run()

gives the error

Traceback (most recent call last):
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/plugins/linear.py", line 48, in run
    node.run(updatehash=updatehash)
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 472, in run
    result = self._run_interface(execute=True)
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 563, in _run_interface
    return self._run_command(execute)
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 888, in _run_command
    self._collate_join_field_inputs()
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 898, in _collate_join_field_inputs
    val = self._collate_input_value(field)
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 928, in _collate_input_value
    for idx in range(self._next_slot_index)
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 928, in <listcomp>
    for idx in range(self._next_slot_index)
  File "/Users/tclose/git/ni/nipype/nipype/pipeline/engine/nodes.py", line 947, in _slot_value
    field, index, e))
AttributeError: The join node test_workflow.join_subjects does not have a slot field visit_idsJ2 to hold the visit_ids value at index 1: 'DynamicTraitedSpec' object has no attribute 'visit_idsJ2'

I was expecting the combination of subjects and visits iterables in the sessions node would result in 6 nodes when the workflow is run, one for each subject/visit ID pair, and that these pairs would then be combined into a list of lists over the two join nodes.

Hi @tclose, I think what you want to do is joining over subject_ids in join_subjects, and over visit_idsinjoin_visits`, is that right?

You can try joinfield='subject_ids' in join_subjects and joinfield='visit_ids' in join_visits

I am basically attempting to get a combination of all subject/visit ID pairs in a single list. In the real code the last join node chains the inner lists together.

The reason I am looking to do this is I have a tool that normalises a group of so I need to provide the input images as a list, but then for downstream processing I need to expand the outputs into separate nodes again (I basically need the inverse of a MapNode). So the way I have attempted to do it is to have a special “selector” node that takes the list of normalised images and a list of matching subject and visit IDs and a copy of the subject and visit ID from the iterator node and normalised image from the list that matches the given subject/visit ID pair. Something like

from itertools import chain
from nipype.pipeline.engine import Workflow, Node, JoinNode
from nipype.interfaces.utility import IdentityInterface
import nipype.interfaces.io as nio
from nipype.interfaces.io import IOBase
from nipype.interfaces.base import (
    InputMultiPath, BaseInterfaceInputSpec, traits, TraitedSpec)


class Chain(IdentityInterface):

    def _list_outputs(self):
        outputs = super(Chain, self)._list_outputs()
        chained_outputs = {}
        for k, v in outputs.items():
            chained_outputs[k] = list(chain(*v))
        return chained_outputs


class SelectSessionInputSpec(BaseInterfaceInputSpec):
    inlist = InputMultiPath(
        traits.Any, mandatory=True, desc='List of items to select from')
    subject_ids = traits.List(traits.Str, mandatory=True,
                              desc=('List of subject IDs corresponding to the '
                                    'provided items'))
    visit_ids = traits.List(traits.Str, mandatory=True,
                            desc=('List of visit IDs corresponding to the '
                                  'provided items'))
    subject_id = traits.Str(mandatory=True, desc='Subject ID')
    visit_id = traits.Str(mandatory=True, desc='Visit ID')


class SelectSessionOutputSpec(TraitedSpec):
    out = traits.Any(desc='selected value')


class SelectSession(IOBase):
    """Basic interface class to select session from a list"""

    input_spec = SelectSessionInputSpec
    output_spec = SelectSessionOutputSpec

    def _list_outputs(self):
        outputs = self._outputs().get()
        assert len(self.inputs.subject_ids) == len(self.inputs.inlist)
        assert len(self.inputs.visit_ids) == len(self.inputs.inlist)
        session_ids = list(zip(self.inputs.subject_ids, self.inputs.visit_ids))
        index = session_ids.index((self.inputs.subject_id,
                                   self.inputs.visit_id))
        outputs['out'] = self.inputs.inlist[index]
        return outputs


workflow = Workflow('test_workflow')

subjects = Node(
    name='subjects',
    interface=IdentityInterface(
        fields=['subject_id']))

subjects.iterables = ('subject_id', ['subject1', 'subject2', 'visit2'])

visits = Node(
    name='visits',
    interface=IdentityInterface(
        fields=['visit_id']))

visits.iterables = ('visit_id', ['visit1', 'visit2'])

sessions = Node(
    name='sessions',
    interface=IdentityInterface(
        fields=['image', 'subject_id', 'visit_id']))

datasource = Node(
    nio.DataGrabber(
        base_directory='/my-dataset',
        template='subject%s/visit_%s/image.nii.gz',
        infields=['subject_id', 'visit_id'],
        sort_filelist=True),
    name='datasource')

normaliser = Node(
    Normaliser(),
    name='normaliser')


join_subjects = JoinNode(
    name='join_subjects',
    interface=IdentityInterface(
        fields=['subject_ids', 'visit_ids']),
    joinsource='subjects',
    joinfield=['subject_ids', 'visit_ids'])

join_visits = JoinNode(
    name='join_visits',
    interface=Chain(
        fields=['subject_ids', 'visit_ids']),
    joinsource='visits',
    joinfield=['subject_ids', 'visit_ids'])

selector = Node(
    SelectSession(),
    name='selector')

...

Session specific workflow to follow

...

workflow.connect(subjects, 'subject_id', sessions, 'subject_id')
workflow.connect(visits, 'visit_id', sessions, 'visit_id')
workflow.connect(sessions, 'subject_id', datasource, 'subject_id')
workflow.connect(sessions, 'visit_id', datasource, 'visit_id')
workflow.connect(datasource, 'outfiles', normaliser, 'infiles')
workflow.connect(sessions, 'subject_id', join_subjects, 'subject_ids')
workflow.connect(sessions, 'visit_id', join_subjects, 'visit_ids')
workflow.connect(join_subjects, 'subject_id', join_visits, 'subject_ids')
workflow.connect(join_subjects, 'visit_id', join_visits, 'visit_ids')
workflow.connect(normaliser, 'outfiles', selector, 'inlist')
workflow.connect(join_visits, 'subject_ids', selector, 'subject_ids')
workflow.connect(join_visits, 'visit_ids', selector, 'visit_ids')
workflow.connect(sessions, 'subject_id', selector, 'subject_id')
workflow.connect(sessions, 'visit_id', selector, 'visit_id')


result = workflow.run()

If you want to have a list of all the subject/visits pairs, you can try this:

workflow = Workflow('test_workflow')
wf_sub.base_dir = "tmp"

subjects = Node(
    name='subjects',
    interface=IdentityInterface(
        fields=['subject_id']))

subjects.iterables = ('subject_id', ['subject1', 'subject2', 'subject3'])

visits = Node(
    name='visits',
    interface=IdentityInterface(
        fields=['visit_id']))

visits.iterables = ('visit_id', ['visit1', 'visit2'])

# merging subjects and visits ids 
def merge(subject_id, visit_id):
    return (subject_id, visit_id)

sessions = Node(Function(input_names=["subject_id", "visit_id"],
                        output_names=["pair"],
                        function=merge, name='sessions'), name="session")

# for join node: create a list from all elements
def create_list(pair):
    return list(pair)

join_list = JoinNode(Function(input_names=['pair'],
                    output_names=['pairs_list'],
                    function=create_list), 
            name='list', joinsource='subjects', joinfield=['pair'])


# for join node: concatenate all lists from previous join node
def concatenate(pairs_list):
    out = []
    for el in pairs_list:
        out += el
    return out

join_concate = JoinNode(Function(input_names=['pairs_list'],
                        output_names=['all_pairs'],
                        function=concatenate), 
               name='con', joinsource='visits', joinfield=['pairs_list'])

workflow.connect(subjects, 'subject_id', sessions, 'subject_id')
workflow.connect(visits, 'visit_id', sessions, 'visit_id')
workflow.connect(sessions, 'pair', join_list, 'pair')
workflow.connect(join_list, 'pairs_list', join_concate, "pairs_list")

result = workflow.run()

Let me know if this solves your problem!