Skip to content

Commit

Permalink
iterating
Browse files Browse the repository at this point in the history
  • Loading branch information
Riley Hun committed Aug 24, 2023
1 parent b800cb2 commit c2e23de
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions metaflow/plugins/frameworks/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR):

s3 = S3(run=flow)
ensure_ray_installed()

try:
print("Locating Ray Cli Path ...")
command = "which ray"
output = subprocess.check_output(command, shell=True, text=True)
print("Ray cli path: ", output)
except subprocess.CalledProcessError as e:
print("Error:", e.output)

if os.environ.get("METAFLOW_RUNTIME_ENVIRONMENT", "local") == "local":
checkpoint_path = os.path.join(os.getcwd(), "ray_checkpoints")
else:
Expand All @@ -74,14 +83,16 @@ def _control_wrapper(step_func, flow, var=RAY_JOB_COMPLETE_VAR):
return partial(_worker_heartbeat)

def setup_distributed_env(self, flow, ubf_context):
py_cli_path = Path(sys.executable)
if py_cli_path.is_symlink():
py_cli_path = os.readlink(py_cli_path)
print("Locating Ray Cli Path ...")
output = subprocess.check_output("which ray", shell=True, stderr=subprocess.STDOUT, text=True)
print("Ray cli path: ", output)
ray_cli_path = os.path.join(py_cli_path.split('python')[0], 'ray')
setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"], ray_cli_path, flow, ubf_context)
py_cli_path = Path(sys.executable).resolve()
py_exec_dir = py_cli_path.parent
ray_cli_path = py_exec_dir / "ray"

if ray_cli_path.is_file():
ray_cli_path = ray_cli_path.resolve()
setup_ray_distributed(self.attributes["main_port"], self.attributes["all_nodes_started_timeout"],
str(ray_cli_path), flow, ubf_context)
else:
print("'ray' executable not found in:", ray_cli_path)


def setup_ray_distributed(
Expand Down

0 comments on commit c2e23de

Please sign in to comment.