import logging
import dipy.tracking.streamline as dts
import dipy.tracking.utils as dtu
import nibabel as nib
import numpy as np
from scipy.spatial.distance import cdist
import AFQ.recognition.utils as abu
[docs]
logger = logging.getLogger("AFQ")
[docs]
def clean_by_overlap(
this_bundle_sls,
other_bundle_sls,
overlap,
img,
remove=False,
project=None,
other_bundle_min_density=0.05,
):
"""
Cleans a set of streamlines by only keeping (or removing) those with
significant overlap with another set of streamlines.
Parameters
----------
this_bundle_sls : array-like
A list or array of streamlines to be cleaned.
Assumed to be in RASMM space.
other_bundle_sls : array-like
A reference list or array of streamlines to determine overlapping regions.
overlap : int
The minimum number of nodes allowed to overlap between `this_bundle_sls`
and `other_bundle_sls`. Streamlines with overlaps beyond this threshold
are removed.
img : nibabel.Nifti1Image
A reference 3D image that defines the spatial dimensions for the density
map.
remove : bool, optional
If True, streamlines that overlap in less than `overlap` nodes are
removed. If False, streamlines that overlap in more than `overlap` nodes
are removed.
Default: False.
project : {'A/P', 'I/S', 'L/R', None}, optional
If specified, the overlap calculation is projected along the given axis
before cleaning. For example, 'A/P' projects the streamlines along the
anterior-posterior axis.
Default: None.
other_bundle_min_density : float, optional
A threshold to binarize the density map of `other_bundle_sls`. Voxels
with density values above this threshold (as a fraction of the maximum
density) are considered occupied.
Default: 0.05.
Returns
-------
cleaned_idx : ndarray of bool
An array of boolean values indicating which streamlines from
`this_bundle_sls` pass the overlap threshold (True for streamlines to
keep, False for streamlines to discard).
Notes
-----
This function computes a density map from `other_bundle_sls` to represent
the spatial occupancy of the streamlines. It then calculates the probability
of each streamline in `this_bundle_sls` overlapping with this map.
Streamlines that overlap in less than `overlap` nodes are flagged for
removal (or more, if remove is True).
Examples
--------
>>> clean_idx = clean_by_overlap(bundle1, bundle2, 5, img, True)
>>> cleaned_bundle = [s for i, s in enumerate(bundle1) if clean_idx[i]]
"""
other_bundle_density_map = dtu.density_map(
other_bundle_sls, img.affine, img.shape[:3]
)
if remove:
max_val = other_bundle_density_map.max()
if max_val > 0:
other_bundle_density_map = (
other_bundle_density_map / max_val
) > other_bundle_min_density
else:
other_bundle_density_map = np.zeros_like(
other_bundle_density_map, dtype=bool
)
if project is not None:
orientation = nib.orientations.aff2axcodes(img.affine)
core_axis = next(
idx for idx, label in enumerate(orientation) if label in project.upper()
)
projection = np.sum(other_bundle_density_map, axis=core_axis)
other_bundle_density_map = np.broadcast_to(
np.expand_dims(projection, axis=core_axis), other_bundle_density_map.shape
)
fiber_probabilities = dts.values_from_volume(
other_bundle_density_map, this_bundle_sls, img.affine
)
cleaned_idx = np.zeros(len(this_bundle_sls), dtype=np.bool_)
for ii, fp in enumerate(fiber_probabilities):
if remove:
cleaned_idx[ii] = np.sum(np.asarray(fp) >= 1) <= overlap
else:
cleaned_idx[ii] = np.sum(np.asarray(fp) >= 1) > overlap
return cleaned_idx
[docs]
def clean_relative_to_other_core(
core,
this_fgarray,
other_fgarray,
consideration,
):
"""
Removes streamlines from a set that lie on the opposite side of a specified
core axis compared to another set of streamlines.
Parameters
----------
core : {'anterior', 'posterior', 'superior', 'inferior', 'right', 'left'}
The anatomical axis used to define the core direction. This determines
the side of the core from which streamlines in `this_fgarray` are
retained.
this_fgarray : ndarray
An array of streamlines to be cleaned.
Assumed to be in RASMM space.
other_fgarray : ndarray
An array of reference streamlines to define the core.
Assumed to be in RASMM space.
consideration : float or string, optional
If float, the distance threshold (in voxels) for considering a
streamline's position relative to the core. All points on
the streamline within distance from the core are considered
when determining if the streamline lies on the correct side.
If no points are within consideration distance, the closest point
on the streamline to the core is considered.
If string, must be one of 'entire' or 'closest'.
If 'entire', the entire streamline must lie on the correct
side of the core to be retained.
If 'closest', only the closest point on the streamline to
the core is considered.
Returns
-------
cleaned_idx_core : ndarray of bool
An array of boolean values indicating which streamlines in `this_fgarray`
lie on the correct side of the core (True for streamlines to keep, False
for streamlines to discard).
Notes
-----
This function first calculates the median streamline of `other_fgarray`,
which acts as the core line. It then determines whether each streamline in
`this_fgarray` is on the specified side of this core, based on the specified
anatomical axis (`core`). Streamlines on the opposite side are flagged for
removal.
Examples
--------
>>> cleaned_core_idx = clean_relative_to_other_core('anterior',
... streamlines1,
... streamlines2,
... np.eye(4))
>>> cleaned_streamlines = [s for i, s in enumerate(streamlines1)
... if cleaned_core_idx[i]]
"""
if len(other_fgarray) == 0:
logger.warning("Cleaning relative to core skipped, no core found.")
return np.ones(this_fgarray.shape[0], dtype=np.bool_)
# find dimension of core axis
core_axis = abu.axes_dict[core[0].upper()]
direction_signs = {
"L": 1,
"R": -1,
"P": 1,
"A": -1,
"I": 1,
"S": -1,
}
core_direc = direction_signs[core[0].upper()]
core_bundle = np.median(other_fgarray, axis=0)
cleaned_idx_core = np.zeros(this_fgarray.shape[0], dtype=np.bool_)
def _consideration_closest(sl):
dist_matrix = cdist(core_bundle, sl, "sqeuclidean")
min_dist_indices = np.unravel_index(np.argmin(dist_matrix), dist_matrix.shape)
closest_core = core_bundle[min_dist_indices[0], core_axis]
closest_sl = sl[min_dist_indices[1], core_axis]
return core_direc * (closest_sl - closest_core) > 0
for ii, sl in enumerate(this_fgarray):
if isinstance(consideration, float):
dist_matrix = cdist(core_bundle, sl, "sqeuclidean")
closest_core_indices = np.argmin(dist_matrix, axis=0)
min_dists_sq = np.min(dist_matrix, axis=0)
within_threshold = min_dists_sq < consideration**2
if np.any(within_threshold):
relevant_sl_pts = sl[within_threshold, core_axis]
relevant_core_pts = core_bundle[
closest_core_indices[within_threshold], core_axis
]
cleaned_idx_core[ii] = np.all(
core_direc * (relevant_sl_pts - relevant_core_pts) > 0
)
else:
cleaned_idx_core[ii] = _consideration_closest(sl)
elif consideration == "entire":
cleaned_idx_core[ii] = np.all(
core_direc * (sl[:, core_axis] - core_bundle[:, core_axis]) > 0
)
elif consideration == "closest":
cleaned_idx_core[ii] = _consideration_closest(sl)
else:
raise ValueError(
"Invalid value for consideration. Must be a "
"float or one of 'entire' or 'closest'. You have provided: "
f"{consideration}"
)
return cleaned_idx_core