From a37e9a544eee35d4f6aeb95d8373037f21636621 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Fri, 21 Jul 2023 17:15:09 -0700 Subject: [PATCH 01/24] add Ray Decorator --- metaflow/plugins/__init__.py | 1 + metaflow/plugins/frameworks/ray.py | 44 ++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+) create mode 100644 metaflow/plugins/frameworks/ray.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 06572f6776..75c2e3540e 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -44,6 +44,7 @@ ("conda", ".conda.conda_step_decorator.CondaStepDecorator"), ("card", ".cards.card_decorator.CardDecorator"), ("pytorch_parallel", ".frameworks.pytorch.PytorchParallelDecorator"), + ("ray_parallel", ".frameworks.ray.RayParallelDecorator"), ("airflow_internal", ".airflow.airflow_decorator.AirflowInternalDecorator"), ] diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py new file mode 100644 index 0000000000..833929bbe5 --- /dev/null +++ b/metaflow/plugins/frameworks/ray.py @@ -0,0 +1,44 @@ +import inspect +import subprocess +import pickle +import tempfile +import os +import sys +from metaflow import current +from metaflow.plugins.parallel_decorator import ParallelDecorator + + +class RayParallelDecorator(ParallelDecorator): + name = "ray_parallel" + defaults = {"master_port": None} + IS_PARALLEL = True + + def task_decorate( + self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context + ): + return super().task_decorate( + step_func, flow, graph, retry_count, max_user_code_retries, ubf_context + ) + + def setup_distributed_env(self, flow): + setup_ray_distributed(self.attributes["master_port"]) + + +def setup_ray_distributed(master_port=None): + """ + Manually set up Ray cluster + """ + # Choose port depending on run id to reduce probability of collisions, unless + # provided by the user. + try: + master_port = master_port or (9001 + abs(int(current.run_id)) % 1000) + except: + # if `int()` fails, i.e. `run_id` is not an `int`, use just a constant port. Can't use `hash()`, + # as that is not constant. + master_port = 9001 + + if current.parallel.node_index == 0: + subprocess.run([sys.executable, "-m", "ray", "start", "--head", f"--port={master_port}"]) + else: + address = f"{current.parallel.main_ip}:{master_port}" + subprocess.run([sys.executable, "-m", "ray", "start", "--address", address]) \ No newline at end of file From 64321171b601b886e2038a24893c2e036a9060e6 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Fri, 21 Jul 2023 17:25:59 -0700 Subject: [PATCH 02/24] install ray default --- metaflow/plugins/frameworks/ray.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 833929bbe5..c39fa7a498 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -14,7 +14,7 @@ class RayParallelDecorator(ParallelDecorator): IS_PARALLEL = True def task_decorate( - self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context + self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context ): return super().task_decorate( step_func, flow, graph, retry_count, max_user_code_retries, ubf_context @@ -30,6 +30,10 @@ def setup_ray_distributed(master_port=None): """ # Choose port depending on run id to reduce probability of collisions, unless # provided by the user. + subprocess.run( + [sys.executable, "-m", "pip", "install", "ray[default]"] + ) + try: master_port = master_port or (9001 + abs(int(current.run_id)) % 1000) except: @@ -38,7 +42,9 @@ def setup_ray_distributed(master_port=None): master_port = 9001 if current.parallel.node_index == 0: - subprocess.run([sys.executable, "-m", "ray", "start", "--head", f"--port={master_port}"]) + subprocess.run( + [sys.executable, "-m", "ray", "start", "--head", f"--port={master_port}"] + ) else: address = f"{current.parallel.main_ip}:{master_port}" - subprocess.run([sys.executable, "-m", "ray", "start", "--address", address]) \ No newline at end of file + subprocess.run([sys.executable, "-m", "ray", "start", "--address", address]) From 6a7000e0aaf75770aec6deb57acc31685f141692 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Thu, 27 Jul 2023 23:18:16 -1000 Subject: [PATCH 03/24] fix ray head node / node connectivity --- metaflow/plugins/frameworks/ray.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index c39fa7a498..0e7a462e77 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -30,21 +30,21 @@ def setup_ray_distributed(master_port=None): """ # Choose port depending on run id to reduce probability of collisions, unless # provided by the user. - subprocess.run( - [sys.executable, "-m", "pip", "install", "ray[default]"] - ) + subprocess.Popen( + [sys.executable, "-m", "pip", "install", "-U", "ray[air]"] + ).wait() try: - master_port = master_port or (9001 + abs(int(current.run_id)) % 1000) + master_port = master_port or (6379 + abs(int(current.run_id)) % 1000) except: # if `int()` fails, i.e. `run_id` is not an `int`, use just a constant port. Can't use `hash()`, # as that is not constant. - master_port = 9001 + master_port = 6379 if current.parallel.node_index == 0: - subprocess.run( - [sys.executable, "-m", "ray", "start", "--head", f"--port={master_port}"] - ) + subprocess.Popen(f"ray start --head --node-ip-address {current.parallel.main_ip} --port {master_port} --block", shell=True).wait() else: - address = f"{current.parallel.main_ip}:{master_port}" - subprocess.run([sys.executable, "-m", "ray", "start", "--address", address]) + import ray + node_ip_address = ray._private.services.get_node_ip_address() + subprocess.Popen(f"ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port} --block", shell=True).wait() + From d7bcae517ddc3bbadde777124078a2fcb6115da4 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Mon, 31 Jul 2023 15:47:06 -0700 Subject: [PATCH 04/24] squashed commit --- metaflow/plugins/frameworks/ray.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 0e7a462e77..27e4352879 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -28,10 +28,11 @@ def setup_ray_distributed(master_port=None): """ Manually set up Ray cluster """ + import time # Choose port depending on run id to reduce probability of collisions, unless # provided by the user. subprocess.Popen( - [sys.executable, "-m", "pip", "install", "-U", "ray[air]"] + [sys.executable, "-m", "pip", "install", "-U", "ray[air]==2.5.0", "pydantic==1.10.12"] ).wait() try: @@ -42,9 +43,14 @@ def setup_ray_distributed(master_port=None): master_port = 6379 if current.parallel.node_index == 0: - subprocess.Popen(f"ray start --head --node-ip-address {current.parallel.main_ip} --port {master_port} --block", shell=True).wait() + print(f"The Master Node IP address is: {current.parallel.main_ip}") + subprocess.Popen(f"RAY_BACKEND_LOG_LEVEL=debug ray start --head --node-ip-address {current.parallel.main_ip} --port {master_port}", shell=True).wait() else: import ray node_ip_address = ray._private.services.get_node_ip_address() - subprocess.Popen(f"ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port} --block", shell=True).wait() + print(f"The Master Node IP address is: {current.parallel.main_ip}") + print(f"The Node IP address is: {node_ip_address}") + subprocess.Popen(f"RAY_BACKEND_LOG_LEVEL=debug ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port}", shell=True).wait() + + time.sleep(5*int(current.parallel.num_nodes)) From f799e070fc842bfe9ad8f503831d2b461051bc25 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Sun, 6 Aug 2023 00:18:55 -0700 Subject: [PATCH 05/24] iterating --- metaflow/plugins/frameworks/ray.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 27e4352879..0568bd2fab 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -52,5 +52,3 @@ def setup_ray_distributed(master_port=None): print(f"The Node IP address is: {node_ip_address}") subprocess.Popen(f"RAY_BACKEND_LOG_LEVEL=debug ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port}", shell=True).wait() - time.sleep(5*int(current.parallel.num_nodes)) - From 29b90970bd045d2342fb0662faad2b335e078767 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Sun, 6 Aug 2023 00:21:05 -0700 Subject: [PATCH 06/24] iterating --- metaflow/plugins/frameworks/ray.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 0568bd2fab..afd0ae9cc2 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -44,11 +44,11 @@ def setup_ray_distributed(master_port=None): if current.parallel.node_index == 0: print(f"The Master Node IP address is: {current.parallel.main_ip}") - subprocess.Popen(f"RAY_BACKEND_LOG_LEVEL=debug ray start --head --node-ip-address {current.parallel.main_ip} --port {master_port}", shell=True).wait() + subprocess.Popen(f"ray start --head --node-ip-address {current.parallel.main_ip} --port {master_port}", shell=True).wait() else: import ray node_ip_address = ray._private.services.get_node_ip_address() print(f"The Master Node IP address is: {current.parallel.main_ip}") print(f"The Node IP address is: {node_ip_address}") - subprocess.Popen(f"RAY_BACKEND_LOG_LEVEL=debug ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port}", shell=True).wait() + subprocess.Popen(f"ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port}", shell=True).wait() From db5321fcbe95c71a4326e4ce2604589542086705 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Tue, 8 Aug 2023 23:57:14 -0700 Subject: [PATCH 07/24] add efa compatibility --- metaflow/plugins/aws/batch/batch.py | 5 ++++ metaflow/plugins/aws/batch/batch_cli.py | 3 +++ metaflow/plugins/aws/batch/batch_client.py | 27 +++++++++++++++++++ metaflow/plugins/aws/batch/batch_decorator.py | 3 +++ 4 files changed, 38 insertions(+) diff --git a/metaflow/plugins/aws/batch/batch.py b/metaflow/plugins/aws/batch/batch.py index aeb6f5c773..68f6533f6d 100644 --- a/metaflow/plugins/aws/batch/batch.py +++ b/metaflow/plugins/aws/batch/batch.py @@ -178,6 +178,7 @@ def create_job( max_swap=None, swappiness=None, inferentia=None, + efa=None, env={}, attrs={}, host_volumes=None, @@ -214,6 +215,7 @@ def create_job( .max_swap(max_swap) .swappiness(swappiness) .inferentia(inferentia) + .efa(efa) .timeout_in_secs(run_time_limit) .job_def( image, @@ -224,6 +226,7 @@ def create_job( max_swap, swappiness, inferentia, + efa, memory=memory, host_volumes=host_volumes, use_tmpfs=use_tmpfs, @@ -326,6 +329,7 @@ def launch_job( max_swap=None, swappiness=None, inferentia=None, + efa=None, host_volumes=None, use_tmpfs=None, tmpfs_tempdir=None, @@ -361,6 +365,7 @@ def launch_job( max_swap, swappiness, inferentia, + efa, env=env, attrs=attrs, host_volumes=host_volumes, diff --git a/metaflow/plugins/aws/batch/batch_cli.py b/metaflow/plugins/aws/batch/batch_cli.py index e82a647a3d..50a4df6f26 100644 --- a/metaflow/plugins/aws/batch/batch_cli.py +++ b/metaflow/plugins/aws/batch/batch_cli.py @@ -141,6 +141,7 @@ def kill(ctx, run_id, user, my_runs): @click.option("--max-swap", help="Max Swap requirement for AWS Batch.") @click.option("--swappiness", help="Swappiness requirement for AWS Batch.") @click.option("--inferentia", help="Inferentia requirement for AWS Batch.") +@click.option("--efa", is_flag=True, help="Activate Elastic Fabric Adapter. EFA driver must be installed and Instance Type compatible with EFA") @click.option("--use-tmpfs", is_flag=True, help="tmpfs requirement for AWS Batch.") @click.option("--tmpfs-tempdir", is_flag=True, help="tmpfs requirement for AWS Batch.") @click.option("--tmpfs-size", help="tmpfs requirement for AWS Batch.") @@ -173,6 +174,7 @@ def step( max_swap=None, swappiness=None, inferentia=None, + efa=None, use_tmpfs=None, tmpfs_tempdir=None, tmpfs_size=None, @@ -301,6 +303,7 @@ def _sync_metadata(): max_swap=max_swap, swappiness=swappiness, inferentia=inferentia, + efa=efa, env=env, attrs=attrs, host_volumes=host_volumes, diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index 8ef26070f6..7f63b91390 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -149,6 +149,7 @@ def _register_job_definition( max_swap, swappiness, inferentia, + efa, memory, host_volumes, use_tmpfs, @@ -332,6 +333,26 @@ def _register_job_definition( "container": job_definition["containerProperties"], } ) + + if efa: + if not (isinstance(efa, bool)): + raise BatchJobException( + "Invalid efa value: ({}) (should be True or False)".format( + efa + ) + ) + else: + job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"] = [] + job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"].append( + { + "hostPath": "/dev/infiniband/uverbs0", + "containerPath": "/dev/infiniband/uverbs0", + "permissions": [ + "READ", "WRITE", "MKNOD" + ] + } + ) + del job_definition["containerProperties"] # not used for multi-node # check if job definition already exists @@ -371,6 +392,7 @@ def job_def( max_swap, swappiness, inferentia, + efa, memory, host_volumes, use_tmpfs, @@ -388,6 +410,7 @@ def job_def( max_swap, swappiness, inferentia, + efa, memory, host_volumes, use_tmpfs, @@ -438,6 +461,10 @@ def inferentia(self, inferentia): self._inferentia = inferentia return self + def efa(self, efa): + self._efa = efa + return self + def command(self, command): if "command" not in self.payload["containerOverrides"]: self.payload["containerOverrides"]["command"] = [] diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index 6dcb6e8e99..1a5be6c82d 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -83,6 +83,8 @@ class BatchDecorator(StepDecorator): Path to tmpfs mount for this step. Defaults to /metaflow_temp. inferentia : int, default: 0 Number of Inferentia chips required for this step. + efa: bool, default: False + This enables elastic fabric adapter """ name = "batch" @@ -98,6 +100,7 @@ class BatchDecorator(StepDecorator): "max_swap": None, "swappiness": None, "inferentia": None, + "efa": False, "host_volumes": None, "use_tmpfs": False, "tmpfs_tempdir": True, From 7e4b338187dc9bed58853c9b6abb3d8cca2146f5 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Tue, 8 Aug 2023 23:59:18 -0700 Subject: [PATCH 08/24] formatting --- metaflow/plugins/aws/batch/batch_client.py | 34 +++++++++++----------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index 7f63b91390..e8cb5ca861 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -334,24 +334,24 @@ def _register_job_definition( } ) - if efa: - if not (isinstance(efa, bool)): - raise BatchJobException( - "Invalid efa value: ({}) (should be True or False)".format( - efa - ) - ) - else: - job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"] = [] - job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"].append( - { - "hostPath": "/dev/infiniband/uverbs0", - "containerPath": "/dev/infiniband/uverbs0", - "permissions": [ - "READ", "WRITE", "MKNOD" - ] - } + if efa: + if not (isinstance(efa, bool)): + raise BatchJobException( + "Invalid efa value: ({}) (should be True or False)".format( + efa ) + ) + else: + job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"] = [] + job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"].append( + { + "hostPath": "/dev/infiniband/uverbs0", + "containerPath": "/dev/infiniband/uverbs0", + "permissions": [ + "READ", "WRITE", "MKNOD" + ] + } + ) del job_definition["containerProperties"] # not used for multi-node From cb3919c0ab16b1ae84b50e62de4e36e1439d63a0 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 9 Aug 2023 00:15:14 -0700 Subject: [PATCH 09/24] formatting --- metaflow/plugins/aws/batch/batch_client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index e8cb5ca861..19eab87aa5 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -342,8 +342,7 @@ def _register_job_definition( ) ) else: - job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"] = [] - job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"].append( + job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"] = [ { "hostPath": "/dev/infiniband/uverbs0", "containerPath": "/dev/infiniband/uverbs0", @@ -351,7 +350,7 @@ def _register_job_definition( "READ", "WRITE", "MKNOD" ] } - ) + ] del job_definition["containerProperties"] # not used for multi-node From 998e9b732ae63a0eae8ac490b3aea40fc246be4d Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 9 Aug 2023 00:20:11 -0700 Subject: [PATCH 10/24] formatting --- metaflow/plugins/aws/batch/batch_client.py | 36 +++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index 19eab87aa5..1f12538936 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -311,6 +311,24 @@ def _register_job_definition( } ] + if efa: + if not (isinstance(efa, bool)): + raise BatchJobException( + "Invalid efa value: ({}) (should be True or False)".format( + efa + ) + ) + else: + job_definition["containerProperties"]["linuxParameters"]["devices"] = [ + { + "hostPath": "/dev/infiniband/uverbs0", + "containerPath": "/dev/infiniband/uverbs0", + "permissions": [ + "READ", "WRITE", "MKNOD" + ] + } + ] + self.num_parallel = num_parallel or 0 if self.num_parallel >= 1: job_definition["type"] = "multinode" @@ -334,24 +352,6 @@ def _register_job_definition( } ) - if efa: - if not (isinstance(efa, bool)): - raise BatchJobException( - "Invalid efa value: ({}) (should be True or False)".format( - efa - ) - ) - else: - job_definition["nodeProperties"]["nodeRangeProperties"]["container"]["linuxParameters"] = [ - { - "hostPath": "/dev/infiniband/uverbs0", - "containerPath": "/dev/infiniband/uverbs0", - "permissions": [ - "READ", "WRITE", "MKNOD" - ] - } - ] - del job_definition["containerProperties"] # not used for multi-node # check if job definition already exists From b052115cd1c15ddfb35394c6427cb6f0bdc5f92d Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Sun, 13 Aug 2023 16:45:30 -0700 Subject: [PATCH 11/24] allow multiple efa devices --- metaflow/plugins/aws/batch/batch_cli.py | 8 ++++++- metaflow/plugins/aws/batch/batch_client.py | 22 +++++++++---------- metaflow/plugins/aws/batch/batch_decorator.py | 6 ++--- 3 files changed, 21 insertions(+), 15 deletions(-) diff --git a/metaflow/plugins/aws/batch/batch_cli.py b/metaflow/plugins/aws/batch/batch_cli.py index 5fc31530a0..22aaa0643f 100644 --- a/metaflow/plugins/aws/batch/batch_cli.py +++ b/metaflow/plugins/aws/batch/batch_cli.py @@ -141,7 +141,13 @@ def kill(ctx, run_id, user, my_runs): @click.option("--max-swap", help="Max Swap requirement for AWS Batch.") @click.option("--swappiness", help="Swappiness requirement for AWS Batch.") @click.option("--inferentia", help="Inferentia requirement for AWS Batch.") -@click.option("--efa", is_flag=True, help="Activate Elastic Fabric Adapter. EFA driver must be installed and Instance Type compatible with EFA") +@click.option( + "--efa", + default=0, + type=int, + help="Activate designated number of elastic fabric adapter devices. " + "EFA driver must be installed and instance type compatible with EFA" +) @click.option("--use-tmpfs", is_flag=True, help="tmpfs requirement for AWS Batch.") @click.option("--tmpfs-tempdir", is_flag=True, help="tmpfs requirement for AWS Batch.") @click.option("--tmpfs-size", help="tmpfs requirement for AWS Batch.") diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index 1f12538936..fba19bae7a 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -312,22 +312,22 @@ def _register_job_definition( ] if efa: - if not (isinstance(efa, bool)): + if not (isinstance(efa, (int, unicode, basestring))): raise BatchJobException( - "Invalid efa value: ({}) (should be True or False)".format( + "Invalid efa value: ({}) (should be 0 or greater)".format( efa ) ) else: - job_definition["containerProperties"]["linuxParameters"]["devices"] = [ - { - "hostPath": "/dev/infiniband/uverbs0", - "containerPath": "/dev/infiniband/uverbs0", - "permissions": [ - "READ", "WRITE", "MKNOD" - ] - } - ] + job_definition["containerProperties"]["linuxParameters"]["devices"] = [] + for i in range(int(efa)): + job_definition["containerProperties"]["linuxParameters"]["devices"] = [ + { + "hostPath": "/dev/infiniband/uverbs{}".format(i), + "containerPath": "/dev/infiniband/uverbs{}".format(i), + "permissions": ["READ", "WRITE", "MKNOD"] + } + ] self.num_parallel = num_parallel or 0 if self.num_parallel >= 1: diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index 1a5be6c82d..51d3ca55e0 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -83,8 +83,8 @@ class BatchDecorator(StepDecorator): Path to tmpfs mount for this step. Defaults to /metaflow_temp. inferentia : int, default: 0 Number of Inferentia chips required for this step. - efa: bool, default: False - This enables elastic fabric adapter + efa: bool, default: 0 + Number of elastic fabric adapter network devices to attach to container """ name = "batch" @@ -100,7 +100,7 @@ class BatchDecorator(StepDecorator): "max_swap": None, "swappiness": None, "inferentia": None, - "efa": False, + "efa": None, "host_volumes": None, "use_tmpfs": False, "tmpfs_tempdir": True, From 1cd950141aba0bf16a7fcc261a0e37eae7ffdb65 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Sun, 13 Aug 2023 17:11:44 -0700 Subject: [PATCH 12/24] update @ray_parallel with improvements from Outerbounds team --- metaflow/plugins/frameworks/ray.py | 61 ++++++++++++++++++------------ 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index afd0ae9cc2..8fc4577310 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -4,51 +4,64 @@ import tempfile import os import sys +import time +from functools import partial from metaflow import current +from metaflow.unbounded_foreach import UBF_CONTROL from metaflow.plugins.parallel_decorator import ParallelDecorator class RayParallelDecorator(ParallelDecorator): + name = "ray_parallel" - defaults = {"master_port": None} + defaults = {"main_port": None} IS_PARALLEL = True + def _mapper_heartbeat(self, graph_info): + print('HEARTBEAT') + from metaflow import Task # avoid circular import + from metaflow.plugins.parallel_decorator import identify_control_task_pathspec + control_task_pathspec = identify_control_task_pathspec(graph_info, current) + while not Task(control_task_pathspec).finished: + time.sleep(10) + def task_decorate( self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context ): - return super().task_decorate( - step_func, flow, graph, retry_count, max_user_code_retries, ubf_context - ) + if ubf_context == UBF_CONTROL: + # print('CONTROL') + return super().task_decorate( + step_func, flow, graph, retry_count, max_user_code_retries, ubf_context + ) + else: + print('MAPPER') + # doesn't do @pip + return partial(self._mapper_heartbeat, graph_info=flow._graph_info) def setup_distributed_env(self, flow): - setup_ray_distributed(self.attributes["master_port"]) + ray_cli_path = sys.executable.replace('python', 'ray') + print("RAY PATH: ", ray_cli_path) + setup_ray_distributed(self.attributes["main_port"], ray_cli_path) -def setup_ray_distributed(master_port=None): - """ - Manually set up Ray cluster - """ - import time - # Choose port depending on run id to reduce probability of collisions, unless - # provided by the user. - subprocess.Popen( - [sys.executable, "-m", "pip", "install", "-U", "ray[air]==2.5.0", "pydantic==1.10.12"] - ).wait() +def setup_ray_distributed(main_port=None, ray_cli_path=None): try: - master_port = master_port or (6379 + abs(int(current.run_id)) % 1000) + main_port = main_port or (6379 + abs(int(current.run_id)) % 1000) except: # if `int()` fails, i.e. `run_id` is not an `int`, use just a constant port. Can't use `hash()`, # as that is not constant. - master_port = 6379 + main_port = 6379 + + main_ip = current.parallel.main_ip + print('MAIN IP', main_ip) if current.parallel.node_index == 0: - print(f"The Master Node IP address is: {current.parallel.main_ip}") - subprocess.Popen(f"ray start --head --node-ip-address {current.parallel.main_ip} --port {master_port}", shell=True).wait() + print(f"main: The main Node IP address is: {main_ip}") + subprocess.run([ray_cli_path, "start", "--head", "--node-ip-address", main_ip, "--port", main_port], check=True) else: - import ray + import ray # put here, so ray import is only imported after environment exists in the task container node_ip_address = ray._private.services.get_node_ip_address() - print(f"The Master Node IP address is: {current.parallel.main_ip}") - print(f"The Node IP address is: {node_ip_address}") - subprocess.Popen(f"ray start --node-ip-address {node_ip_address} --address {current.parallel.main_ip}:{master_port}", shell=True).wait() - + print(f"MAPPER: The main Node IP address is: {main_ip}") + print(f"MAPPER: The Node IP address is: {node_ip_address}") + subprocess.run([ray_cli_path, "start", "--node-ip-address", node_ip_address, "--address", f"{main_ip}:{main_port}"], check=True) \ No newline at end of file From 270251aed57e2453bdd1fa524acc9861c010ec9b Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Sun, 13 Aug 2023 17:16:08 -0700 Subject: [PATCH 13/24] fix typo --- metaflow/plugins/aws/batch/batch_decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index 51d3ca55e0..c62c70c538 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -83,7 +83,7 @@ class BatchDecorator(StepDecorator): Path to tmpfs mount for this step. Defaults to /metaflow_temp. inferentia : int, default: 0 Number of Inferentia chips required for this step. - efa: bool, default: 0 + efa: int, default: 0 Number of elastic fabric adapter network devices to attach to container """ From 76f01734e92443becfc3dbab4de4d8e6a0e881d9 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Mon, 14 Aug 2023 10:37:00 -0700 Subject: [PATCH 14/24] formatting --- metaflow/plugins/frameworks/ray.py | 39 ++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 8fc4577310..f71f00fb31 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -18,9 +18,10 @@ class RayParallelDecorator(ParallelDecorator): IS_PARALLEL = True def _mapper_heartbeat(self, graph_info): - print('HEARTBEAT') - from metaflow import Task # avoid circular import + print("HEARTBEAT") + from metaflow import Task # avoid circular import from metaflow.plugins.parallel_decorator import identify_control_task_pathspec + control_task_pathspec = identify_control_task_pathspec(graph_info, current) while not Task(control_task_pathspec).finished: time.sleep(10) @@ -34,12 +35,12 @@ def task_decorate( step_func, flow, graph, retry_count, max_user_code_retries, ubf_context ) else: - print('MAPPER') + print("MAPPER") # doesn't do @pip return partial(self._mapper_heartbeat, graph_info=flow._graph_info) def setup_distributed_env(self, flow): - ray_cli_path = sys.executable.replace('python', 'ray') + ray_cli_path = sys.executable.replace("python", "ray") print("RAY PATH: ", ray_cli_path) setup_ray_distributed(self.attributes["main_port"], ray_cli_path) @@ -54,14 +55,36 @@ def setup_ray_distributed(main_port=None, ray_cli_path=None): main_port = 6379 main_ip = current.parallel.main_ip - print('MAIN IP', main_ip) + print("MAIN IP", main_ip) if current.parallel.node_index == 0: print(f"main: The main Node IP address is: {main_ip}") - subprocess.run([ray_cli_path, "start", "--head", "--node-ip-address", main_ip, "--port", main_port], check=True) + subprocess.run( + [ + ray_cli_path, + "start", + "--head", + "--node-ip-address", + main_ip, + "--port", + main_port, + ], + check=True, + ) else: - import ray # put here, so ray import is only imported after environment exists in the task container + import ray # put here, so ray import is only imported after environment exists in the task container + node_ip_address = ray._private.services.get_node_ip_address() print(f"MAPPER: The main Node IP address is: {main_ip}") print(f"MAPPER: The Node IP address is: {node_ip_address}") - subprocess.run([ray_cli_path, "start", "--node-ip-address", node_ip_address, "--address", f"{main_ip}:{main_port}"], check=True) \ No newline at end of file + subprocess.run( + [ + ray_cli_path, + "start", + "--node-ip-address", + node_ip_address, + "--address", + f"{main_ip}:{main_port}", + ], + check=True, + ) From c97117a21fc5b5ab3891639c0d21cb320c4b9556 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 16 Aug 2023 15:37:03 -0700 Subject: [PATCH 15/24] formatting --- metaflow/plugins/frameworks/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index f71f00fb31..20d4fd3fc2 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -67,7 +67,7 @@ def setup_ray_distributed(main_port=None, ray_cli_path=None): "--node-ip-address", main_ip, "--port", - main_port, + str(main_port), ], check=True, ) From 1557c1a8be711b68a3a24ca976f137fa4c6050fa Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 16 Aug 2023 23:04:28 -0700 Subject: [PATCH 16/24] formatting --- metaflow/plugins/frameworks/ray.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 20d4fd3fc2..2f2281d89f 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -42,8 +42,15 @@ def task_decorate( def setup_distributed_env(self, flow): ray_cli_path = sys.executable.replace("python", "ray") print("RAY PATH: ", ray_cli_path) + self.ensure_ray_air_installed() setup_ray_distributed(self.attributes["main_port"], ray_cli_path) + def ensure_ray_air_installed(self): + try: + import ray + except ImportError: + print("Installing latest version of ray-air package") + subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray-air"]) def setup_ray_distributed(main_port=None, ray_cli_path=None): From ca4f90912f30627dc561a611fa0ae120229d2396 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Thu, 17 Aug 2023 13:00:20 -0700 Subject: [PATCH 17/24] adding cool new polished changes --- metaflow/plugins/frameworks/ray.py | 170 +++++++++++++++++++++-------- 1 file changed, 124 insertions(+), 46 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 2f2281d89f..229dd82a94 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -1,58 +1,95 @@ -import inspect import subprocess -import pickle -import tempfile import os import sys import time -from functools import partial -from metaflow import current from metaflow.unbounded_foreach import UBF_CONTROL -from metaflow.plugins.parallel_decorator import ParallelDecorator +from metaflow.plugins.parallel_decorator import ParallelDecorator, _local_multinode_control_task_step_func class RayParallelDecorator(ParallelDecorator): - name = "ray_parallel" defaults = {"main_port": None} IS_PARALLEL = True - def _mapper_heartbeat(self, graph_info): - print("HEARTBEAT") - from metaflow import Task # avoid circular import - from metaflow.plugins.parallel_decorator import identify_control_task_pathspec - - control_task_pathspec = identify_control_task_pathspec(graph_info, current) - while not Task(control_task_pathspec).finished: - time.sleep(10) - def task_decorate( - self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context + self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context ): - if ubf_context == UBF_CONTROL: - # print('CONTROL') - return super().task_decorate( - step_func, flow, graph, retry_count, max_user_code_retries, ubf_context - ) + + from functools import partial + + def _worker_heartbeat(graph_info=flow._graph_info): + from metaflow import Task, current + control = get_previous_task_pathspec(graph_info, current) + while not Task(control).finished: + time.sleep(3) + + def _empty_worker_task(): + pass + + if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local": + if ubf_context == UBF_CONTROL: + env_to_use = getattr(self.environment, "base_env", self.environment) + return partial( + _local_multinode_control_task_step_func, + # assigns the flow._control_mapper_tasks variables & runs worker subprocesses. + flow, + env_to_use, + step_func, + # run user code and let ray.init() auto-detect available resources. could swap this for an entrypoint.py file to match ray job submission API. + retry_count, + ) + return partial( + _empty_worker_task) # don't need to run code on worker task. ray.init() in control attaches driver to the cluster. else: - print("MAPPER") - # doesn't do @pip - return partial(self._mapper_heartbeat, graph_info=flow._graph_info) + self.setup_distributed_env(flow, ubf_context) + if ubf_context == UBF_CONTROL: + return step_func + return partial( + _worker_heartbeat) # don't need to run user code on worker task. ray.init() in control attaches driver to the cluster. - def setup_distributed_env(self, flow): - ray_cli_path = sys.executable.replace("python", "ray") - print("RAY PATH: ", ray_cli_path) + def setup_distributed_env(self, flow, ubf_context): self.ensure_ray_air_installed() - setup_ray_distributed(self.attributes["main_port"], ray_cli_path) + ray_cli_path = sys.executable.replace("python", "ray") + setup_ray_distributed(self.attributes["main_port"], ray_cli_path, flow, ubf_context) def ensure_ray_air_installed(self): try: import ray except ImportError: - print("Installing latest version of ray-air package") + print("Ray is not installed. Installing latest version of ray-air package.") subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray-air"]) -def setup_ray_distributed(main_port=None, ray_cli_path=None): +def setup_ray_distributed( + main_port=None, + ray_cli_path=None, + run=None, + ubf_context=None +): + import ray + import json + import socket + from metaflow import S3, current + + # Why are deco.task_pre_step and deco.task_decorate calls in the same loop? + # https://github.com/Netflix/metaflow/blob/76eee802cba1983dffe7e7731dd8e31e2992e59b/metaflow/task.py#L553 + # this causes these current.parallel variables to be defaults on all nodes, + # since AWS Batch decorator's task_pre_step hasn't run yet. + # num_nodes = current.parallel.num_nodes + # node_index = current.parallel.node_index + + # AWS Batch-specific workaround. + num_nodes = int(os.environ["AWS_BATCH_JOB_NUM_NODES"]) + node_index = os.environ["AWS_BATCH_JOB_NODE_INDEX"] + node_key = os.path.join("ray_nodes", "node_%s.json" % node_index) + + # Similar to above comment, + # better to use current.parallel.main_ip instead of this conditional block, + # but this seems to require a change to the main loop in metaflow.task. + if ubf_context == UBF_CONTROL: + local_ips = socket.gethostbyname_ex(socket.gethostname())[-1] + main_ip = local_ips[0] + else: + main_ip = os.environ['AWS_BATCH_JOB_MAIN_NODE_PRIVATE_IPV4_ADDRESS'] try: main_port = main_port or (6379 + abs(int(current.run_id)) % 1000) @@ -61,12 +98,10 @@ def setup_ray_distributed(main_port=None, ray_cli_path=None): # as that is not constant. main_port = 6379 - main_ip = current.parallel.main_ip - print("MAIN IP", main_ip) + s3 = S3(run=run) - if current.parallel.node_index == 0: - print(f"main: The main Node IP address is: {main_ip}") - subprocess.run( + if ubf_context == UBF_CONTROL: + runtime_start_result = subprocess.run( [ ray_cli_path, "start", @@ -75,23 +110,66 @@ def setup_ray_distributed(main_port=None, ray_cli_path=None): main_ip, "--port", str(main_port), - ], - check=True, + ] ) else: - import ray # put here, so ray import is only imported after environment exists in the task container - node_ip_address = ray._private.services.get_node_ip_address() - print(f"MAPPER: The main Node IP address is: {main_ip}") - print(f"MAPPER: The Node IP address is: {node_ip_address}") - subprocess.run( + runtime_start_result = subprocess.run( [ ray_cli_path, "start", "--node-ip-address", node_ip_address, "--address", - f"{main_ip}:{main_port}", - ], - check=True, + "%s:%s" % (main_ip, main_port), + ] ) + + if runtime_start_result.returncode != 0: + raise Exception("Ray runtime failed to start on node %s" % node_index) + else: + s3.put(node_key, json.dumps({'node_started': True})) + + def _num_nodes_started(path="ray_nodes"): + objs = s3.get_recursive([path]) + num_started = 0 + for obj in objs: + obj = json.loads(obj.text) + if obj['node_started']: + num_started += 1 + else: + raise Exception("Node {} failed to start Ray runtime".format(node_index)) + return num_started + + # poll until all workers have joined the cluster + if ubf_context == UBF_CONTROL: + while _num_nodes_started() < num_nodes: + time.sleep(10) + + s3.close() + + +def get_previous_task_pathspec(graph_info, current): + """ + Find the pathspec of the control task that a worker task is coupled to. + """ + + from metaflow import Step + + steps_info = graph_info['steps'] + for step_name, step_info in steps_info.items(): + if current.step_name == step_name: + previous_step_name = step_name + step_pathspec = "{flow_name}/{run_id}/{step_name}".format( + flow_name=current.flow_name, + run_id=current.run_id, + step_name=previous_step_name + ) + step = Step(step_pathspec) + for task in step: + if task.id.startswith("control"): + control_task_pathspec = "{step_pathspec}/{task_id}".format( + step_pathspec=step.pathspec, + task_id=task.id + ) + return control_task_pathspec From f94763d494c305d3f89c4c777966ab546489c7fb Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Thu, 17 Aug 2023 13:12:43 -0700 Subject: [PATCH 18/24] ray-air -> ray[air] --- metaflow/plugins/frameworks/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 229dd82a94..129d789604 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -57,7 +57,7 @@ def ensure_ray_air_installed(self): import ray except ImportError: print("Ray is not installed. Installing latest version of ray-air package.") - subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray-air"]) + subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray[air]"]) def setup_ray_distributed( main_port=None, From c7d94a8ac2761c0b060efe4a510efd350bbef54f Mon Sep 17 00:00:00 2001 From: Eddie Mattia Date: Wed, 23 Aug 2023 11:00:44 -0700 Subject: [PATCH 19/24] add error handling, node watcher, and auto checkpoint_path var --- metaflow/plugins/frameworks/ray.py | 219 ++++++++++++++++++++--------- 1 file changed, 150 insertions(+), 69 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 129d789604..0623d5d064 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -1,70 +1,94 @@ -import subprocess import os import sys import time +import json +import signal +import subprocess +from pathlib import Path +from threading import Thread +from metaflow.exception import MetaflowException from metaflow.unbounded_foreach import UBF_CONTROL from metaflow.plugins.parallel_decorator import ParallelDecorator, _local_multinode_control_task_step_func +RAY_CHECKPOINT_VAR_NAME = 'checkpoint_path' +RAY_JOB_COMPLETE_VAR = 'ray_job_completed' +RAY_NODE_STARTED_VAR = 'node_started' +CONTROL_TASK_S3_ID = 'control' class RayParallelDecorator(ParallelDecorator): + name = "ray_parallel" - defaults = {"main_port": None} + defaults = {"main_port": None, "worker_polling_freq": 10, "all_nodes_started_timeout": 90} IS_PARALLEL = True def task_decorate( - self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context + self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context ): from functools import partial - - def _worker_heartbeat(graph_info=flow._graph_info): - from metaflow import Task, current - control = get_previous_task_pathspec(graph_info, current) - while not Task(control).finished: - time.sleep(3) + from metaflow import S3, current + from metaflow.metaflow_config import DATATOOLS_S3ROOT def _empty_worker_task(): - pass + pass # local case + + def _worker_heartbeat(polling_freq=self.attributes["worker_polling_freq"], var=RAY_JOB_COMPLETE_VAR): + while not json.loads(s3.get(CONTROL_TASK_S3_ID).blob)[var]: + time.sleep(polling_freq) + + def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR): + watcher = NodeParticipationWatcher(expected_num_nodes=current.num_nodes, polling_freq=10) + try: + step_func() + except Exception as e: + raise ControlTaskException(e) + finally: + watcher.end() + s3.put(CONTROL_TASK_S3_ID, json.dumps({var: True})) + + s3 = S3(run=flow) + ensure_ray_installed() + if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local": + checkpoint_path = os.path.join(os.getcwd(), "ray_checkpoints") + else: + checkpoint_path = os.path.join( + DATATOOLS_S3ROOT, current.flow_name, current.run_id, "ray_checkpoints" + ) + setattr(flow, RAY_CHECKPOINT_VAR_NAME, checkpoint_path) if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local": if ubf_context == UBF_CONTROL: env_to_use = getattr(self.environment, "base_env", self.environment) return partial( _local_multinode_control_task_step_func, - # assigns the flow._control_mapper_tasks variables & runs worker subprocesses. flow, - env_to_use, + env_to_use, step_func, - # run user code and let ray.init() auto-detect available resources. could swap this for an entrypoint.py file to match ray job submission API. retry_count, - ) - return partial( - _empty_worker_task) # don't need to run code on worker task. ray.init() in control attaches driver to the cluster. + ) + return partial(_empty_worker_task) else: self.setup_distributed_env(flow, ubf_context) if ubf_context == UBF_CONTROL: - return step_func - return partial( - _worker_heartbeat) # don't need to run user code on worker task. ray.init() in control attaches driver to the cluster. + return partial(_control_wrapper, step_func=step_func, flow=flow) + return partial(_worker_heartbeat) def setup_distributed_env(self, flow, ubf_context): - self.ensure_ray_air_installed() - ray_cli_path = sys.executable.replace("python", "ray") - setup_ray_distributed(self.attributes["main_port"], ray_cli_path, flow, ubf_context) + py_cli_path = Path(sys.executable) + if py_cli_path.is_symlink(): + py_cli_path = os.readlink(py_cli_path) + ray_cli_path = os.path.join(py_cli_path.split('python')[0], 'ray') + setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"], ray_cli_path, flow, ubf_context) - def ensure_ray_air_installed(self): - try: - import ray - except ImportError: - print("Ray is not installed. Installing latest version of ray-air package.") - subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray[air]"]) def setup_ray_distributed( - main_port=None, - ray_cli_path=None, - run=None, - ubf_context=None + main_port, + all_nodes_started_timeout, + ray_cli_path, + run, + ubf_context ): + import ray import json import socket @@ -72,30 +96,29 @@ def setup_ray_distributed( # Why are deco.task_pre_step and deco.task_decorate calls in the same loop? # https://github.com/Netflix/metaflow/blob/76eee802cba1983dffe7e7731dd8e31e2992e59b/metaflow/task.py#L553 - # this causes these current.parallel variables to be defaults on all nodes, - # since AWS Batch decorator's task_pre_step hasn't run yet. + # The way this runs now causes these current.parallel variables to be defaults on all nodes, + # since AWS Batch decorator task_pre_step hasn't run prior to the above task_decorate call. # num_nodes = current.parallel.num_nodes # node_index = current.parallel.node_index # AWS Batch-specific workaround. num_nodes = int(os.environ["AWS_BATCH_JOB_NUM_NODES"]) node_index = os.environ["AWS_BATCH_JOB_NODE_INDEX"] - node_key = os.path.join("ray_nodes", "node_%s.json" % node_index) + node_key = os.path.join(RAY_NODE_STARTED_VAR, "node_%s.json" % node_index) + current._update_env({'num_nodes': num_nodes}) - # Similar to above comment, - # better to use current.parallel.main_ip instead of this conditional block, - # but this seems to require a change to the main loop in metaflow.task. + # Similar to above comment, + # better to use current.parallel.main_ip instead of this conditional block, + # but this seems to require a change to the main loop in metaflow.task. if ubf_context == UBF_CONTROL: local_ips = socket.gethostbyname_ex(socket.gethostname())[-1] main_ip = local_ips[0] - else: + else: main_ip = os.environ['AWS_BATCH_JOB_MAIN_NODE_PRIVATE_IPV4_ADDRESS'] - + try: main_port = main_port or (6379 + abs(int(current.run_id)) % 1000) except: - # if `int()` fails, i.e. `run_id` is not an `int`, use just a constant port. Can't use `hash()`, - # as that is not constant. main_port = 6379 s3 = S3(run=run) @@ -112,6 +135,7 @@ def setup_ray_distributed( str(main_port), ] ) + s3.put('control', json.dumps({RAY_JOB_COMPLETE_VAR: False})) else: node_ip_address = ray._private.services.get_node_ip_address() runtime_start_result = subprocess.run( @@ -124,13 +148,12 @@ def setup_ray_distributed( "%s:%s" % (main_ip, main_port), ] ) - if runtime_start_result.returncode != 0: - raise Exception("Ray runtime failed to start on node %s" % node_index) + raise RayWorkerFailedStartException(node_index) else: s3.put(node_key, json.dumps({'node_started': True})) - def _num_nodes_started(path="ray_nodes"): + def _num_nodes_started(path=RAY_NODE_STARTED_VAR): objs = s3.get_recursive([path]) num_started = 0 for obj in objs: @@ -138,38 +161,96 @@ def _num_nodes_started(path="ray_nodes"): if obj['node_started']: num_started += 1 else: - raise Exception("Node {} failed to start Ray runtime".format(node_index)) + raise RayWorkerFailedStartException(node_index) return num_started - + # poll until all workers have joined the cluster if ubf_context == UBF_CONTROL: + t0 = time.time() while _num_nodes_started() < num_nodes: + if all_nodes_started_timeout <= time.time() - t0: + raise AllNodesStartupTimeoutException() time.sleep(10) s3.close() -def get_previous_task_pathspec(graph_info, current): - """ - Find the pathspec of the control task that a worker task is coupled to. - """ +def ensure_ray_installed(): + while True: + try: + import ray + break + except ImportError: + print("Ray is not installed. Installing latest version of ray-air package.") + subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray[air]"]) + + +class NodeParticipationWatcher(object): - from metaflow import Step + def __init__(self, expected_num_nodes, polling_freq=10, t_user_code_start_buffer=30): + self.t_user_code_start_buffer = t_user_code_start_buffer + self.expected_num_nodes = expected_num_nodes + self.polling_freq = polling_freq + self._thread = Thread(target = self._enforce_participation) + self.is_alive = True + self._thread.start() - steps_info = graph_info['steps'] - for step_name, step_info in steps_info.items(): - if current.step_name == step_name: - previous_step_name = step_name - step_pathspec = "{flow_name}/{run_id}/{step_name}".format( - flow_name=current.flow_name, - run_id=current.run_id, - step_name=previous_step_name - ) - step = Step(step_pathspec) - for task in step: - if task.id.startswith("control"): - control_task_pathspec = "{step_pathspec}/{task_id}".format( - step_pathspec=step.pathspec, - task_id=task.id - ) - return control_task_pathspec + def end(self): + self.is_alive = False + + def _enforce_participation(self): + + import ray + + # Why this sleep? + time.sleep(self.t_user_code_start_buffer) + # The user code is expected to run ray.init(), in line with ergonomic Ray workflows. + # To run self._num_nodes_started() in following loop, ray.init() needs to already run. + # If we don't wait for user code to run ray.init(), + # then we need to do it before this loop, + # which causes the user code ray.init() to throw error like: + # `Maybe you called ray.init twice by accident?` + # and will ask user to put 'ignore_reinit_error=True' in 'ray.init()', which is annoying UX. + # So we wait for user code to run ray.init() before we run self._num_nodes_started() in following loop. + + while self.is_alive: + n = self._num_nodes(ray) + if n < self.expected_num_nodes: + self.is_alive = False + self._kill_run(n) + time.sleep(self.polling_freq) + + def _num_nodes(self, ray): + return len(ray._private.state.state._live_node_ids()) # Should this use ray._private.state.node_ids()? + + def _kill_run(self, n): + msg = "Node {} stopped participating. Expected {} nodes to participate.".format(n, self.expected_num_nodes) + print(msg) + os.kill(os.getpid(), signal.SIGINT) + + +class ControlTaskException(MetaflowException): + headline = "Contral task error" + + def __init__(self, e): + msg = """ +Spinning down all workers because of the following exception running the @step code on the control task: + {exception_str} + """.format(exception_str=str(e)) + super(ControlTaskException, self).__init__(msg) + + +class RayWorkerFailedStartException(MetaflowException): + headline = "Worker task startup error" + + def __init__(self, node_index): + msg = "Worker task failed to start on node {}".format(node_index) + super(RayWorkerFailedStartException, self).__init__(msg) + + +class AllNodesStartupTimeoutException(MetaflowException): + headline = "All workers did not join cluster error" + + def __init__(self): + msg = "Exiting job due to time out waiting for all workers to join cluster. You can set the timeout in @ray_parallel(all_nodes_started_timeout=X)" + super(AllNodesStartupTimeoutException, self).__init__(msg) \ No newline at end of file From e2fd245f66650504067d1763bfda3e99189e8017 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 23 Aug 2023 18:06:02 -0700 Subject: [PATCH 20/24] iterating --- metaflow/plugins/frameworks/ray.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 0623d5d064..4c3d7645e0 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -48,6 +48,15 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR): s3 = S3(run=flow) ensure_ray_installed() + + try: + print("Locating Ray Cli Path ...") + command = "which ray" + output = subprocess.check_output(command, shell=True, text=True) + print("Ray cli path: ", output) + except subprocess.CalledProcessError as e: + print("Error:", e.output) + if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local": checkpoint_path = os.path.join(os.getcwd(), "ray_checkpoints") else: @@ -74,11 +83,16 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR): return partial(_worker_heartbeat) def setup_distributed_env(self, flow, ubf_context): - py_cli_path = Path(sys.executable) - if py_cli_path.is_symlink(): - py_cli_path = os.readlink(py_cli_path) - ray_cli_path = os.path.join(py_cli_path.split('python')[0], 'ray') - setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"], ray_cli_path, flow, ubf_context) + py_cli_path = Path(sys.executable).resolve() + py_exec_dir = py_cli_path.parent + ray_cli_path = py_exec_dir / "ray" + + if ray_cli_path.is_file(): + ray_cli_path = ray_cli_path.resolve() + setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"], + str(ray_cli_path), flow, ubf_context) + else: + print("'ray' executable not found in:", ray_cli_path) def setup_ray_distributed( From 214c0dafdb1ab14ec66b011fe9d70c895ffc089d Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 23 Aug 2023 18:14:21 -0700 Subject: [PATCH 21/24] remove ray cli path locating --- metaflow/plugins/frameworks/ray.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 4c3d7645e0..dd0c202711 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -49,14 +49,6 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR): s3 = S3(run=flow) ensure_ray_installed() - try: - print("Locating Ray Cli Path ...") - command = "which ray" - output = subprocess.check_output(command, shell=True, text=True) - print("Ray cli path: ", output) - except subprocess.CalledProcessError as e: - print("Error:", e.output) - if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local": checkpoint_path = os.path.join(os.getcwd(), "ray_checkpoints") else: From 1a58ada4fc06bb530718a80f82461948aa7b6a3d Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 23 Aug 2023 21:16:01 -0700 Subject: [PATCH 22/24] remove hpc-related changes --- metaflow/plugins/aws/batch/batch.py | 5 ---- metaflow/plugins/aws/batch/batch_cli.py | 9 ------- metaflow/plugins/aws/batch/batch_client.py | 25 ------------------- metaflow/plugins/aws/batch/batch_decorator.py | 3 --- 4 files changed, 42 deletions(-) diff --git a/metaflow/plugins/aws/batch/batch.py b/metaflow/plugins/aws/batch/batch.py index 68f6533f6d..aeb6f5c773 100644 --- a/metaflow/plugins/aws/batch/batch.py +++ b/metaflow/plugins/aws/batch/batch.py @@ -178,7 +178,6 @@ def create_job( max_swap=None, swappiness=None, inferentia=None, - efa=None, env={}, attrs={}, host_volumes=None, @@ -215,7 +214,6 @@ def create_job( .max_swap(max_swap) .swappiness(swappiness) .inferentia(inferentia) - .efa(efa) .timeout_in_secs(run_time_limit) .job_def( image, @@ -226,7 +224,6 @@ def create_job( max_swap, swappiness, inferentia, - efa, memory=memory, host_volumes=host_volumes, use_tmpfs=use_tmpfs, @@ -329,7 +326,6 @@ def launch_job( max_swap=None, swappiness=None, inferentia=None, - efa=None, host_volumes=None, use_tmpfs=None, tmpfs_tempdir=None, @@ -365,7 +361,6 @@ def launch_job( max_swap, swappiness, inferentia, - efa, env=env, attrs=attrs, host_volumes=host_volumes, diff --git a/metaflow/plugins/aws/batch/batch_cli.py b/metaflow/plugins/aws/batch/batch_cli.py index 22aaa0643f..a4ead59858 100644 --- a/metaflow/plugins/aws/batch/batch_cli.py +++ b/metaflow/plugins/aws/batch/batch_cli.py @@ -141,13 +141,6 @@ def kill(ctx, run_id, user, my_runs): @click.option("--max-swap", help="Max Swap requirement for AWS Batch.") @click.option("--swappiness", help="Swappiness requirement for AWS Batch.") @click.option("--inferentia", help="Inferentia requirement for AWS Batch.") -@click.option( - "--efa", - default=0, - type=int, - help="Activate designated number of elastic fabric adapter devices. " - "EFA driver must be installed and instance type compatible with EFA" -) @click.option("--use-tmpfs", is_flag=True, help="tmpfs requirement for AWS Batch.") @click.option("--tmpfs-tempdir", is_flag=True, help="tmpfs requirement for AWS Batch.") @click.option("--tmpfs-size", help="tmpfs requirement for AWS Batch.") @@ -180,7 +173,6 @@ def step( max_swap=None, swappiness=None, inferentia=None, - efa=None, use_tmpfs=None, tmpfs_tempdir=None, tmpfs_size=None, @@ -308,7 +300,6 @@ def _sync_metadata(): max_swap=max_swap, swappiness=swappiness, inferentia=inferentia, - efa=efa, env=env, attrs=attrs, host_volumes=host_volumes, diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index fba19bae7a..9a37efb3b2 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -149,7 +149,6 @@ def _register_job_definition( max_swap, swappiness, inferentia, - efa, memory, host_volumes, use_tmpfs, @@ -311,24 +310,6 @@ def _register_job_definition( } ] - if efa: - if not (isinstance(efa, (int, unicode, basestring))): - raise BatchJobException( - "Invalid efa value: ({}) (should be 0 or greater)".format( - efa - ) - ) - else: - job_definition["containerProperties"]["linuxParameters"]["devices"] = [] - for i in range(int(efa)): - job_definition["containerProperties"]["linuxParameters"]["devices"] = [ - { - "hostPath": "/dev/infiniband/uverbs{}".format(i), - "containerPath": "/dev/infiniband/uverbs{}".format(i), - "permissions": ["READ", "WRITE", "MKNOD"] - } - ] - self.num_parallel = num_parallel or 0 if self.num_parallel >= 1: job_definition["type"] = "multinode" @@ -391,7 +372,6 @@ def job_def( max_swap, swappiness, inferentia, - efa, memory, host_volumes, use_tmpfs, @@ -409,7 +389,6 @@ def job_def( max_swap, swappiness, inferentia, - efa, memory, host_volumes, use_tmpfs, @@ -460,10 +439,6 @@ def inferentia(self, inferentia): self._inferentia = inferentia return self - def efa(self, efa): - self._efa = efa - return self - def command(self, command): if "command" not in self.payload["containerOverrides"]: self.payload["containerOverrides"]["command"] = [] diff --git a/metaflow/plugins/aws/batch/batch_decorator.py b/metaflow/plugins/aws/batch/batch_decorator.py index c62c70c538..6dcb6e8e99 100644 --- a/metaflow/plugins/aws/batch/batch_decorator.py +++ b/metaflow/plugins/aws/batch/batch_decorator.py @@ -83,8 +83,6 @@ class BatchDecorator(StepDecorator): Path to tmpfs mount for this step. Defaults to /metaflow_temp. inferentia : int, default: 0 Number of Inferentia chips required for this step. - efa: int, default: 0 - Number of elastic fabric adapter network devices to attach to container """ name = "batch" @@ -100,7 +98,6 @@ class BatchDecorator(StepDecorator): "max_swap": None, "swappiness": None, "inferentia": None, - "efa": None, "host_volumes": None, "use_tmpfs": False, "tmpfs_tempdir": True, From 9ee1b0ef8d56f6d375439ef7df7e5c076f911f87 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 23 Aug 2023 21:16:33 -0700 Subject: [PATCH 23/24] remove hpc-related changes --- metaflow/plugins/aws/batch/batch_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metaflow/plugins/aws/batch/batch_client.py b/metaflow/plugins/aws/batch/batch_client.py index 9a37efb3b2..8ef26070f6 100644 --- a/metaflow/plugins/aws/batch/batch_client.py +++ b/metaflow/plugins/aws/batch/batch_client.py @@ -332,7 +332,6 @@ def _register_job_definition( "container": job_definition["containerProperties"], } ) - del job_definition["containerProperties"] # not used for multi-node # check if job definition already exists From de58d11e3fdd109b7f28706d69447a797d31c724 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Fri, 25 Aug 2023 11:55:28 -0700 Subject: [PATCH 24/24] wait until Ray installed --- metaflow/plugins/frameworks/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index dd0c202711..367150631f 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -188,7 +188,7 @@ def ensure_ray_installed(): break except ImportError: print("Ray is not installed. Installing latest version of ray-air package.") - subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray[air]"]) + subprocess.run([sys.executable, "-m", "pip", "install", "-U", "ray[air]"], check=True) class NodeParticipationWatcher(object):