Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transform_file task uses ack_late=True #881

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
import os
import signal
from multiprocessing import Process
from pathlib import Path
from queue import Queue
Expand Down Expand Up @@ -58,12 +59,23 @@ class ObjectStoreUploader(Process):
def __init__(self, request_id: str, input_queue: Queue,
logger: logging.Logger,
convert_root_to_parquet: bool):

super().__init__(target=self.service_work_queue)
self.request_id = request_id
self.input_queue = input_queue
self.logger = logger
self.convert_root_to_parquet = convert_root_to_parquet

signal.signal(signal.SIGTERM, self.handle_sigterm)

def handle_sigterm(self, signum, frame):
# This method will be called when SIGTERM is received
self.logger.debug(
"SIGTERM received, but ignored",
extra={'requestId': self.request_id,
"place": PLACE,
"qsize": self.input_queue.qsize()})

def service_work_queue(self):
import time
self.logger.debug("Object store uploader starting.",
Expand Down
11 changes: 9 additions & 2 deletions transformer_sidecar/src/transformer_sidecar/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import json
import os
import shutil
import sys
import timeit
from argparse import Namespace
from hashlib import sha1, sha256
Expand All @@ -42,7 +43,8 @@
import time
from celery import Celery, shared_task

from transformer_sidecar.science_container_command import ScienceContainerCommand
from transformer_sidecar.science_container_command import ScienceContainerCommand, \
ScienceContainerException
from transformer_sidecar.transformer_logging import initialize_logging
from transformer_sidecar.transformer_stats import TransformerStats
from transformer_sidecar.transformer_stats.aod_stats import AODStats # NOQA: 401
Expand Down Expand Up @@ -82,7 +84,7 @@
logger = initialize_logging()


@shared_task
@shared_task(acks_late=True)
def transform_file(
request_id,
file_id,
Expand Down Expand Up @@ -276,6 +278,9 @@ def transform_file(
},
)

except ScienceContainerException:
logger.exception("Science container not responding. Shutting down this transformer.")
sys.exit(0)
except Exception as error:
logger.exception(f"Received exception doing transform: {error}")
rec = FileCompleteRecord(
Expand Down Expand Up @@ -503,6 +508,8 @@ def prepend_xcache(file_paths: list[str]) -> list[str]:
app.conf.task_create_missing_queues = False
app.conf.worker_hijack_root_logger = False
app.conf.worker_redirect_stdouts_level = 'DEBUG'
app.conf.worker_prefetch_multiplier = 1
app.conf.broker_connection_retry_on_startup = True
init(_args, app)

logger.debug(
Expand Down
Loading