Skip to content

Commit

Permalink
Set metadata properly in runner (#2113)
Browse files Browse the repository at this point in the history
This also makes metadata more local to the object than a global concept all the time.
This should allow objects across metadata services to work together.
  • Loading branch information
romain-intel authored Oct 23, 2024
1 parent e316600 commit 24a112a
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 57 deletions.
8 changes: 4 additions & 4 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,12 +701,12 @@ def resume(
runtime.persist_constants()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
with open(runner_attribute_file, "w", encoding="utf-8") as f:
json.dump(
{
"run_id": runtime.run_id,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down Expand Up @@ -779,12 +779,12 @@ def run(
runtime.persist_constants()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
with open(runner_attribute_file, "w", encoding="utf-8") as f:
json.dump(
{
"run_id": runtime.run_id,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down
116 changes: 77 additions & 39 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
List,
NamedTuple,
Optional,
TYPE_CHECKING,
Tuple,
)

Expand All @@ -37,6 +38,9 @@
from ..info_file import INFO_FILE
from .filecache import FileCache

if TYPE_CHECKING:
from metaflow.metadata import MetadataProvider

try:
# python2
import cPickle as pickle
Expand Down Expand Up @@ -82,28 +86,16 @@ def metadata(ms: str) -> str:
get_metadata()).
"""
global current_metadata
infos = ms.split("@", 1)
types = [m.TYPE for m in METADATA_PROVIDERS]
if infos[0] in types:
current_metadata = [m for m in METADATA_PROVIDERS if m.TYPE == infos[0]][0]
if len(infos) > 1:
current_metadata.INFO = infos[1]
else:
# Deduce from ms; if starts with http, use service or else use local
if ms.startswith("http"):
metadata_type = "service"
else:
metadata_type = "local"
res = [m for m in METADATA_PROVIDERS if m.TYPE == metadata_type]
if not res:
print(
"Cannot find a '%s' metadata provider -- "
"try specifying one explicitly using <type>@<info>",
metadata_type,
)
return get_metadata()
current_metadata = res[0]
current_metadata.INFO = ms
provider, info = _metadata(ms)
if provider is None:
print(
"Cannot find a metadata provider -- "
"try specifying one explicitly using <type>@<info>",
)
return get_metadata()
current_metadata = provider
if info:
current_metadata.INFO = info
return get_metadata()


Expand All @@ -127,7 +119,7 @@ def get_metadata() -> str:
"""
if current_metadata is False:
default_metadata()
return "%s@%s" % (current_metadata.TYPE, current_metadata.INFO)
return current_metadata.metadata_str()


def default_metadata() -> str:
Expand Down Expand Up @@ -268,9 +260,16 @@ def __init__(
_object: Optional["MetaflowObject"] = None,
_parent: Optional["MetaflowObject"] = None,
_namespace_check: bool = True,
_metaflow: Optional["Metaflow"] = None,
_current_namespace: Optional[str] = None,
_current_metadata: Optional[str] = None,
):
self._metaflow = Metaflow()
# the default namespace is activated lazily at the first
# get_namespace(). The other option of activating
# the namespace at the import time is problematic, since there
# may be other modules that alter environment variables etc.
# which may affect the namespace setting.
self._metaflow = Metaflow(_current_metadata) or _metaflow
self._parent = _parent
self._path_components = None
self._attempt = attempt
Expand Down Expand Up @@ -390,6 +389,7 @@ def __iter__(self) -> Iterator["MetaflowObject"]:
attempt=self._attempt,
_object=obj,
_parent=self,
_metaflow=self._metaflow,
_namespace_check=self._namespace_check,
_current_namespace=(
self._current_namespace if self._namespace_check else None
Expand Down Expand Up @@ -505,6 +505,7 @@ def __getitem__(self, id: str) -> "MetaflowObject":
attempt=self._attempt,
_object=obj,
_parent=self,
_metaflow=self._metaflow,
_namespace_check=self._namespace_check,
_current_namespace=(
self._current_namespace if self._namespace_check else None
Expand Down Expand Up @@ -552,7 +553,25 @@ def _unpickle_2124(self, data):
_current_namespace=ns,
)

_UNPICKLE_FUNC = {"2.8.4": _unpickle_284, "2.12.4": _unpickle_2124}
def _unpickle_21226(self, data):
if len(data) != 5:
raise MetaflowInternalError(
"Unexpected size of array: {}".format(len(data))
)
pathspec, attempt, md, ns, namespace_check = data
self.__init__(
pathspec=pathspec,
attempt=attempt,
_namespace_check=namespace_check,
_current_metadata=md,
_current_namespace=ns,
)

_UNPICKLE_FUNC = {
"2.8.4": _unpickle_284,
"2.12.4": _unpickle_2124,
"2.12.26": _unpickle_21226,
}

def __setstate__(self, state):
"""
Expand Down Expand Up @@ -595,10 +614,11 @@ def __getstate__(self):
# checking for the namespace even after unpickling since we will know which
# namespace to check.
return {
"version": "2.12.4",
"version": "2.12.26",
"data": [
self.pathspec,
self._attempt,
self._metaflow.metadata.metadata_str(),
self._current_namespace,
self._namespace_check,
],
Expand Down Expand Up @@ -2288,17 +2308,16 @@ class Metaflow(object):
if it has at least one run in the namespace.
"""

def __init__(self):
# the default namespace is activated lazily at the first object
# invocation or get_namespace(). The other option of activating
# the namespace at the import time is problematic, since there
# may be other modules that alter environment variables etc.
# which may affect the namescape setting.
if current_namespace is False:
default_namespace()
if current_metadata is False:
default_metadata()
self.metadata = current_metadata
def __init__(self, _current_metadata: Optional[str] = None):
if _current_metadata:
provider, info = _metadata(_current_metadata)
self.metadata = provider
if info:
self.metadata.INFO = info
else:
if current_metadata is False:
default_metadata()
self.metadata = current_metadata

@property
def flows(self) -> List[Flow]:
Expand Down Expand Up @@ -2335,7 +2354,7 @@ def __iter__(self) -> Iterator[Flow]:
all_flows = all_flows if all_flows else []
for flow in all_flows:
try:
v = Flow(_object=flow)
v = Flow(_object=flow, _metaflow=self)
yield v
except MetaflowNamespaceMismatch:
continue
Expand All @@ -2359,7 +2378,26 @@ def __getitem__(self, name: str) -> Flow:
Flow
Flow with the given name.
"""
return Flow(name)
return Flow(name, _metaflow=self)


def _metadata(ms: str) -> Tuple[Optional["MetadataProvider"], Optional[str]]:
infos = ms.split("@", 1)
types = [m.TYPE for m in METADATA_PROVIDERS]
if infos[0] in types:
provider = [m for m in METADATA_PROVIDERS if m.TYPE == infos[0]][0]
if len(infos) > 1:
return provider, infos[1]
return provider, None
# Deduce from ms; if starts with http, use service or else use local
if ms.startswith("http"):
metadata_type = "service"
else:
metadata_type = "local"
res = [m for m in METADATA_PROVIDERS if m.TYPE == metadata_type]
if not res:
return None, None
return res[0], ms


_CLASSES["flow"] = Flow
Expand Down
6 changes: 6 additions & 0 deletions metaflow/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def type_to_order(obj_type):

@with_metaclass(MetadataProviderMeta)
class MetadataProvider(object):
TYPE = None

@classmethod
def metadata_str(cls):
return "%s@%s" % (cls.TYPE, cls.INFO)

@classmethod
def compute_info(cls, val):
"""
Expand Down
6 changes: 3 additions & 3 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ def create(
validate_tags(tags)

if deployer_attribute_file:
with open(deployer_attribute_file, "w") as f:
with open(deployer_attribute_file, "w", encoding="utf-8") as f:
json.dump(
{
"name": obj.workflow_name,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down Expand Up @@ -657,7 +657,7 @@ def _convert_value(param):
json.dump(
{
"name": obj.workflow_name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
"pathspec": "/".join((obj.flow.name, run_id)),
},
f,
Expand Down
4 changes: 2 additions & 2 deletions metaflow/plugins/aws/step_functions/step_functions_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def create(
{
"name": obj.state_machine_name,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down Expand Up @@ -502,7 +502,7 @@ def _convert_value(param):
json.dump(
{
"name": obj.state_machine_name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
"pathspec": "/".join((obj.flow.name, run_id)),
},
f,
Expand Down
5 changes: 1 addition & 4 deletions metaflow/plugins/tag_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,7 @@ def _get_client_run_obj(obj, run_id, user_namespace):


def _set_current(obj):
current._set_env(
metadata_str="%s@%s"
% (obj.metadata.__class__.TYPE, obj.metadata.__class__.INFO)
)
current._set_env(metadata_str=obj.metadata.metadata_str())


@click.group()
Expand Down
7 changes: 4 additions & 3 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import Dict, Iterator, Optional, Tuple

from metaflow import Run, metadata
from metaflow import Run

from .utils import handle_timeout
from .subprocess_manager import CommandManager, SubprocessManager
Expand Down Expand Up @@ -275,9 +275,10 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj):

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")
metadata(metadata_for_flow)

run_object = Run(pathspec, _namespace_check=False)
run_object = Run(
pathspec, _namespace_check=False, _current_metadata=metadata_for_flow
)
return ExecutingRun(self, command_obj, run_object)

def run(self, **kwargs) -> ExecutingRun:
Expand Down
3 changes: 1 addition & 2 deletions metaflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,7 @@ def run_step(
origin_run_id=origin_run_id,
namespace=resolve_identity(),
username=get_username(),
metadata_str="%s@%s"
% (self.metadata.__class__.TYPE, self.metadata.__class__.INFO),
metadata_str=self.metadata.metadata_str(),
is_running=True,
tags=self.metadata.sticky_tags,
)
Expand Down

0 comments on commit 24a112a

Please sign in to comment.