From e2fd245f66650504067d1763bfda3e99189e8017 Mon Sep 17 00:00:00 2001 From: Riley Hun Date: Wed, 23 Aug 2023 18:06:02 -0700 Subject: [PATCH] iterating --- metaflow/plugins/frameworks/ray.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/frameworks/ray.py b/metaflow/plugins/frameworks/ray.py index 0623d5d064..4c3d7645e0 100644 --- a/metaflow/plugins/frameworks/ray.py +++ b/metaflow/plugins/frameworks/ray.py @@ -48,6 +48,15 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR): s3 = S3(run=flow) ensure_ray_installed() + + try: + print("Locating Ray Cli Path ...") + command = "which ray" + output = subprocess.check_output(command, shell=True, text=True) + print("Ray cli path: ", output) + except subprocess.CalledProcessError as e: + print("Error:", e.output) + if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local": checkpoint_path = os.path.join(os.getcwd(), "ray_checkpoints") else: @@ -74,11 +83,16 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR): return partial(_worker_heartbeat) def setup_distributed_env(self, flow, ubf_context): - py_cli_path = Path(sys.executable) - if py_cli_path.is_symlink(): - py_cli_path = os.readlink(py_cli_path) - ray_cli_path = os.path.join(py_cli_path.split('python')[0], 'ray') - setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"], ray_cli_path, flow, ubf_context) + py_cli_path = Path(sys.executable).resolve() + py_exec_dir = py_cli_path.parent + ray_cli_path = py_exec_dir / "ray" + + if ray_cli_path.is_file(): + ray_cli_path = ray_cli_path.resolve() + setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"], + str(ray_cli_path), flow, ubf_context) + else: + print("'ray' executable not found in:", ray_cli_path) def setup_ray_distributed(