DerivativesDatasink corrupting files and bus error

Hi,

I’m using nipype 1.6.0 to run a short workflow that includes DerivativesDataSink (ds_ nodes) from niworkflows.interfaces.bids (niflow-nipype1-workflows 0.0.4). I’m having issues setting my outputs to be in NIFTI format, instead of GZ.

If I work with the original data types, my final output is readable but it’s a .nii.gz file. So, I tried to i) set the ds_ node to compress=False; ii) have the topup node output a .nii file; ii) add a gunzip node between topup and ds_. However, when doing any of the above I get two general issues: 1) running the ds_ nodes corrupts both its input and output (they become 0 bytes files), 2) if I run the workflow with the SLURMGraph plugin I get an AttributeError: Can't get attribute 'DerivativesDataSink' in a crashfile, and if I run the workflow with MultiProc I get a Fatal Python error: Bus error.

Code and errors below! Any suggestion is welcomed!

CODE

####################  PLUGINS   ##############################
#MULTIPROC
plugin_settings_MultiProc = {'plugin': 'MultiProc',
		'plugin_args': {'raise_insufficient': False,
		'maxtasksperchild': 1}}
plugin_settings_MultiProc['plugin_args']['n_procs'] = avail_cpu #cpu_count()

#SLURM
plugin_settings_SLURMGraph = {'plugin': 'SLURMGraph',
		'plugin_args': {'dont_resubmit_completed_jobs': True,
			'sbatch_args': '--mem=2G'}}

####################  RUN TOPUP  ##############################
def fmaps_topup_fsl_wf(indir, matched_files, outdir, name='wf_topup_fmaps'):
	workflow = Workflow(name=name)

	###############  DEFINE NODES ################    
	# datasource
	datasource = Node(Function(
		function=_dict_ds,
		input_names = ['in_dict', 'ID'],
		output_names=['IntendedForFile1','IntendedForFile2', 'fmaps']
	),name='datasource')
	datasource.inputs.in_dict = matched_files
	datasource.iterables = ('ID', sorted(matched_files.keys(),reverse=True))

	# fslmerge
	merge_epi = Node(interface=fsl.Merge(dimension='t'),name="merge_epi")

	# fsl topup
	est_topup = Node(interface=TOPUP(
		#output_type = 'NIFTI',
		fwhm= 6),name="est_topup")

	# gunzip
	gunzip = Node(Gunzip(), name='gunzip')  
	#gunzip = MapNode(Gunzip(), name='gunzip', iterfield=['in_file'])

	# Datasink
	ds_topup_fout1 = Node(DerivativesDataSink(
		base_directory=str(outdir), 
		out_path_base = 'topup_fmaps',
		#keep_dtype=False, 
		#data_dtype='NIFTI',
		#compress=False,
		suffix='topup_fmap1', 
	), name='ds_topup_fout1', run_without_submitting=True) #, iterfield=["in_file", "source_file"]

	################  CONNECT NODES ################
	workflow.connect([   
		(datasource, merge_epi, [('fmaps', 'in_files')]),
		(merge_epi, est_topup, [('merged_file', 'in_file')]),
		(est_topup, gunzip, [('out_field', 'in_file')]),

		# datasink
		(gunzip, ds_topup_fout1, [('out_file', 'in_file')]),
		(datasource, ds_topup_fout1, [('IntendedForFile1', 'source_file')]),
	])       
	return workflow

################  CREATE WORKFLOW ################
wf_topup_fmaps = fmaps_topup_fsl_wf(indir, matched_files, outdir)
wf_topup_fmaps.base_dir = working_files
wf_topup_fmaps.inputs.est_topup.encoding_file = acq_file

################  RUN  ################
wf_topup_fmaps.run(**plugin_settings_MultiProc)
# wf_topup_fmaps.run(**plugin_settings_SLURMGraph)
# wf_topup_fmaps.run()

CRASH FILE from SLURMGraph

(postfMRIprep) [f.morfini@login-00 batch]$ nipypecli crash crashdump_20210512_080414_wf_topup_fmaps_ds_topup_fout1.a0.pklz 


File: /work/swglab/data/BANDA_U01/working_files/wf_topup_fmaps/batch/crashdump_20210512_080414_wf_topup_fmaps_ds_topup_fout1.a0.pklz
Traceback: 
Traceback (most recent call last):
  File "/work/swglab/data/BANDA_U01/working_files/wf_topup_fmaps/batch/pyscript_20210512_080414_wf_topup_fmaps_ds_topup_fout1.a0.py", line 37, in <module>
	info = loadpkl(pklfile)
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/utils/filemanip.py", line 672, in loadpkl
	raise e
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/utils/filemanip.py", line 649, in loadpkl
	unpkl = pickle.loads(pkl_contents)
AttributeError: Can't get attribute 'DerivativesDataSink' on <module '__main__' from '/work/swglab/data/BANDA_U01/working_files/wf_topup_fmaps/batch/pyscript_20210512_080414_wf_topup_fmaps_ds_topup_fout1.a0.py'>


(postfMRIprep) [f.morfini@login-00 batch]$ 

BUS ERROR from MultiProc

(postfMRIprep) [f.morfini@d1008 script]$ Fatal Python error: Bus error

Thread 0x00002ae1c4ef8700 (most recent call first):
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/parentpoller.py", line 39 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 884 in _bootstrap

Thread 0x00002ae1c4cb7700 (most recent call first):
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 295 in wait
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 551 in wait
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/history.py", line 829 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/history.py", line 58 in needs_sqlite
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/decorator.py", line 231 in fun
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 884 in _bootstrap

Thread 0x00002ae1c4674700 (most recent call first):
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/heartbeat.py", line 100 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 884 in _bootstrap

Thread 0x00002ae1c4473700 (most recent call first):
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/selectors.py", line 445 in select
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/asyncio/base_events.py", line 1426 in _run_once
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/asyncio/base_events.py", line 442 in run_forever
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 199 in start
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/iostream.py", line 78 in _thread_main
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 864 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 916 in _bootstrap_inner
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/threading.py", line 884 in _bootstrap

Current thread 0x00002ae1ba3729c0 (most recent call first):
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nibabel/volumeutils.py", line 827 in _write_data
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nibabel/volumeutils.py", line 655 in array_to_file
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nibabel/arraywriters.py", line 561 in to_fileobj
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nibabel/analyze.py", line 1044 in to_file_map
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nibabel/filebasedimages.py", line 333 in to_filename
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/niworkflows/interfaces/bids.py", line 431 in _run_interface
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/interfaces/base/core.py", line 434 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/pipeline/engine/nodes.py", line 741 in _run_command
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/pipeline/engine/nodes.py", line 635 in _run_interface
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/pipeline/engine/nodes.py", line 516 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/pipeline/plugins/linear.py", line 46 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/nipype/pipeline/engine/workflows.py", line 632 in run
  File "<ipython-input-13-af69e76f51de>", line 1 in <module>
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3343 in run_code
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3263 in run_ast_nodes
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 3072 in run_cell_async
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/async_helpers.py", line 68 in _pseudo_sync_runner
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2895 in _run_cell
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/IPython/core/interactiveshell.py", line 2867 in run_cell
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/zmqshell.py", line 536 in run_cell
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/ipkernel.py", line 306 in do_execute
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/contextvars/__init__.py", line 38 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 234 in wrapper
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 545 in execute_request
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/contextvars/__init__.py", line 38 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 234 in wrapper
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 268 in dispatch_shell
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/contextvars/__init__.py", line 38 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 234 in wrapper
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 365 in process_one
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 775 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/contextvars/__init__.py", line 38 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 741 in __init__
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 250 in wrapper
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/kernelbase.py", line 381 in dispatch_queue
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 775 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/contextvars/__init__.py", line 38 in run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/gen.py", line 814 in inner
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/ioloop.py", line 741 in _run_callback
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/ioloop.py", line 688 in <lambda>
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/asyncio/events.py", line 145 in _run
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/asyncio/base_events.py", line 1462 in _run_once
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/asyncio/base_events.py", line 442 in run_forever
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/tornado/platform/asyncio.py", line 199 in start
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel/kernelapp.py", line 612 in start
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/traitlets/config/application.py", line 664 in launch_instance
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/site-packages/ipykernel_launcher.py", line 16 in <module>
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/runpy.py", line 85 in _run_code
  File "/home/f.morfini/softwares/miniconda3/envs/postfMRIprep/lib/python3.6/runpy.py", line 193 in _run_module_as_main
[I 2021-05-12 10:14:17.598 ServerApp] AsyncIOLoopKernelRestarter: restarting kernel (1/5), keep random ports
kernel e71e3367-0ddc-4f1a-8d45-11b05466c383 restarted
kernel e71e3367-0ddc-4f1a-8d45-11b05466c383 restarted
kernel e71e3367-0ddc-4f1a-8d45-11b05466c383 restarted
[I 2021-05-12 10:14:17.660 ServerApp] Starting buffering for e71e3367-0ddc-4f1a-8d45-11b05466c383:ee06d2d6-9054-495c-a701-e54969c38c1d

Thanks!
Francesca

Have you tried running with Linear? If I had to guess, the situation is that you have multiple nodes writing to the same output location, and hitting a race condition when one truncates the file while another is still writing. If you run with Linear (and it completes), then you’ll be able to look at the output filenames and check whether there’s a collision.

Thanks! I tried using Linear passing files to datasource to run 1 iteration only, but I still get the bus error, the output is unreadable, and the input becomes unreadable after having run ds_otopup (i.e. the merged_file of the gunzip node).
But to clarify, I do get the output files labelled as I would expect, although empty:
sub-BANDA001_task-rest_run-01_topup_fmap1.nii

and if I were to re-run the workflow I would get a second output
sub-BANDA001_task-rest_run-03_topup_fmap1.nii

Because the fmaps are IntendedFor different functional runs, I’m passing unique source_file to ds_topup. So at least the run entity should change always. Could this be creating collisions?

Ah, well that’s good news, then because the issue should be reproducible. Could you post the code, a description of your environment (Python version, libraries installed, etc.) and some example data (from OpenNeuro would be ideal) that causes it to fail on Issues · nipreps/niworkflows · GitHub?

Thanks so much @effigies ! I’ve opened an issue on github #639 about it!