From fa3cbd2b6f3ec501f435082a2caa3efcc65ff260 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Fri, 11 Oct 2024 09:57:18 -0600 Subject: [PATCH 1/4] transform_file task uses ack_late=True We found that when the autoscaler decides to scale down the number of transformers that the science image dies immediatly which causes the "sync" call to fail. When this happens we need to put the transform request back onto rabbit queue and die gracefully. Add the acks_late propoerty to the transform_file request and now the failure to receive the sync command from the science container is a fatal error. --- .../src/transformer_sidecar/transformer.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index 1fc64187..d1e6dd56 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -29,6 +29,7 @@ import json import os import shutil +import sys import timeit from argparse import Namespace from hashlib import sha1, sha256 @@ -42,7 +43,8 @@ import time from celery import Celery, shared_task -from transformer_sidecar.science_container_command import ScienceContainerCommand +from transformer_sidecar.science_container_command import ScienceContainerCommand, \ + ScienceContainerException from transformer_sidecar.transformer_logging import initialize_logging from transformer_sidecar.transformer_stats import TransformerStats from transformer_sidecar.transformer_stats.aod_stats import AODStats # NOQA: 401 @@ -82,7 +84,7 @@ logger = initialize_logging() -@shared_task +@shared_task(acks_late=True) def transform_file( request_id, file_id, @@ -276,6 +278,9 @@ def transform_file( }, ) + except ScienceContainerException: + logger.exception("Science container not responding. Shutting down this transformer.") + sys.exit(0) except Exception as error: logger.exception(f"Received exception doing transform: {error}") rec = FileCompleteRecord( From d5d98dd5e84062d04ed561da0c9f30da48f49c25 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Fri, 11 Oct 2024 15:26:43 -0600 Subject: [PATCH 2/4] Add some more resliance to the transformer 1. Add a no-op SIGTERM handler to the object store uploader so it doesn't terminate when the pod shuts down. It stays up and relies on the main Celery worker to send the poison pill to the queue when it is time to go. 2. Set the celery pre-fetch multiplier to one so we don't get greedy and grab more transform requests than we can digest --- .../src/transformer_sidecar/object_store_uploader.py | 8 ++++++++ .../src/transformer_sidecar/transformer.py | 1 + 2 files changed, 9 insertions(+) diff --git a/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py b/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py index 25944e52..bbde3a69 100644 --- a/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py +++ b/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging import os +import signal from multiprocessing import Process from pathlib import Path from queue import Queue @@ -58,12 +59,19 @@ class ObjectStoreUploader(Process): def __init__(self, request_id: str, input_queue: Queue, logger: logging.Logger, convert_root_to_parquet: bool): + super().__init__(target=self.service_work_queue) self.request_id = request_id self.input_queue = input_queue self.logger = logger self.convert_root_to_parquet = convert_root_to_parquet + signal.signal(signal.SIGTERM, self.handle_sigterm) + + def handle_sigterm(self, signum, frame): + # This method will be called when SIGTERM is received + print("SIGTERM received, but ignored.") + def service_work_queue(self): import time self.logger.debug("Object store uploader starting.", diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index d1e6dd56..55a99a59 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -508,6 +508,7 @@ def prepend_xcache(file_paths: list[str]) -> list[str]: app.conf.task_create_missing_queues = False app.conf.worker_hijack_root_logger = False app.conf.worker_redirect_stdouts_level = 'DEBUG' + app.conf.worker_prefetch_multiplier = 1 init(_args, app) logger.debug( From a9c74d659583035a05d240602463a7eb945a3211 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Tue, 15 Oct 2024 14:09:27 -0600 Subject: [PATCH 3/4] Log debug output on object store uploader SIGTERM message --- .../src/transformer_sidecar/object_store_uploader.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py b/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py index bbde3a69..11389826 100644 --- a/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py +++ b/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py @@ -70,7 +70,11 @@ def __init__(self, request_id: str, input_queue: Queue, def handle_sigterm(self, signum, frame): # This method will be called when SIGTERM is received - print("SIGTERM received, but ignored.") + self.logger.debug( + "SIGTERM received, but ignored", + extra={'requestId': self.request_id, + "place": PLACE, + "qsize": self.input_queue.qsize()}) def service_work_queue(self): import time From 5b3a3aff40bbd1212f36a5b243e3ce31f40ceea4 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Tue, 15 Oct 2024 14:18:11 -0600 Subject: [PATCH 4/4] Eliminate broker_connection_retry_on_startup warning --- transformer_sidecar/src/transformer_sidecar/transformer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index 55a99a59..1350fff2 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -509,6 +509,7 @@ def prepend_xcache(file_paths: list[str]) -> list[str]: app.conf.worker_hijack_root_logger = False app.conf.worker_redirect_stdouts_level = 'DEBUG' app.conf.worker_prefetch_multiplier = 1 + app.conf.broker_connection_retry_on_startup = True init(_args, app) logger.debug(