Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor nbrun fixes #1860

Merged
merged 4 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,10 @@ class Runner(object):
def __init__(
self,
flow_file: str,
show_output: bool = False,
profile: Optional[str] = None,
env: Optional[Dict] = None,
cwd: Optional[str] = None,
**kwargs
):
"""
Expand All @@ -212,12 +214,18 @@ def __init__(
----------
flow_file : str
Path to the flow file to run
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default,
Only applicable for synchronous 'run' and 'resume' functions.
profile : Optional[str], default None
Metaflow profile to use to run this run. If not specified, the default
profile is used (or the one already set using `METAFLOW_PROFILE`)
env : Optional[Dict], default None
Additional environment variables to set for the Run. This overrides the
environment set for this process.
cwd : Optional[str], default None
The directory to run the subprocess in; if not specified, the current
directory is used.
**kwargs : Any
Additional arguments that you would pass to `python ./myflow.py` before
the `run` command.
Expand All @@ -234,11 +242,15 @@ def __init__(
from metaflow.runner.click_api import MetaflowAPI

self.flow_file = flow_file
self.show_output = show_output

self.old_env = os.environ.copy()
self.env_vars = self.old_env.copy()
self.env_vars.update(env or {})
if profile:
self.env_vars["METAFLOW_PROFILE"] = profile

self.cwd = cwd
self.spm = SubprocessManager()
self.top_level_kwargs = kwargs
self.api = MetaflowAPI.from_cli(self.flow_file, start)
Expand All @@ -262,7 +274,7 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj):

# Set the correct metadata from the runner_attribute file corresponding to this run.
content = read_from_file_when_ready(tfp_runner_attribute.name, timeout=10)
metadata_for_flow, pathspec = content.split(":", maxsplit=1)
metadata_for_flow, pathspec = content.rsplit(":", maxsplit=1)
metadata(metadata_for_flow)
run_object = Run(pathspec, _namespace_check=False)
return ExecutingRun(self, command_obj, run_object)
Expand All @@ -277,19 +289,13 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj):
error_message += "\nStderr:\n%s\n" % stderr_log
raise RuntimeError(error_message) from e

def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
def run(self, **kwargs) -> ExecutingRun:
"""
Synchronous execution of the run. This method will *block* until
the run has completed execution.

Parameters
----------
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default.
They can be accessed later by reading the files present in the
ExecutingRun object (referenced as 'result' below) returned:
- result.stdout
- result.stderr
**kwargs : Any
Additional arguments that you would pass to `python ./myflow.py` after
the `run` command.
Expand All @@ -308,25 +314,22 @@ def run(self, show_output: bool = False, **kwargs) -> ExecutingRun:
)

pid = self.spm.run_command(
[sys.executable, *command], env=self.env_vars, show_output=show_output
[sys.executable, *command],
env=self.env_vars,
cwd=self.cwd,
show_output=self.show_output,
)
command_obj = self.spm.get(pid)

return self.__get_executing_run(tfp_runner_attribute, command_obj)

def resume(self, show_output: bool = False, **kwargs):
def resume(self, **kwargs):
"""
Synchronous resume execution of the run.
This method will *block* until the resumed run has completed execution.

Parameters
----------
show_output : bool, default False
Suppress the 'stdout' and 'stderr' to the console by default.
They can be accessed later by reading the files present in the
ExecutingRun object (referenced as 'result' below) returned:
- result.stdout
- result.stderr
**kwargs : Any
Additional arguments that you would pass to `python ./myflow.py` after
the `resume` command.
Expand All @@ -345,7 +348,10 @@ def resume(self, show_output: bool = False, **kwargs):
)

pid = self.spm.run_command(
[sys.executable, *command], env=self.env_vars, show_output=show_output
[sys.executable, *command],
env=self.env_vars,
cwd=self.cwd,
show_output=self.show_output,
)
command_obj = self.spm.get(pid)

Expand Down Expand Up @@ -378,6 +384,7 @@ async def async_run(self, **kwargs) -> ExecutingRun:
pid = await self.spm.async_run_command(
[sys.executable, *command],
env=self.env_vars,
cwd=self.cwd,
)
command_obj = self.spm.get(pid)

Expand Down Expand Up @@ -410,6 +417,7 @@ async def async_resume(self, **kwargs):
pid = await self.spm.async_run_command(
[sys.executable, *command],
env=self.env_vars,
cwd=self.cwd,
)
command_obj = self.spm.get(pid)

Expand Down
31 changes: 16 additions & 15 deletions metaflow/runner/nbrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
except ModuleNotFoundError:
print("'nbrun' requires an interactive python environment (such as Jupyter)")

DEFAULT_DIR = tempfile.gettempdir()


def get_current_cell():
if ipython:
Expand Down Expand Up @@ -48,13 +50,15 @@ class NBRunner(object):
def __init__(
self,
flow,
show_output: bool = False,
profile: Optional[str] = None,
env: Optional[Dict] = None,
base_dir: Optional[str] = None,
base_dir: str = DEFAULT_DIR,
**kwargs,
):
self.cell = get_current_cell()
self.flow = flow
self.show_output = show_output

self.env_vars = os.environ.copy()
self.env_vars.update(env or {})
Expand All @@ -67,17 +71,11 @@ def __init__(
if not self.cell:
raise ValueError("Couldn't find a cell.")

if self.base_dir is None:
# for some reason, using this is much faster
self.tempdir = tempfile.mkdtemp()
else:
self.tempdir = self.base_dir

self.tmp_flow_file = tempfile.NamedTemporaryFile(
prefix=self.flow.__name__,
suffix=".py",
mode="w",
dir=self.tempdir,
dir=self.base_dir,
delete=False,
)

Expand All @@ -87,26 +85,29 @@ def __init__(

self.runner = Runner(
flow_file=self.tmp_flow_file.name,
show_output=self.show_output,
profile=profile,
env=self.env_vars,
cwd=self.base_dir,
**kwargs,
)

def nbrun(self, **kwargs):
result = self.runner.run(show_output=True, **kwargs)
self.old_val_show_output = self.show_output
self.runner.show_output = True
result = self.runner.run(**kwargs)
self.runner.show_output = self.old_val_show_output
self.runner.spm.cleanup()
return result.run

def nbresume(self, **kwargs):
result = self.runner.resume(show_output=True, **kwargs)
self.old_val_show_output = self.show_output
self.runner.show_output = True
result = self.runner.resume(**kwargs)
self.runner.show_output = self.old_val_show_output
self.runner.spm.cleanup()
return result.run

def cleanup(self):
"""Cleans up the temporary directory used to store the flow script"""
if self.base_dir is None:
shutil.rmtree(self.tempdir, ignore_errors=True)

def run(self, **kwargs):
"""
Runs the flow.
Expand Down
6 changes: 3 additions & 3 deletions test/core/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ def construct_arg_dicts_from_click_api():
flow_ret = subprocess.call(run_cmd("run"), env=env)
elif executor == "api":
top_level_dict, run_level_dict = construct_arg_dicts_from_click_api()
runner = Runner("test_flow.py", env=env, **top_level_dict)
result = runner.run(show_output=True, **run_level_dict)
runner = Runner("test_flow.py", show_output=True, env=env, **top_level_dict)
result = runner.run(**run_level_dict)
flow_ret = result.command_obj.process.returncode

if flow_ret:
Expand All @@ -207,7 +207,7 @@ def construct_arg_dicts_from_click_api():
flow_ret = subprocess.call(run_cmd("resume"), env=env)
elif executor == "api":
_, resume_level_dict = construct_arg_dicts_from_click_api()
result = runner.resume(show_output=True, **resume_level_dict)
result = runner.resume(**resume_level_dict)
flow_ret = result.command_obj.process.returncode
else:
log("flow failed", formatter, context)
Expand Down
Loading