Source code for AFQ.tasks.utils

import inspect
import os
import os.path as op

from AFQ.utils.path import drop_extension

__all__ = ["get_tp", "get_fname", "with_name", "get_base_fname"]


[docs] def get_tp(tp_name, structural_imap, data_imap, tissue_imap): if tp_name in data_imap: return data_imap[tp_name] elif tp_name in structural_imap: return structural_imap[tp_name] elif tissue_imap is not None and tp_name in tissue_imap: return tissue_imap[tp_name] else: raise NotImplementedError(f"tp_name {tp_name} not found")
[docs] def get_base_fname(output_dir, dwi_data_file): # setup up path and base file name for outputs # remove suffix and desc from dwi data file name used_key_list = ["desc", "space", "to", "from"] dwi_dfile_no_ext = op.join(output_dir, drop_extension(op.basename(dwi_data_file))) fname = op.dirname(dwi_dfile_no_ext) + "/" for key_val_pair in op.basename(dwi_dfile_no_ext).split("_"): if "-" in key_val_pair: key = key_val_pair.split("-")[0] if key not in used_key_list: fname = fname + key_val_pair + "_" if fname[-1] == "_": fname = fname[:-1] else: # if no key value pairs found, # have some default base file name fname = fname + "subject" return fname
def _split_path(path): parts = [] while True: path, folder = os.path.split(path) if folder: parts.append(folder) else: if path: parts.append(path) break return parts[::-1] def check_onnxruntime(model_name, alternative_text): try: import onnxruntime as ort except ImportError as e: raise ImportError( f"onnxruntime is required to run the {model_name} model. " f"When we tried to import onnxruntime, we got the " f"following error:\n{e}\n" f"Please install onnxruntime to use this feature, " f"by doing `pip install onnxruntime` or " f"`pip install onnxruntime-gpu` if you have a compatible GPU or " f"`pip install pyAFQ[nn]` or " "`pip install pyAFQ[gpu]` if you have a compatible GPU " "and want GPUStreamlines. " f"{alternative_text}\n" "If there are still issues, post an issue on " "https://github.com/tractometry/pyAFQ/issues" ) from e return ort
[docs] def get_fname(base_fname, suffix, subfolder=None): if subfolder is None: return base_fname + suffix elif subfolder == "..": base_dir = op.dirname(base_fname) base_fname = op.basename(base_fname) folders = _split_path(base_dir) if folders[-1] == "dwi": if len(folders) > 1 and "sub-" in folders[-2] or "ses-" in folders[-2]: return op.join(*folders[:-2], folders[-2] + suffix) else: return op.join(*folders[:-1], base_fname + suffix) else: return op.join(*folders[:-1], folders[-1] + suffix) else: base_dir = op.dirname(base_fname) base_fname = op.basename(base_fname) subfolder = op.join(base_dir, subfolder) os.makedirs(subfolder, exist_ok=True) return op.join(subfolder, base_fname + suffix)
# Turn list of tasks into dictionary with names for each task
[docs] def with_name(task_list): return {f"{task.__name__}_res": task for task in task_list}
def get_default_args(func): return { k: v.default for k, v in inspect.signature(func).parameters.items() if v.default is not inspect.Parameter.empty } def str_to_desc(string): return string.replace("-", "").replace("_", "").replace(" ", "")