Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make argo events sensor namespace configurable #1463

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8cffd98
Make argo events sensor namespace configurable
dhpollack Jun 22, 2023
57c1f62
Merge remote-tracking branch 'dhpollack/master' into feature/configur…
dhpollack Jun 28, 2023
3510429
Add sensor_namespace to delete method
dhpollack Jun 28, 2023
6372e15
Refactor sensor cleanup
dhpollack Jul 6, 2023
b2071a3
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Oct 9, 2023
b444768
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Oct 11, 2023
f7406b9
Use client
dhpollack Oct 11, 2023
0899a45
Changes based on PR comments
dhpollack Oct 11, 2023
bab09bf
Import namespace
dhpollack Oct 17, 2023
18681bf
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Oct 17, 2023
f29c08d
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Oct 19, 2023
6f3ece8
Move annotations to avoid duplicate if-statements
dhpollack Oct 19, 2023
4d9bb49
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Nov 9, 2023
dc00696
changes from code review
dhpollack Nov 13, 2023
5609270
Merge branch 'master' into feature/configure-sensor-namespace
saikonen Nov 17, 2023
a680b66
Do not delete old schedule
dhpollack Nov 17, 2023
4115a44
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Mar 27, 2024
348c0b9
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Jun 3, 2024
85785ee
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Jul 21, 2024
333ee5c
Rename argo_client
dhpollack Jul 21, 2024
39284f2
Use sensor name method everywhere
dhpollack Jul 21, 2024
6df2c25
Merge remote-tracking branch 'upstream/master' into feature/configure…
dhpollack Aug 27, 2024
89bf94a
Fix sensor namespace issues
dhpollack Aug 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@
"ARGO_EVENTS_INTERNAL_WEBHOOK_URL", ARGO_EVENTS_WEBHOOK_URL
)
ARGO_EVENTS_WEBHOOK_AUTH = from_conf("ARGO_EVENTS_WEBHOOK_AUTH", "none")
ARGO_EVENTS_SENSOR_NAMESPACE = from_conf(
saikonen marked this conversation as resolved.
Show resolved Hide resolved
"ARGO_EVENTS_SENSOR_NAMESPACE", KUBERNETES_NAMESPACE
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
)

ARGO_WORKFLOWS_UI_URL = from_conf("ARGO_WORKFLOWS_UI_URL")

Expand Down
16 changes: 10 additions & 6 deletions metaflow/plugins/argo/argo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sys

from metaflow.metaflow_config import ARGO_EVENTS_SENSOR_NAMESPACE
from metaflow.exception import MetaflowException
from metaflow.plugins.kubernetes.kubernetes_client import KubernetesClient

Expand Down Expand Up @@ -349,12 +350,15 @@ def schedule_workflow_template(self, name, schedule=None, timezone=None):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def register_sensor(self, name, sensor=None):
def register_sensor(
self, name, sensor=None, sensor_namespace=ARGO_EVENTS_SENSOR_NAMESPACE
):
if sensor is None:
sensor = {}
# Unfortunately, Kubernetes client does not handle optimistic
# concurrency control by itself unlike kubectl
client = self._client.get()

if not sensor:
sensor["metadata"] = {}

Expand All @@ -364,7 +368,7 @@ def register_sensor(self, name, sensor=None):
] = client.CustomObjectsApi().get_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
namespace=sensor_namespace,
plural="sensors",
name=name,
)[
Expand All @@ -379,7 +383,7 @@ def register_sensor(self, name, sensor=None):
return client.CustomObjectsApi().create_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
namespace=sensor_namespace,
plural="sensors",
body=sensor,
)
Expand All @@ -397,7 +401,7 @@ def register_sensor(self, name, sensor=None):
return client.CustomObjectsApi().replace_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
namespace=sensor_namespace,
plural="sensors",
body=sensor,
name=name,
Expand All @@ -407,7 +411,7 @@ def register_sensor(self, name, sensor=None):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def delete_sensor(self, name):
def delete_sensor(self, name, sensor_namespace):
"""
Issues an API call for deleting a sensor

Expand All @@ -419,7 +423,7 @@ def delete_sensor(self, name):
return client.CustomObjectsApi().delete_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
namespace=sensor_namespace,
plural="sensors",
name=name,
)
Expand Down
88 changes: 72 additions & 16 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ARGO_EVENTS_EVENT_BUS,
ARGO_EVENTS_EVENT_SOURCE,
ARGO_EVENTS_INTERNAL_WEBHOOK_URL,
ARGO_EVENTS_SENSOR_NAMESPACE,
ARGO_EVENTS_SERVICE_ACCOUNT,
ARGO_EVENTS_WEBHOOK_AUTH,
ARGO_WORKFLOWS_ENV_VARS_TO_SKIP,
Expand Down Expand Up @@ -66,6 +67,10 @@ class ArgoWorkflowsException(MetaflowException):
headline = "Argo Workflows error"


class ArgoWorkflowsCleanupException(MetaflowException):
headline = "Argo Workflows clean up error"


class ArgoWorkflowsSchedulingException(MetaflowException):
headline = "Argo Workflows scheduling error"

Expand Down Expand Up @@ -175,6 +180,44 @@ def deploy(self):
except Exception as e:
raise ArgoWorkflowsException(str(e))

def cleanup_previous(self):
try:
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
# Check for existing deployment and do cleanup
old_template = client.get_workflow_template(self.name)
if not old_template:
return None
# Clean up old sensors
old_has_sensor = old_template["metadata"]["annotations"].get(
"metaflow/has_sensor"
)
if old_has_sensor is None:
# This workflow was created before sensor annotations
# and may have a sensor in the default namespace
# we will delete it and it'll get recreated if need be
client.delete_sensor(self.name.replace(".", "-"), client._namespace)
elif old_has_sensor == "True":
# delete old sensor only if it was somewhere else, otherwise it'll get replaced
old_sensor_name = old_template["metadata"]["annotations"][
"metaflow/sensor_name"
]
old_sensor_namespace = old_template["metadata"]["annotations"][
"metaflow/sensor_namespace"
]
if (
not self._sensor
or old_sensor_namespace != ARGO_EVENTS_SENSOR_NAMESPACE
):
client.delete_sensor(old_sensor_name, old_sensor_namespace)
# Clean up old schedules/cronworkflow
old_has_schedule = old_template["metadata"]["annotations"].get(
"metaflow/has_schedule"
)
if not self._schedule and (old_has_schedule is None or old_has_schedule):
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
client.delete_cronworkflow(self.name)
except Exception as e:
raise ArgoWorkflowsCleanupException(str(e))

@staticmethod
def _sanitize(name):
# Metaflow allows underscores in node names, which are disallowed in Argo
Expand Down Expand Up @@ -205,6 +248,14 @@ def list_templates(flow_name, all=False):
def delete(name):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)

workflow_template = client.get_workflow_template(name)
sensor_name = workflow_template["metadata"]["annotations"].get(
"metaflow/sensor_name", name.replace(".", "-")
)
sensor_namespace = workflow_template["metadata"]["annotations"].get(
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
"metaflow/sensor_namespace", ARGO_EVENTS_SENSOR_NAMESPACE
)

# Always try to delete the schedule. Failure in deleting the schedule should not
# be treated as an error, due to any of the following reasons
# - there might not have been a schedule, or it was deleted by some other means
Expand All @@ -214,7 +265,7 @@ def delete(name):

# The workflow might have sensors attached to it, which consume actual resources.
# Try to delete these as well.
sensor_deleted = client.delete_sensor(name)
sensor_deleted = client.delete_sensor(sensor_name, sensor_namespace)

# After cleaning up related resources, delete the workflow in question.
# Failure in deleting is treated as critical and will be made visible to the user
Expand Down Expand Up @@ -327,20 +378,21 @@ def _get_schedule(self):

def schedule(self):
try:
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
argo_client.schedule_workflow_template(
self.name, self._schedule, self._timezone
)
# Register sensor. Unfortunately, Argo Events Sensor names don't allow for
# dots (sensors run into an error) which rules out self.name :(
# Metaflow will overwrite any existing sensor.
sensor_name = self.name.replace(".", "-")
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)

if self._schedule:
client.schedule_workflow_template(
self.name, self._schedule, self._timezone
)
if self._sensor:
argo_client.register_sensor(sensor_name, self._sensor.to_json())
else:
# Since sensors occupy real resources, delete existing sensor if needed
# Deregister sensors that might have existed before this deployment
argo_client.delete_sensor(sensor_name)
# Register sensor. Unfortunately, Argo Events Sensor names don't allow for
# dots (sensors run into an error) which rules out self.name :(
# Metaflow will overwrite any existing sensor.
sensor_name = self.name.replace(".", "-")
dhpollack marked this conversation as resolved.
Show resolved Hide resolved
# The new sensor will go into the sensor namespace specified
client.register_sensor(
sensor_name, self._sensor.to_json(), ARGO_EVENTS_SENSOR_NAMESPACE
)
except Exception as e:
raise ArgoWorkflowsSchedulingException(str(e))

Expand Down Expand Up @@ -620,6 +672,8 @@ def _compile_workflow_template(self):
"metaflow/owner": self.username,
"metaflow/user": "argo-workflows",
"metaflow/flow_name": self.flow.name,
"metaflow/has_schedule": str(self._schedule is not None),
"metaflow/has_sensor": str(bool(self.triggers)),
}

if self.parameters:
Expand All @@ -645,7 +699,9 @@ def _compile_workflow_template(self):
{key: trigger.get(key) for key in ["name", "type"]}
for trigger in self.triggers
]
)
),
"metaflow/sensor_name": self.name.replace(".", "-"),
"metaflow/sensor_namespace": ARGO_EVENTS_SENSOR_NAMESPACE,
}
)
if self.notify_on_error:
Expand Down Expand Up @@ -1793,7 +1849,7 @@ def _compile_sensor(self):
# Sensor metadata.
ObjectMeta()
.name(self.name.replace(".", "-"))
.namespace(KUBERNETES_NAMESPACE)
.namespace(ARGO_EVENTS_SENSOR_NAMESPACE)
.label("app.kubernetes.io/name", "metaflow-sensor")
.label("app.kubernetes.io/part-of", "metaflow")
.labels(self.kubernetes_labels)
Expand Down
1 change: 1 addition & 0 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def create(
obj.echo_always(str(flow), err=False, no_bold=True)
# TODO: Support echo-ing Argo Events Sensor template
else:
flow.cleanup_previous()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there is an issue with running the cleanup only as part of the deploy command. Consider the following scenario:

  • user has deployed a sensor flow to namespace dev-A, sensor exists in the same namespace
  • this feature gets released and the Argo sensor namespace env_var gets configured to dev-B
  • the user decides that they want to get rid of their flow, and issue an argo-workflows delete directly.

Wouldn't this only check for the currently configured sensor namespace, leaving the original sensor in dev-A untouched?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in the case of delete it seems sensible to me to keep the concern of cleaning up resources as part of the method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the legacy case, we need to do cleanup before creating a new argo workflow. This can only happen if we clean up before calling flow.deploy().

One reason I did this is that there is some inconsistency in the API. The current implementation already does some sort of "cleanup". For schedules, we create/suspend them within the schedule_workflow_template method which is in the argo_client.py, which is called within the schedule method in argo_workflows.py. However the sensors are created/deleted within the schedule method in argo_workflows.py. So I wanted to move this all into one place, but I couldn't put my code into this schedule method. The previous annotations for the triggers would be overwritten before the schedulemethod.

Currently, the flow.schedule function is used to create both the schedule and the sensor with the assumption that the sensor will get overwritten because it always exists in the same namespace. So in your scenario, if you configured you local environment with dev-B then you would have to do an argo-workflows create in order to create a sensor in dev-B. Once you did that, it would delete the sensor in dev-A due to this line:

https://github.com/dhpollack/metaflow/blob/feature/configure-sensor-namespace/metaflow/plugins/argo/argo_workflows.py#L195

flow.deploy()
obj.echo(
"Workflow *{workflow_name}* "
Expand Down
Loading