Skip to content

Commit

Permalink
fix: reduce postgres notify messaging (#409)
Browse files Browse the repository at this point in the history
* some reworking of notify.py

* remove triggers from artifacts, restrict triggers to INSERT for metadata

* add conditions to metadata triggers

* restrict tasks and runs to insert triggers only

* test: switch to eager polling for heartbeat tracking
  • Loading branch information
saikonen authored Aug 26, 2024
1 parent bcd036c commit 85032e6
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 220 deletions.
6 changes: 4 additions & 2 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
operator_match = re.compile('([^:]*):([=><]+)$')

# use a ddmmyyy timestamp as the version for triggers
TRIGGER_VERSION = "18012024"
TRIGGER_VERSION = "23082024"
TRIGGER_NAME_PREFIX = "notify_ui"


Expand Down Expand Up @@ -759,7 +759,8 @@ class AsyncMetadataTablePostgres(AsyncPostgresTable):
primary_keys = ["flow_id", "run_number",
"step_name", "task_id", "field_name"]
trigger_keys = ["flow_id", "run_number",
"step_name", "task_id", "field_name", "value"]
"step_name", "task_id", "field_name", "value", "tags"]
trigger_operations = ["INSERT"]
select_columns = keys

async def add_metadata(
Expand Down Expand Up @@ -827,6 +828,7 @@ class AsyncArtifactTablePostgres(AsyncPostgresTable):
primary_keys = ["flow_id", "run_number",
"step_name", "task_id", "attempt_id", "name"]
trigger_keys = primary_keys
trigger_operations = ["INSERT"]
select_columns = keys

async def add_artifact(
Expand Down
30 changes: 21 additions & 9 deletions services/ui_backend_service/api/heartbeat_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def check_heartbeats(self):
and triggering handlers in case the heartbeat is too old.
"""
while True:
time_now = int(datetime.datetime.utcnow().timestamp()) # same format as the metadata heartbeat uses
time_now = heartbeat_time_now()
for key, hb in list(self.watched.items()):
if time_now - hb > HEARTBEAT_INTERVAL:
self.loop.create_task(self.load_and_broadcast(key))
Expand Down Expand Up @@ -123,9 +123,8 @@ async def heartbeat_handler(self, action: str, data: Dict):
async def add_to_watch(self, run: Dict):
if "last_heartbeat_ts" in run and "run_number" in run:
run_number = run["run_number"]
heartbeat_ts = run["last_heartbeat_ts"]
if heartbeat_ts is not None: # only start monitoring on runs that have a heartbeat
self.watched[run_number] = heartbeat_ts
heartbeat_ts = run["last_heartbeat_ts"] or heartbeat_time_now()
self.watched[run_number] = heartbeat_ts

async def get_run(self, run_key: str) -> Optional[Dict]:
"Fetch run with a given id or number from the DB"
Expand All @@ -136,7 +135,11 @@ async def get_run(self, run_key: str) -> Optional[Dict]:
async def load_and_broadcast(self, key):
run = await self.get_run(key)
resources = resource_list(self._run_table.table_name, run) if run else None
if resources and run['status'] == "failed":
if not resources:
return
if run['status'] == "running":
await self.add_to_watch(run)
if run['status'] == "failed":
# The purpose of the monitor is to emit otherwise unnoticed failed attempts.
# Do not unnecessarily broadcast other statuses that already get propagated by Notify.
self.event_emitter.emit('notify', 'UPDATE', resources, run)
Expand Down Expand Up @@ -199,9 +202,8 @@ async def add_to_watch(self, data):

key = self.generate_dict_key(task)
if key and "last_heartbeat_ts" in task:
heartbeat_ts = task["last_heartbeat_ts"]
if heartbeat_ts is not None: # only start monitoring on runs that have a heartbeat
self.watched[key] = heartbeat_ts
heartbeat_ts = task["last_heartbeat_ts"] or heartbeat_time_now()
self.watched[key] = heartbeat_ts

async def _get_task(self, flow_id: str, run_key: str, step_name: str, task_key: str, attempt_id: int = None, postprocess=None) -> Optional[Dict]:
"""
Expand All @@ -225,7 +227,12 @@ async def load_and_broadcast(self, key):
flow_id, run_number, step_name, task_id, attempt_id = self.decode_key_ids(key)
task = await self._get_task(flow_id, run_number, step_name, task_id, attempt_id, postprocess=self.refiner.postprocess)
resources = resource_list(self._task_table.table_name, task) if task else None
if resources and task['status'] == "failed":
if not resources:
return

if task['status'] == "running":
await self.add_to_watch(task)
if task['status'] == "failed":
# The purpose of the monitor is to emit otherwise unnoticed failed attempts.
# Do not unnecessarily broadcast other statuses that already get propagated by Notify.
self.event_emitter.emit('notify', 'UPDATE', resources, task)
Expand All @@ -240,3 +247,8 @@ def generate_dict_key(self, data):
def decode_key_ids(self, key):
flow, run, step, task, attempt = key.split("/")
return flow, run, step, task, attempt


def heartbeat_time_now():
# same format as the metadata heartbeat uses
return int(datetime.datetime.utcnow().timestamp())
45 changes: 10 additions & 35 deletions services/ui_backend_service/api/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,15 @@ async def handle_trigger_msg(self, msg: str):
if operation == "INSERT" and \
table.table_name == self.db.metadata_table_postgres.table_name and \
data["field_name"] == "attempt_ok":
attempt_id = None
try:
attempt_tag = [t for t in data['tags'] if t.startswith('attempt_id')][0]
attempt_id = attempt_tag.split(":")[1]
except Exception:
self.logger.exception("Failed to load attempt_id from attempt_ok metadata")
pass

# remove heartbeat watcher for completed task
self.event_emitter.emit("task-heartbeat", "complete", data)

# broadcast task status as it has either completed or failed.
Expand All @@ -133,40 +141,13 @@ async def handle_trigger_msg(self, msg: str):
event_emitter=self.event_emitter,
operation="UPDATE",
table=self.db.task_table_postgres,
data=data
data=data,
filter_dict={"attempt_id": attempt_id} if attempt_id else {}
)

# Notify updated Run status once attempt_ok metadata for end step has been received
if data["step_name"] == "end":
await _broadcast(self.event_emitter, "UPDATE", self.db.run_table_postgres, data)
# And remove possible heartbeat watchers for completed runs
self.event_emitter.emit("run-heartbeat", "complete", data)

# Notify related resources once new `_task_ok` artifact has been created
if operation == "INSERT" and \
table.table_name == self.db.artifact_table_postgres.table_name and \
data["name"] == "_task_ok":

# remove heartbeat watcher for completed task
self.event_emitter.emit("task-heartbeat", "complete", data)

# Always mark task finished if '_task_ok' artifact is created
# Include 'attempt_id' so we can identify which attempt this artifact related to
_attempt_id = data.get("attempt_id", 0)
await _broadcast(
event_emitter=self.event_emitter,
operation="UPDATE",
table=self.db.task_table_postgres,
data=data,
filter_dict={"attempt_id": _attempt_id}
)

# Last step is always called 'end' and only one '_task_ok' should be present
# Run is considered finished once 'end' step has '_task_ok' artifact
if data["step_name"] == "end":
await _broadcast(
self.event_emitter, "UPDATE", self.db.run_table_postgres,
data)
# Also trigger preload of artifacts after a run finishes.
self.event_emitter.emit("preload-task-statuses", data['flow_id'], data['run_number'])
# And remove possible heartbeat watchers for completed runs
Expand All @@ -179,12 +160,6 @@ async def handle_trigger_msg(self, msg: str):
data["field_name"] in ["code-package-url", "code-package"]:
self.event_emitter.emit("preload-dag", data['flow_id'], data['run_number'])

if operation == "INSERT" and \
table.table_name == self.db.artifact_table_postgres.table_name and \
data["step_name"] == "_parameters" and \
data["name"] == "_graph_info":
self.event_emitter.emit("preload-dag", data['flow_id'], data['run_number'])

except Exception:
self.logger.exception("Exception occurred")

Expand Down
3 changes: 2 additions & 1 deletion services/ui_backend_service/data/db/tables/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class AsyncArtifactTablePostgres(AsyncPostgresTable):
ordering = ["attempt_id DESC"]
keys = MetadataArtifactTable.keys
primary_keys = MetadataArtifactTable.primary_keys
trigger_keys = MetadataArtifactTable.trigger_keys
trigger_keys = None
trigger_operations = None
select_columns = keys

async def get_run_parameter_artifacts(self, flow_name, run_number, postprocess=None, invalidate_cache=False):
Expand Down
7 changes: 7 additions & 0 deletions services/ui_backend_service/data/db/tables/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class AsyncMetadataTablePostgres(AsyncPostgresTable):
keys = MetaserviceMetadataTable.keys
primary_keys = MetaserviceMetadataTable.primary_keys
trigger_keys = MetaserviceMetadataTable.trigger_keys
trigger_operations = ["INSERT"]
trigger_conditions = [
"NEW.field_name = 'attempt'",
"NEW.field_name = 'attempt_ok'",
"NEW.field_name = 'code-package'",
"NEW.field_name = 'code-package-url'",
]

@property
def select_columns(self):
Expand Down
1 change: 1 addition & 0 deletions services/ui_backend_service/data/db/tables/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class AsyncRunTablePostgres(AsyncPostgresTable):
keys = MetadataRunTable.keys
primary_keys = MetadataRunTable.primary_keys
trigger_keys = MetadataRunTable.trigger_keys
trigger_operations = ["INSERT"]

# NOTE: OSS Schema has metadata value column as TEXT, but for the time being we also need to support
# value columns of type jsonb, which is why there is additional logic when dealing with 'value'
Expand Down
1 change: 1 addition & 0 deletions services/ui_backend_service/data/db/tables/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class AsyncTaskTablePostgres(AsyncPostgresTable):
keys = MetadataTaskTable.keys
primary_keys = MetadataTaskTable.primary_keys
trigger_keys = MetadataTaskTable.trigger_keys
trigger_operations = ["INSERT"]
# NOTE: There is a lot of unfortunate backwards compatibility logic for cases where task metadata,
# or artifacts have not been stored correctly.
# NOTE: OSS Schema has metadata value column as TEXT, but for the time being we also need to support
Expand Down
177 changes: 4 additions & 173 deletions services/ui_backend_service/tests/integration_tests/notify_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,182 +134,13 @@ async def test_pg_notify_trigger_updates_on_task(cli, db):
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/attempts".format(**_task_end)]
assert result == assertable_task(_task_end)

# Add artifact (Task will be done)
_should_call_task_done = Future()
_should_call_artifact_insert = Future()

async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict):
if operation == "UPDATE":
if not _should_call_task_done.done():
_should_call_task_done.set_result([operation, resources, result])
else:
if not _should_call_artifact_insert.done():
_should_call_artifact_insert.set_result([operation, resources, result])
cli.server.app.event_emitter.on('notify', _event_handler_task_done)

_artifact_step = (await add_artifact(db,
flow_id=_task_step.get("flow_id"),
run_number=_task_step.get(
"run_number"),
run_id=_task_step.get("run_id"),
step_name=_task_step.get("step_name"),
task_id=_task_step.get("task_id"),
task_name=_task_step.get("task_name"),
artifact={"name": "_task_ok"})).body

operation, resources, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE)
assert operation == "UPDATE"
assert result == assertable_artifact(_artifact_step)

operation, resources, result = await wait_for(_should_call_artifact_insert, TIMEOUT_FUTURE)
assert operation == "INSERT"
assert result == assertable_artifact(_artifact_step)

cli.server.app.event_emitter.remove_all_listeners()
# NOTE: We used to test for legacy cases here where adding a task_ok artifact
# would broadcast an updated status for a task and possibly run.

# Add artifact (Run will be done)
_should_call_task_done = Future()
_should_call_run_done = Future()

async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict):
if operation == "UPDATE":
if "/runs" in resources:
_should_call_run_done.set_result(
[operation, resources, result])
else:
_should_call_task_done.set_result(
[operation, resources, result])
cli.server.app.event_emitter.on('notify', _event_handler_task_done)

_artifact_end = (await add_artifact(db,
flow_id=_task_end.get("flow_id"),
run_number=_task_end.get(
"run_number"),
run_id=_task_end.get("run_id"),
step_name=_task_end.get("step_name"),
task_id=_task_end.get("task_id"),
task_name=_task_end.get("task_name"),
artifact={"name": "_task_ok"})).body

operation, resources, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE)
assert operation == "UPDATE"
assert result == assertable_artifact(_artifact_end)

operation, resources, result = await wait_for(_should_call_run_done, TIMEOUT_FUTURE)
assert operation == "UPDATE"
assert result == assertable_artifact(_artifact_end)

cli.server.app.event_emitter.remove_all_listeners()


# Test INSERT and UPDATE pg_notify triggers
# Resource insert order is important here due to foreign keys
# Test artifact attempt_id and task updates related to it
# Task finished_at and attempt_id should always reflect artifact values
async def test_pg_notify_trigger_updates_on_attempt_id(cli, db):
# Add new Flow
_should_call = _set_notify_handler(cli)
_flow = (await add_flow(db, flow_id="HelloFlow")).body

# Wait for results
await wait_for(_should_call, TIMEOUT_FUTURE)

# Add new Run
_should_call = _set_notify_handler(cli)
_run = (await add_run(db, flow_id=_flow.get("flow_id"))).body

# Wait for results
await wait_for(_should_call, TIMEOUT_FUTURE)

# Add normal Step
_should_call = _set_notify_handler(cli)
_step = (await add_step(db, flow_id=_run.get("flow_id"), step_name="step", run_number=_run.get("run_number"), run_id=_run.get("run_id"))).body

# Wait for results
await wait_for(_should_call, TIMEOUT_FUTURE)

# Add new Task
_should_call = _set_notify_handler(cli)
_task_step = (await add_task(db,
flow_id=_step.get("flow_id"),
step_name=_step.get("step_name"),
run_number=_step.get("run_number"),
run_id=_step.get("run_id"))).body

# Wait for results
await wait_for(_should_call, TIMEOUT_FUTURE)

# Add artifact with attempt_id = 0 (Task will be done)

_should_call_task_done = Future()
_should_call_artifact_insert = Future()

async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict):
if operation == 'UPDATE':
if not _should_call_task_done.done():
_should_call_task_done.set_result([operation, resources, result])
else:
if not _should_call_artifact_insert.done():
_should_call_artifact_insert.set_result([operation, resources, result])

cli.server.app.event_emitter.on('notify', _event_handler_task_done)

_artifact_step = (await add_artifact(db,
flow_id=_task_step.get("flow_id"),
run_number=_task_step.get(
"run_number"),
run_id=_task_step.get("run_id"),
step_name=_task_step.get("step_name"),
task_id=_task_step.get("task_id"),
task_name=_task_step.get("task_name"),
artifact={"name": "_task_ok", "attempt_id": 0})).body
# Wait for results

operation, _, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE)
assert operation == "UPDATE"
assert result == assertable_artifact(_artifact_step)

operation, _, result = await wait_for(_should_call_artifact_insert, TIMEOUT_FUTURE)
assert operation == "INSERT"
assert result == assertable_artifact(_artifact_step)

cli.server.app.event_emitter.remove_all_listeners()

# Add artifact with attempt_id = 1 (Task will be done)
_should_call_task_done = Future()
_should_call_artifact_insert = Future()

async def _event_handler_task_done(operation: str, resources: List[str], result: Dict, table, filter_dict):
if operation == 'UPDATE':
if not _should_call_task_done.done():
_should_call_task_done.set_result([operation, resources, result])
else:
if not _should_call_artifact_insert.done():
_should_call_artifact_insert.set_result([operation, resources, result])
cli.server.app.event_emitter.on('notify', _event_handler_task_done)

_artifact_step = (await add_artifact(db,
flow_id=_task_step.get("flow_id"),
run_number=_task_step.get(
"run_number"),
run_id=_task_step.get("run_id"),
step_name=_task_step.get("step_name"),
task_id=_task_step.get("task_id"),
task_name=_task_step.get("task_name"),
artifact={"name": "_task_ok", "attempt_id": 1})).body

# Wait for results

operation, _, result = await wait_for(_should_call_task_done, TIMEOUT_FUTURE)
assert operation == "UPDATE"
assert result == assertable_artifact(_artifact_step)

operation, _, result = await wait_for(_should_call_artifact_insert, TIMEOUT_FUTURE)
assert operation == "INSERT"
assert result == assertable_artifact(_artifact_step)

cli.server.app.event_emitter.remove_all_listeners()

# NOTE: task_ok artifacts used to be able to drive task attempt updates.
# We have since gotten rid of triggers on artifacts so all task attempt updates are metadata driven.

async def test_pg_notify_dag_code_package_url(cli, db):
_flow = (await add_flow(db, flow_id="HelloFlow")).body
Expand Down

0 comments on commit 85032e6

Please sign in to comment.