Saving files within a workflow

I have a function in my workflow that saves a file, and I’d like to save the file in each subject specific folder.
I tried something like this

def fun(A):
import numpy as np
text = np.loadtxt(A)
np.savetxt(‘text_save’, text)

subject_list = [‘01’,‘02’]

infosource = Node(IdentityInterface(fields=[“subject_id”]), name=“infosource”)
infosource.iterables = [(“subject_id”, subject_list)]

template = {‘textdata’: os.path.join(“sub-{subject_id}”, ‘text.txt’)}
select_files = Node(SelectFiles(template, base_directory="/data"), name=‘selectfiles’)

wf = Workflow(name=“wf”, base_dir="/data")

savetext = Node(Function(input_names=[“A”], output_names=[“out”], function=fun), name=‘savetext’)

wf.connect(infosource, “subject_id”, select_files, “subject_id”)
wf.connect(select_files, “textdata”, savetext, ‘A’)

wf.run()

But this doesn’t save the files. What am I doing wrong?

I’d like to save the file in each subject specific folder.

You may want to use DataSink in that case (think of wf.base_dir as the working directory of your whole workflow, with inputs/outputs of each step - many times you will only want specific outputs)

What am I doing wrong?

By default, Nipype will remove any extra files generated within a Node that are unused in your Workflow. You can avoid this by connecting to a datasink. If not, you could also change the configuration file of your wf. More information on workflow configuration can be found here

1 Like

Thanks again @mgxd. I tried to use DataSink, but haven’t managed to save the file. I’m not exactly sure how to specify the output_names field and pass it to the sinker, since the function isn’t returning anything (file should be saved to disk). Could you please help me modify the script below?

def fun(A):
import numpy as np
import os
text = np.loadtxt(A)
np.savetxt(os.path.abspath(“savedfile.txt”), text)
return

subject_list = [“01”,“02”]

infosource = Node(IdentityInterface(fields=[“subject_id”]), name=“infosource”)
infosource.iterables = [(“subject_id”, subject_list)]

template = {‘textdata’: os.path.join(“sub-{subject_id}”, ‘text.txt’)}
selectfiles = Node(SelectFiles(template, base_directory="/data"), name=‘selectfiles’)

wf = Workflow(name=“wf”, base_dir="/data")

readtext = Node(Function(input_names=[“A”], output_names=[“out”], function=fun),
name=‘readtext’)

sinker = Node(DataSink(), name=“sinker”)
sinker.inputs.base_directory = “/data/wf/sinker”

wf.connect(infosource,“subject_id”,selectfiles,“subject_id”)
wf.connect(selectfiles,“textdata”,readtext,“A”)
wf.connect(readtext,“out”,sinker,“saved_file”)

wf.run()

The idea is to create subject specific folders each containing the corresponding saved file, for example:
/data/wf/sinker/01/savedfile.txt
/data/wf/sinker/02/savedfile.txt

@mri could you give this a try?

def fun(A):
    import numpy as np
    import os
    text = np.loadtxt(A)
    outfile = os.path.abspath("savedfile.txt")
    np.savetxt(outfile, text)
    return outfile

subject_list = ["01","02"]

infosource = Node(IdentityInterface(fields=["subject_id"]), name="infosource")
infosource.iterables = [("subject_id", subject_list)]

template = {"textdata": os.path.join("sub-{subject_id}", "text.txt")}
selectfiles = Node(SelectFiles(template, base_directory="/data"), name="selectfiles")

wf = Workflow(name="wf", base_dir="/data")

readtext = Node(Function(input_names=["A"], output_names=["outfile"], function=fun),
                name="readtext")

sinker = Node(DataSink(), name="sinker")
sinker.inputs.base_directory = "/data/output"

wf.connect(infosource, "subject_id", selectfiles, "subject_id")
wf.connect(selectfiles, "textdata", readtext, "A")
wf.connect(infosource, "subject_id", sinker, "container")
wf.connect(readtext, "outfile", sinker, "@savedfile")

wf.run()
1 Like

@mgxd thank you, it’s working perfectly.