Paralleling when using Nilearn / Scikit-learn?

Hi every one,

I am using Nilearn for different purposes: tSNR and BOLD HFR exploration, GLM, functional connectivity, etc. In the case of MVPA, I sometimes call Nilearn directly, or use Scikitlearn to access additional information such as confusion matrices or leave X runs out.
In both case I am running them through Jupyter Notebooks on VS Code on an Ubuntu 24 workstation.

I though my GLM based MVPA (L2O) was extremely long. Looking at the “Activity Monitor”, I noticed that only one CPU core was used at a time. So I have tried to implement joblib with n_job = -1 for some parelleling, but I am still using only 1 core. What can I do to force multicore processing?

Thank you.
Matthieu

Thank you for posting.
Can you clarify to each function you passed n_jobs=-1 ?
Also, note you can use coarse-grained parallelisme by calling joblib Parallel in your script.
Best,
Bertrand

Dear @bthirion , thank you very much for your answer.

I have tried to pass n_job=-1 in both my fit_and_evaluate_decoder and main functions, with no success.

(...) 

def fit_and_evaluate_decoder(z_maps, mask_img, conditions_label, run_label,
                             screening_percentile=screening_percentile, C=C, n_leave_out=n_leave_out):
    """Fit and evaluate a decoder with feature screening and linear SVM.

    Parameters
    ----------
    z_maps : list or array
        List of z-maps per condition/run.
    mask_img : Niimg-like
        Brain mask.
    conditions_label : array
        Condition labels (y).
    run_label : array
        Run IDs (used as groups in CV).
    screening_percentile : int, default=5
        Percentile of most informative voxels selected.
    C : float, default=1.0
        Regularization strength for SVC (smaller = stronger regularization).
    n_leave_out : int, default=1
        Number of groups to leave out during cross-validation.

    Returns
    -------
    mean_accuracy : float
    chance_level : float
    accuracy_scores : list of float
    cm : ndarray
        Confusion matrix.
    model : sklearn.pipeline.Pipeline
        Trained pipeline.
    """
    print(f"\nFitting scikit-learn decoder with L{n_leave_out}O, screening_percentile={screening_percentile}, C={C}...")

    # Convert to numpy array if needed
    z_maps = np.array(z_maps)
    conditions_label = np.array(conditions_label)
    run_label = np.array(run_label)

    # Flatten for masking
    z_maps_reshaped = z_maps.reshape(-1, z_maps.shape[-1])
    masked_data = apply_mask(z_maps_reshaped, mask_img)

    # Pipeline: Scaling -> Feature Selection -> Classification
    model = make_pipeline(
        StandardScaler(),
        SelectPercentile(f_classif, percentile=screening_percentile),
        SVC(kernel='linear', C=C)
    )

    # Cross-validation using Leave-P-Groups-Out
    logo = LeavePGroupsOut(n_groups=n_leave_out)

    all_y_true, all_y_pred, accuracy_scores = [], [], []

    for train_idx, test_idx in logo.split(masked_data, conditions_label, groups=run_label):
        model.fit(masked_data[train_idx], conditions_label[train_idx])
        y_pred = model.predict(masked_data[test_idx])
        all_y_pred.extend(y_pred)
        all_y_true.extend(conditions_label[test_idx])
        accuracy_scores.append(accuracy_score(conditions_label[test_idx], y_pred))

    # Accuracy stats
    mean_accuracy = np.mean(accuracy_scores)
    std_accuracy = np.std(accuracy_scores)
    chance_level = 1.0 / len(np.unique(conditions_label))
    cm = confusion_matrix(all_y_true, all_y_pred)

    print(f"    - Classification mean accuracy (CV):  {mean_accuracy:.4f} ± {std_accuracy:.4f}")
    print(f"    - Chance level:                     {chance_level:.4f}")
    print(f"    - Confusion Matrix (from CV):\n{cm}")

    return mean_accuracy, chance_level, accuracy_scores, cm, model

(...)
def main():
    visual_events_all = {}
    conditions_all = {}
    bold_images_all = {}
    contrasts_all = {}
    decoding_results = {}
    weight_maps_all = {}

    roi_masks = {}
    for roi_name in roi_names:
        try:
            mask_img = find_nii_or_niigz(os.path.join(roi_dir, roi_name))
            roi_masks[roi_name] = mask_img
        except FileNotFoundError as e:
            print(f"ROI not found: {roi_name}\n  {e}")

    for subject_id in subject_list:
        print(f"\n{'-'*50}")
        bids_func_dir = os.path.join(bids_dir, f"sub-{subject_id}", "func")
        fmriprep_func_dir = os.path.join(fmriprep_dir, f"sub-{subject_id}", "func")

        if not os.path.isdir(bids_func_dir):
            print(f"Skipping sub-{subject_id}, directory not found: {bids_func_dir}")
            continue

        visual_events, conditions = load_visual_events_by_run(bids_func_dir, subject_id, task_name)
        bold_images = load_bold_by_run(fmriprep_func_dir, subject_id, task_name)

        visual_events_all[subject_id] = visual_events
        conditions_all[subject_id] = conditions
        bold_images_all[subject_id] = bold_images
        contrasts_all[subject_id] = {}

        for roi_name, mask_img in roi_masks.items():
            print(f"\nComputing contrasts for sub-{subject_id} in ROI={roi_name}...")
            try:
                # GLM
                z_maps, cond_labels, run_labels = compute_contrasts_by_run(
                    subject_id, roi_name, mask_img, bold_images, visual_events, unique_runs
                )

                contrasts_all[subject_id][roi_name] = {
                    'z_maps': z_maps,
                    'condition_labels': cond_labels,
                    'run_labels': run_labels,
                }

                if debug:
                    print(f"--DEBUG Conditions: {cond_labels}")
                    print(f"--DEBUG Runs: {run_labels}")

                # Decoder
                accuracy, chance, accuracy_scores, cm, model = fit_and_evaluate_decoder(
                    z_maps, mask_img, cond_labels, run_labels
                )

                if debug:
                    print(f"--DEBUG fit_and_evaluate_decoder: {(accuracy, chance, accuracy_scores, cm, model)}")

                decoding_results[(subject_id, roi_name)] = {
                    'accuracy': accuracy,
                    'chance_level': chance,
                    'accuracy_scores': accuracy_scores,
                    'confusion_matrix': cm,
                    'conditions': np.unique(cond_labels).tolist()  # keep the order
                }

                # saving weight maps
                if save_weight_maps:
                    try:
                        output_prefix = os.path.join(
                            output_dir,
                            f"sub-{subject_id}_roi-{roi_name}"
                        )
                        save_decoder_weights(model, mask_img, output_prefix, class_labels=np.unique(cond_labels).tolist())
                    except Exception as e:
                        print(f"Failed to save weights for sub-{subject_id}, ROI={roi_name}\n  {e}")

            except Exception as e:
                print(f"Failed for sub-{subject_id}, ROI={roi_name}\n  {e}")

        # cleaning memory to avoid heat
        del visual_events, conditions, bold_images
        del z_maps, cond_labels, run_labels
        gc.collect()

    return (
        visual_events_all,
        conditions_all,
        bold_images_all,
        contrasts_all,
        decoding_results,
        weight_maps_all
    )

(the script is quite long but I can provide it full if needed)

Do you know how to call joblib correctly in my specific case? Thank you.

Best,
Matthieu

Sorry, but I don’t find any call to Parallel/Delayed function of joblib. You need to call them explicitly to use joblib parallelization.
Sorry if I misunderstood.
Best,
Bertrand

Thanks for your answer. Sorry, I wasn’t clear. I presented a part of my script which doesn’t include the joblib call, on purpose. Here is how I tried to call it:

def fit_and_evaluate_decoder(
    z_maps, mask_img, conditions_label, run_label,
    screening_percentile=5, C=1.0, n_leave_out=1, n_jobs_cv=1
):
    """Fit and evaluate a decoder with feature screening and linear SVM.

    Parameters
    ----------
    z_maps : array
        List or array of z-maps per condition/run.
    mask_img : Niimg-like
        Brain mask.
    conditions_label : array
        Condition labels (y).
    run_label : array
        Run IDs (used as groups in CV).
    screening_percentile : int
        Percentile of most informative voxels selected.
    C : float
        Regularization strength for SVC.
    n_leave_out : int
        Number of groups to leave out during cross-validation.
    n_jobs_cv : int
        Number of parallel jobs for CV (-1 = all cores).

    Returns
    -------
    mean_accuracy : float
    chance_level : float
    accuracy_scores : list of float
    cm : ndarray
    best_model : sklearn.pipeline.Pipeline
        Last model from CV (not retrained on full set).
    """
    print(f"\nFitting scikit-learn decoder with L{n_leave_out}O, screening={screening_percentile}%, C={C}...")

    z_maps = np.array(z_maps)
    conditions_label = np.array(conditions_label)
    run_label = np.array(run_label)

    z_maps_reshaped = z_maps.reshape(-1, z_maps.shape[-1])
    masked_data = apply_mask(z_maps_reshaped, mask_img)

    logo = LeavePGroupsOut(n_groups=n_leave_out)
    cv_splits = list(logo.split(masked_data, conditions_label, groups=run_label))

    def fit_and_predict(train_idx, test_idx):
        model = make_pipeline(
            StandardScaler(),
            SelectPercentile(f_classif, percentile=screening_percentile),
            SVC(kernel='linear', C=C)
        )
        model.fit(masked_data[train_idx], conditions_label[train_idx])
        y_pred = model.predict(masked_data[test_idx])
        y_true = conditions_label[test_idx]
        score = accuracy_score(y_true, y_pred)
        return y_true, y_pred, score, model

    with parallel_backend("loky"):
        results = Parallel(n_jobs=n_jobs_cv)(
            delayed(fit_and_predict)(train_idx, test_idx)
            for train_idx, test_idx in cv_splits
        )

    all_y_true, all_y_pred, accuracy_scores = [], [], []
    best_model = None
    for y_true, y_pred, score, model in results:
        all_y_true.extend(y_true)
        all_y_pred.extend(y_pred)
        accuracy_scores.append(score)
        best_model = model  # last one

    mean_accuracy = np.mean(accuracy_scores)
    std_accuracy = np.std(accuracy_scores)
    chance_level = 1.0 / len(np.unique(conditions_label))
    cm = confusion_matrix(all_y_true, all_y_pred)

    print(f"    - Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}")
    print(f"    - Chance level: {chance_level:.4f}")
    print(f"    - Confusion Matrix:\n{cm}")

    return mean_accuracy, chance_level, accuracy_scores, cm, best_model

(…)

def main():
    visual_events_all = {}
    conditions_all = {}
    bold_images_all = {}
    contrasts_all = {}
    decoding_results = {}
    weight_maps_all = {}

    roi_masks = {}
    for roi_name in roi_names:
        try:
            mask_img = find_nii_or_niigz(os.path.join(roi_dir, roi_name))
            roi_masks[roi_name] = mask_img
        except FileNotFoundError as e:
            print(f"ROI not found: {roi_name}\n  {e}")

    for subject_id in subject_list:
        print(f"\n{'-'*50}\nSubject: {subject_id}")
        bids_func_dir = os.path.join(bids_dir, f"sub-{subject_id}", "func")
        fmriprep_func_dir = os.path.join(fmriprep_dir, f"sub-{subject_id}", "func")

        if not os.path.isdir(bids_func_dir):
            print(f"Skipping sub-{subject_id}, directory not found: {bids_func_dir}")
            continue

        visual_events, conditions = load_visual_events_by_run(bids_func_dir, subject_id, task_name)
        bold_images = load_bold_by_run(fmriprep_func_dir, subject_id, task_name)

        visual_events_all[subject_id] = visual_events
        conditions_all[subject_id] = conditions
        bold_images_all[subject_id] = bold_images
        contrasts_all[subject_id] = {}

        for roi_name, mask_img in roi_masks.items():
            print(f"\nComputing contrasts for sub-{subject_id}, ROI={roi_name}...")
            try:
                z_maps, cond_labels, run_labels = compute_contrasts_by_run(
                    subject_id, roi_name, mask_img, bold_images, visual_events, unique_runs
                )

                contrasts_all[subject_id][roi_name] = {
                    'z_maps': z_maps,
                    'condition_labels': cond_labels,
                    'run_labels': run_labels,
                }

                accuracy, chance, accuracy_scores, cm, model = fit_and_evaluate_decoder(
                    z_maps, mask_img, cond_labels, run_labels,
                    screening_percentile=screening_percentile,
                    C=C, n_leave_out=n_leave_out, n_jobs_cv=n_jobs_decoder
                )

                decoding_results[(subject_id, roi_name)] = {
                    'accuracy': accuracy,
                    'chance_level': chance,
                    'accuracy_scores': accuracy_scores,
                    'confusion_matrix': cm,
                    'conditions': np.unique(cond_labels).tolist()
                }

                if save_weight_maps:
                    try:
                        output_prefix = os.path.join(output_dir, f"sub-{subject_id}_roi-{roi_name}")
                        save_decoder_weights(model, mask_img, output_prefix, class_labels=np.unique(cond_labels).tolist())
                    except Exception as e:
                        print(f"Failed to save weights for sub-{subject_id}, ROI={roi_name}\n  {e}")

                del z_maps, cond_labels, run_labels, model, cm
                gc.collect()

            except Exception as e:
                print(f"Failed for sub-{subject_id}, ROI={roi_name}\n  {e}")

        del visual_events, conditions, bold_images
        gc.collect()

    return (
        visual_events_all,
        conditions_all,
        bold_images_all,
        contrasts_all,
        decoding_results,
        weight_maps_all
    )

And here is some print I obtain. From other scripts, this specific part seems very verbose but fine, and a mark that n_jobs is called:

Exception ignored in: <function ResourceTracker.del at 0x7fa1dd5c39c0>
Traceback (most recent call last):
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 82, in del
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 91, in _stop
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 116, in _stop_locked
ChildProcessError: [Errno 10] No child processes

But my Activity Monitor shows that only 1 core is used 100% at a time.

Thank you.
Matthieu

What happens if you remove the del statements ?
Also, try and set a fix value for n_jobs_cv, e.g. 4.
HTH,
Bertrand

Dear @bthirion , thanks again for your help.

The solution was indeed to provide a fix value to n_jobs_cv and remove the del statement, but also to completely restructure the script, because paralleling had to be called at the beginning of each participants, not within the loop.

Great succes!

The results for 4 subjects, 18 ROI (L&R), 6 runs, 18 trials (140 images, 2x2x2 3T), leave 2 runs out are obtained in ~26 minutes.

For those interested, please find my (quite long) script bellow. In my case, I am looking at the difference in accuracy scores between 2 conditions (and so, types of runs, v and vo).

import pandas as pd 
import os 

nilearn_folder = "nilearn_se" 

current_directory = os.getcwd()
source_data_directory = os.path.abspath(os.path.join(current_directory, "..", nilearn_folder, "sourcedata"))

run_and_trial_types = pd.read_csv(os.path.join(source_data_directory, "run_and_trial_types.csv"))
run_and_trial_types.drop(["Unnamed: 0"], axis=1, inplace=True)
run_and_trial_types
import os
import re
import pandas as pd
import numpy as np 
import time
from datetime import datetime 
import matplotlib.pyplot as plt
from nilearn.image import load_img, mean_img
from nilearn.plotting import plot_epi
from nilearn.glm.first_level import FirstLevelModel
from nilearn.decoding import Decoder
from nilearn.reporting import make_glm_report
from nilearn.maskers import NiftiMasker
from sklearn.model_selection import LeaveOneGroupOut
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

#------------------Parameters------------------
data_location = "nilearn_se"
task_name = "odor"
t_r = 1.5
n_runs = 12
run_type="vo"
n_runs_to_train_test = 6

regroup_by_domain = True  # regroup by domain (ex: images 2,3 and 4 -> Food)
domain_mapping = {
        'img/obj2': 'Food',
        'img/obj3': 'Food',
        'img/obj4': 'Food',
        'img/obj5': 'Human',
        'img/obj6': 'Human',
        'img/obj7': 'Human',
    }

subject_list = [
    "01", "02", "03", "04"
    ]

roi_names = [
    # VISION
    # "MNI152_visual_all_cortex_LR", "MNI152_visual_all_cortex_L", "MNI152_visual_all_cortex_R", # created by combining all VC L & R ROI
    # "MNI152_visual_primary_cortex_LR", "MNI152_visual_primary_cortex_L", "MNI152_visual_primary_cortex_R",
    # "MNI152_visual_secondary_cortex_LR", "MNI152_visual_secondary_cortex_L", "MNI152_visual_secondary_cortex_R",
    # "MNI152_visual_ventral_cortex_LR", "MNI152_visual_ventral_cortex_L", "MNI152_visual_ventral_cortex_R",    
    # "MNI152_visual_dorsal_cortex_LR", "MNI152_visual_dorsal_cortex_L", "MNI152_visual_dorsal_cortex_R",

    "MNI152_Lingual_L_1mm", "MNI152_Lingual_R_1mm",
    "MNI152_Cuneus_L_1mm", "MNI152_Cuneus_R_1mm",
    "MNI152_Calcarine_L_1mm", "MNI152_Calcarine_R_1mm",
    "MNI152_Occipital_Sup_L_1mm", "MNI152_Occipital_Sup_R_1mm",
    "MNI152_Occipital_Mid_L_1mm", "MNI152_Occipital_Mid_R_1mm", 
    "MNI152_Occipital_Inf_L_1mm", "MNI152_Occipital_Inf_R_1mm",
    "MNI152_Fusiform_L_1mm", "MNI152_Fusiform_R_1mm",
    "MNI152_Temporal_Inf_L_1mm", "MNI152_Temporal_Inf_R_1mm",
    "MNI152_Temporal_Mid_L_1mm", "MNI152_Temporal_Mid_R_1mm",
    "MNI152_Parietal_Sup_L_1mm", "MNI152_Parietal_Sup_R_1mm",
    "MNI152_Precuneus_L_1mm", "MNI152_Precuneus_R_1mm",
    "MNI152_Angular_L_1mm", "MNI152_Angular_R_1mm",   

    # OLFACTION
    # "MNI152_olfactory_cortex_LR", "MNI152_olfactory_cortex_L", "MNI152_olfactory_cortex_R", # created by combining all OC L & R ROI

    "OLFDTI_R_frontal_piriform", "OLFDTI_L_frontal_piriform",
    "OLFDTI_R_temporal_piriform_", "OLFDTI_L_temporal_piriform_",
    "OLFDTI_R_amygdala_", "OLFDTI_L_amygdala_",
    "OLFDTI_R_AON_", "OLFDTI_L_AON_",
    "OLFDTI_R_entorhinal_", "OLFDTI_L_entorhinal_",
    "OLFDTI_R_olfactory_tubercle_", "OLFDTI_L_olfactory_tubercle_",
    "OFColf_R", "OFColf_L"
]

debug = True
mri_trigger = True          # onset = 0 for trial_type == MRI_trigger (realigne t0)
save_stat_maps = True       # heavy, will create a stat map for each sub-run-roi-condition
save_glm_report = True      # heavy, will create a stat map for each sub-run-roi-condition
save_weight_maps = True     # heavy, will create a stat map for each sub-roi-condition
save_confusion_matrix = True 
save_final_results = True
#------------------Parameters------------------

# paths
subjfolder = datetime.today().strftime("%Y%m%d")
current_directory = os.getcwd()  # Current working directory
bids_dir = os.path.abspath(os.path.join(current_directory, '..', f"{data_location}", 'bids'))
fmriprep_dir = os.path.abspath(os.path.join(current_directory, '..', f"{data_location}", 'fmriprep'))
roi_dir =  os.path.abspath(os.path.join(current_directory, '..', f"{data_location}", 'anatomy', 'mni'))
output_dir = os.path.abspath(os.path.join(current_directory, '..', "results", "mvpa", run_type, subjfolder, data_location))

def get_subject_runs(df, run_type="v"):
    filt = df["run_type"].str.lower() == run_type.lower()
    tmp = df.loc[filt, ["subject_id", "run_id"]].drop_duplicates()
    tmp["sub_num"] = tmp["subject_id"].str.extract(r"(\d+)").astype(str)
    tmp["run_num"] = tmp["run_id"].str.extract(r"(\d+)").astype(int)
    return (
        tmp.groupby("sub_num")["run_num"]
           .apply(lambda s: sorted(s.tolist()))
           .to_dict()
    )

def build_unique_runs_per_subject(
        df,
        run_type=run_type,
        n_runs_to_train_test=None
    ):
    runs_by_subject = get_subject_runs(df, run_type)

    if n_runs_to_train_test is not None:
        runs_by_subject = {
            sub: runs[:n_runs_to_train_test]
            for sub, runs in runs_by_subject.items()
        }

    return runs_by_subject

unique_runs = build_unique_runs_per_subject(
    run_and_trial_types,
    run_type=run_type,
    n_runs_to_train_test=n_runs_to_train_test
)

# check prints
print(
    f"Data: {data_location}\n"
    f"Task: {task_name}\n"
    f"Subjects: {subject_list}\n"
    f"ROI: {roi_names}\n"
    f"TR: {t_r} seconds\n"
    f"Number of runs: {len(unique_runs)}\n"
    f"Unique runs: {unique_runs}\n"
    f"bids: {bids_dir}\n"
    f"fmriprep: {fmriprep_dir}\n"
    f"roi: {roi_dir}\n"
    f"output: {output_dir}\n"    
    f"Debug: {debug}\n"
    f"Regroup by domain: {regroup_by_domain}\n"
    f"Save stat maps: {save_stat_maps}\n" 
    f"Save GLM report: {save_glm_report}\n"  
    f"Save weight maps: {save_weight_maps}\n" 
    f"Save confusion matrix: {save_confusion_matrix}\n"
    f"Save final results as unique df: {save_final_results}\n" 
)

from __future__ import annotations

import os
import re
import gc
import logging
import warnings
from datetime import datetime
from itertools import product
from pathlib import Path
from typing import Dict, Tuple, List, Any

import multiprocessing as _mp

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from nilearn.image import load_img
from nilearn.masking import apply_mask
from nilearn.glm.first_level import FirstLevelModel
from nilearn.maskers import NiftiMasker

from sklearn.svm import LinearSVC
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectPercentile, f_classif
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.model_selection import LeavePGroupsOut, cross_val_score, cross_val_predict
from sklearn.base import clone

from joblib import Parallel, delayed, parallel_backend

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
logging.getLogger("nilearn").setLevel(logging.ERROR)

# --------------- Parameters --------------
# GLM
hrf_model: str = "spm"          # HRF function
high_pass: float = 0.008        # low cut filter
smoothing_fwhm: int = 4         # smoothing


# Decoder
n_leave_out: int = 2                #number of leave out
screening_percentile: int = 100     # purcentage of voxel kept
C: float = 1.0                      #?
n_jobs_total: int = 20              # CPUs to use
# --------------- Parameters --------------

print("\n" + "-" * 40)
print("          ANALYSIS PARAMETERS")
print("-" * 40)
print("GLM parameters:")
print(f"  • hrf_model        : {hrf_model}")
print(f"  • high_pass        : {high_pass}")
print(f"  • smoothing_fwhm   : {smoothing_fwhm}")
print("Decoder parameters:")
print(f"  • n_leave_out      : {n_leave_out}")
print(f"  • screening_pct    : {screening_percentile}")
print(f"  • Linear SVC  C    : {C}")
print(f"  • Total CPUs       : {n_jobs_total}")
print("-" * 40 + "\n")

def notify_save(description: str, path: os.PathLike | str) -> None:  # noqa: N802
    """
    prints a standardised message for each save
    """
    print(f"Saved {description} to {path}")


def load_visual_events_by_run(events_dir: str, subject_id: str, task_name: str):
    """
    load the events from presentation log for each run
    """
    print(f"\nLoading visual events for sub-{subject_id}, keeping only img/objX events...")

    events_dir = os.path.abspath(os.path.join(os.getcwd(), "..", data_location, "bids", f"sub-{subject_id}", "func"))
    events: Dict[int, pd.DataFrame] = {}
    pattern = re.compile(rf"sub-{subject_id}_task-{task_name}_run-(\d+)_events\.tsv")
    obj_pattern = re.compile(r"(img/obj\d+)")

    for filename in os.listdir(events_dir):
        match = pattern.match(filename)
        if match:
            run_num = int(match.group(1))
            filepath = os.path.join(events_dir, filename)
            df = pd.read_csv(filepath, sep="\t")

            if "trial_type" in df.columns:
                if mri_trigger:
                    trigger_mask = df["trial_type"] == "MRI_trigger"
                    if trigger_mask.any():
                        trigger_onset = df.loc[trigger_mask, "onset"].iloc[0]
                        df["onset"] -= trigger_onset

                df = df[df["trial_type"].str.startswith("img/obj", na=False)].copy()
                df["trial_type"] = df["trial_type"].str.extract(obj_pattern)

                if regroup_by_domain:
                    df["trial_type"] = df["trial_type"].replace(domain_mapping)

                events[run_num] = df

    conditions = events[1]["trial_type"].unique() if 1 in events else None
    print(f"sub-{subject_id}: {len(events)} events files for {len(events)} runs loaded from {events_dir}.")
    print(f"conditions: {conditions}")
    return events, conditions


def load_bold_by_run(func_dir: str, subject_id: str, task_name: str):
    """
    load bold data processed by fmriprep
    """
    print(f"Loading functional data for sub-{subject_id} ...")
    func: Dict[int, Any] = {}
    pattern = re.compile(rf"sub-{subject_id}_task-{task_name}_run-(\d+)_echo-1_space-MNI152NLin2009cAsym_desc-preproc_bold\.nii\.gz")

    for filename in os.listdir(func_dir):
        match = pattern.match(filename)
        if match:
            run_num = int(match.group(1))
            filepath = os.path.join(func_dir, filename)
            func[run_num] = load_img(filepath)

    print(f"sub-{subject_id}: {len(func)} functional files loaded from {func_dir}.")
    return func


def find_nii_or_niigz(path_without_ext: str) -> str:
    """
    returns existing .nii ou .nii.gz .
    """
    nii_path = f"{path_without_ext}.nii"
    niigz_path = f"{path_without_ext}.nii.gz"
    if os.path.exists(nii_path):
        return nii_path
    if os.path.exists(niigz_path):
        return niigz_path
    raise FileNotFoundError(f"Neither '{nii_path}' nor '{niigz_path}' found.")


def compute_contrasts_by_run(
    subject_id: str,
    roi_name: str,
    mask_img: str | os.PathLike,
    func: Dict[int, Any],
    events: Dict[int, pd.DataFrame],
    runs: List[int],
):
    z_maps, conditions_label, run_label = [], [], []

    for run in runs:
        try:
            glm = FirstLevelModel(
                t_r=t_r,
                mask_img=mask_img,
                hrf_model=hrf_model,
                high_pass=high_pass,
                smoothing_fwhm=smoothing_fwhm,
            )
            glm.fit(func[run], events=events[run])

            for cond in events[run].trial_type.unique():
                z_map = glm.compute_contrast(cond)
                z_maps.append(z_map)
                conditions_label.append(cond)
                run_label.append(int(run))

                safe_condition = cond.replace("/", "_")

                # stat maps
                if save_stat_maps:
                    outdir = Path(output_dir) / "stat_maps"
                    outdir.mkdir(parents=True, exist_ok=True)
                    filename = f"sub-{subject_id}_task-{task_name}_run-{run}_roi-{roi_name}_cond-{safe_condition}_zmap.nii.gz"
                    filepath = outdir / filename
                    z_map.to_filename(filepath)
                    notify_save("Z‑map", filepath)

                # GLM report
                if save_glm_report:
                    from nilearn.reporting import make_glm_report

                    report = make_glm_report(glm, contrasts=cond)
                    report_dir = Path(output_dir) / "glm_reports"
                    report_dir.mkdir(parents=True, exist_ok=True)
                    report_filename = (
                        f"sub-{subject_id}_task-{task_name}_run-{run}_roi-{roi_name}_cond-{safe_condition}_glmreport.html"
                    )
                    report_filepath = report_dir / report_filename
                    report.save_as_html(report_filepath)
                    notify_save("GLM report", report_filepath)

        except Exception as e:
            print(f"GLM failed sub-{subject_id} run-{run} ROI={roi_name}: {e}")

    return z_maps, conditions_label, run_label


def fit_and_evaluate_decoder(
    z_maps: List[Any],
    mask_img: str | os.PathLike,
    y: List[str],
    groups: List[int],
    screening_percentile: int = 100,
    C: float = 1.0,
    n_leave_out: int = 2,
    n_jobs: int = 1,
):
    # Vectorisation
    z_maps = np.array(z_maps)
    X = apply_mask(z_maps.reshape(-1, z_maps.shape[-1]), mask_img).astype("float32", copy=False)

    # Pipeline
    pipe = make_pipeline(
        StandardScaler(),
        SelectPercentile(f_classif, percentile=screening_percentile),
        LinearSVC(C=C, dual=False),
    )

    # parallelised Cross‑validation 
    logo = LeavePGroupsOut(n_groups=n_leave_out)
    with parallel_backend("loky", inner_max_num_threads=1):
        scores = cross_val_score(pipe, X, y, groups=groups, cv=logo, n_jobs=n_jobs, verbose=0)

    acc_mean, acc_std = scores.mean(), scores.std()
    print(f"    Accuracy L{n_leave_out}O: {acc_mean:.4f} ± {acc_std:.4f}")

    pipe.fit(X, y)

    # confusion matrix (no training)
    y_true, y_pred = [], []

    for train_idx, test_idx in logo.split(X, y, groups):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = np.array(y)[train_idx], np.array(y)[test_idx]

        clf = clone(pipe)
        clf.fit(X_train, y_train)
        y_pred.extend(clf.predict(X_test))
        y_true.extend(y_test)

    cm = confusion_matrix(y_true, y_pred)

    return acc_mean, 1 / len(np.unique(y)), scores.tolist(), cm, pipe

def process_subject_roi(
        subject_id: str,
        roi_name: str,
        mask_path: str | os.PathLike,
        unique_runs_by_sub: dict[str, list[int]]
    ):

    try:
        # runs specific to that sub
        try:
            runs_subset = unique_runs_by_sub[subject_id]
        except KeyError:
            raise ValueError(f"No runs of the expected type for sub-{subject_id}")

        bids_func    = os.path.join(bids_dir,     f"sub-{subject_id}", "func")
        preproc_func = os.path.join(fmriprep_dir, f"sub-{subject_id}", "func")

        events, _ = load_visual_events_by_run(bids_func, subject_id, task_name)
        func       = load_bold_by_run(preproc_func, subject_id, task_name)


        z_maps, labels, runs = compute_contrasts_by_run(
            subject_id,
            roi_name,
            mask_path,
            func,
            events,
            runs_subset
        )

        acc, chance, scores, cm, model = fit_and_evaluate_decoder(
            z_maps,
            mask_path,
            labels,
            runs,
            screening_percentile=screening_percentile,
            C=C,
            n_leave_out=n_leave_out,
            n_jobs=1,  # here, keep 1 !
        )


        if save_stat_maps:
            out_prefix = os.path.join(
                output_dir,
                "weight_maps",
                f"sub-{subject_id}_roi-{roi_name}_task-{task_name}",
            )
            Path(out_prefix).parent.mkdir(parents=True, exist_ok=True)

            save_decoder_weights(
                model,
                mask_path,
                out_prefix,
                class_labels=np.unique(labels).tolist(),
            )

        return (subject_id, roi_name), {
            "accuracy": acc,
            "chance_level": chance,
            "accuracy_scores": scores,
            "confusion_matrix": cm,
            "conditions": np.unique(labels).tolist(),
            "model": model,
        }

    except Exception as e:
        print(f"Failed pipeline sub-{subject_id} ROI={roi_name}: {e}")
        return (subject_id, roi_name), {}

def save_decoder_weights(
    model,
    mask_img: str | os.PathLike,
    output_path_prefix: str,
    class_labels: List[str] | None = None,
):

    from nilearn.image import load_img

    if isinstance(mask_img, (str, os.PathLike)):
        mask_img = load_img(mask_img)

    try:
        svc = model.named_steps["linearsvc"]
        selector = model.named_steps["selectpercentile"]
    except Exception:
        print("Could not extract 'linearsvc' or 'selectpercentile' from model — skipping weight maps.")
        return

    if not hasattr(svc, "coef_"):
        print("Model SVC has no attribute 'coef_' — ensure it is trained and linear.")
        return

    coef = svc.coef_
    if coef.ndim == 1:
        coef = coef.reshape(1, -1)

    n_classes = coef.shape[0]
    support_mask = selector.get_support()

    masker = NiftiMasker(mask_img=mask_img)
    masker.fit()

    for i in range(n_classes):
        class_name = class_labels[i] if class_labels else f"class_{i}"
        selected_weights = coef[i]

        full_weights = np.zeros(support_mask.shape[0])
        full_weights[support_mask] = selected_weights

        # as nifiti
        weights_img = masker.inverse_transform(full_weights)
        nii_path = f"{output_path_prefix}_weights_{class_name}.nii.gz"
        weights_img.to_filename(nii_path)
        notify_save("weight map (NIfTI)", nii_path)

        # as csv
        mask_data = masker.mask_img_.get_fdata().astype(bool)
        coords = np.column_stack(np.where(mask_data))
        df = pd.DataFrame({"x": coords[:, 0], "y": coords[:, 1], "z": coords[:, 2], "weight": full_weights})

        csv_path = f"{output_path_prefix}_weights_{class_name}.csv"
        df.to_csv(csv_path, index=False)
        notify_save("weights table (CSV)", csv_path)

if __name__ == "__main__":
    os.makedirs(output_dir, exist_ok=True)


    unique_runs = build_unique_runs_per_subject(
        run_and_trial_types,
        run_type=run_type, 
        n_runs_to_train_test=n_runs_to_train_test
    )

    roi_masks = {
        roi: find_nii_or_niigz(os.path.join(roi_dir, roi))
        for roi in roi_names
    }


    all_pairs = list(product(subject_list, roi_names))

    with parallel_backend("loky", inner_max_num_threads=1):
        results = Parallel(n_jobs=n_jobs_total, verbose=10)(
            delayed(process_subject_roi)(
                sub,
                roi,
                roi_masks[roi],
                unique_runs         
            )
            for sub, roi in all_pairs
        )

    decoding_results = dict(results)

    group_suffix = "domains" if regroup_by_domain else "categories"
    rows = []
    for (sub, roi), res in decoding_results.items():
        if not res:
            continue
        row = {
            "subject": sub,
            "roi": roi,
            "accuracy_mean": res["accuracy"],
            "chance_level": res["chance_level"],
        }
        for i, s in enumerate(res["accuracy_scores"]):
            row[f"fold{i+1}"] = s
        rows.append(row)

    df_results = pd.DataFrame(rows)


    runs_tag = f"{n_runs_to_train_test or 'all'}runs"
    csv_name = (
        f"decoding_accuracy_summary_{group_suffix}_{runs_tag}_L{n_leave_out}O_"
        f"{datetime.today().strftime('%Y%m%d%H%M%S')}.csv"
    )
    csv_path = os.path.join(output_dir, csv_name)
    df_results.to_csv(csv_path, index=False)
    notify_save("decoding summary CSV", csv_path)


    if save_confusion_matrix:
        conf_dir = os.path.join(
            output_dir,
            f"conf_mat_{group_suffix}_{runs_tag}_L{n_leave_out}O"
        )
        os.makedirs(conf_dir, exist_ok=True)

        for (sub, roi), res in decoding_results.items():
            if not res:
                continue

            cm     = res["confusion_matrix"]
            conds  = res["conditions"]
            df_cm  = pd.DataFrame(cm, index=conds, columns=conds)

            cm_csv = os.path.join(conf_dir, f"sub-{sub}_roi-{roi}_cm.csv")
            df_cm.to_csv(cm_csv)
            notify_save("confusion matrix CSV", cm_csv)

            plt.figure(figsize=(6, 5))
            sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues")
            plt.title(f"CM sub-{sub} ROI-{roi}")
            plt.tight_layout()

            cm_png = os.path.join(conf_dir, f"sub-{sub}_roi-{roi}_cm.png")
            plt.savefig(cm_png)
            plt.close()
            notify_save("confusion matrix PNG", cm_png)

    print("\nDONE !")

It is still very verbose with a lot of UserWaning and others in the print:

Exception ignored in: <function ResourceTracker.del at 0x7f2e5c75f9c0>
Traceback (most recent call last):
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 82, in del
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 91, in _stop
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 116, in _stop_locked
ChildProcessError: [Errno 10] No child processes

Also if I made any mistake in the script, please do not hesitate to notify me !

Matthieu

1 Like

Glad to see this work !
Best,
Bertrand