Skip to content

Commit

Permalink
grab run_tests.py changes from base branch
Browse files Browse the repository at this point in the history
  • Loading branch information
saikonen committed Oct 23, 2024
1 parent 992bced commit 8a83eba
Showing 1 changed file with 120 additions and 124 deletions.
244 changes: 120 additions & 124 deletions test/core/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,146 +200,142 @@ def construct_arg_dicts_from_click_api():
}
)

called_processes = []
if "pre_command" in context:
if context["pre_command"].get("metaflow_command"):
cmd = [context["python"], "test_flow.py"]
cmd.extend(context["top_options"])
cmd.extend(context["pre_command"]["command"])
else:
cmd = context["pre_command"]["command"]
called_processes.append(
subprocess.run(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)
if called_processes[-1].returncode and not context["pre_command"].get(
"ignore_errors", False
):
log(
"pre-command failed", formatter, context, processes=called_processes
os.environ.clear()
os.environ.update(env)

called_processes = []
if "pre_command" in context:
if context["pre_command"].get("metaflow_command"):
cmd = [context["python"], "test_flow.py"]
cmd.extend(context["top_options"])
cmd.extend(context["pre_command"]["command"])
else:
cmd = context["pre_command"]["command"]
called_processes.append(
subprocess.run(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)
return called_processes[-1].returncode, path

# run flow
if executor == "cli":
called_processes.append(
subprocess.run(
run_cmd("run"),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
if called_processes[-1].returncode and not context["pre_command"].get(
"ignore_errors", False
):
log(
"pre-command failed",
formatter,
context,
processes=called_processes,
)
return called_processes[-1].returncode, path

# run flow
if executor == "cli":
called_processes.append(
subprocess.run(
run_cmd("run"),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)
)
elif executor == "api":
top_level_dict, run_level_dict = construct_arg_dicts_from_click_api()
runner = Runner(
"test_flow.py", show_output=False, env=env, **top_level_dict
)
result = runner.run(**run_level_dict)
with open(result.command_obj.log_files["stdout"], encoding="utf-8") as f:
stdout = f.read()
with open(result.command_obj.log_files["stderr"], encoding="utf-8") as f:
stderr = f.read()
called_processes.append(
subprocess.CompletedProcess(
result.command_obj.command,
result.command_obj.process.returncode,
stdout,
stderr,
elif executor == "api":
top_level_dict, run_level_dict = construct_arg_dicts_from_click_api()
runner = Runner(
"test_flow.py", show_output=False, env=env, **top_level_dict
)
)

if called_processes[-1].returncode:
if formatter.should_fail:
log("Flow failed as expected.")
elif formatter.should_resume:
log("Resuming flow as expected", formatter, context)
if executor == "cli":
called_processes.append(
subprocess.run(
run_cmd(
"resume",
(
[formatter.resume_step]
if formatter.resume_step
else []
),
),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)
elif executor == "api":
_, resume_level_dict = construct_arg_dicts_from_click_api()
if formatter.resume_step:
resume_level_dict["step_to_rerun"] = formatter.resume_step
result = runner.resume(**resume_level_dict)
# NOTE: This will include both logs from the original run and resume
# so we will remove the last process
with open(
result.command_obj.log_files["stdout"], encoding="utf-8"
) as f:
stdout = f.read()
with open(
result.command_obj.log_files["stderr"], encoding="utf-8"
) as f:
stderr = f.read()
called_processes[-1] = subprocess.CompletedProcess(
result = runner.run(**run_level_dict)
with open(
result.command_obj.log_files["stdout"], encoding="utf-8"
) as f:
stdout = f.read()
with open(
result.command_obj.log_files["stderr"], encoding="utf-8"
) as f:
stderr = f.read()
called_processes.append(
subprocess.CompletedProcess(
result.command_obj.command,
result.command_obj.process.returncode,
stdout,
stderr,
)
else:
log("flow failed", formatter, context, processes=called_processes)
return called_processes[-1].returncode, path
elif formatter.should_fail:
log(
"The flow should have failed but it didn't. Error!",
formatter,
context,
processes=called_processes,
)
return 1, path

# check results
run_id = open("run-id").read()
ret = 0
for check_name in context["checks"]:
check = checks[check_name]
python = check["python"]
cmd = [python, "check_flow.py", check["class"], run_id]
cmd.extend(context["top_options"])
called_processes.append(
subprocess.run(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)

if called_processes[-1].returncode:
if formatter.should_fail:
log("Flow failed as expected.")
elif formatter.should_resume:
log("Resuming flow as expected", formatter, context)
if executor == "cli":
called_processes.append(
subprocess.run(
run_cmd(
"resume",
(
[formatter.resume_step]
if formatter.resume_step
else []
),
),
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)
elif executor == "api":
_, resume_level_dict = construct_arg_dicts_from_click_api()
if formatter.resume_step:
resume_level_dict["step_to_rerun"] = formatter.resume_step
result = runner.resume(**resume_level_dict)
# NOTE: This will include both logs from the original run and resume
# so we will remove the last process
with open(
result.command_obj.log_files["stdout"], encoding="utf-8"
) as f:
stdout = f.read()
with open(
result.command_obj.log_files["stderr"], encoding="utf-8"
) as f:
stderr = f.read()
called_processes[-1] = subprocess.CompletedProcess(
result.command_obj.command,
result.command_obj.process.returncode,
stdout,
stderr,
)
else:
log("flow failed", formatter, context, processes=called_processes)
return called_processes[-1].returncode, path
elif formatter.should_fail:
log(
"The flow should have failed but it didn't. Error!",
formatter,
context,
processes=called_processes,
)
ret = called_processes[-1].returncode
else:
log(
"checker '%s' says that results are ok" % check_name,
formatter,
context,
return 1, path

# check results
run_id = open("run-id").read()
ret = 0
for check_name in context["checks"]:
check = checks[check_name]
python = check["python"]
cmd = [python, "check_flow.py", check["class"], run_id]
cmd.extend(context["top_options"])
called_processes.append(
subprocess.run(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=False,
)
)
if called_processes[-1].returncode:
log(
Expand Down

0 comments on commit 8a83eba

Please sign in to comment.