Source code for AFQ.tasks.data

import logging

import dipy.core.gradients as dpg
import dipy.reconst.dki as dpy_dki
import dipy.reconst.dti as dpy_dti
import dipy.reconst.fwdti as dpy_fwdti
import dipy.reconst.msdki as dpy_msdki
import immlib
import nibabel as nib
import numpy as np
from dipy.align import resample
from dipy.data import get_sphere
from dipy.io.gradients import read_bvals_bvecs
from dipy.reconst import shm
from dipy.reconst.dki_micro import axonal_water_fraction
from dipy.reconst.gqi import GeneralizedQSamplingModel
from dipy.reconst.rumba import RumbaSDModel

import AFQ.api.bundle_dict as abd
import AFQ.data.fetch as afd
from AFQ._fixes import gwi_odf
from AFQ.definitions.utils import Definition
from AFQ.models.asym_filtering import (
    compute_asymmetry_index,
    compute_odd_power_map,
    unified_filtering,
)
from AFQ.models.csd import CsdNanResponseError
from AFQ.models.csd import _fit as csd_fit_model
from AFQ.models.dki import _fit as dki_fit_model
from AFQ.models.dti import _fit as dti_fit_model
from AFQ.models.dti import noise_from_b0
from AFQ.models.fwdti import _fit as fwdti_fit_model
from AFQ.models.QBallTP import anisotropic_index, anisotropic_power
from AFQ.tasks.decorators import as_file, as_fit_deriv, as_img
from AFQ.tasks.utils import with_name

[docs] logger = logging.getLogger("AFQ")
[docs] DIPY_GH = "https://github.com/dipy/dipy/blob/master/dipy/"
[docs] def _or_to_text(outlier_rejection_method): if outlier_rejection_method is True: return "RT" elif outlier_rejection_method is False: return "None" else: return str(outlier_rejection_method)
@immlib.calc("data", "gtab", "dwi", "dwi_affine")
[docs] def get_data_gtab( dwi_data_file, bval_file, bvec_file, min_bval=-np.inf, max_bval=np.inf, b0_threshold=50, ): """ DWI data as an ndarray for selected b values, A DIPY GradientTable with all the gradient information, DWI data in a Nifti1Image, and the affine transformation of the DWI data Parameters ---------- min_bval : float, optional Minimum b value you want to use from the dataset (other than b0), inclusive. If None, there is no minimum limit. Default: -np.inf max_bval : float, optional Maximum b value you want to use from the dataset (other than b0), inclusive. If None, there is no maximum limit. Default: np.inf b0_threshold : int, optional The value of b under which it is considered to be b0. Default: 50 """ img = nib.load(dwi_data_file) bvals, bvecs = read_bvals_bvecs(bval_file, bvec_file) data = img.get_fdata() valid_b = np.logical_or( np.logical_and(bvals >= min_bval, bvals <= max_bval), bvals <= b0_threshold ) data = data[..., valid_b] bvals = bvals[valid_b] bvecs = bvecs[valid_b] gtab = dpg.gradient_table(bvals=bvals, bvecs=bvecs, b0_threshold=b0_threshold) img = nib.Nifti1Image(data, img.affine) return data, gtab, img, img.affine
@immlib.calc("b0") @as_file("_b0ref.nii.gz") @as_img
[docs] def b0(dwi, gtab): """ full path to a nifti file containing the mean b0 """ mean_b0 = np.mean(dwi.get_fdata()[..., gtab.b0s_mask], -1) meta = dict(b0_threshold=gtab.b0_threshold) return mean_b0, meta
@immlib.calc("t1w_over_b0") @as_file("_desc-T1wOverB0.nii.gz") @as_img
[docs] def t1w_over_b0(structural_imap, b0, citations, min_b0_for_r1_approximation=1e-2): """ full path to a nifti file containing the T1w over mean b0 which is a proxy for R1 [1]_ Parameters ---------- min_b0_for_r1_approximation : float, optional The minimum value of b0 to consider when doing the division. This is to avoid dividing by small numbers. Default: 1e-2 References ---------- .. [1] Moskovich, S., Shtangel, O., & Mezer, A. (2024). Approximating R1 and R2: A quantitative approach to clinical weighted MRI. Hum. Brain Mapp., 45(18), e70102. """ citations.add("Moskovich2024-qd") t1_img = nib.load(structural_imap["t1_masked"]) b0_img = nib.load(b0) resampled_t1 = resample(t1_img, b0_img) data = np.zeros_like(resampled_t1.get_fdata(), dtype=float) data = np.divide( resampled_t1.get_fdata(), b0_img.get_fdata(), out=data, where=b0_img.get_fdata() >= min_b0_for_r1_approximation, ) meta = dict(T1w=structural_imap["t1_masked"], b0=b0) return data, meta
@immlib.calc("masked_b0") @as_file("_desc-masked_b0ref.nii.gz") @as_img
[docs] def b0_mask(b0, brain_mask): """ full path to a nifti file containing the mean b0 after applying the brain mask """ img = nib.load(b0) brain_mask = nib.load(brain_mask).get_fdata().astype(bool) masked_data = img.get_fdata() masked_data[~brain_mask] = 0 meta = dict(source=b0, masked=True) return masked_data, meta
@immlib.calc("dti_tf")
[docs] def dti_fit(dti_params, gtab): """DTI TensorFit object""" dti_params = nib.load(dti_params).get_fdata() tm = dpy_dti.TensorModel(gtab) evals, evecs = dpy_dti.decompose_tensor(dpy_dti.from_lower_triangular(dti_params)) evecs = np.reshape(evecs, (evecs.shape[0], evecs.shape[1], evecs.shape[2], -1)) return dpy_dti.TensorFit(tm, np.concatenate([evals, evecs], -1))
@immlib.calc("dti_params", "dti_s0") @as_file( suffix=[ "_model-tensor_param-diffusivity_dwimap.nii.gz", "_model-tensor_param-s0_dwimap.nii.gz", ], subfolder="models", ) @as_img
[docs] def dti_params( brain_mask, data, gtab, bval_file, bvec_file, citations, b0_threshold=50, robust_tensor_fitting=False, ): """ full path to a nifti file containing parameters for the DTI fit, s0 values of DTI fit Parameters ---------- robust_tensor_fitting : bool, optional Whether to use robust_tensor_fitting when doing dti. Only applies to dti. Default: False b0_threshold : int, optional The value of b under which it is considered to be b0. Default: 50. """ citations.add("basser1994mr") mask = nib.load(brain_mask).get_fdata() if robust_tensor_fitting: bvals, _ = read_bvals_bvecs(bval_file, bvec_file) sigma = noise_from_b0(data, gtab, bvals, mask=mask, b0_threshold=b0_threshold) fit_method = "RT" else: sigma = None fit_method = "WLS" dtf = dti_fit_model(gtab, data, mask=mask, sigma=sigma, return_S0_hat=True) meta = dict( Description="Diffusion Coefficient, encoded as a tensor representation", Units="mm^2/s", Model=dict( Parameters=dict( FitMethod=fit_method, OutlierRejectionMethod=_or_to_text(robust_tensor_fitting), ), ModelURL=f"{DIPY_GH}reconst/dti.py", ), OrientationEncoding=dict( EncodingAxis=3, Reference="ijk", TensorRank=2, Type="tensor" ), ) meta_s0 = dict( Description="Estimated signal intensity with no diffusion weighting, ie. S0", Model=dict( Description="Diffusion Tensor", Parameters=dict( FitMethod=fit_method, OutlierRejectionMethod=_or_to_text(robust_tensor_fitting), ), ), ) return [(dtf.lower_triangular(), meta), (dtf.model_S0, meta_s0)]
@immlib.calc("fwdti_tf")
[docs] def fwdti_fit(fwdti_params, gtab): """Free-water DTI TensorFit object""" fwdti_params = nib.load(fwdti_params).get_fdata() fwtm = dpy_fwdti.FreeWaterTensorModel(gtab) return dpy_fwdti.FreeWaterTensorFit(fwtm, fwdti_params)
@immlib.calc("fwdti_params") @as_file(suffix="_model-fwtensor_param-diffusivity_dwimap.nii.gz", subfolder="models") @as_img
[docs] def fwdti_params(brain_mask, data, gtab, citations): """ Full path to a nifti file containing parameters for the free-water DTI fit. """ citations.add("henriques2017re") mask = nib.load(brain_mask).get_fdata() dtf = fwdti_fit_model(data, gtab, mask=mask) meta = dict( Description=("Diffusion Coefficient, with free-water elimination"), Units="mm^2/s", Model=dict( Parameters=dict(FitMethod="NLS"), ModelURL=f"{DIPY_GH}reconst/fwdti.py", ), OrientationEncoding=dict( EncodingAxis=3, Reference="ijk", TensorRank=2, Type="tensor" ), ) return dtf.model_params, meta
@immlib.calc("dki_tf")
[docs] def dki_fit(dki_params, gtab): """DKI DiffusionKurtosisFit object""" dki_params = nib.load(dki_params).get_fdata() tm = dpy_dki.DiffusionKurtosisModel(gtab) return dpy_dki.DiffusionKurtosisFit(tm, dki_params)
@immlib.calc("dki_params", "dki_s0") @as_file( suffix=[ "_model-kurtosis_param-diffusivity_dwimap.nii.gz", "_model-kurtosis_param-s0_dwimap.nii.gz", ], subfolder="models", ) @as_img
[docs] def dki_params(brain_mask, gtab, data, citations): """ full path to a nifti file containing parameters for the DKI fit, s0 values of DKI fit """ citations.add("henriques2021diffusional") if len(dpg.unique_bvals_magnitude(gtab.bvals)) < 3: raise ValueError( ( "The DKI model requires at least 2 non-zero b-values, " f"but you provided {len(dpg.unique_bvals_magnitude(gtab.bvals))}" " b-values (including b=0)." ) ) mask = nib.load(brain_mask).get_fdata() dkf = dki_fit_model(gtab, data, mask=mask, return_S0_hat=True) meta = dict( Description=( "Diffusion Coefficient, encoded as a kurtosis tensor representation" ), Units="mm^2/s", Model=dict( Parameters=dict(FitMethod="wls", OutlierRejectionMethod=_or_to_text(False)), ModelURL=f"{DIPY_GH}reconst/dki.py", ), OrientationEncoding=dict( EncodingAxis=3, Reference="ijk", TensorRank=4, Type="tensor" ), ) meta_s0 = dict( Description="Estimated signal intensity with no diffusion weighting, ie. S0", Model=dict( Description="Diffusion Kurtosis Tensor", Parameters=dict( FitMethod="wls", OutlierRejectionMethod=_or_to_text(False), ), ), ) return [(dkf.model_params, meta), (dkf.model_S0, meta_s0)]
@immlib.calc("msdki_tf")
[docs] def msdki_fit(msdki_params, gtab): """Mean Signal DKI DiffusionKurtosisFit object""" msdki_params = nib.load(msdki_params).get_fdata() tm = dpy_msdki.MeanDiffusionKurtosisModel(gtab) return dpy_msdki.MeanDiffusionKurtosisFit(tm, msdki_params)
@immlib.calc("msdki_params", "msdki_s0") @as_file( suffix=[ "_model-mskurtosis_param-diffusivity_dwimap.nii.gz", "_model-mskurtosis_param-s0_dwimap.nii.gz", ], subfolder="models", ) @as_img
[docs] def msdki_params(brain_mask, gtab, data, citations): """ full path to a nifti file containing parameters for the Mean Signal DKI fit, s0 values of Mean Signal DKI fit """ citations.add("neto2018advanced") mask = nib.load(brain_mask).get_fdata() msdki_model = dpy_msdki.MeanDiffusionKurtosisModel(gtab, return_S0_hat=True) msdki_fit = msdki_model.fit(data, mask=mask) meta = dict( Description=( "Diffusion Coefficient, encoded as a kurtosis tensor representation" ), Units="mm^2/s", Model=dict( Description="Mean Signal Diffusion Kurtosis Tensor", Parameters=dict(FitMethod="wls", OutlierRejectionMethod=_or_to_text(False)), ModelURL=f"{DIPY_GH}reconst/msdki.py", ), OrientationEncoding=dict( EncodingAxis=3, Reference="ijk", TensorRank=4, Type="tensor" ), ) meta_s0 = dict( Description="Estimated signal intensity with no diffusion weighting, ie. S0", Model=dict( Description="Mean Signal Diffusion Kurtosis Tensor", Parameters=dict( FitMethod="wls", OutlierRejectionMethod=_or_to_text(False), ), ), ) return [(msdki_fit.model_params, meta), (msdki_fit.model_S0, meta_s0)]
@immlib.calc("msdki_msd") @as_file("_model-mskurtosis_param-msd_dwimap.nii.gz", subfolder="models") @as_fit_deriv("MSDKI")
[docs] def msdki_msd(msdki_tf): """ full path to a nifti file containing the MSDKI mean signal diffusivity """ return msdki_tf.msd, {"Description": "Mean Signal Diffusivity"}
@immlib.calc("msdki_msk") @as_file("_model-mskurtosis_param-msk_dwimap.nii.gz", subfolder="models") @as_fit_deriv("MSDKI")
[docs] def msdki_msk(msdki_tf): """ full path to a nifti file containing the MSDKI mean signal kurtosis """ return msdki_tf.msk, {"Description": "Mean Signal Kurtosis"}
@immlib.calc("csd_params") @as_file(suffix="_model-csd_param-wm_dwimap.nii.gz", subfolder="models") @as_img
[docs] def csd_params( dwi, brain_mask, gtab, data, citations, csd_response=None, csd_sh_order_max=None, csd_lambda_=1, csd_tau=0.1, csd_fa_thr=0.7, ): """ full path to a nifti file containing parameters for the CSD fit Parameters ---------- csd_response : tuple or None, optional. The response function to be used by CSD, as a tuple with two elements. The first is the eigen-values as an (3,) ndarray and the second is the signal value for the response function without diffusion-weighting (i.e. S0). If not provided, auto_response will be used to calculate these values. Default: None csd_sh_order_max : int or None, optional. If None, infer the number of parameters from the number of data volumes, but no larger than 8. Default: None csd_lambda_ : float, optional. weight given to the constrained-positivity regularization part of the deconvolution equation. Default: 1 csd_tau : float, optional. threshold controlling the amplitude below which the corresponding fODF is assumed to be zero. Ideally, tau should be set to zero. However, to improve the stability of the algorithm, tau is set to tau*100 percent of the mean fODF amplitude (here, 10 percent by default) (see [1]_). Default: 0.1 csd_fa_thr : float, optional. The threshold on the FA used to calculate the single shell auto response. Can be useful to reduce for baby subjects. Default: 0.7 References ---------- .. [1] Tournier, J.D., et al. NeuroImage 2007. Robust determination of the fibre orientation distribution in diffusion MRI: Non-negativity constrained super-resolved spherical deconvolution """ citations.add("tournier2007robust") mask = nib.load(brain_mask).get_fdata() try: csdm, csdf, sh_order_max = csd_fit_model( gtab, data, mask=mask, response=csd_response, sh_order_max=csd_sh_order_max, lambda_=csd_lambda_, tau=csd_tau, csd_fa_thr=csd_fa_thr, ) except CsdNanResponseError as e: raise CsdNanResponseError( f"Could not compute CSD response function for file: {dwi}." ) from e meta_wm = dict( Model=dict( Description="Constrained Spherical Deconvolution (CSD)", URL="https://mrtrix.readthedocs.io/en/latest/constrained_spherical_deconvolution/constrained_spherical_deconvolution.html", ), Description="White matter", NonNegativity="constrained", OrientationEncoding=dict( EncodingAxis=3, Reference="xyz", SphericalHarmonicBasis="descoteaux", SphericalHarmonicDegree=sh_order_max, Type="sh", ), ResponseFunction=dict( Coefficients=csdm.response[0].tolist() + [csdm.response[1]], Type="eigen" ), ) return csdf.shm_coeff, meta_wm
@immlib.calc("csd_aodf_params") @as_file(suffix="_model-csd_param-aodf_dwimap.nii.gz", subfolder="models") @as_img
[docs] def csd_aodf(structural_imap, csd_params, tracking_params, citations): """ full path to a nifti file containing SSST CSD ODFs filtered by unified filtering [1] References ---------- [1] Poirier and Descoteaux, 2024, "A Unified Filtering Method for Estimating Asymmetric Orientation Distribution Functions", Neuroimage, https://doi.org/10.1016/j.neuroimage.2024.120516 """ citations.add("poirier2024unified") citations.add("renauld2026tractography") sh_coeff = nib.load(csd_params).get_fdata() logger.info("Applying unified filtering to generate asymmetric CSD ODFs...") aodf = unified_filtering( sh_coeff, get_sphere(tracking_params["sphere"]), n_threads=structural_imap["n_threads"], low_mem=structural_imap["low_mem"], ) return aodf, dict( Model=dict( Description=( "ODFs for Asymmetric Constrained Spherical Deconvolution (aCSD)" ), URL="https://doi.org/10.1016/j.neuroimage.2024.120516", ), Description="White matter", OrientationEncoding=dict( EncodingAxis=3, Reference="xyz", AntipodalSymmetry=False, Type="odf", Sphere=tracking_params["sphere"], ), Source=csd_params, )
@immlib.calc("csd_aodf_asi") @as_file(suffix="_model-csd_param-asi_dwimap.nii.gz", subfolder="models") @as_fit_deriv("CSD_AODF")
[docs] def csd_aodf_asi(csd_aodf_params, brain_mask): """ full path to a nifti file containing the CSD Asymmetric Index (ASI) [1] References ---------- [1] S. Cetin Karayumak, E. Özarslan, and G. Unal, "Asymmetric Orientation Distribution Functions (AODFs) revealing intravoxel geometry in diffusion MRI" Magnetic Resonance Imaging, vol. 49, pp. 145-158, Jun. 2018, doi: https://doi.org/10.1016/j.mri.2018.03.006. """ aodf = nib.load(csd_aodf_params).get_fdata() brain_mask = nib.load(brain_mask).get_fdata().astype(bool) asi = compute_asymmetry_index(aodf, brain_mask) return asi, {"Description": "Asymmetry Index"}
@immlib.calc("csd_aodf_opm") @as_file(suffix="_model-csd_param-opm_dwimap.nii.gz", subfolder="models") @as_fit_deriv("CSD_AODF")
[docs] def csd_aodf_opm(csd_aodf_params, brain_mask): """ full path to a nifti file containing the CSD odd-power map [1] References ---------- [1] C. Poirier, E. St-Onge, and M. Descoteaux, "Investigating the Occurrence of Asymmetric Patterns in White Matter Fiber Orientation Distribution Functions" [Abstract], In: Proc. Intl. Soc. Mag. Reson. Med. 29 (2021), 2021 May 15-20, Vancouver, BC, Abstract number 0865. """ aodf = nib.load(csd_aodf_params).get_fdata() brain_mask = nib.load(brain_mask).get_fdata().astype(bool) opm = compute_odd_power_map(aodf, brain_mask) return opm, {"Description": "Odd Power Map"}
@immlib.calc("csd_pmap") @as_file(suffix="_model-csd_param-apm_dwimap.nii.gz", subfolder="models") @as_fit_deriv("CSD")
[docs] def anisotropic_power_map(csd_params): """ full path to a nifti file containing the anisotropic power map """ sh_coeff = nib.load(csd_params).get_fdata() pmap = anisotropic_power(sh_coeff) return pmap, {"Description": "Anisotropic Power Map"}
@immlib.calc("csd_ai") @as_file(suffix="_model-csd_param-ai_dwimap.nii.gz", subfolder="models") @as_fit_deriv("CSD")
[docs] def csd_anisotropic_index(csd_params): """ full path to a nifti file containing the anisotropic index """ sh_coeff = nib.load(csd_params).get_fdata() AI = anisotropic_index(sh_coeff) return AI, {"Description": "Anisotropic Index"}
@immlib.calc("gq_params", "gq_iso") @as_file( suffix=[ "_model-gq_param-odf_dwimap.nii.gz", "_model-gq_param-iso_dwimap.nii.gz", ], subfolder="models", ) @as_img
[docs] def gq(gtab, data, tracking_params, citations, gq_sampling_length=1.2): """ full path to a nifti file containing ODF for the Generalized Q-Sampling, full path to a nifti file containing isotropic diffusion component Parameters ---------- gq_sampling_length : float Diffusion sampling length. Default: 1.2 """ citations.add("yeh2010generalized") gqmodel = GeneralizedQSamplingModel(gtab, sampling_length=gq_sampling_length) sphere = get_sphere(tracking_params["sphere"]) odf = gwi_odf(gqmodel, data, sphere) odf_norm = odf / odf.max() ISO = odf_norm.min(axis=-1) GQ_meta = dict( Model=dict( Description="Generalized Q-Sampling Imaging Model", URL="https://dsi-studio.labsolver.org/ref/GQI.pdf", ), Parameters=dict(SamplingLength=gq_sampling_length), ) odf_meta = GQ_meta.copy() odf_meta["Description"] = "ODF from Generalized Q-Sampling" iso_meta = GQ_meta.copy() iso_meta["Description"] = ( "Isotropic Diffusion Component from Generalized Q-Sampling" ) return [(odf, odf_meta), (ISO, iso_meta)]
@immlib.calc("rumba_params", "rumba_f_csf", "rumba_f_gm", "rumba_f_wm") @as_file( suffix=[ "_model-rumba_param-odf_dwimap.nii.gz", "_model-rumba_param-csf_probseg.nii.gz", "_model-rumba_param-gm_probseg.nii.gz", "_model-rumba_param-wm_probseg.nii.gz", ], subfolder="models", ) @as_img
[docs] def rumba_params( gtab, data, brain_mask, tracking_params, citations, rumba_wm_response=None, rumba_gm_response=0.0008, rumba_csf_response=0.003, rumba_n_iter=600, ): """ ODF for the RUMBA-SD model, full path to a nifti file containing the CSF volume fraction for each voxel, full path to a nifti file containing the GM volume fraction for each voxel, full path to a nifti file containing the white matter volume fraction for each voxel Parameters ---------- rumba_wm_response: 1D or 2D ndarray or AxSymShResponse. Able to take response[0] from auto_response_ssst. default: array([0.0017, 0.0002, 0.0002]) rumba_gm_response: float, optional Mean diffusivity for GM compartment. If None, then grey matter volume fraction is not computed. Default: 0.8e-3 rumba_csf_response: float, optional Mean diffusivity for CSF compartment. If None, then CSF volume fraction is not computed. Default: 3.0e-3 rumba_n_iter: int, optional Number of iterations for fODF estimation. Must be a positive int. Default: 600 """ citations.add("canales2015spherical") if rumba_wm_response is None: rumba_wm_response = np.asarray([0.0017, 0.0002, 0.0002]) else: rumba_wm_response = np.asarray(rumba_wm_response) rumba_model = RumbaSDModel( gtab, wm_response=rumba_wm_response, gm_response=rumba_gm_response, csf_response=rumba_csf_response, n_iter=rumba_n_iter, recon_type="smf", n_coils=1, R=1, voxelwise=False, use_tv=False, sphere=get_sphere(tracking_params["sphere"]), verbose=True, ) rumba_fit = rumba_model.fit(data, mask=nib.load(brain_mask).get_fdata()) odf = rumba_fit.odf(sphere=get_sphere(tracking_params["sphere"])) model_meta = dict( Description=( "Robust and Unbiased Model-Based Spherical Deconvolution (RUMBA-SD)" ), URL="https://doi.org/10.1371/journal.pone.0138910", ) odf_meta = dict( Model=model_meta, Description="ODF for the RUMBA-SD model", OrientationEncoding=dict( EncodingAxis=3, Reference="xyz", Type="odf", Sphere=tracking_params["sphere"], ), ResponseFunction=dict(Coefficients=rumba_wm_response.tolist(), Type="eigen"), ) meta_wm = dict( Model=model_meta, Description="White matter volume fraction", ) meta_gm = dict( Model=model_meta, Description="Gray matter volume fraction", ) meta_csf = dict( Model=model_meta, Description="Cerebro-spinal fluid volume fraction", ) return [ (odf, odf_meta), (rumba_fit.f_csf, meta_csf), (rumba_fit.f_gm, meta_gm), (rumba_fit.f_wm, meta_wm), ]
@immlib.calc("opdt_params", "opdt_gfa") @as_file( suffix=[ "_model-opdt_param-wm_dwimap.nii.gz", "_model-opdt_param-gfa_dwimap.nii.gz", ], subfolder="models", ) @as_img
[docs] def opdt_params(data, gtab, brain_mask, citations, opdt_sh_order_max=8): """ full path to a nifti file containing parameters for the Orientation Probability Density Transform shm_coeff, full path to a nifti file containing GFA Parameters ---------- opdt_sh_order_max : int Spherical harmonics order for OPDT model. Must be even. Default: 8 """ citations.add("tristan2009estimation") opdt_model = shm.OpdtModel(gtab, opdt_sh_order_max) opdt_fit = opdt_model.fit(data, mask=brain_mask) OPDT_meta = dict( Model=dict( Description="Orientation Probability Density Transform (OPDT)", URL="https://www.lpi.tel.uva.es/node/103", ), OrientationEncoding=dict( EncodingAxis=3, Reference="xyz", SphericalHarmonicBasis="descoteaux", SphericalHarmonicDegree=opdt_sh_order_max, Type="sh", ), ) shm_meta = OPDT_meta.copy() shm_meta["Description"] = "White Matter" gfa_meta = OPDT_meta.copy() gfa_meta["Description"] = "Generalized Fractional Anisotropy" return [(opdt_fit._shm_coef, shm_meta), (opdt_fit.gfa, gfa_meta)]
@immlib.calc("opdt_pmap") @as_file(suffix="_model-opdt_param-apm_dwimap.nii.gz", subfolder="models") @as_fit_deriv("OPDT")
[docs] def opdt_pmap(opdt_params): """ full path to a nifti file containing the anisotropic power map from OPDT """ sh_coeff = nib.load(opdt_params).get_fdata() pmap = anisotropic_power(sh_coeff) return pmap, {"Description": "Anisotropic Power Map"}
@immlib.calc("opdt_ai") @as_file(suffix="_model-opdt_param-ai_dwimap.nii.gz", subfolder="models") @as_fit_deriv("OPDT")
[docs] def opdt_ai(opdt_params): """ full path to a nifti file containing the anisotropic index from OPDT """ sh_coeff = nib.load(opdt_params).get_fdata() AI = anisotropic_index(sh_coeff) return AI, {"Description": "Anisotropic Index"}
@immlib.calc("csa_params", "csa_gfa") @as_file( suffix=["_model-csa_param-wm_dwimap.nii.gz", "_model-csa_param-gfa_dwimap.nii.gz"], subfolder="models", ) @as_img
[docs] def csa_params(data, gtab, brain_mask, citations, csa_sh_order_max=8): """ full path to a nifti file containing parameters for the Constant Solid Angle shm_coeff, full path to a nifti file containing GFA Parameters ---------- csa_sh_order_max : int Spherical harmonics order for CSA model. Must be even. Default: 8 """ citations.add("aganj2010reconstruction") csa_model = shm.CsaOdfModel(gtab, csa_sh_order_max) csa_fit = csa_model.fit(data, mask=brain_mask) CSA_meta = dict( Model=dict( Description="Constant Solid Angle (CSA)", URL="https://doi.org/10.1016/j.neuroimage.2009.04.049", ), OrientationEncoding=dict( EncodingAxis=3, Reference="xyz", SphericalHarmonicBasis="descoteaux", SphericalHarmonicDegree=csa_sh_order_max, Type="sh", ), ) shm_meta = CSA_meta.copy() shm_meta["Description"] = "White Matter" gfa_meta = CSA_meta.copy() gfa_meta["Description"] = "Generalized Fractional Anisotropy" return [(csa_fit._shm_coef, shm_meta), (csa_fit.gfa, gfa_meta)]
@immlib.calc("csa_pmap") @as_file(suffix="_model-csa_param-apm_dwimap.nii.gz", subfolder="models") @as_fit_deriv("CSA")
[docs] def csa_pmap(csa_params): """ full path to a nifti file containing the anisotropic power map from CSA """ sh_coeff = nib.load(csa_params).get_fdata() pmap = anisotropic_power(sh_coeff) return pmap, {"Description": "Anisotropic Power Map"}
@immlib.calc("csa_ai") @as_file(suffix="_model-csa_param-ai_dwimap.nii.gz", subfolder="models") @as_fit_deriv("CSA")
[docs] def csa_ai(csa_params): """ full path to a nifti file containing the anisotropic index from CSA """ sh_coeff = nib.load(csa_params).get_fdata() AI = anisotropic_index(sh_coeff) return AI, {"Description": "Anisotropic Index"}
@immlib.calc("fwdti_fa") @as_file(suffix="_model-fwtensor_param-fa_dwimap.nii.gz", subfolder="models") @as_fit_deriv("FWDTI")
[docs] def fwdti_fa(fwdti_tf): """ full path to a nifti file containing the Free-water DTI fractional anisotropy """ return fwdti_tf.fa, {"Description": "Free Water Fractional Anisotropy"}
@immlib.calc("fwdti_md") @as_file(suffix="_model-fwtensor_param-md_dwimap.nii.gz", subfolder="models") @as_fit_deriv("FWDTI")
[docs] def fwdti_md(fwdti_tf): """ full path to a nifti file containing the Free-water DTI mean diffusivity """ return fwdti_tf.md, {"Description": "Free Water Mean Diffusivity"}
@immlib.calc("fwdti_fwf") @as_file(suffix="_model-fwtensor_param-fwf_dwimap.nii.gz", subfolder="models") @as_fit_deriv("FWDTI")
[docs] def fwdti_fwf(fwdti_tf): """ full path to a nifti file containing the Free-water DTI free water fraction """ return fwdti_tf.f, {"Description": "Free Water Fraction"}
@immlib.calc("dti_fa") @as_file(suffix="_model-tensor_param-fa_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_fa(dti_tf): """ full path to a nifti file containing the DTI fractional anisotropy """ return dti_tf.fa, {"Description": "Fractional Anisotropy"}
@immlib.calc("dti_lt0", "dti_lt1", "dti_lt2", "dti_lt3", "dti_lt4", "dti_lt5")
[docs] def dti_lt(dti_tf, dwi_affine): """ Image of first element in the DTI tensor according to DIPY convention i.e. Dxx (rate of diffusion from the left to right side of the brain), Image of second element in the DTI tensor according to DIPY convention i.e. Dyy (rate of diffusion from the posterior to anterior part of the brain), Image of third element in the DTI tensor according to DIPY convention i.e. Dzz (rate of diffusion from the inferior to superior part of the brain), Image of fourth element in the DTI tensor according to DIPY convention i.e. Dxy (rate of diffusion in the xy plane indicating the relationship between the x and y directions), Image of fifth element in the DTI tensor according to DIPY convention i.e. Dxz (rate of diffusion in the xz plane indicating the relationship between the x and z directions), Image of sixth element in the DTI tensor according to DIPY convention i.e. Dyz (rate of diffusion in the yz plane indicating the relationship between the y and z directions) """ dti_lt_dict = {} for ii in range(6): dti_lt_dict[f"dti_lt{ii}"] = nib.Nifti1Image( dti_tf.lower_triangular()[..., ii], dwi_affine ) return dti_lt_dict
@immlib.calc("dti_cfa") @as_file(suffix="_model-tensor_param-cfa_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_cfa(dti_tf): """ full path to a nifti file containing the DTI color fractional anisotropy """ return dti_tf.color_fa, {"Description": "Color Fractional Anisotropy"}
@immlib.calc("dti_pdd") @as_file(suffix="_model-tensor_param-pdd_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_pdd(dti_tf): """ full path to a nifti file containing the DTI principal diffusion direction """ pdd = dti_tf.directions.squeeze() # Invert the x coordinates: pdd[..., 0] = pdd[..., 0] * -1 return pdd, {"Description": "Principal Diffusion Direction"}
@immlib.calc("dti_md") @as_file("_model-tensor_param-md_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_md(dti_tf): """ full path to a nifti file containing the DTI mean diffusivity """ return dti_tf.md, {"Description": "Mean Diffusivity"}
@immlib.calc("dti_ga") @as_file(suffix="_model-tensor_param-ga_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_ga(dti_tf): """ full path to a nifti file containing the DTI geodesic anisotropy """ return dti_tf.ga, {"Description": "Geodesic Anisotropy"}
@immlib.calc("dti_rd") @as_file(suffix="_model-tensor_param-rd_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_rd(dti_tf): """ full path to a nifti file containing the DTI radial diffusivity """ return dti_tf.rd, {"Description": "Radial Diffusivity"}
@immlib.calc("dti_ad") @as_file(suffix="_model-tensor_param-ad_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DTI")
[docs] def dti_ad(dti_tf): """ full path to a nifti file containing the DTI axial diffusivity """ return dti_tf.ad, {"Description": "Axial Diffusivity"}
@immlib.calc( "dki_kt0", "dki_kt1", "dki_kt2", "dki_kt3", "dki_kt4", "dki_kt5", "dki_kt6", "dki_kt7", "dki_kt8", "dki_kt9", "dki_kt10", "dki_kt11", "dki_kt12", "dki_kt13", "dki_kt14", )
[docs] def dki_kt(dki_tf, dwi_affine): """ Image of first element in the DKI kurtosis model, Image of second element in the DKI kurtosis model, Image of third element in the DKI kurtosis model, Image of fourth element in the DKI kurtosis model, Image of fifth element in the DKI kurtosis model, Image of sixth element in the DKI kurtosis model, Image of seventh element in the DKI kurtosis model, Image of eighth element in the DKI kurtosis model, Image of ninth element in the DKI kurtosis model, Image of tenth element in the DKI kurtosis model, Image of eleventh element in the DKI kurtosis model, Image of twelfth element in the DKI kurtosis model, Image of thirteenth element in the DKI kurtosis model, Image of fourteenth element in the DKI kurtosis model, Image of fifteenth element in the DKI kurtosis model """ dki_kt_dict = {} for ii in range(15): dki_kt_dict[f"dki_kt{ii}"] = nib.Nifti1Image(dki_tf.kt[..., ii], dwi_affine) return dki_kt_dict
@immlib.calc("dki_lt0", "dki_lt1", "dki_lt2", "dki_lt3", "dki_lt4", "dki_lt5")
[docs] def dki_lt(dki_tf, dwi_affine): """ Image of first element in the DTI tensor from DKI, Image of second element in the DTI tensor from DKI, Image of third element in the DTI tensor from DKI, Image of fourth element in the DTI tensor from DKI, Image of fifth element in the DTI tensor from DKI, Image of sixth element in the DTI tensor from DKI """ dki_lt_dict = {} for ii in range(6): dki_lt_dict[f"dki_lt{ii}"] = nib.Nifti1Image( dki_tf.lower_triangular()[..., ii], dwi_affine ) return dki_lt_dict
@immlib.calc("dki_cfa") @as_file(suffix="_model-kurtosis_param-cfa_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_cfa(dki_tf): """ full path to a nifti file containing the DKI color fractional anisotropy """ return dki_tf.color_fa, {"Description": "Color Fractional Anisotropy"}
@immlib.calc("dki_fa") @as_file("_model-kurtosis_param-fa_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_fa(dki_tf): """ full path to a nifti file containing the DKI fractional anisotropy """ return dki_tf.fa, {"Description": "Fractional Anisotropy"}
@immlib.calc("dki_md") @as_file("_model-kurtosis_param-md_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_md(dki_tf): """ full path to a nifti file containing the DKI mean diffusivity """ return dki_tf.md, {"Description": "Mean Diffusivity"}
@immlib.calc("dki_awf") @as_file("_model-kurtosis_param-awf_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_awf(dki_params, tracking_params, gtol=1e-2): """ full path to a nifti file containing the DKI axonal water fraction Parameters ---------- gtol : float, optional This input is to refine kurtosis maxima under the precision of the directions sampled on the sphere class instance. The gradient of the convergence procedure must be less than gtol before successful termination. If gtol is None, fiber direction is directly taken from the initial sampled directions of the given sphere object. Default: 1e-2 """ dki_params = nib.load(dki_params).get_fdata() return axonal_water_fraction( dki_params, sphere=tracking_params["sphere"], gtol=gtol ), {"Description": "Axonal Water Fraction"}
@immlib.calc("dki_mk") @as_file("_model-kurtosis_param-mk_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_mk(dki_tf): """ full path to a nifti file containing the DKI mean kurtosis file """ return dki_tf.mk(), {"Description": "Mean Kurtosis"}
@immlib.calc("dki_kfa") @as_file("_model-kurtosis_param-kfa_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_kfa(dki_tf): """ full path to a nifti file containing the DKI kurtosis FA file References ---------- .. [Hansen2019] Hansen B. An Introduction to Kurtosis Fractional Anisotropy. AJNR Am J Neuroradiol. 2019 Oct;40(10):1638-1641. doi: 10.3174/ajnr.A6235. Epub 2019 Sep 26. PMID: 31558496; PMCID: PMC7028548. """ return dki_tf.kfa, {"Description": "Kurtosis Fractional Anisotropy"}
@immlib.calc("dki_cl") @as_file("_model-kurtosis_param-cl_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_cl(dki_tf): """ full path to a nifti file containing the DKI linearity file """ return dki_tf.linearity, {"Description": "Linearity"}
@immlib.calc("dki_cp") @as_file("_model-kurtosis_param-cp_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_cp(dki_tf): """ full path to a nifti file containing the DKI planarity file """ return dki_tf.planarity, {"Description": "Planarity"}
@immlib.calc("dki_cs") @as_file("_model-kurtosis_param-cs_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_cs(dki_tf): """ full path to a nifti file containing the DKI sphericity file """ return dki_tf.sphericity, {"Description": "Sphericity"}
@immlib.calc("dki_ga") @as_file(suffix="_model-kurtosis_param-ga_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_ga(dki_tf): """ full path to a nifti file containing the DKI geodesic anisotropy """ return dki_tf.ga, {"Description": "Geodesic Anisotropy"}
@immlib.calc("dki_rd") @as_file(suffix="_model-kurtosis_param-rd_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_rd(dki_tf): """ full path to a nifti file containing the DKI radial diffusivity """ return dki_tf.rd, {"Description": "Radial Diffusivity"}
@immlib.calc("dki_ad") @as_file(suffix="_model-kurtosis_param-ad_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_ad(dki_tf): """ full path to a nifti file containing the DKI axial diffusivity """ return dki_tf.ad, {"Description": "Axial Diffusivity"}
@immlib.calc("dki_rk") @as_file(suffix="_model-kurtosis_param-rk_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_rk(dki_tf): """ full path to a nifti file containing the DKI radial kurtosis """ return dki_tf.rk(), {"Description": "Radial Kurtosis"}
@immlib.calc("dki_ak") @as_file(suffix="_model-kurtosis_param-ak_dwimap.nii.gz", subfolder="models") @as_fit_deriv("DKI")
[docs] def dki_ak(dki_tf): """ full path to a nifti file containing the DKI axial kurtosis file """ return dki_tf.ak(), {"Description": "Axial Kurtosis"}
@immlib.calc("brain_mask") @as_file("_desc-brain_mask.nii.gz")
[docs] def brain_mask(structural_imap, b0): """ full path to a nifti file containing the brain mask """ return resample(structural_imap["t1w_brain_mask"], b0), dict( BrainMaskinT1w=structural_imap["t1w_brain_mask"] )
@immlib.calc("bundle_dict", "reg_template", "tmpl_name")
[docs] def get_bundle_dict( b0, citations, bundle_info=None, reg_template_spec="mni_T1", reg_template_space_name="mni", ): """ Dictionary defining the different bundles to be segmented, and a Nifti1Image containing the template for registration, and the name of the template space for file outputs Parameters ---------- bundle_info : dict or BundleDict, optional A dictionary or BundleDict for use in segmentation. See `Defining Custom Bundle Dictionaries` in the `usage` section of pyAFQ's documentation for details. If None, will get all appropriate bundles for the chosen segmentation algorithm. Default: None reg_template_spec : str, or Nifti1Image, optional The target image data for registration. Can either be a Nifti1Image, a path to a Nifti1Image, or if "mni_T2", "dti_fa_template", "hcp_atlas", or "mni_T1", image data will be loaded automatically. If "hcp_atlas" is used, slr registration will be used and reg_subject should be "subject_sls". Default: "mni_T1" reg_template_space_name : str, optional Name to use in file names for the template space. Default: "mni" """ if not isinstance(reg_template_spec, str) and not isinstance( reg_template_spec, nib.Nifti1Image ): raise TypeError("reg_template must be a str or Nifti1Image") if bundle_info is not None and not ( (isinstance(bundle_info, dict)) or (isinstance(bundle_info, abd.BundleDict)) ): raise TypeError(("bundle_info must be a dict, or a BundleDict")) if bundle_info is None: bundle_info = abd.default_bd() + abd.slf_bd() + abd.callosal_bd() if isinstance(reg_template_spec, nib.Nifti1Image): reg_template = reg_template_spec else: img_l = reg_template_spec.lower() if img_l == "mni_t2": reg_template = afd.read_mni_template(mask=True, weight="T2w") elif img_l == "mni_t1": reg_template = afd.read_mni_template(mask=True, weight="T1w") elif img_l == "dti_fa_template": reg_template = afd.read_ukbb_fa_template(mask=True) elif img_l == "hcp_atlas": reg_template = afd.read_mni_template(mask=True) elif img_l == "pediatric": reg_template = afd.read_pediatric_templates()[ "UNCNeo-withCerebellum-for-babyAFQ" ] else: reg_template = nib.load(reg_template_spec) if isinstance(bundle_info, abd.BundleDict): bundle_dict = bundle_info.copy() else: bundle_dict = abd.BundleDict(bundle_info, resample_to=reg_template) if bundle_dict.resample_subject_to: bundle_dict.resample_subject_to = b0 citations.update(bundle_dict.citations) return bundle_dict, reg_template, reg_template_space_name
[docs] def get_data_plan(kwargs): if "scalars" in kwargs and not ( isinstance(kwargs["scalars"], list) and isinstance(kwargs["scalars"][0], (str, Definition)) ): raise TypeError("scalars must be a list of strings/scalar definitions") data_tasks = with_name( [ get_data_gtab, b0, b0_mask, t1w_over_b0, brain_mask, dti_fit, dki_fit, fwdti_fit, anisotropic_power_map, csd_anisotropic_index, csd_aodf, csd_aodf_asi, csd_aodf_opm, dti_fa, dti_lt, dti_cfa, dti_pdd, dti_md, dki_kt, dki_lt, dki_fa, gq, opdt_params, opdt_pmap, opdt_ai, csa_params, csa_pmap, csa_ai, fwdti_fa, fwdti_md, fwdti_fwf, msdki_fit, msdki_params, msdki_msd, msdki_msk, dki_md, dki_awf, dki_mk, dki_kfa, dki_cfa, dki_ga, dki_rd, dti_ga, dti_rd, dti_ad, dki_ad, dki_rk, dki_ak, dti_params, dki_params, fwdti_params, dki_cl, dki_cp, dki_cs, rumba_params, csd_params, get_bundle_dict, ] ) if "scalars" not in kwargs: bvals, _ = read_bvals_bvecs(kwargs["bval_file"], kwargs["bvec_file"]) if len(dpg.unique_bvals_magnitude(bvals)) > 2: kwargs["scalars"] = ["dki_fa", "dki_md", "dki_kfa", "dki_mk", "t1w_over_b0"] else: kwargs["scalars"] = ["dti_fa", "dti_md", "t1w_over_b0"] else: scalars = [] for scalar in kwargs["scalars"]: if isinstance(scalar, str): scalars.append(scalar.lower()) else: scalars.append(scalar) kwargs["scalars"] = scalars return immlib.plan(**data_tasks)