From 85032e6083ced97e7e585f2a1486623ec043c4b5 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen <64256562+saikonen@users.noreply.github.com> Date: Mon, 26 Aug 2024 13:20:42 +0300 Subject: [PATCH] fix: reduce postgres notify messaging (#409) * some reworking of notify.py * remove triggers from artifacts, restrict triggers to INSERT for metadata * add conditions to metadata triggers * restrict tasks and runs to insert triggers only * test: switch to eager polling for heartbeat tracking --- services/data/postgres_async_db.py | 6 +- .../api/heartbeat_monitor.py | 30 ++- services/ui_backend_service/api/notify.py | 45 +---- .../data/db/tables/artifact.py | 3 +- .../data/db/tables/metadata.py | 7 + .../ui_backend_service/data/db/tables/run.py | 1 + .../ui_backend_service/data/db/tables/task.py | 1 + .../tests/integration_tests/notify_test.py | 177 +----------------- 8 files changed, 50 insertions(+), 220 deletions(-) diff --git a/services/data/postgres_async_db.py b/services/data/postgres_async_db.py index 97ce93f3..1f1c66e3 100644 --- a/services/data/postgres_async_db.py +++ b/services/data/postgres_async_db.py @@ -39,7 +39,7 @@ operator_match = re.compile('([^:]*):([=><]+)$') # use a ddmmyyy timestamp as the version for triggers -TRIGGER_VERSION = "18012024" +TRIGGER_VERSION = "23082024" TRIGGER_NAME_PREFIX = "notify_ui" @@ -759,7 +759,8 @@ class AsyncMetadataTablePostgres(AsyncPostgresTable): primary_keys = ["flow_id", "run_number", "step_name", "task_id", "field_name"] trigger_keys = ["flow_id", "run_number", - "step_name", "task_id", "field_name", "value"] + "step_name", "task_id", "field_name", "value", "tags"] + trigger_operations = ["INSERT"] select_columns = keys async def add_metadata( @@ -827,6 +828,7 @@ class AsyncArtifactTablePostgres(AsyncPostgresTable): primary_keys = ["flow_id", "run_number", "step_name", "task_id", "attempt_id", "name"] trigger_keys = primary_keys + trigger_operations = ["INSERT"] select_columns = keys async def add_artifact( diff --git a/services/ui_backend_service/api/heartbeat_monitor.py b/services/ui_backend_service/api/heartbeat_monitor.py index 630a1f8d..17087280 100644 --- a/services/ui_backend_service/api/heartbeat_monitor.py +++ b/services/ui_backend_service/api/heartbeat_monitor.py @@ -70,7 +70,7 @@ async def check_heartbeats(self): and triggering handlers in case the heartbeat is too old. """ while True: - time_now = int(datetime.datetime.utcnow().timestamp()) # same format as the metadata heartbeat uses + time_now = heartbeat_time_now() for key, hb in list(self.watched.items()): if time_now - hb > HEARTBEAT_INTERVAL: self.loop.create_task(self.load_and_broadcast(key)) @@ -123,9 +123,8 @@ async def heartbeat_handler(self, action: str, data: Dict): async def add_to_watch(self, run: Dict): if "last_heartbeat_ts" in run and "run_number" in run: run_number = run["run_number"] - heartbeat_ts = run["last_heartbeat_ts"] - if heartbeat_ts is not None: # only start monitoring on runs that have a heartbeat - self.watched[run_number] = heartbeat_ts + heartbeat_ts = run["last_heartbeat_ts"] or heartbeat_time_now() + self.watched[run_number] = heartbeat_ts async def get_run(self, run_key: str) -> Optional[Dict]: "Fetch run with a given id or number from the DB" @@ -136,7 +135,11 @@ async def get_run(self, run_key: str) -> Optional[Dict]: async def load_and_broadcast(self, key): run = await self.get_run(key) resources = resource_list(self._run_table.table_name, run) if run else None - if resources and run['status'] == "failed": + if not resources: + return + if run['status'] == "running": + await self.add_to_watch(run) + if run['status'] == "failed": # The purpose of the monitor is to emit otherwise unnoticed failed attempts. # Do not unnecessarily broadcast other statuses that already get propagated by Notify. self.event_emitter.emit('notify', 'UPDATE', resources, run) @@ -199,9 +202,8 @@ async def add_to_watch(self, data): key = self.generate_dict_key(task) if key and "last_heartbeat_ts" in task: - heartbeat_ts = task["last_heartbeat_ts"] - if heartbeat_ts is not None: # only start monitoring on runs that have a heartbeat - self.watched[key] = heartbeat_ts + heartbeat_ts = task["last_heartbeat_ts"] or heartbeat_time_now() + self.watched[key] = heartbeat_ts async def _get_task(self, flow_id: str, run_key: str, step_name: str, task_key: str, attempt_id: int = None, postprocess=None) -> Optional[Dict]: """ @@ -225,7 +227,12 @@ async def load_and_broadcast(self, key): flow_id, run_number, step_name, task_id, attempt_id = self.decode_key_ids(key) task = await self._get_task(flow_id, run_number, step_name, task_id, attempt_id, postprocess=self.refiner.postprocess) resources = resource_list(self._task_table.table_name, task) if task else None - if resources and task['status'] == "failed": + if not resources: + return + + if task['status'] == "running": + await self.add_to_watch(task) + if task['status'] == "failed": # The purpose of the monitor is to emit otherwise unnoticed failed attempts. # Do not unnecessarily broadcast other statuses that already get propagated by Notify. self.event_emitter.emit('notify', 'UPDATE', resources, task) @@ -240,3 +247,8 @@ def generate_dict_key(self, data): def decode_key_ids(self, key): flow, run, step, task, attempt = key.split("/") return flow, run, step, task, attempt + + +def heartbeat_time_now(): + # same format as the metadata heartbeat uses + return int(datetime.datetime.utcnow().timestamp()) diff --git a/services/ui_backend_service/api/notify.py b/services/ui_backend_service/api/notify.py index ded628c1..31f7f622 100644 --- a/services/ui_backend_service/api/notify.py +++ b/services/ui_backend_service/api/notify.py @@ -124,7 +124,15 @@ async def handle_trigger_msg(self, msg: str): if operation == "INSERT" and \ table.table_name == self.db.metadata_table_postgres.table_name and \ data["field_name"] == "attempt_ok": + attempt_id = None + try: + attempt_tag = [t for t in data['tags'] if t.startswith('attempt_id')][0] + attempt_id = attempt_tag.split(":")[1] + except Exception: + self.logger.exception("Failed to load attempt_id from attempt_ok metadata") + pass + # remove heartbeat watcher for completed task self.event_emitter.emit("task-heartbeat", "complete", data) # broadcast task status as it has either completed or failed. @@ -133,40 +141,13 @@ async def handle_trigger_msg(self, msg: str): event_emitter=self.event_emitter, operation="UPDATE", table=self.db.task_table_postgres, - data=data + data=data, + filter_dict={"attempt_id": attempt_id} if attempt_id else {} ) # Notify updated Run status once attempt_ok metadata for end step has been received if data["step_name"] == "end": await _broadcast(self.event_emitter, "UPDATE", self.db.run_table_postgres, data) - # And remove possible heartbeat watchers for completed runs - self.event_emitter.emit("run-heartbeat", "complete", data) - - # Notify related resources once new `_task_ok` artifact has been created - if operation == "INSERT" and \ - table.table_name == self.db.artifact_table_postgres.table_name and \ - data["name"] == "_task_ok": - - # remove heartbeat watcher for completed task - self.event_emitter.emit("task-heartbeat", "complete", data) - - # Always mark task finished if '_task_ok' artifact is created - # Include 'attempt_id' so we can identify which attempt this artifact related to - _attempt_id = data.get("attempt_id", 0) - await _broadcast( - event_emitter=self.event_emitter, - operation="UPDATE", - table=self.db.task_table_postgres, - data=data, - filter_dict={"attempt_id": _attempt_id} - ) - - # Last step is always called 'end' and only one '_task_ok' should be present - # Run is considered finished once 'end' step has '_task_ok' artifact - if data["step_name"] == "end": - await _broadcast( - self.event_emitter, "UPDATE", self.db.run_table_postgres, - data) # Also trigger preload of artifacts after a run finishes. self.event_emitter.emit("preload-task-statuses", data['flow_id'], data['run_number']) # And remove possible heartbeat watchers for completed runs @@ -179,12 +160,6 @@ async def handle_trigger_msg(self, msg: str): data["field_name"] in ["code-package-url", "code-package"]: self.event_emitter.emit("preload-dag", data['flow_id'], data['run_number']) - if operation == "INSERT" and \ - table.table_name == self.db.artifact_table_postgres.table_name and \ - data["step_name"] == "_parameters" and \ - data["name"] == "_graph_info": - self.event_emitter.emit("preload-dag", data['flow_id'], data['run_number']) - except Exception: self.logger.exception("Exception occurred") diff --git a/services/ui_backend_service/data/db/tables/artifact.py b/services/ui_backend_service/data/db/tables/artifact.py index 43821755..2d416c93 100644 --- a/services/ui_backend_service/data/db/tables/artifact.py +++ b/services/ui_backend_service/data/db/tables/artifact.py @@ -14,7 +14,8 @@ class AsyncArtifactTablePostgres(AsyncPostgresTable): ordering = ["attempt_id DESC"] keys = MetadataArtifactTable.keys primary_keys = MetadataArtifactTable.primary_keys - trigger_keys = MetadataArtifactTable.trigger_keys + trigger_keys = None + trigger_operations = None select_columns = keys async def get_run_parameter_artifacts(self, flow_name, run_number, postprocess=None, invalidate_cache=False): diff --git a/services/ui_backend_service/data/db/tables/metadata.py b/services/ui_backend_service/data/db/tables/metadata.py index 3940d9c5..046a7eb5 100644 --- a/services/ui_backend_service/data/db/tables/metadata.py +++ b/services/ui_backend_service/data/db/tables/metadata.py @@ -13,6 +13,13 @@ class AsyncMetadataTablePostgres(AsyncPostgresTable): keys = MetaserviceMetadataTable.keys primary_keys = MetaserviceMetadataTable.primary_keys trigger_keys = MetaserviceMetadataTable.trigger_keys + trigger_operations = ["INSERT"] + trigger_conditions = [ + "NEW.field_name = 'attempt'", + "NEW.field_name = 'attempt_ok'", + "NEW.field_name = 'code-package'", + "NEW.field_name = 'code-package-url'", + ] @property def select_columns(self): diff --git a/services/ui_backend_service/data/db/tables/run.py b/services/ui_backend_service/data/db/tables/run.py index 27c598fe..51f3cc36 100644 --- a/services/ui_backend_service/data/db/tables/run.py +++ b/services/ui_backend_service/data/db/tables/run.py @@ -31,6 +31,7 @@ class AsyncRunTablePostgres(AsyncPostgresTable): keys = MetadataRunTable.keys primary_keys = MetadataRunTable.primary_keys trigger_keys = MetadataRunTable.trigger_keys + trigger_operations = ["INSERT"] # NOTE: OSS Schema has metadata value column as TEXT, but for the time being we also need to support # value columns of type jsonb, which is why there is additional logic when dealing with 'value' diff --git a/services/ui_backend_service/data/db/tables/task.py b/services/ui_backend_service/data/db/tables/task.py index b4909577..996a177e 100644 --- a/services/ui_backend_service/data/db/tables/task.py +++ b/services/ui_backend_service/data/db/tables/task.py @@ -21,6 +21,7 @@ class AsyncTaskTablePostgres(AsyncPostgresTable): keys = MetadataTaskTable.keys primary_keys = MetadataTaskTable.primary_keys trigger_keys = MetadataTaskTable.trigger_keys + trigger_operations = ["INSERT"] # NOTE: There is a lot of unfortunate backwards compatibility logic for cases where task metadata, # or artifacts have not been stored correctly. # NOTE: OSS Schema has metadata value column as TEXT, but for the time being we also need to support diff --git a/services/ui_backend_service/tests/integration_tests/notify_test.py b/services/ui_backend_service/tests/integration_tests/notify_test.py index f0f43f90..23ecf9ab 100644 --- a/services/ui_backend_service/tests/integration_tests/notify_test.py +++ b/services/ui_backend_service/tests/integration_tests/notify_test.py @@ -134,182 +134,13 @@ async def test_pg_notify_trigger_updates_on_task(cli, db): "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/attempts".format(**_task_end)] assert result == assertable_task(_task_end) - # Add artifact (Task will be done) - _should_call_task_done = Future() - _should_call_artifact_insert = Future() - - async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict): - if operation == "UPDATE": - if not _should_call_task_done.done(): - _should_call_task_done.set_result([operation, resources, result]) - else: - if not _should_call_artifact_insert.done(): - _should_call_artifact_insert.set_result([operation, resources, result]) - cli.server.app.event_emitter.on('notify', _event_handler_task_done) - - _artifact_step = (await add_artifact(db, - flow_id=_task_step.get("flow_id"), - run_number=_task_step.get( - "run_number"), - run_id=_task_step.get("run_id"), - step_name=_task_step.get("step_name"), - task_id=_task_step.get("task_id"), - task_name=_task_step.get("task_name"), - artifact={"name": "_task_ok"})).body - - operation, resources, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE) - assert operation == "UPDATE" - assert result == assertable_artifact(_artifact_step) - - operation, resources, result = await wait_for(_should_call_artifact_insert, TIMEOUT_FUTURE) - assert operation == "INSERT" - assert result == assertable_artifact(_artifact_step) - cli.server.app.event_emitter.remove_all_listeners() + # NOTE: We used to test for legacy cases here where adding a task_ok artifact + # would broadcast an updated status for a task and possibly run. - # Add artifact (Run will be done) - _should_call_task_done = Future() - _should_call_run_done = Future() - - async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict): - if operation == "UPDATE": - if "/runs" in resources: - _should_call_run_done.set_result( - [operation, resources, result]) - else: - _should_call_task_done.set_result( - [operation, resources, result]) - cli.server.app.event_emitter.on('notify', _event_handler_task_done) - - _artifact_end = (await add_artifact(db, - flow_id=_task_end.get("flow_id"), - run_number=_task_end.get( - "run_number"), - run_id=_task_end.get("run_id"), - step_name=_task_end.get("step_name"), - task_id=_task_end.get("task_id"), - task_name=_task_end.get("task_name"), - artifact={"name": "_task_ok"})).body - - operation, resources, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE) - assert operation == "UPDATE" - assert result == assertable_artifact(_artifact_end) - - operation, resources, result = await wait_for(_should_call_run_done, TIMEOUT_FUTURE) - assert operation == "UPDATE" - assert result == assertable_artifact(_artifact_end) - - cli.server.app.event_emitter.remove_all_listeners() - - -# Test INSERT and UPDATE pg_notify triggers -# Resource insert order is important here due to foreign keys -# Test artifact attempt_id and task updates related to it -# Task finished_at and attempt_id should always reflect artifact values -async def test_pg_notify_trigger_updates_on_attempt_id(cli, db): - # Add new Flow - _should_call = _set_notify_handler(cli) - _flow = (await add_flow(db, flow_id="HelloFlow")).body - - # Wait for results - await wait_for(_should_call, TIMEOUT_FUTURE) - - # Add new Run - _should_call = _set_notify_handler(cli) - _run = (await add_run(db, flow_id=_flow.get("flow_id"))).body - - # Wait for results - await wait_for(_should_call, TIMEOUT_FUTURE) - - # Add normal Step - _should_call = _set_notify_handler(cli) - _step = (await add_step(db, flow_id=_run.get("flow_id"), step_name="step", run_number=_run.get("run_number"), run_id=_run.get("run_id"))).body - - # Wait for results - await wait_for(_should_call, TIMEOUT_FUTURE) - - # Add new Task - _should_call = _set_notify_handler(cli) - _task_step = (await add_task(db, - flow_id=_step.get("flow_id"), - step_name=_step.get("step_name"), - run_number=_step.get("run_number"), - run_id=_step.get("run_id"))).body - - # Wait for results - await wait_for(_should_call, TIMEOUT_FUTURE) - - # Add artifact with attempt_id = 0 (Task will be done) - - _should_call_task_done = Future() - _should_call_artifact_insert = Future() - - async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict): - if operation == 'UPDATE': - if not _should_call_task_done.done(): - _should_call_task_done.set_result([operation, resources, result]) - else: - if not _should_call_artifact_insert.done(): - _should_call_artifact_insert.set_result([operation, resources, result]) - - cli.server.app.event_emitter.on('notify', _event_handler_task_done) - - _artifact_step = (await add_artifact(db, - flow_id=_task_step.get("flow_id"), - run_number=_task_step.get( - "run_number"), - run_id=_task_step.get("run_id"), - step_name=_task_step.get("step_name"), - task_id=_task_step.get("task_id"), - task_name=_task_step.get("task_name"), - artifact={"name": "_task_ok", "attempt_id": 0})).body - # Wait for results - - operation, _, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE) - assert operation == "UPDATE" - assert result == assertable_artifact(_artifact_step) - - operation, _, result = await wait_for(_should_call_artifact_insert, TIMEOUT_FUTURE) - assert operation == "INSERT" - assert result == assertable_artifact(_artifact_step) - - cli.server.app.event_emitter.remove_all_listeners() - - # Add artifact with attempt_id = 1 (Task will be done) - _should_call_task_done = Future() - _should_call_artifact_insert = Future() - - async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict): - if operation == 'UPDATE': - if not _should_call_task_done.done(): - _should_call_task_done.set_result([operation, resources, result]) - else: - if not _should_call_artifact_insert.done(): - _should_call_artifact_insert.set_result([operation, resources, result]) - cli.server.app.event_emitter.on('notify', _event_handler_task_done) - - _artifact_step = (await add_artifact(db, - flow_id=_task_step.get("flow_id"), - run_number=_task_step.get( - "run_number"), - run_id=_task_step.get("run_id"), - step_name=_task_step.get("step_name"), - task_id=_task_step.get("task_id"), - task_name=_task_step.get("task_name"), - artifact={"name": "_task_ok", "attempt_id": 1})).body - - # Wait for results - - operation, _, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE) - assert operation == "UPDATE" - assert result == assertable_artifact(_artifact_step) - - operation, _, result = await wait_for(_should_call_artifact_insert, TIMEOUT_FUTURE) - assert operation == "INSERT" - assert result == assertable_artifact(_artifact_step) - - cli.server.app.event_emitter.remove_all_listeners() +# NOTE: task_ok artifacts used to be able to drive task attempt updates. +# We have since gotten rid of triggers on artifacts so all task attempt updates are metadata driven. async def test_pg_notify_dag_code_package_url(cli, db): _flow = (await add_flow(db, flow_id="HelloFlow")).body