Nipype gets stuck at gunzip node for branches with invalid input instead of crashing

Summary of what happened:

I have a BIDS-like dataset that looks like this:

test_gunzip/
└── sub-INDI1W934
    └── ses-01
        └── func
            ├── sub-INDI1W934_ses-01_task-faces_space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii.gz
            ├── sub-INDI1W934_ses-01_task-mid_space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii.gz
            └── sub-INDI1W934_ses-01_task-nback_space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii.gz

I already know that in the future, there will be more sessions for each subject and that for some subjects the second session could be missing (due to drop outs). I want to implement a nipype script that runs a first-level analysis (using SPM) for each subject, each task and each session. It makes sense to write my code in a fashion that I iterate over each combination of subjects, task, and sessions knowing that this will lead to some branches failing (in this case, for all ses-02-files because right now, these files don’t exist, and I also cannot guarantee that each subject will participate in each session). The problem is that the gunzip-node (I need to unzip the .nii.gz files for SPM) never exits. I would expect that all gunzip nodes for session 02 should fail because I provide an invalid None value to them.

Command used (and if a helper script was used, a link to the helper script or the command generated):

from bids import BIDSLayout
from nipype import Workflow,Node
from nipype.interfaces.utility import Function
from nipype.algorithms.misc import Gunzip

# set task specific data directory
data_directory = '/home/johannes.wiesner/work/testing/test_gunzip/'

# get functional images as dataframe
data_df = BIDSLayout(data_directory,validate=False).to_df()

# define a node that gets data 
# (if you cannot find a file for this subject, task, session, return None)
def get_files(data_df,subject,task,session):
    
    # query dataframe for a specific subject, specific task and specific session
    subject_df = data_df.query("subject==@subject & task==@task & session==@session")
    subject_df = subject_df.set_index('suffix')
    
    if not subject_df.empty:
        func = subject_df.at['bold','path']
    else:
        func = None
        
    return func

file_getter = Node(Function(input_names=['data_df','subject','task','session'],
                            output_names='func',
                            function=get_files),
                    name='file_getter')

file_getter.inputs.data_df = data_df
file_getter.iterables = [('subject',data_df['subject'].unique()),
                         ('task',['nback','faces','mid']),
                         ('session',['01','02'])]

# unzip the images
gunzipper = Node(Gunzip(),name='gunzipper')

# run the workflow
wf = Workflow(name='debug_gunzip')
wf.base_dir = './'
wf.config['execution']['crashfile_format'] = 'txt'
wf.connect(file_getter,'func',gunzipper,'in_file')
wf_results = wf.run('MultiProc',plugin_args={'n_procs':3})

Version:

nipype version: 1.8.6

Environment (Docker, Singularity, custom installation):

Nipype in a conda environment on a Ubuntu machine (also tested inside a docker container)

Relevant log outputs (up to 20 lines):

Nipype seems to have struggle setting up the gunzipper directory for all ses-02 folders. For the ses-01 folders everything works (see the directory tree of my workflow directory below).

FileNotFoundError: /home/johannes.wiesner/work/projects/project_indicate/code/debug_gunzip/_session_02_subject_INDI1W934_task_mid/gunzipper/result_gunzipper.pklz
230628-16:56:47,637 nipype.workflow INFO:
	 [MultiProc] Running 3 tasks, and 12 jobs ready. Free memory (GB): 55.68/56.28, Free processors: 0/3.
                     Currently running:
                       * debug_gunzip.gunzipper
                       * debug_gunzip.gunzipper
                       * debug_gunzip.gunzipper

Screenshots / relevant information:

Output from the workflow directory:

debug_gunzip
├── d3.js
├── graph1.json
├── graph.json
├── index.html
├── _session_01_subject_INDI1W934_task_faces
│   ├── file_getter
│   │   ├── _0xb0d52d4c470d86121cceb72f3ee346bc.json
│   │   ├── _inputs.pklz
│   │   ├── _node.pklz
│   │   ├── _report
│   │   │   └── report.rst
│   │   └── result_file_getter.pklz
│   └── gunzipper
│       ├── _0xd14c9e25a031bbfb9fb2033aa0890176.json
│       ├── _inputs.pklz
│       ├── _node.pklz
│       ├── _report
│       │   └── report.rst
│       ├── result_gunzipper.pklz
│       └── sub-INDI1W934_ses-01_task-faces_space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii
├── _session_01_subject_INDI1W934_task_mid
│   ├── file_getter
│   │   ├── _0x98b1fd56fee60b072781e9833e6b7443.json
│   │   ├── _inputs.pklz
│   │   ├── _node.pklz
│   │   ├── _report
│   │   │   └── report.rst
│   │   └── result_file_getter.pklz
│   └── gunzipper
│       ├── _0x6ed69c476e0a6186ff1f554797d72ff6.json
│       ├── _inputs.pklz
│       ├── _node.pklz
│       ├── _report
│       │   └── report.rst
│       ├── result_gunzipper.pklz
│       └── sub-INDI1W934_ses-01_task-mid_space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii
├── _session_01_subject_INDI1W934_task_nback
│   ├── file_getter
│   │   ├── _0x5ad9a982e728af9d5edd169f53c065e8.json
│   │   ├── _inputs.pklz
│   │   ├── _node.pklz
│   │   ├── _report
│   │   │   └── report.rst
│   │   └── result_file_getter.pklz
│   └── gunzipper
│       ├── _0x48c76f3c4131b759a51b226c0f25a661.json
│       ├── _inputs.pklz
│       ├── _node.pklz
│       ├── _report
│       │   └── report.rst
│       ├── result_gunzipper.pklz
│       └── sub-INDI1W934_ses-01_task-nback_space-MNI152NLin2009cAsym_res-2_desc-preproc_bold.nii
├── _session_02_subject_INDI1W934_task_faces
│   └── file_getter
│       ├── _0xf30317cbe64f29788e0aaa0734d9f96c.json
│       ├── _inputs.pklz
│       ├── _node.pklz
│       ├── _report
│       │   └── report.rst
│       └── result_file_getter.pklz
├── _session_02_subject_INDI1W934_task_mid
│   └── file_getter
│       ├── _0xeee937a981c3415ad4039d1ec13e9ab5.json
│       ├── _inputs.pklz
│       ├── _node.pklz
│       ├── _report
│       │   └── report.rst
│       └── result_file_getter.pklz
└── _session_02_subject_INDI1W934_task_nback
    └── file_getter
        ├── _0x6657d86580f0594179158cccad03220f.json
        ├── _inputs.pklz
        ├── _node.pklz
        ├── _report
        │   └── report.rst
        └── result_file_getter.pklz

Okay, the only workaround so far that I’ve found is to raise an Error inside the file_getter node like this:

# define a node that gets data
def get_files(data_df,subject,task,session):
    
    from nipype.interfaces.base.support import NipypeInterfaceError
    
    # query dataframe for a specific subject, specific task and specific session
    subject_df = data_df.query("subject==@subject & task==@task & session==@session")
    subject_df = subject_df.set_index('suffix')
    
    if not subject_df.empty:
        func = subject_df.at['bold','path']
        return func
    else:
        raise NipypeInterfaceError('No files found for this subject')
        return 

file_getter = Node(Function(input_names=['data_df','subject','task','session'],
                            output_names='func',
                            function=get_files),
                    name='file_getter')

Only then, then workflow doesn’t get stuck at the gunzipper node. This will produce this output after the workflow has run:

The above exception was the direct cause of the following exception:

Traceback (most recent call last):

  File ~/.conda/envs/csp_wiesner_johannes_new/lib/python3.8/site-packages/spyder_kernels/py3compat.py:356 in compat_exec
    exec(code, globals, locals)

  File ~/work/projects/project_indicate/code/debug_gunzip.py:79
    wf_results = wf.run('MultiProc',plugin_args={'n_procs':6})

  File ~/.conda/envs/csp_wiesner_johannes_new/lib/python3.8/site-packages/nipype/pipeline/engine/workflows.py:638 in run
    runner.run(execgraph, updatehash=updatehash, config=self.config)

  File ~/.conda/envs/csp_wiesner_johannes_new/lib/python3.8/site-packages/nipype/pipeline/plugins/base.py:224 in run
    raise error from cause

RuntimeError: 3 raised. Re-raising first.

Which is a little bit misleading, because it was to be expected that the workflow coulnd’t run for all second sessions (they simply don’t exist). Would be nice if nipype had a feature to properly catch these exceptions without having to raise Errrors. But I think there will be such a feature, as @satra already mentioned in this comment? However, I still don’t really get why the gunzip nodes get stuck. Shouldn’t they check the input (aka. output from file_getter) and then fail because the input is None?