Source code for AFQ.recognition.other_bundles

import numpy as np
import nibabel as nib
import logging

import dipy.tracking.utils as dtu
import dipy.tracking.streamline as dts

from scipy.spatial.distance import cdist
[docs]logger = logging.getLogger('AFQ')
[docs]def clean_by_other_density_map(this_bundle_sls, other_bundle_sls, node_thresh, img): """ Cleans a set of streamlines by removing those with significant overlap with another set of streamlines. Parameters ---------- this_bundle_sls : array-like A list or array of streamlines to be cleaned. other_bundle_sls : array-like A reference list or array of streamlines to determine overlapping regions. node_thresh : int The maximum number of nodes allowed to overlap between `this_bundle_sls` and `other_bundle_sls`. Streamlines with overlaps beyond this threshold are removed. img : nibabel.Nifti1Image or ndarray A reference 3D image that defines the spatial dimensions for the density map. Returns ------- cleaned_idx : ndarray of bool An array of boolean values indicating which streamlines from `this_bundle_sls` pass the overlap threshold (True for streamlines to keep, False for streamlines to discard). Notes ----- This function computes a density map from `other_bundle_sls` to represent the spatial occupancy of the streamlines. It then calculates the probability of each streamline in `this_bundle_sls` overlapping with this map. Streamlines that overlap in more than `node_thresh` nodes are flagged for removal. Examples -------- >>> clean_idx = clean_by_other_density_map(bundle1, bundle2, 5, img) >>> cleaned_bundle = [s for i, s in enumerate(bundle1) if clean_idx[i]] """ other_bundle_density_map = dtu.density_map( other_bundle_sls, np.eye(4), img.shape[:3]) fiber_probabilities = dts.values_from_volume( other_bundle_density_map, this_bundle_sls, np.eye(4)) cleaned_idx = np.zeros(len(this_bundle_sls), dtype=np.bool8) for ii, fp in enumerate(fiber_probabilities): cleaned_idx[ii] = np.sum(np.asarray(fp) >= 1) <= node_thresh return cleaned_idx
[docs]def clean_relative_to_other_core(core, this_fgarray, other_fgarray, affine): """ Removes streamlines from a set that lie on the opposite side of a specified core axis compared to another set of streamlines. Parameters ---------- core : {'anterior', 'posterior', 'superior', 'inferior', 'right', 'left'} The anatomical axis used to define the core direction. This determines the side of the core from which streamlines in `this_fgarray` are retained. this_fgarray : ndarray An array of streamlines to be cleaned. other_fgarray : ndarray An array of reference streamlines to define the core. affine : ndarray The affine transformation matrix. Returns ------- cleaned_idx_core : ndarray of bool An array of boolean values indicating which streamlines in `this_fgarray` lie on the correct side of the core (True for streamlines to keep, False for streamlines to discard). Notes ----- This function first calculates the median streamline of `other_fgarray`, which acts as the core line. It then determines whether each streamline in `this_fgarray` is on the specified side of this core, based on the specified anatomical axis (`core`). Streamlines on the opposite side are flagged for removal. Examples -------- >>> cleaned_core_idx = clean_relative_to_other_core('anterior', ... streamlines1, ... streamlines2, ... np.eye(4)) >>> cleaned_streamlines = [s for i, s in enumerate(streamlines1) ... if cleaned_core_idx[i]] """ if len(other_fgarray) == 0: logger.warning("Cleaning relative to core skipped, no core found.") return np.ones(this_fgarray.shape[0], dtype=np.bool8) # find dimension of core axis orientation = nib.orientations.aff2axcodes(affine) core_axis = None core_upper = core[0].upper() axis_groups = { 'L': ('L', 'R'), 'R': ('L', 'R'), 'P': ('P', 'A'), 'A': ('P', 'A'), 'I': ('I', 'S'), 'S': ('I', 'S'), } direction_signs = { 'L': 1, 'R': -1, 'P': 1, 'A': -1, 'I': 1, 'S': -1, } core_axis = None for idx, axis_label in enumerate(orientation): if core_upper in axis_groups[axis_label]: core_axis = idx core_direc = direction_signs[core_upper] break if core_axis is None: raise ValueError(f"Invalid core axis: {core}") core_bundle = np.median(other_fgarray, axis=0) cleaned_idx_core = np.zeros(this_fgarray.shape[0], dtype=np.bool8) for ii, sl in enumerate(this_fgarray): dist_matrix = cdist(core_bundle, sl, 'sqeuclidean') min_dist_indices = np.unravel_index(np.argmin(dist_matrix), dist_matrix.shape) closest_core = core_bundle[min_dist_indices[0], core_axis] closest_sl = sl[min_dist_indices[1], core_axis] cleaned_idx_core[ii] = closest_sl < closest_core if core_direc == -1 \ else closest_sl > closest_core return cleaned_idx_core