Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

argo-workflows runner #1868

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 52 additions & 3 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import sys
from hashlib import sha1

from metaflow import JSONType, current, decorators, parameters
from metaflow import Run, JSONType, current, decorators, parameters
from metaflow.client.core import get_metadata
from metaflow.exception import MetaflowNotFound
from metaflow._vendor import click
from metaflow.exception import MetaflowException, MetaflowInternalError
from metaflow.metaflow_config import (
Expand Down Expand Up @@ -165,6 +167,13 @@ def argo_workflows(obj, name=None):
default="",
help="PagerDuty Events API V2 Integration key for workflow success/failure notifications.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the workflow name to the file specified. Used internally for Metaflow's Runner API.",
)
@click.pass_obj
def create(
obj,
Expand All @@ -182,9 +191,14 @@ def create(
notify_on_success=False,
notify_slack_webhook_url=None,
notify_pager_duty_integration_key=None,
runner_attribute_file=None,
):
validate_tags(tags)

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump({"name": obj.workflow_name}, f)

obj.echo("Deploying *%s* to Argo Workflows..." % obj.workflow_name, bold=True)

if SERVICE_VERSION_CHECK:
Expand Down Expand Up @@ -563,8 +577,15 @@ def resolve_token(
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@click.pass_obj
def trigger(obj, run_id_file=None, **kwargs):
def trigger(obj, run_id_file=None, runner_attribute_file=None, **kwargs):
def _convert_value(param):
# Swap `-` with `_` in parameter name to match click's behavior
val = kwargs.get(param.name.replace("-", "_").lower())
Expand All @@ -587,6 +608,17 @@ def _convert_value(param):
with open(run_id_file, "w") as f:
f.write(str(run_id))

if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"name": obj.workflow_name,
"metadata": get_metadata(),
"pathspec": "/".join((obj.flow.name, run_id)),
},
f,
)

obj.echo(
"Workflow *{name}* triggered on Argo Workflows "
"(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id),
Expand Down Expand Up @@ -786,6 +818,20 @@ def validate_token(name, token_prefix, authorize, instructions_fn=None):
return True


def get_run_object(pathspec: str):
try:
return Run(pathspec, _namespace_check=False)
except MetaflowNotFound:
return None


def get_status_considering_run_object(status, run_obj):
remapped_status = remap_status(status)
if remapped_status == "Running" and run_obj is None:
return "Pending"
return remapped_status


@argo_workflows.command(help="Fetch flow execution status on Argo Workflows.")
@click.argument("run-id", required=True, type=str)
@click.pass_obj
Expand All @@ -803,8 +849,11 @@ def status(obj, run_id):
# Trim prefix from run_id
name = run_id[5:]
status = ArgoWorkflows.get_workflow_status(obj.flow.name, name)
run_obj = get_run_object("/".join((obj.flow.name, run_id)))

if status is not None:
obj.echo_always(remap_status(status))
status = get_status_considering_run_object(status, run_obj)
obj.echo_always(status)


@argo_workflows.command(help="Terminate flow execution on Argo Workflows.")
Expand Down
123 changes: 123 additions & 0 deletions metaflow/plugins/argo/argo_workflows_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import sys
from typing import Optional, ClassVar
from metaflow.plugins.argo.argo_workflows import ArgoWorkflows
from metaflow.plugins.argo.argo_workflows_cli import get_status_considering_run_object
from metaflow.plugins.deployer import (
Deployer,
DeployedFlow,
TriggeredRun,
get_lower_level_group,
)


class ArgoExecutingRun(TriggeredRun):
@property
def status(self):
flow_name, run_id = self.pathspec.split("/")
name = run_id[5:]
status = ArgoWorkflows.get_workflow_status(flow_name, name)
if status is not None:
return get_status_considering_run_object(status, self.run)
return None

def suspend(self, **kwargs):
_, run_id = self.pathspec.split("/")
command = get_lower_level_group(
self.deployer.api,
self.deployer.top_level_kwargs,
self.deployer.type,
self.deployer.name,
).suspend(run_id=run_id, **kwargs)

pid = self.deployer.spm.run_command(
[sys.executable, *command],
env=self.deployer.env_vars,
cwd=self.deployer.cwd,
show_output=self.deployer.show_output,
)

command_obj = self.deployer.spm.get(pid)
return command_obj.process.returncode == 0

def unsuspend(self, **kwargs):
_, run_id = self.pathspec.split("/")
command = get_lower_level_group(
self.deployer.api,
self.deployer.top_level_kwargs,
self.deployer.type,
self.deployer.name,
).unsuspend(run_id=run_id, **kwargs)

pid = self.deployer.spm.run_command(
[sys.executable, *command],
env=self.deployer.env_vars,
cwd=self.deployer.cwd,
show_output=self.deployer.show_output,
)

command_obj = self.deployer.spm.get(pid)
return command_obj.process.returncode == 0


class ArgoWorkflowsTemplate(DeployedFlow):
@property
def production_token(self):
_, production_token = ArgoWorkflows.get_existing_deployment(self.deployer.name)
return production_token


class ArgoWorkflowsDeployer(Deployer):
type: ClassVar[Optional[str]] = "argo-workflows"

def create(self, **kwargs) -> DeployedFlow:
command_obj = super().create(**kwargs)

if command_obj.process.returncode == 0:
return ArgoWorkflowsTemplate(deployer=self)

raise Exception("Error deploying %s to %s" % (self.flow_file, self.type))

def trigger(self, **kwargs) -> TriggeredRun:
content, command_obj = super().trigger(**kwargs)

if command_obj.process.returncode == 0:
return ArgoExecutingRun(self, content)

raise Exception(
"Error triggering %s on %s for %s" % (self.name, self.type, self.flow_file)
)


if __name__ == "__main__":
import time

ar = ArgoWorkflowsDeployer("../try.py")
print(ar.name)
ar_obj = ar.deploy()
print(ar.name)
print(type(ar))
print(type(ar_obj))
print(ar_obj.production_token)
result = ar_obj.trigger(alpha=300)
print(result.status)
run = result.run
while run is None:
print("trying again...")
run = result.run
print(result.run)
print(result.status)
time.sleep(120)
print(result.terminate())

print("triggering from deployer..")
result = ar.trigger(alpha=600)
print(result.name)
print(result.status)
run = result.run
while run is None:
print("trying again...")
run = result.run
print(result.run)
print(result.status)
time.sleep(120)
print(result.terminate())
Loading
Loading