{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n# Using cloudknot to run pyAFQ on AWS batch:\nOne of the purposes of ``pyAFQ`` is to analyze large-scale openly-available\ndatasets, such as those in the\n[Human Connectome Project](https://www.humanconnectome.org/).\n\nTo analyze these datasets, large amounts of compute are needed.\nOne way to gain access to massive computational power is by using\ncloud computing. Here, we will demonstrate\nhow to use ``pyAFQ`` in the Amazon Web Services cloud.\n\nWe will rely on the [AWS Batch Service](https://aws.amazon.com/batch/) ,\nand we will submit work into AWS Batch using software that our group\ndeveloped called [Cloudknot](https://nrdg.github.io/cloudknot/).\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Import cloudknot and set the AWS region within which computations will take\nplace. Setting a region is important, because if the data that you are\nanalyzing is stored in [AWS S3](https://aws.amazon.com/s3/) in a\nparticular region, it is best to run the computation in that region as well.\nThat is because AWS charges for inter-region transfer of data.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "import cloudknot as ck\nck.set_region('us-east-1')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define the function to use\n``Cloudknot`` uses the single program multiple data paradigm of computing.\nThis means that the same function will be run on multiple different inputs.\nFor example, a ``pyAFQ`` processing function run\non multiple different subjects in a dataset.\nBelow, we define the function that we will use. Notice that\n``Cloudknot`` functions include the import statements of the dependencies\nused. This is necessary so that ``Cloudknot`` knows\nwhat dependencies to install into AWS Batch to run this function.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def afq_process_subject(subject):\n # define a function that each job will run\n # In this case, each process does a single subject\n import s3fs\n # all imports must be at the top of the function\n # cloudknot installs the appropriate packages from pip\n from s3bids.utils import S3BIDSStudy\n from AFQ.api.group import GroupAFQ\n import AFQ.definitions.image as afm\n\n # Download the given subject to your local machine from s3\n # Can find subjects more easily if they are specified in a\n # BIDS participants.tsv file, even if it is sparse\n study_ixi = S3BIDSStudy(\n \"my_study\",\n \"my_study_bucket\",\n \"my_study_prefix\",\n subjects=[subject],\n use_participants_tsv=True,\n anon=False)\n study_ixi.download(\n \"local_bids_dir\",\n include_derivs=[\"pipeline_name\"])\n\n # define the api AFQ object\n myafq = GroupAFQ(\n \"local_bids_dir\",\n dwi_preproc_pipeline=\"pipeline_name\",\n viz_backend_spec='plotly', # this will generate both interactive html and GIFs # noqa\n scalars=[\"dki_fa\", \"dki_md\"])\n\n # export_all runs the entire pipeline and creates many useful derivates\n myafq.export_all()\n\n # upload the results to some location on s3\n myafq.upload_to_s3(\n s3fs.S3FileSystem(),\n \"my_study_bucket/my_study_prefix/derivatives/afq\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we provide a list of subjects that we have selected to process\nto randomly select 3 subjects without replacement, instead do:\nsubjects = [[1], [2], [3]]\nsee the docstring for S3BIDSStudy.__init__ for more information\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "subjects = [\"123456\", \"123457\", \"123458\"]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Defining a ``Knot`` instance\nWe instantiate a class instance of the :class:`ck.Knot` class.\nThis object will be used to run your jobs.\nThe object is instantiated with the `'AmazonS3FullAccess'` policy,\nso that it can write the results\nout to S3, into a bucket that you have write permissions on.\nSetting the `bid_percentage` key-word makes AWS Batch use\n[spot EC2 instances](https://aws.amazon.com/ec2/spot/) for the\ncomputation. This can result in substantial cost-savings, as spot compute\ninstances can cost much less than on-demand instances.\nHowever, not that spot instances can also\nbe evicted, so if completing all of the work is very time-sensitive,\ndo not set this key-word argument. Using the `image_github_installs`\nkey-word argument will install pyAFQ from GitHub.\nYou can also specify other forks and branches to install from.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "knot = ck.Knot(\n name='afq-process-subject-201009-0',\n func=afq_process_subject,\n base_image='python:3.11',\n image_github_installs=\"https://github.com/tractometry/pyAFQ.git\",\n pars_policies=('AmazonS3FullAccess',),\n bid_percentage=100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Launching the computation\nThe :meth:`map` method of the :class:`Knot object maps each of the inputs\nprovided as a sequence onto the function and executes the function on each\none of them in parallel.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "result_futures = knot.map(subjects)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once computations have started, you can call the following\nfunction to view the progress of jobs::\n\n knot.view_jobs()\n\nYou can also view the status of a specific job::\n\n knot.jobs[0].status\n\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When all jobs are finished, remember to use the :meth:`clobber` method to\ndestroy all of the AWS resources created by the :class:`Knot`\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "result_futures.result()\nknot.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In a second :class:`Knot` object, we use a function that takes the\nresulting profiles of each subject and combines them into one csv file.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "def afq_combine_profiles(dummy_argument):\n from AFQ.api import download_and_combine_afq_profiles\n download_and_combine_afq_profiles(\n \"my_study_bucket\", \"my_study_prefix\")\n\n\nknot2 = ck.Knot(\n name='afq_combine_subjects-201009-0',\n func=afq_combine_profiles,\n base_image='python:3.11',\n image_github_installs=\"https://github.com/tractometry/pyAFQ.git\",\n pars_policies=('AmazonS3FullAccess',),\n bid_percentage=100)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This knot is called with a dummy argument, which is not used within the\nfunction itself. The `job_type` key-word argument is used to signal to\n``Cloudknot`` that only one job is submitted rather than the default\narray of jobs.\n\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false }, "outputs": [], "source": [ "result_futures2 = knot2.map([\"dummy_argument\"], job_type=\"independent\")\nresult_futures2.result()\nknot2.clobber(clobber_pars=True, clobber_repo=True, clobber_image=True)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.13.13" } }, "nbformat": 4, "nbformat_minor": 0 }