Paralleling when using Nilearn / Scikit-learn?

Hi every one,

I am using Nilearn for different purposes: tSNR and BOLD HFR exploration, GLM, functional connectivity, etc. In the case of MVPA, I sometimes call Nilearn directly, or use Scikitlearn to access additional information such as confusion matrices or leave X runs out.
In both case I am running them through Jupyter Notebooks on VS Code on an Ubuntu 24 workstation.

I though my GLM based MVPA (L2O) was extremely long. Looking at the “Activity Monitor”, I noticed that only one CPU core was used at a time. So I have tried to implement joblib with n_job = -1 for some parelleling, but I am still using only 1 core. What can I do to force multicore processing?

Thank you.
Matthieu

Thank you for posting.
Can you clarify to each function you passed n_jobs=-1 ?
Also, note you can use coarse-grained parallelisme by calling joblib Parallel in your script.
Best,
Bertrand

Dear @bthirion , thank you very much for your answer.

I have tried to pass n_job=-1 in both my fit_and_evaluate_decoder and main functions, with no success.

(...) 

def fit_and_evaluate_decoder(z_maps, mask_img, conditions_label, run_label,
                             screening_percentile=screening_percentile, C=C, n_leave_out=n_leave_out):
    """Fit and evaluate a decoder with feature screening and linear SVM.

    Parameters
    ----------
    z_maps : list or array
        List of z-maps per condition/run.
    mask_img : Niimg-like
        Brain mask.
    conditions_label : array
        Condition labels (y).
    run_label : array
        Run IDs (used as groups in CV).
    screening_percentile : int, default=5
        Percentile of most informative voxels selected.
    C : float, default=1.0
        Regularization strength for SVC (smaller = stronger regularization).
    n_leave_out : int, default=1
        Number of groups to leave out during cross-validation.

    Returns
    -------
    mean_accuracy : float
    chance_level : float
    accuracy_scores : list of float
    cm : ndarray
        Confusion matrix.
    model : sklearn.pipeline.Pipeline
        Trained pipeline.
    """
    print(f"\nFitting scikit-learn decoder with L{n_leave_out}O, screening_percentile={screening_percentile}, C={C}...")

    # Convert to numpy array if needed
    z_maps = np.array(z_maps)
    conditions_label = np.array(conditions_label)
    run_label = np.array(run_label)

    # Flatten for masking
    z_maps_reshaped = z_maps.reshape(-1, z_maps.shape[-1])
    masked_data = apply_mask(z_maps_reshaped, mask_img)

    # Pipeline: Scaling -> Feature Selection -> Classification
    model = make_pipeline(
        StandardScaler(),
        SelectPercentile(f_classif, percentile=screening_percentile),
        SVC(kernel='linear', C=C)
    )

    # Cross-validation using Leave-P-Groups-Out
    logo = LeavePGroupsOut(n_groups=n_leave_out)

    all_y_true, all_y_pred, accuracy_scores = [], [], []

    for train_idx, test_idx in logo.split(masked_data, conditions_label, groups=run_label):
        model.fit(masked_data[train_idx], conditions_label[train_idx])
        y_pred = model.predict(masked_data[test_idx])
        all_y_pred.extend(y_pred)
        all_y_true.extend(conditions_label[test_idx])
        accuracy_scores.append(accuracy_score(conditions_label[test_idx], y_pred))

    # Accuracy stats
    mean_accuracy = np.mean(accuracy_scores)
    std_accuracy = np.std(accuracy_scores)
    chance_level = 1.0 / len(np.unique(conditions_label))
    cm = confusion_matrix(all_y_true, all_y_pred)

    print(f"    - Classification mean accuracy (CV):  {mean_accuracy:.4f} ± {std_accuracy:.4f}")
    print(f"    - Chance level:                     {chance_level:.4f}")
    print(f"    - Confusion Matrix (from CV):\n{cm}")

    return mean_accuracy, chance_level, accuracy_scores, cm, model

(...)
def main():
    visual_events_all = {}
    conditions_all = {}
    bold_images_all = {}
    contrasts_all = {}
    decoding_results = {}
    weight_maps_all = {}

    roi_masks = {}
    for roi_name in roi_names:
        try:
            mask_img = find_nii_or_niigz(os.path.join(roi_dir, roi_name))
            roi_masks[roi_name] = mask_img
        except FileNotFoundError as e:
            print(f"ROI not found: {roi_name}\n  {e}")

    for subject_id in subject_list:
        print(f"\n{'-'*50}")
        bids_func_dir = os.path.join(bids_dir, f"sub-{subject_id}", "func")
        fmriprep_func_dir = os.path.join(fmriprep_dir, f"sub-{subject_id}", "func")

        if not os.path.isdir(bids_func_dir):
            print(f"Skipping sub-{subject_id}, directory not found: {bids_func_dir}")
            continue

        visual_events, conditions = load_visual_events_by_run(bids_func_dir, subject_id, task_name)
        bold_images = load_bold_by_run(fmriprep_func_dir, subject_id, task_name)

        visual_events_all[subject_id] = visual_events
        conditions_all[subject_id] = conditions
        bold_images_all[subject_id] = bold_images
        contrasts_all[subject_id] = {}

        for roi_name, mask_img in roi_masks.items():
            print(f"\nComputing contrasts for sub-{subject_id} in ROI={roi_name}...")
            try:
                # GLM
                z_maps, cond_labels, run_labels = compute_contrasts_by_run(
                    subject_id, roi_name, mask_img, bold_images, visual_events, unique_runs
                )

                contrasts_all[subject_id][roi_name] = {
                    'z_maps': z_maps,
                    'condition_labels': cond_labels,
                    'run_labels': run_labels,
                }

                if debug:
                    print(f"--DEBUG Conditions: {cond_labels}")
                    print(f"--DEBUG Runs: {run_labels}")

                # Decoder
                accuracy, chance, accuracy_scores, cm, model = fit_and_evaluate_decoder(
                    z_maps, mask_img, cond_labels, run_labels
                )

                if debug:
                    print(f"--DEBUG fit_and_evaluate_decoder: {(accuracy, chance, accuracy_scores, cm, model)}")

                decoding_results[(subject_id, roi_name)] = {
                    'accuracy': accuracy,
                    'chance_level': chance,
                    'accuracy_scores': accuracy_scores,
                    'confusion_matrix': cm,
                    'conditions': np.unique(cond_labels).tolist()  # keep the order
                }

                # saving weight maps
                if save_weight_maps:
                    try:
                        output_prefix = os.path.join(
                            output_dir,
                            f"sub-{subject_id}_roi-{roi_name}"
                        )
                        save_decoder_weights(model, mask_img, output_prefix, class_labels=np.unique(cond_labels).tolist())
                    except Exception as e:
                        print(f"Failed to save weights for sub-{subject_id}, ROI={roi_name}\n  {e}")

            except Exception as e:
                print(f"Failed for sub-{subject_id}, ROI={roi_name}\n  {e}")

        # cleaning memory to avoid heat
        del visual_events, conditions, bold_images
        del z_maps, cond_labels, run_labels
        gc.collect()

    return (
        visual_events_all,
        conditions_all,
        bold_images_all,
        contrasts_all,
        decoding_results,
        weight_maps_all
    )

(the script is quite long but I can provide it full if needed)

Do you know how to call joblib correctly in my specific case? Thank you.

Best,
Matthieu

Sorry, but I don’t find any call to Parallel/Delayed function of joblib. You need to call them explicitly to use joblib parallelization.
Sorry if I misunderstood.
Best,
Bertrand

Thanks for your answer. Sorry, I wasn’t clear. I presented a part of my script which doesn’t include the joblib call, on purpose. Here is how I tried to call it:

def fit_and_evaluate_decoder(
    z_maps, mask_img, conditions_label, run_label,
    screening_percentile=5, C=1.0, n_leave_out=1, n_jobs_cv=1
):
    """Fit and evaluate a decoder with feature screening and linear SVM.

    Parameters
    ----------
    z_maps : array
        List or array of z-maps per condition/run.
    mask_img : Niimg-like
        Brain mask.
    conditions_label : array
        Condition labels (y).
    run_label : array
        Run IDs (used as groups in CV).
    screening_percentile : int
        Percentile of most informative voxels selected.
    C : float
        Regularization strength for SVC.
    n_leave_out : int
        Number of groups to leave out during cross-validation.
    n_jobs_cv : int
        Number of parallel jobs for CV (-1 = all cores).

    Returns
    -------
    mean_accuracy : float
    chance_level : float
    accuracy_scores : list of float
    cm : ndarray
    best_model : sklearn.pipeline.Pipeline
        Last model from CV (not retrained on full set).
    """
    print(f"\nFitting scikit-learn decoder with L{n_leave_out}O, screening={screening_percentile}%, C={C}...")

    z_maps = np.array(z_maps)
    conditions_label = np.array(conditions_label)
    run_label = np.array(run_label)

    z_maps_reshaped = z_maps.reshape(-1, z_maps.shape[-1])
    masked_data = apply_mask(z_maps_reshaped, mask_img)

    logo = LeavePGroupsOut(n_groups=n_leave_out)
    cv_splits = list(logo.split(masked_data, conditions_label, groups=run_label))

    def fit_and_predict(train_idx, test_idx):
        model = make_pipeline(
            StandardScaler(),
            SelectPercentile(f_classif, percentile=screening_percentile),
            SVC(kernel='linear', C=C)
        )
        model.fit(masked_data[train_idx], conditions_label[train_idx])
        y_pred = model.predict(masked_data[test_idx])
        y_true = conditions_label[test_idx]
        score = accuracy_score(y_true, y_pred)
        return y_true, y_pred, score, model

    with parallel_backend("loky"):
        results = Parallel(n_jobs=n_jobs_cv)(
            delayed(fit_and_predict)(train_idx, test_idx)
            for train_idx, test_idx in cv_splits
        )

    all_y_true, all_y_pred, accuracy_scores = [], [], []
    best_model = None
    for y_true, y_pred, score, model in results:
        all_y_true.extend(y_true)
        all_y_pred.extend(y_pred)
        accuracy_scores.append(score)
        best_model = model  # last one

    mean_accuracy = np.mean(accuracy_scores)
    std_accuracy = np.std(accuracy_scores)
    chance_level = 1.0 / len(np.unique(conditions_label))
    cm = confusion_matrix(all_y_true, all_y_pred)

    print(f"    - Accuracy: {mean_accuracy:.4f} ± {std_accuracy:.4f}")
    print(f"    - Chance level: {chance_level:.4f}")
    print(f"    - Confusion Matrix:\n{cm}")

    return mean_accuracy, chance_level, accuracy_scores, cm, best_model

(…)

def main():
    visual_events_all = {}
    conditions_all = {}
    bold_images_all = {}
    contrasts_all = {}
    decoding_results = {}
    weight_maps_all = {}

    roi_masks = {}
    for roi_name in roi_names:
        try:
            mask_img = find_nii_or_niigz(os.path.join(roi_dir, roi_name))
            roi_masks[roi_name] = mask_img
        except FileNotFoundError as e:
            print(f"ROI not found: {roi_name}\n  {e}")

    for subject_id in subject_list:
        print(f"\n{'-'*50}\nSubject: {subject_id}")
        bids_func_dir = os.path.join(bids_dir, f"sub-{subject_id}", "func")
        fmriprep_func_dir = os.path.join(fmriprep_dir, f"sub-{subject_id}", "func")

        if not os.path.isdir(bids_func_dir):
            print(f"Skipping sub-{subject_id}, directory not found: {bids_func_dir}")
            continue

        visual_events, conditions = load_visual_events_by_run(bids_func_dir, subject_id, task_name)
        bold_images = load_bold_by_run(fmriprep_func_dir, subject_id, task_name)

        visual_events_all[subject_id] = visual_events
        conditions_all[subject_id] = conditions
        bold_images_all[subject_id] = bold_images
        contrasts_all[subject_id] = {}

        for roi_name, mask_img in roi_masks.items():
            print(f"\nComputing contrasts for sub-{subject_id}, ROI={roi_name}...")
            try:
                z_maps, cond_labels, run_labels = compute_contrasts_by_run(
                    subject_id, roi_name, mask_img, bold_images, visual_events, unique_runs
                )

                contrasts_all[subject_id][roi_name] = {
                    'z_maps': z_maps,
                    'condition_labels': cond_labels,
                    'run_labels': run_labels,
                }

                accuracy, chance, accuracy_scores, cm, model = fit_and_evaluate_decoder(
                    z_maps, mask_img, cond_labels, run_labels,
                    screening_percentile=screening_percentile,
                    C=C, n_leave_out=n_leave_out, n_jobs_cv=n_jobs_decoder
                )

                decoding_results[(subject_id, roi_name)] = {
                    'accuracy': accuracy,
                    'chance_level': chance,
                    'accuracy_scores': accuracy_scores,
                    'confusion_matrix': cm,
                    'conditions': np.unique(cond_labels).tolist()
                }

                if save_weight_maps:
                    try:
                        output_prefix = os.path.join(output_dir, f"sub-{subject_id}_roi-{roi_name}")
                        save_decoder_weights(model, mask_img, output_prefix, class_labels=np.unique(cond_labels).tolist())
                    except Exception as e:
                        print(f"Failed to save weights for sub-{subject_id}, ROI={roi_name}\n  {e}")

                del z_maps, cond_labels, run_labels, model, cm
                gc.collect()

            except Exception as e:
                print(f"Failed for sub-{subject_id}, ROI={roi_name}\n  {e}")

        del visual_events, conditions, bold_images
        gc.collect()

    return (
        visual_events_all,
        conditions_all,
        bold_images_all,
        contrasts_all,
        decoding_results,
        weight_maps_all
    )

And here is some print I obtain. From other scripts, this specific part seems very verbose but fine, and a mark that n_jobs is called:

Exception ignored in: <function ResourceTracker.del at 0x7fa1dd5c39c0>
Traceback (most recent call last):
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 82, in del
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 91, in _stop
File “/home/touhara/anaconda3/envs/nilearn/lib/python3.13/multiprocessing/resource_tracker.py”, line 116, in _stop_locked
ChildProcessError: [Errno 10] No child processes

But my Activity Monitor shows that only 1 core is used 100% at a time.

Thank you.
Matthieu