# -*- coding: utf-8 -*-# -*- coding: utf-8 -*-
import warnings
import tempfile
from AFQ.definitions.mapping import SynMap
from AFQ.definitions.utils import Definition
import AFQ.api.bundle_dict as abd
warnings.simplefilter(action='ignore', category=FutureWarning) # noqa
import logging
from AFQ.api.participant import ParticipantAFQ
from AFQ.api.utils import (
check_attribute, AFQclass_doc,
export_all_helper, valid_exports_string)
import AFQ.utils.streamlines as aus
from AFQ.viz.utils import get_eye
from dipy.utils.parallel import paramap
from dipy.io.stateful_tractogram import StatefulTractogram, Space
import dipy.tracking.streamlinespeed as dps
import dipy.tracking.streamline as dts
from dipy.io.streamline import save_tractogram
from AFQ.version import version as pyafq_version
from AFQ.viz.utils import trim
import pandas as pd
import pydra
import numpy as np
import os
import os.path as op
from tqdm import tqdm
import json
import s3fs
from time import time
import nibabel as nib
from PIL import Image
from s3bids.utils import S3BIDSStudy
import glob
from bids.layout import BIDSLayout, BIDSLayoutIndexer
try:
import afqbrowser as afqb
using_afqb = True
except ImportError:
using_afqb = False
__all__ = ["GroupAFQ", "get_afq_bids_entities_fname"]
logger = logging.getLogger('AFQ')
logger.setLevel(logging.INFO)
# get rid of unnecessary columns in df
def clean_pandas_df(df):
df = df.reset_index(drop=True)
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
return df
[docs]def get_afq_bids_entities_fname():
return op.dirname(op.dirname(op.abspath(
aus.__file__))) + "/afq_bids_entities.json"
class _ParticipantAFQInputs:
def __init__(
self, dwi_data_file, bval_file, bvec_file, results_dir,
kwargs):
self.dwi_data_file = dwi_data_file
self.bval_file = bval_file
self.bvec_file = bvec_file
self.results_dir = results_dir
self.kwargs = kwargs
[docs]class GroupAFQ(object):
f"""{AFQclass_doc}"""
def __init__(self,
bids_path,
bids_filters={"suffix": "dwi"},
preproc_pipeline="all",
participant_labels=None,
output_dir=None,
parallel_params={"engine": "serial"},
bids_layout_kwargs={},
**kwargs):
'''
Initialize a GroupAFQ object from a BIDS dataset.
Parameters
----------
bids_path : str
The path to preprocessed diffusion data organized in a BIDS
dataset. This should contain a BIDS derivative dataset with
preprocessed dwi/bvals/bvecs.
bids_filters : dict
Filter to pass to bids_layout.get when finding DWI files.
Default: {"suffix": "dwi"}
preproc_pipeline : str, optional.
The name of the pipeline used to preprocess the DWI data.
Default: "all".
participant_labels : list or None, optional
List of participant labels (subject IDs) to perform
processing on. If None, all subjects are used.
Default: None
output_dir : str or None, optional
Path to output directory. If None, outputs are put
in a AFQ pipeline folder in the derivatives folder of
the BIDS directory. pyAFQ will use existing derivatives
from the output directory if they exist, instead of recalculating
them (this means you need to clear the output folder if you want
to recalculate a derivative).
Default: None
parallel_params : dict, optional
Parameters to pass to paramap in AFQ.utils.parallel,
to parallelize computations across subjects and sessions.
Set "n_jobs" to -1 to automatically parallelize as
the number of cpus. Here is an example for how to do
multiprocessing with 4 cpus:
{"n_jobs": 4, "engine": "joblib", "backend": "loky"}
Default: {"engine": "serial"}
bids_layout_kwargs: dict, optional
Additional arguments to give to BIDSLayout from pybids.
For large datasets, try:
{"validate": False, "index_metadata": False}
Default: {}
kwargs : additional optional parameters
You can set additional parameters for any step
of the process. See :ref:`usage/kwargs` for more details.
Examples
--------
api.GroupAFQ(my_path, csd_sh_order=4)
api.GroupAFQ(
my_path,
reg_template_spec="mni_t2", reg_subject_spec="b0")
'''
if not isinstance(bids_path, str):
raise TypeError("bids_path must be a string")
if not op.exists(bids_path):
raise ValueError("bids_path not found")
if not op.exists(op.join(bids_path, "dataset_description.json")):
raise ValueError("There must be a dataset_description.json"
+ " in bids_path")
if not isinstance(bids_filters, dict):
raise TypeError("bids_filters must be a dict")
# preproc_pipeline typechecking handled by pyBIDS
if participant_labels is not None\
and not isinstance(participant_labels, list):
raise TypeError(
"participant_labels must be either a list or None")
if output_dir is not None\
and not isinstance(output_dir, str):
raise TypeError(
"output_dir must be either a str or None")
if not isinstance(parallel_params, dict):
raise TypeError("parallel_params must be a dict")
if not isinstance(bids_layout_kwargs, dict):
raise TypeError("bids_layout_kwargs must be a dict")
parallel_params["engine"] = parallel_params.get("engine", "serial")
self.logger = logger
self.parallel_params = parallel_params
self.wf_dict = {}
# validate input and fail early
if not op.exists(bids_path):
raise ValueError(f'Unable to locate BIDS dataset in: {bids_path}')
# This is where all the outputs will go:
if output_dir is None:
self.afq_path = op.join(bids_path, 'derivatives', 'afq')
self.afqb_path = op.join(bids_path, 'derivatives', 'afq_browser')
else:
self.afq_path = output_dir
self.afqb_path = op.join(output_dir, 'afq_browser')
# Create it as needed:
os.makedirs(self.afq_path, exist_ok=True)
bids_indexer = BIDSLayoutIndexer(**bids_layout_kwargs)
bids_layout = BIDSLayout(
bids_path, derivatives=True, indexer=bids_indexer)
bids_description = bids_layout.description
# check that any files exist in the derivatives folder,
# not including the dataset_description.json files
# the second check may be particularly useful in checking
# that the derivatives folder is well-defined
if len(bids_layout.get())\
- len(bids_layout.get(extension="json")) < 1:
raise ValueError(
f"No non-json files recognized by pyBIDS in {bids_path}")
if len(bids_layout.get(scope=preproc_pipeline))\
- len(bids_layout.get(
scope=preproc_pipeline,
extension="json")) < 1:
raise ValueError((
f"No non-json files recognized by "
f"pyBIDS in the pipeline: {preproc_pipeline}"))
# Add required metadata file at top level (inheriting as needed):
pipeline_description = {
"Name": bids_description["Name"],
"BIDSVersion": bids_description["BIDSVersion"],
"PipelineDescription": {"Name": "pyAFQ",
"Version": pyafq_version},
"GeneratedBy": [{"Name": op.basename(self.afq_path),
"Version": pyafq_version}]}
pl_desc_file = op.join(self.afq_path, 'dataset_description.json')
with open(pl_desc_file, 'w') as outfile:
json.dump(pipeline_description, outfile)
self.subjects = bids_layout.get(return_type='id', target='subject')
if not len(self.subjects):
raise ValueError(
"`bids_path` contains no subjects in derivatives folders."
+ " This could be caused by derivatives folders not following"
+ " the BIDS format.")
if participant_labels is not None:
filtered_subjects = []
subjects_found_printed = False
for subjectID in participant_labels:
subjectID = str(subjectID)
if subjectID not in self.subjects:
self.logger.warning((
f"Subject {subjectID} specified in "
f"`participant_labels` but not found "
f"in BIDS derivatives folders"))
if not subjects_found_printed:
subjects_found_printed = True
self.logger.warning((
f"Only these subjects found in BIDS "
f"derivatives folders: {self.subjects}"))
else:
filtered_subjects.append(subjectID)
self.subjects = filtered_subjects
if not len(self.subjects):
raise ValueError(
"No subjects specified in `participant_labels` "
+ " found in BIDS derivatives folders."
+ " See above warnings.")
sessions = bids_layout.get(return_type='id', target='session')
self.sessions = sessions if len(sessions) else [None]
# do not bother to parallelize if less than 2 subject-sessions
if len(self.sessions) * len(self.subjects) < 2:
self.parallel_params["engine"] = "serial"
# do not parallelize segmentation if parallelizing across
# subject-sessions
if self.parallel_params["engine"] != "serial":
if "segmentation_params" not in kwargs:
kwargs["segmentation_params"] = {}
if "parallel_segmentation" not in kwargs["segmentation_params"]:
kwargs["segmentation_params"]["parallel_segmentation"] = {}
kwargs["segmentation_params"]["parallel_segmentation"]["engine"] =\
"serial"
self.valid_sub_list = []
self.valid_ses_list = []
self.pAFQ_list = []
self.pAFQ_inputs_list = []
for subject in self.subjects:
self.wf_dict[subject] = {}
for session in self.sessions:
this_kwargs = kwargs.copy()
results_dir = op.join(self.afq_path, 'sub-' + subject)
if session is not None:
results_dir = op.join(results_dir, 'ses-' + session)
dwi_bids_filters = {
"subject": subject,
"session": session,
"return_type": "filename",
"scope": preproc_pipeline,
"extension": "nii.gz",
"suffix": "dwi",
}
dwi_bids_filters.update(bids_filters)
dwi_files = bids_layout.get(**dwi_bids_filters)
if (not len(dwi_files)):
self.logger.warning(
f"No dwi found for subject {subject} and session "
f"{session}. Skipping.")
continue
os.makedirs(results_dir, exist_ok=True)
dwi_data_file = dwi_files[0]
# For bvals and bvecs, use ``get_bval()`` and ``get_bvec()`` to
# walk up the file tree and inherit the closest bval and bvec
# files. Maintain input ``bids_filters`` in case user wants to
# specify acquisition labels, but pop suffix since it is
# already specified inside ``get_bvec()`` and ``get_bval()``
suffix = bids_filters.pop("suffix", None)
bvec_file = bids_layout.get_bvec(
dwi_data_file,
**bids_filters)
bval_file = bids_layout.get_bval(
dwi_data_file,
**bids_filters)
if suffix is not None:
bids_filters["suffix"] = suffix
# Call find path for all definitions
for key, value in this_kwargs.items():
if key == "scalars":
for scalar in this_kwargs["scalars"]:
if isinstance(scalar, Definition):
scalar_found = scalar.find_path(
bids_layout,
dwi_data_file,
subject,
session,
required=False)
if scalar_found is False:
this_kwargs["scalars"].remove(scalar)
elif key == "import_tract":
if isinstance(this_kwargs["import_tract"], dict):
it_res = \
bids_layout.get(
subject=subject,
session=session,
extension=[
'.trk',
'.tck',
'.vtk',
'.fib',
'.dpy'],
return_type='filename',
**this_kwargs["import_tract"])
if len(it_res) < 1:
raise ValueError((
"No custom tractography found for"
f" subject {subject}"
" and session "
f"{session}."))
elif len(it_res) > 1:
this_kwargs["import_tract"] = it_res[0]
logger.warning((
f"Multiple viable custom tractographies found for"
f" subject "
f"{subject} and session "
f"{session}. Will use: {it_res[0]}"))
else:
this_kwargs["import_tract"] = it_res[0]
elif isinstance(value, dict):
for _, subvalue in value.items():
if isinstance(subvalue, Definition):
subvalue.find_path(
bids_layout,
dwi_data_file,
subject,
session)
elif isinstance(value, Definition):
value.find_path(
bids_layout,
dwi_data_file,
subject,
session)
# call find path for all ROIs
if "bundle_info" in this_kwargs and isinstance(
this_kwargs["bundle_info"], abd.BundleDict):
for b_name in this_kwargs["bundle_info"].bundle_names:
this_kwargs["bundle_info"].apply_to_rois(
b_name,
this_kwargs["bundle_info"]._use_bids_info,
bids_layout, dwi_data_file, subject, session,
dry_run=False)
self.valid_sub_list.append(subject)
self.valid_ses_list.append(str(session))
this_pAFQ_inputs = _ParticipantAFQInputs(
dwi_data_file,
bval_file, bvec_file,
results_dir,
this_kwargs)
this_pAFQ = ParticipantAFQ(
this_pAFQ_inputs.dwi_data_file,
this_pAFQ_inputs.bval_file,
this_pAFQ_inputs.bvec_file,
this_pAFQ_inputs.results_dir,
**this_pAFQ_inputs.kwargs)
self.wf_dict[subject][str(session)] = this_pAFQ.wf_dict
self.pAFQ_list.append(this_pAFQ)
self.pAFQ_inputs_list.append(this_pAFQ_inputs)
[docs] def combine_profiles(self):
tract_profiles_dict = self.export("profiles")
if len(self.sessions) > 1:
tract_profiles_list = []
for _, subject_dict in tract_profiles_dict.items():
tract_profiles_list.extend(subject_dict.values())
else:
tract_profiles_list = list(tract_profiles_dict.values())
_df = combine_list_of_profiles(tract_profiles_list)
out_file = op.abspath(op.join(
self.afq_path, "tract_profiles.csv"))
os.makedirs(op.dirname(out_file), exist_ok=True)
_df = clean_pandas_df(_df)
_df.to_csv(out_file, index=False)
return _df
[docs] def get_streamlines_json(self):
sls_json_fname = op.abspath(op.join(
self.afq_path, "afqb_streamlines.json"))
if not op.exists(sls_json_fname):
subses_info = []
def load_next_subject():
subses_idx = len(subses_info)
sub = self.valid_sub_list[subses_idx]
ses = self.valid_ses_list[subses_idx]
this_bundles_file = self.export(
"bundles", collapse=False)[sub][ses]
this_mapping = self.export("mapping", collapse=False)[sub][ses]
this_img = self.export(
"dwi", collapse=False)[sub][ses]
seg_sft = aus.SegmentedSFT.fromfile(
this_bundles_file,
this_img)
seg_sft.sft.to_rasmm()
subses_info.append((seg_sft, this_mapping))
bundle_dict = self.export("bundle_dict", collapse=False)[
self.valid_sub_list[0]][self.valid_ses_list[0]]
sls_dict = {}
load_next_subject() # load first subject
for b in bundle_dict.keys():
for i in range(len(self.valid_sub_list)):
seg_sft, mapping = subses_info[i]
idx = seg_sft.bundle_idxs[b]
# use the first subses that works
# otherwise try each successive subses
if len(idx) == 0:
# break if we run out of subses
if i + 1 >= len(self.valid_sub_list):
break
# load subses if not already loaded
if i + 1 >= len(subses_info):
load_next_subject()
continue
if len(idx) > 100:
idx = np.random.choice(
idx, size=100, replace=False)
these_sls = seg_sft.sft.streamlines[idx]
these_sls = dps.set_number_of_points(these_sls, 100)
tg = StatefulTractogram(
these_sls,
seg_sft.sft,
Space.RASMM)
delta = dts.values_from_volume(
mapping.forward,
tg.streamlines, np.eye(4))
moved_sl = dts.Streamlines(
[d + s for d, s in zip(delta, tg.streamlines)])
moved_sl = np.asarray(moved_sl)
median_sl = np.median(moved_sl, axis=0)
sls_dict[b] = {"coreFiber": median_sl.tolist()}
for ii, sl_idx in enumerate(idx):
sls_dict[b][str(sl_idx)] = moved_sl[ii].tolist()
break
with open(sls_json_fname, 'w') as fp:
json.dump(sls_dict, fp)
return sls_json_fname
[docs] def export(self, attr_name="help", collapse=True):
f"""
Export a specific output. To print a list of available outputs,
call export without arguments.
{valid_exports_string}
Parameters
----------
attr_name : str
Name of the output to export. Default: "help"
collapse : bool
Whether to collapse session dimension if there is only 1 session.
Default: True
Returns
-------
output : dict
The specific output as a dictionary. Keys are subjects.
Values are dictionaries with keys of sessions
if multiple sessions are used. Otherwise, values are
the output.
None if called without arguments.
"""
section = check_attribute(attr_name)
# iterate over subjects / sessions,
# decide if they need to be calculated or not
in_list = []
to_calc_list = []
results = {}
for ii, subject in enumerate(self.valid_sub_list):
if subject not in results:
results[subject] = {}
session = self.valid_ses_list[ii]
wf_dict = self.wf_dict[subject][str(session)]
if section is not None:
wf_dict = wf_dict[section]
if ((self.parallel_params.get("engine", False) != "serial")
and (hasattr(wf_dict, "efferents"))
and (attr_name not in wf_dict.efferents)):
in_list.append((wf_dict))
to_calc_list.append((subject, session))
else:
results[subject][session] = wf_dict[attr_name]
# if some need to be calculated, do those in parallel
if to_calc_list:
par_results = paramap(
lambda wf, attr: wf[attr], in_list,
func_args=[attr_name],
**self.parallel_params)
for i, subses in enumerate(to_calc_list):
subject, session = subses
results[subject][session] = par_results[i]
# If only one session, collapse session dimension
if len(self.sessions) == 1 and collapse:
for subject in self.valid_sub_list:
results[subject] = results[subject][self.valid_ses_list[0]]
return results
[docs] def export_up_to(self, attr_name="help"):
f"""
Export all derivatives necessary for a specific output.
To print a list of available outputs,
call export_up_to without arguments.
{valid_exports_string}
Parameters
----------
attr_name : str
Name of the output to export up to. Default: "help"
"""
section = check_attribute(attr_name)
wf_dict = self.wf_dict[
self.valid_sub_list[0]][self.valid_ses_list[0]]
if section is not None:
wf_dict = wf_dict[section]
for dependent in wf_dict.plan.dependencies[attr_name]:
self.export(dependent)
[docs] def export_all(self, viz=True, afqbrowser=True, xforms=True,
indiv=True):
""" Exports all the possible outputs
Parameters
----------
viz : bool
Whether to output visualizations. This includes tract profile
plots, a figure containing all bundles, and, if using the AFQ
segmentation algorithm, individual bundle figures.
Default: True
afqbrowser : bool
Whether to output an AFQ-Browser from this AFQ instance.
Default: True
xforms : bool
Whether to output the reg_template image in subject space and,
depending on if it is possible based on the mapping used, to
output the b0 in template space.
Default: True
indiv : bool
Whether to output individual bundles in their own files, in
addition to the one file containing all bundles. If using
the AFQ segmentation algorithm, individual ROIs are also
output.
Default: True
"""
start_time = time()
export_all_helper(self, xforms, indiv, viz)
self.combine_profiles()
if afqbrowser:
self.assemble_AFQ_browser()
self.logger.info(
f"Time taken for export all: {str(time() - start_time)}")
[docs] def cmd_outputs(self, cmd="rm", dependent_on=None, exceptions=[],
suffix=""):
"""
Perform some command some or all outputs of pyafq.
This is useful if you change a parameter and need
to recalculate derivatives that depend on it.
Some examples: cp, mv, rm .
-r will be automtically added when necessary.
Parameters
----------
cmd : str
Command to run on outputs. Default: 'rm'
dependent_on : str or None
Which derivatives to perform command on .
If None, perform on all.
If "track", perform on all derivatives that depend on the
tractography.
If "recog", perform on all derivatives that depend on the
bundle recognition.
Default: None
exceptions : list of str
Name outputs that the command should not be applied to.
Default: []
suffix : str
Parts of command that are used after the filename.
Default: ""
Example
-------
# This command would move all derivatives that are
# dependent on the tractography into 'my_other_folder'
myafq.cmd_outputs(
"cp",
dependent_on="track",
suffix="~/my_other_folder/")
"""
for pAFQ in self.pAFQ_list:
pAFQ.cmd_outputs(cmd, dependent_on, exceptions, suffix=suffix)
[docs] clobber = cmd_outputs # alias for default of cmd_outputs
[docs] def make_all_participant_montages(self, images_per_row=2):
"""
Generate montage of all bundles for a all subjects.
Parameters
----------
images_per_row : int
Number of bundle images per row in output file.
Default: 2
Returns
-------
filename of montage images
"""
for pAFQ in self.pAFQ_list:
pAFQ.participant_montage(images_per_row=images_per_row)
[docs] def group_montage(self, bundle_name, size, view, direc, slice_pos=None):
"""
Generate montage file(s) of a given bundle at a given angle.
Parameters
----------
bundle_name : str
Name of bundle to visualize, should be the same as in the
bundle dictionary.
size : tuple of int
The number of columns and rows for each file.
view : str
Which view to display. Can be one of sagittal, coronal, or axial.
direc : str
Which direction to views. Can be one of left, right, top, bottom,
front, back
slice_pos : float, or None
If float, indicates the fractional position along the
perpendicular axis to the slice. Currently only works with plotly.
If None, no slice is displayed.
Returns
-------
list of filenames of montage images
"""
tdir = tempfile.gettempdir()
best_scalar = self.export("best_scalar", collapse=False)[
self.valid_sub_list[0]][self.valid_ses_list[0]]
viz_backend_dict = self.export("viz_backend", collapse=False)
b0_backend_dict = self.export("b0", collapse=False)
dwi_affine_dict = self.export("dwi_affine", collapse=False)
bundles_dict = self.export("bundles", collapse=False)
best_scalar_dict = self.export(best_scalar, collapse=False)
all_fnames = []
self.logger.info("Generating Montage...")
for ii in tqdm(range(len(self.valid_ses_list))):
this_sub = self.valid_sub_list[ii]
this_ses = self.valid_ses_list[ii]
viz_backend = viz_backend_dict[this_sub][this_ses]
b0 = b0_backend_dict[this_sub][this_ses]
dwi_affine = dwi_affine_dict[this_sub][this_ses]
bundles = bundles_dict[this_sub][this_ses]
best_scalar = best_scalar_dict[this_sub][this_ses]
flip_axes = [False, False, False]
for i in range(3):
flip_axes[i] = (dwi_affine[i, i] < 0)
if slice_pos is not None:
slice_kwargs = {}
if view == "sagittal":
slice_kwargs["x_pos"] = slice_pos
slice_kwargs["y_pos"] = None
slice_kwargs["z_pos"] = None
elif view == "coronal":
slice_kwargs["x_pos"] = None
slice_kwargs["y_pos"] = slice_pos
slice_kwargs["z_pos"] = None
elif view == "axial":
slice_kwargs["x_pos"] = None
slice_kwargs["y_pos"] = None
slice_kwargs["z_pos"] = slice_pos
figure = viz_backend.visualize_volume(
best_scalar,
opacity=1.0,
flip_axes=flip_axes,
interact=False,
inline=False,
**slice_kwargs)
else:
figure = None
figure = viz_backend.visualize_bundles(
bundles,
shade_by_volume=best_scalar,
flip_axes=flip_axes,
bundle=bundle_name,
interact=False,
inline=False,
figure=figure)
eye = get_eye(view, direc)
this_fname = tdir + f"/t{ii}.png"
if "plotly" in viz_backend.backend:
figure.update_layout(scene_camera=dict(
projection=dict(type="orthographic"),
up={"x": 0, "y": 0, "z": 1},
eye=eye,
center=dict(x=0, y=0, z=0)))
figure.write_image(this_fname)
# temporary fix for memory leak
import plotly.io as pio
pio.kaleido.scope._shutdown_kaleido()
else:
from dipy.viz import window
direc = np.fromiter(eye.values(), dtype=int)
data_shape = np.asarray(nib.load(b0).get_fdata().shape)
figure.set_camera(
position=direc * data_shape,
focal_point=data_shape // 2,
view_up=(0, 0, 1))
figure.zoom(0.5)
window.snapshot(figure, fname=this_fname, size=(600, 600))
def _save_file(curr_img, curr_file_num):
save_path = op.abspath(op.join(
self.afq_path,
(f"bundle-{bundle_name}_view-{view}"
f"_idx-{curr_file_num}_montage.png")))
curr_img.save(save_path)
all_fnames.append(save_path)
this_img_trimmed = {}
max_height = 0
max_width = 0
for ii in range(len(self.valid_ses_list)):
this_img = Image.open(tdir + f"/t{ii}.png")
try:
this_img_trimmed[ii] = trim(trim(this_img))
except IndexError: # this_img is a picture of nothing
this_img_trimmed[ii] = this_img
if this_img_trimmed[ii].size[0] > max_width:
max_width = this_img_trimmed[ii].size[0]
if this_img_trimmed[ii].size[1] > max_height:
max_height = this_img_trimmed[ii].size[1]
curr_img = Image.new(
'RGB',
(max_width * size[0], max_height * size[1]),
color="white")
curr_file_num = 0
for ii in range(len(self.valid_ses_list)):
x_pos = ii % size[0]
_ii = ii // size[0]
y_pos = _ii % size[1]
_ii = _ii // size[1]
file_num = _ii
if file_num != curr_file_num:
_save_file(curr_img, curr_file_num)
curr_img = Image.new(
'RGB',
(max_width * size[0], max_height * size[1]),
color="white")
curr_file_num = file_num
curr_img.paste(
this_img_trimmed[ii],
(x_pos * max_width, y_pos * max_height))
_save_file(curr_img, curr_file_num)
return all_fnames
[docs] def combine_bundle(self, bundle_name):
"""
Transforms a given bundle to reg_template space for all subjects
then merges them to one trk file.
Useful for visualizing the variability in the bundle across subjects.
Note: currently only implemented using built-in SynMap
Parameters
----------
bundle_name : str
Name of the bundle to transform, should be one of the bundles in
bundle_dict.
"""
reference_wf_dict = self.wf_dict[
self.valid_sub_list[0]][self.valid_ses_list[0]]
if "mapping_definition" in reference_wf_dict:
mapping_definition = reference_wf_dict["mapping_definition"]
if mapping_definition is not None and not isinstance(
mapping_definition, SynMap):
raise NotImplementedError((
"combine_bundle not implemented for mapping_definition "
"other than SynMap"))
reg_template = self.export("reg_template", collapse=False)[
self.valid_sub_list[0]][self.valid_ses_list[0]]
bundles_dict = self.export("bundles", collapse=False)
mapping_dict = self.export("mapping", collapse=False)
sls_mni = []
self.logger.info("Combining Bundles...")
for ii in tqdm(range(len(self.valid_ses_list))):
this_sub = self.valid_sub_list[ii]
this_ses = self.valid_ses_list[ii]
seg_sft = aus.SegmentedSFT.fromfile(bundles_dict[
this_sub][this_ses])
seg_sft.sft.to_vox()
sls = seg_sft.get_bundle(bundle_name).streamlines
mapping = mapping_dict[this_sub][this_ses]
if len(sls) > 0:
delta = dts.values_from_volume(
mapping.forward,
sls, np.eye(4))
sls_mni.extend([d + s for d, s in zip(delta, sls)])
moved_sft = StatefulTractogram(
sls_mni,
reg_template,
Space.VOX)
save_tractogram(
moved_sft,
op.abspath(op.join(
self.afq_path,
f"bundle-{bundle_name}_subjects-all_MNI.trk")),
bbox_valid_check=False)
[docs] def upload_to_s3(self, s3fs, remote_path):
""" Upload entire AFQ derivatives folder to S3"""
s3fs.put(self.afq_path, remote_path, recursive=True)
if op.exists(self.afqb_path):
s3fs.put(self.afqb_path, remote_path, recursive=True)
[docs] def export_group_density(self, boolify=True):
"""
Generate a group density map by combining single subject density maps.
Parameters
----------
boolify : bool
Whether to turn subject streamline count images into booleans
before adding them into the group density map.
Return
------
Path to density nifti file.
"""
densities = self.export("density_maps", collapse=False)
ex_density_init = nib.load(densities[
self.valid_sub_list[0]][
self.valid_ses_list[0]]) # for shape and header
group_density = np.zeros_like(ex_density_init.get_fdata())
self.logger.info("Generating Group Density...")
for ii in tqdm(range(len(self.valid_ses_list))):
this_sub = self.valid_sub_list[ii]
this_ses = self.valid_ses_list[ii]
this_density = nib.load(densities[this_sub][this_ses]).get_fdata()
if boolify:
this_density = this_density.astype(bool)
group_density = group_density + this_density
group_density = group_density / len(self.valid_sub_list)
group_density = nib.Nifti1Image(
group_density,
ex_density_init.affine,
header=ex_density_init.header
)
out_fname = op.abspath(op.join(
self.afq_path,
f"desc-density_subjects-all_space-MNI_dwi.nii.gz"))
nib.save(group_density, out_fname)
return out_fname
[docs] def assemble_AFQ_browser(self, output_path=None, metadata=None,
page_title="AFQ Browser", page_subtitle="",
page_title_link="", page_subtitle_link=""):
"""
Assembles an instance of the AFQ-Browser from this AFQ instance.
First, we generate the combined tract profile if it is not already
generated. This includes running the full AFQ pipeline if it has not
already run. The combined tract profile is one of the outputs of
export_all.
Second, we generate a streamlines.json file from the bundle
recognized in the first subject's first session.
Third, we call AFQ-Browser's assemble to assemble an AFQ-Browser
instance in output_path.
Parameters
----------
output_path : str
Path to location to create this instance of the browser in.
Called "target" in AFQ Browser API. If None,
bids_path/derivatives/afq_browser is used.
Default: None
metadata : str
Path to subject metadata csv file. If None, an metadata file
containing only subject ID is created. This file requires a
"subjectID" column to work.
Default: None
page_title : str
Page title. If None, prompt is sent to command line.
Default: "AFQ Browser"
page_subtitle : str
Page subtitle. If None, prompt is sent to command line.
Default: ""
page_title_link : str
Title hyperlink (including http(s)://).
If None, prompt is sent to command line.
Default: ""
page_subtitle_link : str
Subtitle hyperlink (including http(s)://).
If None, prompt is sent to command line.
Default: ""
"""
if not using_afqb:
self.logger.warning((
"AFQ Browser is not installed, so AFQ Browser instance "
"cannot be assembled. AFQ Browser can be installed with: "
"`pip install pyAFQ[afqbrowser]` or "
"`pip install AFQ-Browser>=0.3`"))
return
n_points_profile = self.export("n_points_profile", collapse=False)[
self.valid_sub_list[0]][
self.valid_ses_list[0]]
if n_points_profile != 100:
self.logger.warning((
"AFQ Browser requires 100 points per tract profile, "
"so AFQ Browser instance cannot be assembled."))
return
if output_path is None:
output_path = self.afqb_path
os.makedirs(self.afqb_path, exist_ok=True)
# generate combined profiles csv
self.combine_profiles()
# generate streamlines.json file
sls_json_fname = self.get_streamlines_json()
afqb.assemble(
op.abspath(op.join(self.afq_path, "tract_profiles.csv")),
target=output_path,
metadata=metadata,
streamlines=sls_json_fname,
title=page_title,
subtitle=page_subtitle,
link=page_title_link,
sublink=page_subtitle_link)
class ParallelGroupAFQ():
def __init__(self, *args, **kwargs):
orig = GroupAFQ(*args, **kwargs)
orig.parallel_params["submitter_params"] = \
orig.parallel_params.get("submitter_params", {"plugin": "cf"})
orig.parallel_params["cache_dir"] = \
orig.parallel_params.get("cache_dir", None)
self.parallel_params = orig.parallel_params
self.pAFQ_kwargs = orig.pAFQ_inputs_list
self.finishing_params = dict()
self.finishing_params["args"] = args
self.finishing_params["kwargs"] = kwargs
self.finishing_params["output_dirs"] = [pAFQ.kwargs["output_dir"]
for pAFQ in orig.pAFQ_list]
def _submit_pydra(self, runnable):
try:
with pydra.Submitter(
**self.parallel_params["submitter_params"],
) as sub:
sub(runnable=runnable)
# Addresses https://github.com/nipype/pydra/issues/630
except AttributeError as e:
if "'NoneType' object has no attribute 'replace'" not in str(e):
raise
def export(self, attr_name="help", collapse=True):
f"""
Export a specific output. To print a list of available outputs,
call export without arguments.
{valid_exports_string}
Parameters
----------
attr_name : str
Name of the output to export. Default: "help"
collapse : bool
Whether to collapse session dimension if there is only 1 session.
Default: True
Returns
-------
output : dict
The specific output as a dictionary. Keys are subjects.
Values are dictionaries with keys of sessions
if multiple sessions are used. Otherwise, values are
the output.
None if called without arguments.
"""
@pydra.mark.task
def export_sub(pAFQ_kwargs, attr_name):
pAFQ = ParticipantAFQ(
pAFQ_kwargs.dwi_data_file,
pAFQ_kwargs.bval_file,
pAFQ_kwargs.bvec_file,
pAFQ_kwargs.results_dir,
**pAFQ_kwargs.kwargs)
pAFQ.export(attr_name)
# Submit to pydra
export_sub_task = export_sub(
attr_name=attr_name,
cache_dir=self.parallel_params["cache_dir"]
).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs)
self._submit_pydra(export_sub_task)
def export_all(self, viz=True, afqbrowser=True, xforms=True, indiv=True):
""" Exports all the possible outputs
Parameters
----------
viz : bool
Whether to output visualizations. This includes tract profile
plots, a figure containing all bundles, and, if using the AFQ
segmentation algorithm, individual bundle figures.
Default: True
afqbrowser : bool
Whether to output an AFQ-Browser from this AFQ instance.
Default: True
xforms : bool
Whether to output the reg_template image in subject space and,
depending on if it is possible based on the mapping used, to
output the b0 in template space.
Default: True
indiv : bool
Whether to output individual bundles in their own files, in
addition to the one file containing all bundles. If using
the AFQ segmentation algorithm, individual ROIs are also
output.
Default: True
"""
@pydra.mark.task
def export_sub(
pAFQ_kwargs,
finishing_params,
viz,
afqbrowser,
xforms,
indiv
):
pAFQ = ParticipantAFQ(
pAFQ_kwargs.dwi_data_file,
pAFQ_kwargs.bval_file,
pAFQ_kwargs.bvec_file,
pAFQ_kwargs.results_dir,
**pAFQ_kwargs.kwargs)
pAFQ.export_all(viz, xforms, indiv)
for dir in finishing_params["output_dirs"]:
if not glob.glob(op.join(dir, "*_desc-profiles_dwi.csv")):
return
gAFQ = GroupAFQ(*finishing_params["args"],
**finishing_params["kwargs"])
gAFQ.export_all(viz, afqbrowser, xforms, indiv)
# Submit to pydra
export_sub_task = export_sub(
finishing_params=self.finishing_params,
viz=viz,
afqbrowser=afqbrowser,
xforms=xforms,
indiv=indiv,
cache_dir=self.parallel_params["cache_dir"]
).split("pAFQ_kwargs", pAFQ_kwargs=self.pAFQ_kwargs)
self._submit_pydra(export_sub_task)
def download_and_combine_afq_profiles(bucket,
study_s3_prefix="", deriv_name=None,
out_file=None,
upload=False, session=None,
**kwargs):
"""
Download and combine tract profiles from different subjects / sessions
on an s3 bucket into one CSV.
Parameters
----------
bucket : str
The S3 bucket that contains the study data.
study_s3_prefix : str
The S3 prefix common to all of the study objects on S3.
out_file : filename, optional
Filename for the combined output CSV.
deriv_name : str, optional
If deriv_name is not None, it should be a string that specifies
which derivatives folder to download and combine profiles from.
upload : bool or str, optional
If True, upload the combined CSV to Amazon S3 at
bucket/study_s3_prefix/derivatives/afq. If a string,
assume string is an Amazon S3 URI and upload there.
Defaut: False
session : str, optional
Session to get CSVs from. If None, all sessions are used.
Default: None
kwargs : optional
Optional arguments to pass to S3BIDSStudy.
Returns
-------
Ouput CSV's pandas dataframe.
"""
if "subjects" not in kwargs:
kwargs["subjects"] = "all"
if "anon" not in kwargs:
kwargs["anon"] = False
if deriv_name is None:
deriv_name = True
with nib.tmpdirs.InTemporaryDirectory() as t_dir:
remote_study = S3BIDSStudy(
"get_profiles",
bucket,
study_s3_prefix,
**kwargs)
remote_study.download(
t_dir,
include_modality_agnostic=False,
include_derivs=deriv_name,
include_derivs_dataset_description=True,
suffix="profiles.csv")
temp_study = BIDSLayout(t_dir, validate=False, derivatives=True)
if session is None:
profiles = temp_study.get(
extension='csv',
suffix='profiles',
return_type='filename')
else:
profiles = temp_study.get(
session=session,
extension='csv',
suffix='profiles',
return_type='filename')
df = combine_list_of_profiles(profiles)
df.to_csv("tmp.csv", index=False)
if upload is True:
bids_prefix = "/".join([bucket, study_s3_prefix]).rstrip("/")
fs = s3fs.S3FileSystem()
fs.put(
"tmp.csv",
"/".join([
bids_prefix,
"derivatives",
"afq",
"combined_tract_profiles.csv"
]))
elif isinstance(upload, str):
fs = s3fs.S3FileSystem()
fs.put("tmp.csv", upload.replace("s3://", ""))
if out_file is not None:
out_file = op.abspath(out_file)
os.makedirs(op.dirname(out_file), exist_ok=True)
df = clean_pandas_df(df)
df.to_csv(out_file, index=False)
return df
def combine_list_of_profiles(profile_fnames):
"""
Combine tract profiles from different subjects / sessions
into one CSV.
Parameters
----------
profile_fnames : list of str
List of csv filenames.
Returns
-------
Ouput CSV's pandas dataframe.
"""
dfs = []
for fname in profile_fnames:
profiles = pd.read_csv(fname)
profiles['subjectID'] = fname.split('sub-')[1].split('/')[0]
if 'ses-' in fname:
session_name = fname.split('ses-')[1].split('/')[0]
else:
session_name = 'unknown'
profiles['sessionID'] = session_name
dfs.append(profiles)
return clean_pandas_df(pd.concat(dfs))