Skip to content

Commit

Permalink
Remove global nature of parameters and flow decorators (#1886)
Browse files Browse the repository at this point in the history
* Remove global nature of parameters and flow decorators

This scopes decorators and parameters to the flow itself which
should improve compatibility with the runner

* Fix issue when no flow (in commands for example)

* Another typo

* Cleanup

* Typo

* Address comments; should fix R tests

* Fix bugs including #1885
  • Loading branch information
romain-intel authored Jun 18, 2024
1 parent ae2fee2 commit 86a6991
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 58 deletions.
17 changes: 10 additions & 7 deletions metaflow/R.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from importlib import util as imp_util, machinery as imp_machinery
from tempfile import NamedTemporaryFile

from . import parameters
from .util import to_bytes

R_FUNCTIONS = {}
Expand Down Expand Up @@ -125,15 +126,17 @@ def run(
flow = module.FLOW(use_cli=False)

from . import exception
from . import cli

try:
cli.main(
flow,
args=metaflow_args,
handle_exceptions=False,
entrypoint=full_cmdline[: -len(metaflow_args)],
)
with parameters.flow_context(flow.__class__) as _:
from . import cli

cli.main(
flow,
args=metaflow_args,
handle_exceptions=False,
entrypoint=full_cmdline[: -len(metaflow_args)],
)
except exception.MetaflowException as e:
cli.print_metaflow_exception(e)
os.remove(tmp.name)
Expand Down
15 changes: 8 additions & 7 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
InvalidDecoratorAttribute,
)

from .parameters import current_flow

from metaflow._vendor import click

try:
Expand Down Expand Up @@ -174,13 +176,9 @@ def __str__(self):


class FlowDecorator(Decorator):
_flow_decorators = []
options = {}

def __init__(self, *args, **kwargs):
# Note that this assumes we are executing one flow per process, so we have a global list of
# _flow_decorators. A similar setup is used in parameters.
self._flow_decorators.append(self)
super(FlowDecorator, self).__init__(*args, **kwargs)

def flow_init(
Expand All @@ -206,7 +204,10 @@ def get_top_level_options(self):
# compare this to parameters.add_custom_parameters
def add_decorator_options(cmd):
seen = {}
for deco in flow_decorators():
flow_cls = getattr(current_flow, "flow_cls", None)
if flow_cls is None:
return cmd
for deco in flow_decorators(flow_cls):
for option, kwargs in deco.options.items():
if option in seen:
msg = (
Expand All @@ -222,8 +223,8 @@ def add_decorator_options(cmd):
return cmd


def flow_decorators():
return FlowDecorator._flow_decorators
def flow_decorators(flow_cls):
return [d for deco_list in flow_cls._flow_decorators.values() for d in deco_list]


class StepDecorator(Decorator):
Expand Down
36 changes: 21 additions & 15 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,18 @@ def __getitem__(self, item):
return item or 0 # item is None for the control task, but it is also split 0


class FlowSpec(object):
class _FlowSpecMeta(type):
def __new__(cls, name, bases, dct):
f = super().__new__(cls, name, bases, dct)
# This makes sure to give _flow_decorators to each
# child class (and not share it with the FlowSpec base
# class). This is important to not make a "global"
# _flow_decorators
f._flow_decorators = {}
return f


class FlowSpec(metaclass=_FlowSpecMeta):
"""
Main class from which all Flows should inherit.
Expand Down Expand Up @@ -83,8 +94,6 @@ class FlowSpec(object):
# names starting with `_` as those are already excluded from `_get_parameters`.
_NON_PARAMETERS = {"cmd", "foreach_stack", "index", "input", "script_name", "name"}

_flow_decorators = {}

def __init__(self, use_cli=True):
"""
Construct a FlowSpec
Expand All @@ -104,15 +113,11 @@ def __init__(self, use_cli=True):
self._graph = FlowGraph(self.__class__)
self._steps = [getattr(self, node.name) for node in self._graph]

# This must be set before calling cli.main() below (or specifically, add_custom_parameters)
parameters.parameters = [p for _, p in self._get_parameters()]

if use_cli:
# we import cli here to make sure custom parameters in
# args.py get fully evaluated before cli.py is imported.
from . import cli
with parameters.flow_context(self.__class__) as _:
from . import cli

cli.main(self)
cli.main(self)

@property
def script_name(self) -> str:
Expand Down Expand Up @@ -192,18 +197,19 @@ def _set_constants(self, graph, kwargs):
"attributes": deco.attributes,
"statically_defined": deco.statically_defined,
}
for deco in flow_decorators()
for deco in flow_decorators(self)
if not deco.name.startswith("_")
],
}
self._graph_info = graph_info

def _get_parameters(self):
for var in dir(self):
if var[0] == "_" or var in self._NON_PARAMETERS:
@classmethod
def _get_parameters(cls):
for var in dir(cls):
if var[0] == "_" or var in cls._NON_PARAMETERS:
continue
try:
val = getattr(self, var)
val = getattr(cls, var)
except:
continue
if isinstance(val, Parameter):
Expand Down
43 changes: 42 additions & 1 deletion metaflow/parameters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import json

from contextlib import contextmanager
from threading import local

from typing import Any, Callable, Dict, NamedTuple, Optional, Type, Union

from metaflow._vendor import click
Expand Down Expand Up @@ -31,7 +35,40 @@
],
)

parameters = [] # Set by FlowSpec.__init__()

# When we launch a flow, we need to know the parameters so we can
# attach them with add_custom_parameters to commands. This used to be a global
# but causes problems when multiple FlowSpec are loaded (as can happen when using
# the Runner or just if multiple Flows are defined and instantiated). To minimally
# impact code, we now create the CLI with a thread local value of the FlowSpec
# that is being used to create the CLI which enables us to extract the parameters
# directly from the Flow.
current_flow = local()


@contextmanager
def flow_context(flow_cls):
"""
Context manager to set the current flow for the thread. This is used
to extract the parameters from the FlowSpec that is being used to create
the CLI.
"""
# Use a stack because with the runner this can get called multiple times in
# a nested fashion
current_flow.flow_cls_stack = getattr(current_flow, "flow_cls_stack", [])
current_flow.flow_cls_stack.insert(0, flow_cls)
current_flow.flow_cls = current_flow.flow_cls_stack[0]
try:
yield
finally:
current_flow.flow_cls_stack = current_flow.flow_cls_stack[1:]
if len(current_flow.flow_cls_stack) == 0:
del current_flow.flow_cls_stack
del current_flow.flow_cls
else:
current_flow.flow_cls = current_flow.flow_cls_stack[0]


context_proto = None


Expand Down Expand Up @@ -391,6 +428,10 @@ def wrapper(cmd):
cmd.has_flow_params = True
# Iterate over parameters in reverse order so cmd.params lists options
# in the order they are defined in the FlowSpec subclass
flow_cls = getattr(current_flow, "flow_cls", None)
if flow_cls is None:
return cmd
parameters = [p for _, p in flow_cls._get_parameters()]
for arg in parameters[::-1]:
kwargs = arg.option_kwargs(deploy_mode)
cmd.params.insert(0, click.Option(("--" + arg.name,), **kwargs))
Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/airflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
# FlowDecorators can define their own top-level options. They are
# responsible for adding their own top-level options and values through
# the get_top_level_options() hook. See similar logic in runtime.py.
for deco in flow_decorators():
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

top_opts = list(dict_to_cli_options(top_opts_dict))
Expand Down
8 changes: 4 additions & 4 deletions metaflow/plugins/airflow/sensors/base_sensor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from metaflow.decorators import FlowDecorator
from metaflow.decorators import FlowDecorator, flow_decorators
from ..exception import AirflowException
from ..airflow_utils import AirflowTask, id_creator, TASK_ID_HASH_LEN

Expand Down Expand Up @@ -49,7 +49,7 @@ def create_task(self):
operator_type=self.operator_type,
).set_operator_args(**{k: v for k, v in task_args.items() if v is not None})

def validate(self):
def validate(self, flow):
"""
Validate if the arguments for the sensor are correct.
"""
Expand All @@ -58,7 +58,7 @@ def validate(self):
if self.attributes["name"] is None:
deco_index = [
d._id
for d in self._flow_decorators
for d in flow_decorators(flow)
if issubclass(d.__class__, AirflowSensorDecorator)
].index(self._id)
self._airflow_task_name = "%s-%s" % (
Expand All @@ -71,4 +71,4 @@ def validate(self):
def flow_init(
self, flow, graph, environment, flow_datastore, metadata, logger, echo, options
):
self.validate()
self.validate(flow)
4 changes: 2 additions & 2 deletions metaflow/plugins/airflow/sensors/external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def serialize_operator_args(self):
)
return task_args

def validate(self):
def validate(self, flow):
if self.attributes["external_dag_id"] is None:
raise AirflowException(
"`%s` argument of `@%s`cannot be `None`."
Expand Down Expand Up @@ -131,4 +131,4 @@ def validate(self):
self.name,
)
)
super().validate()
super().validate(flow)
4 changes: 2 additions & 2 deletions metaflow/plugins/airflow/sensors/s3_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class S3KeySensorDecorator(AirflowSensorDecorator):
# `verify` is a airflow variable.
)

def validate(self):
def validate(self, flow):
if self.attributes["bucket_key"] is None:
raise AirflowException(
"`bucket_key` for `@%s`cannot be empty." % (self.name)
)
super().validate()
super().validate(flow)
2 changes: 1 addition & 1 deletion metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -1237,7 +1237,7 @@ def _container_templates(self):
# FlowDecorators can define their own top-level options. They are
# responsible for adding their own top-level options and values through
# the get_top_level_options() hook. See similar logic in runtime.py.
for deco in flow_decorators():
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

top_level = list(dict_to_cli_options(top_opts_dict)) + [
Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
# FlowDecorators can define their own top-level options. They are
# responsible for adding their own top-level options and values through
# the get_top_level_options() hook. See similar logic in runtime.py.
for deco in flow_decorators():
for deco in flow_decorators(self.flow):
top_opts_dict.update(deco.get_top_level_options())

top_opts = list(dict_to_cli_options(top_opts_dict))
Expand Down
27 changes: 18 additions & 9 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
UUIDParameterType,
)
from metaflow._vendor.typeguard import TypeCheckError, check_type
from metaflow.cli import start
from metaflow.decorators import add_decorator_options
from metaflow.exception import MetaflowException
from metaflow.includefile import FilePathClass
from metaflow.parameters import JSONTypeClass
from metaflow.parameters import JSONTypeClass, flow_context

click_to_python_types = {
StringParamType: str,
Expand Down Expand Up @@ -140,7 +141,7 @@ def get_inspect_param_obj(p: Union[click.Argument, click.Option], kind: str):
loaded_modules = {}


def extract_flowspec_params(flow_file: str) -> List[Parameter]:
def extract_flow_class_from_file(flow_file: str) -> FlowSpec:
# Check if the module has already been loaded
if flow_file in loaded_modules:
module = loaded_modules[flow_file]
Expand All @@ -153,14 +154,16 @@ def extract_flowspec_params(flow_file: str) -> List[Parameter]:
loaded_modules[flow_file] = module
classes = inspect.getmembers(module, inspect.isclass)

parameters = []
flow_cls = None
for _, kls in classes:
if kls != FlowSpec and issubclass(kls, FlowSpec):
for _, obj in inspect.getmembers(kls):
if isinstance(obj, Parameter):
parameters.append(obj)
if flow_cls is not None:
raise MetaflowException(
"Multiple FlowSpec classes found in %s" % flow_file
)
flow_cls = kls

return parameters
return flow_cls


class MetaflowAPI(object):
Expand All @@ -180,7 +183,11 @@ def chain(self):

@classmethod
def from_cli(cls, flow_file: str, cli_collection: Callable) -> Callable:
flow_parameters = extract_flowspec_params(flow_file)
flow_cls = extract_flow_class_from_file(flow_file)
flow_parameters = [p for _, p in flow_cls._get_parameters()]
with flow_context(flow_cls) as _:
add_decorator_options(cli_collection)

class_dict = {"__module__": "metaflow", "_API_NAME": flow_file}
command_groups = cli_collection.sources
for each_group in command_groups:
Expand Down Expand Up @@ -377,6 +384,8 @@ def _method(_self, **kwargs):


if __name__ == "__main__":
from metaflow.cli import start

api = MetaflowAPI.from_cli("../try.py", start)

command = api(metadata="local").run(
Expand Down
4 changes: 4 additions & 0 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import importlib
import os
import sys
import tempfile
Expand Down Expand Up @@ -250,6 +251,9 @@ def __init__(
# from metaflow import Runner
# This ability is made possible by the statement:
# 'from .metaflow_runner import Runner' in '__init__.py'

if "metaflow.cli" in sys.modules:
importlib.reload(sys.modules["metaflow.cli"])
from metaflow.cli import start
from metaflow.runner.click_api import MetaflowAPI

Expand Down
Loading

0 comments on commit 86a6991

Please sign in to comment.