Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set metadata properly in runner #2113

Merged
merged 1 commit into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,12 +701,12 @@ def resume(
runtime.persist_constants()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
with open(runner_attribute_file, "w", encoding="utf-8") as f:
json.dump(
{
"run_id": runtime.run_id,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down Expand Up @@ -779,12 +779,12 @@ def run(
runtime.persist_constants()

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
with open(runner_attribute_file, "w", encoding="utf-8") as f:
json.dump(
{
"run_id": runtime.run_id,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down
116 changes: 77 additions & 39 deletions metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
List,
NamedTuple,
Optional,
TYPE_CHECKING,
Tuple,
)

Expand All @@ -37,6 +38,9 @@
from ..info_file import INFO_FILE
from .filecache import FileCache

if TYPE_CHECKING:
from metaflow.metadata import MetadataProvider

try:
# python2
import cPickle as pickle
Expand Down Expand Up @@ -82,28 +86,16 @@ def metadata(ms: str) -> str:
get_metadata()).
"""
global current_metadata
infos = ms.split("@", 1)
types = [m.TYPE for m in METADATA_PROVIDERS]
if infos[0] in types:
current_metadata = [m for m in METADATA_PROVIDERS if m.TYPE == infos[0]][0]
if len(infos) > 1:
current_metadata.INFO = infos[1]
else:
# Deduce from ms; if starts with http, use service or else use local
if ms.startswith("http"):
metadata_type = "service"
else:
metadata_type = "local"
res = [m for m in METADATA_PROVIDERS if m.TYPE == metadata_type]
if not res:
print(
"Cannot find a '%s' metadata provider -- "
"try specifying one explicitly using <type>@<info>",
metadata_type,
)
return get_metadata()
current_metadata = res[0]
current_metadata.INFO = ms
provider, info = _metadata(ms)
if provider is None:
print(
"Cannot find a metadata provider -- "
"try specifying one explicitly using <type>@<info>",
)
return get_metadata()
current_metadata = provider
if info:
current_metadata.INFO = info
return get_metadata()


Expand All @@ -127,7 +119,7 @@ def get_metadata() -> str:
"""
if current_metadata is False:
default_metadata()
return "%s@%s" % (current_metadata.TYPE, current_metadata.INFO)
return current_metadata.metadata_str()


def default_metadata() -> str:
Expand Down Expand Up @@ -268,9 +260,16 @@ def __init__(
_object: Optional["MetaflowObject"] = None,
_parent: Optional["MetaflowObject"] = None,
_namespace_check: bool = True,
_metaflow: Optional["Metaflow"] = None,
_current_namespace: Optional[str] = None,
_current_metadata: Optional[str] = None,
):
self._metaflow = Metaflow()
# the default namespace is activated lazily at the first
# get_namespace(). The other option of activating
# the namespace at the import time is problematic, since there
# may be other modules that alter environment variables etc.
# which may affect the namespace setting.
self._metaflow = Metaflow(_current_metadata) or _metaflow
self._parent = _parent
self._path_components = None
self._attempt = attempt
Expand Down Expand Up @@ -390,6 +389,7 @@ def __iter__(self) -> Iterator["MetaflowObject"]:
attempt=self._attempt,
_object=obj,
_parent=self,
_metaflow=self._metaflow,
_namespace_check=self._namespace_check,
_current_namespace=(
self._current_namespace if self._namespace_check else None
Expand Down Expand Up @@ -505,6 +505,7 @@ def __getitem__(self, id: str) -> "MetaflowObject":
attempt=self._attempt,
_object=obj,
_parent=self,
_metaflow=self._metaflow,
_namespace_check=self._namespace_check,
_current_namespace=(
self._current_namespace if self._namespace_check else None
Expand Down Expand Up @@ -552,7 +553,25 @@ def _unpickle_2124(self, data):
_current_namespace=ns,
)

_UNPICKLE_FUNC = {"2.8.4": _unpickle_284, "2.12.4": _unpickle_2124}
def _unpickle_21226(self, data):
if len(data) != 5:
raise MetaflowInternalError(
"Unexpected size of array: {}".format(len(data))
)
pathspec, attempt, md, ns, namespace_check = data
self.__init__(
pathspec=pathspec,
attempt=attempt,
_namespace_check=namespace_check,
_current_metadata=md,
_current_namespace=ns,
)

_UNPICKLE_FUNC = {
"2.8.4": _unpickle_284,
"2.12.4": _unpickle_2124,
"2.12.26": _unpickle_21226,
}

def __setstate__(self, state):
"""
Expand Down Expand Up @@ -595,10 +614,11 @@ def __getstate__(self):
# checking for the namespace even after unpickling since we will know which
# namespace to check.
return {
"version": "2.12.4",
"version": "2.12.26",
"data": [
self.pathspec,
self._attempt,
self._metaflow.metadata.metadata_str(),
self._current_namespace,
self._namespace_check,
],
Expand Down Expand Up @@ -2288,17 +2308,16 @@ class Metaflow(object):
if it has at least one run in the namespace.
"""

def __init__(self):
# the default namespace is activated lazily at the first object
# invocation or get_namespace(). The other option of activating
# the namespace at the import time is problematic, since there
# may be other modules that alter environment variables etc.
# which may affect the namescape setting.
if current_namespace is False:
default_namespace()
if current_metadata is False:
default_metadata()
self.metadata = current_metadata
def __init__(self, _current_metadata: Optional[str] = None):
if _current_metadata:
provider, info = _metadata(_current_metadata)
self.metadata = provider
if info:
self.metadata.INFO = info
else:
if current_metadata is False:
default_metadata()
self.metadata = current_metadata

@property
def flows(self) -> List[Flow]:
Expand Down Expand Up @@ -2335,7 +2354,7 @@ def __iter__(self) -> Iterator[Flow]:
all_flows = all_flows if all_flows else []
for flow in all_flows:
try:
v = Flow(_object=flow)
v = Flow(_object=flow, _metaflow=self)
yield v
except MetaflowNamespaceMismatch:
continue
Expand All @@ -2359,7 +2378,26 @@ def __getitem__(self, name: str) -> Flow:
Flow
Flow with the given name.
"""
return Flow(name)
return Flow(name, _metaflow=self)


def _metadata(ms: str) -> Tuple[Optional["MetadataProvider"], Optional[str]]:
infos = ms.split("@", 1)
types = [m.TYPE for m in METADATA_PROVIDERS]
if infos[0] in types:
provider = [m for m in METADATA_PROVIDERS if m.TYPE == infos[0]][0]
if len(infos) > 1:
return provider, infos[1]
return provider, None
# Deduce from ms; if starts with http, use service or else use local
if ms.startswith("http"):
metadata_type = "service"
else:
metadata_type = "local"
res = [m for m in METADATA_PROVIDERS if m.TYPE == metadata_type]
if not res:
return None, None
return res[0], ms


_CLASSES["flow"] = Flow
Expand Down
6 changes: 6 additions & 0 deletions metaflow/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ def type_to_order(obj_type):

@with_metaclass(MetadataProviderMeta)
class MetadataProvider(object):
TYPE = None

@classmethod
def metadata_str(cls):
return "%s@%s" % (cls.TYPE, cls.INFO)

@classmethod
def compute_info(cls, val):
"""
Expand Down
6 changes: 3 additions & 3 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ def create(
validate_tags(tags)

if deployer_attribute_file:
with open(deployer_attribute_file, "w") as f:
with open(deployer_attribute_file, "w", encoding="utf-8") as f:
json.dump(
{
"name": obj.workflow_name,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down Expand Up @@ -657,7 +657,7 @@ def _convert_value(param):
json.dump(
{
"name": obj.workflow_name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
"pathspec": "/".join((obj.flow.name, run_id)),
},
f,
Expand Down
4 changes: 2 additions & 2 deletions metaflow/plugins/aws/step_functions/step_functions_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def create(
{
"name": obj.state_machine_name,
"flow_name": obj.flow.name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
},
f,
)
Expand Down Expand Up @@ -502,7 +502,7 @@ def _convert_value(param):
json.dump(
{
"name": obj.state_machine_name,
"metadata": get_metadata(),
"metadata": obj.metadata.metadata_str(),
"pathspec": "/".join((obj.flow.name, run_id)),
},
f,
Expand Down
5 changes: 1 addition & 4 deletions metaflow/plugins/tag_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,7 @@ def _get_client_run_obj(obj, run_id, user_namespace):


def _set_current(obj):
current._set_env(
metadata_str="%s@%s"
% (obj.metadata.__class__.TYPE, obj.metadata.__class__.INFO)
)
current._set_env(metadata_str=obj.metadata.metadata_str())


@click.group()
Expand Down
7 changes: 4 additions & 3 deletions metaflow/runner/metaflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from typing import Dict, Iterator, Optional, Tuple

from metaflow import Run, metadata
from metaflow import Run

from .utils import handle_timeout
from .subprocess_manager import CommandManager, SubprocessManager
Expand Down Expand Up @@ -275,9 +275,10 @@ def __get_executing_run(self, tfp_runner_attribute, command_obj):

# Set the correct metadata from the runner_attribute file corresponding to this run.
metadata_for_flow = content.get("metadata")
metadata(metadata_for_flow)

run_object = Run(pathspec, _namespace_check=False)
run_object = Run(
pathspec, _namespace_check=False, _current_metadata=metadata_for_flow
)
return ExecutingRun(self, command_obj, run_object)

def run(self, **kwargs) -> ExecutingRun:
Expand Down
3 changes: 1 addition & 2 deletions metaflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,7 @@ def run_step(
origin_run_id=origin_run_id,
namespace=resolve_identity(),
username=get_username(),
metadata_str="%s@%s"
% (self.metadata.__class__.TYPE, self.metadata.__class__.INFO),
metadata_str=self.metadata.metadata_str(),
is_running=True,
tags=self.metadata.sticky_tags,
)
Expand Down
Loading