diff --git a/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py b/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py index 25944e52..11389826 100644 --- a/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py +++ b/transformer_sidecar/src/transformer_sidecar/object_store_uploader.py @@ -27,6 +27,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging import os +import signal from multiprocessing import Process from pathlib import Path from queue import Queue @@ -58,12 +59,23 @@ class ObjectStoreUploader(Process): def __init__(self, request_id: str, input_queue: Queue, logger: logging.Logger, convert_root_to_parquet: bool): + super().__init__(target=self.service_work_queue) self.request_id = request_id self.input_queue = input_queue self.logger = logger self.convert_root_to_parquet = convert_root_to_parquet + signal.signal(signal.SIGTERM, self.handle_sigterm) + + def handle_sigterm(self, signum, frame): + # This method will be called when SIGTERM is received + self.logger.debug( + "SIGTERM received, but ignored", + extra={'requestId': self.request_id, + "place": PLACE, + "qsize": self.input_queue.qsize()}) + def service_work_queue(self): import time self.logger.debug("Object store uploader starting.", diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index 1fc64187..1350fff2 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -29,6 +29,7 @@ import json import os import shutil +import sys import timeit from argparse import Namespace from hashlib import sha1, sha256 @@ -42,7 +43,8 @@ import time from celery import Celery, shared_task -from transformer_sidecar.science_container_command import ScienceContainerCommand +from transformer_sidecar.science_container_command import ScienceContainerCommand, \ + ScienceContainerException from transformer_sidecar.transformer_logging import initialize_logging from transformer_sidecar.transformer_stats import TransformerStats from transformer_sidecar.transformer_stats.aod_stats import AODStats # NOQA: 401 @@ -82,7 +84,7 @@ logger = initialize_logging() -@shared_task +@shared_task(acks_late=True) def transform_file( request_id, file_id, @@ -276,6 +278,9 @@ def transform_file( }, ) + except ScienceContainerException: + logger.exception("Science container not responding. Shutting down this transformer.") + sys.exit(0) except Exception as error: logger.exception(f"Received exception doing transform: {error}") rec = FileCompleteRecord( @@ -503,6 +508,8 @@ def prepend_xcache(file_paths: list[str]) -> list[str]: app.conf.task_create_missing_queues = False app.conf.worker_hijack_root_logger = False app.conf.worker_redirect_stdouts_level = 'DEBUG' + app.conf.worker_prefetch_multiplier = 1 + app.conf.broker_connection_retry_on_startup = True init(_args, app) logger.debug(