Skip to content

Commit

Permalink
[parallel] fix tests for ubf based on new changes to core.
Browse files Browse the repository at this point in the history
  • Loading branch information
valayDave committed Jun 27, 2024
1 parent 389db1a commit eaf5d9f
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions metaflow/plugins/test_unbounded_foreach_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,30 @@ def step_init(
):
self.environment = environment

def task_pre_step(
self,
step_name,
task_datastore,
metadata,
run_id,
task_id,
flow,
graph,
retry_count,
max_user_code_retries,
ubf_context,
inputs,
):
self.input_paths = [obj.pathspec for obj in inputs]

def control_task_step_func(self, flow, graph, retry_count):
from metaflow import current

graph
run_id = current.run_id
step_name = current.step_name
control_task_id = current.task_id
(_, split_step_name, split_task_id) = control_task_id.split("-")[1:]
# (_, split_step_name, split_task_id) = control_task_id.split("-")[1:]
# If we are running inside Conda, we use the base executable FIRST;
# the conda environment will then be used when runtime_step_cli is
# called. This is so that it can properly set up all the metaflow
Expand Down Expand Up @@ -97,7 +114,7 @@ def control_task_step_func(self, flow, graph, retry_count):
task_id = "%s-%d" % (control_task_id.replace("control-", "test-ubf-"), i)
pathspec = "%s/%s/%s" % (run_id, step_name, task_id)
mapper_tasks.append(to_unicode(pathspec))
input_paths = "%s/%s/%s" % (run_id, split_step_name, split_task_id)
input_paths = ",".join(self.input_paths)

# Override specific `step` kwargs.
kwargs = cli_args.step_kwargs
Expand Down

0 comments on commit eaf5d9f

Please sign in to comment.