Skip to content

Commit

Permalink
Merge pull request #116 from ecmwf-projects/copds-1598
Browse files Browse the repository at this point in the history
test stored connection before run catalogue manager
  • Loading branch information
alex75 authored Apr 3, 2024
2 parents 445798f + ec6770f commit 52354a3
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 3 deletions.
12 changes: 9 additions & 3 deletions cads_catalogue/entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
maintenance,
manager,
messages,
object_storage,
validations,
)

Expand Down Expand Up @@ -210,6 +211,14 @@ def update_catalogue(
raise ValueError("%r is not a folder" % licences_folder_path)
if not os.path.isdir(messages_folder_path) and not exclude_messages:
raise ValueError("%r is not a folder" % messages_folder_path)

# test object storage connection, with a timeout (it may raise an error)
logger.info("testing connection to object storage")
storage_settings = config.ensure_storage_settings(config.storagesettings)
object_storage.test_connection_with_timeout(
15, storage_settings.object_storage_url, storage_settings.storage_kws
)

filter_is_active = bool(
include or exclude or exclude_resources or exclude_licences or exclude_messages
)
Expand All @@ -230,9 +239,6 @@ def update_catalogue(
logger.info("checking database structure")
database.init_database(connection_string)

# get storage parameters from environment
storage_settings = config.ensure_storage_settings(config.storagesettings)

paths_db_hash_map = [
(CATALOGUE_DIR, "catalogue_repo_commit"),
(resources_folder_path, "metadata_repo_commit"),
Expand Down
23 changes: 23 additions & 0 deletions cads_catalogue/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ def setup_bucket(client, bucket_name) -> None:
logger.warning(f"unable to set CORS policy on bucket {bucket_name}")


def test_connection(object_storage_url, storage_kws):
try:
client = boto3.client("s3", endpoint_url=object_storage_url, **storage_kws)
client.list_buckets()
logger.debug("connection to object storage is ok.")
except Exception:
logger.error(
"connection to object storage currently doesn't work. "
"Check connection parameters or try again later."
)
raise


def test_connection_with_timeout(timeout, object_storage_url, storage_kws):
timeout_msg = (
"connection to object storage currently doesn't work. "
"Check connection parameters or try again later."
)
utils.run_function_with_timeout(
timeout, timeout_msg, test_connection, args=(object_storage_url, storage_kws)
)


def store_file(
file_path: str | pathlib.Path,
object_storage_url: str, # type: ignore
Expand Down
27 changes: 27 additions & 0 deletions cads_catalogue/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import html.parser
import json
import mimetypes
import multiprocessing as mp
import pathlib
import subprocess
import urllib.parse
Expand All @@ -35,6 +36,32 @@ def is_url(astring):
return False


def run_function_with_timeout(timeout, timeout_msg, function, args=(), kwargs=None):
"""Run a function with a timeout inside a child process.
Raise an error in case of timeout or child process raises an error.
Parameters
----------
:param timeout: timeout in seconds
:param timeout_msg: timeout message
:param function: function to run
:param args: args of the function
:param kwargs: kwargs of the function
"""
if kwargs is None:
kwargs = dict()
process = mp.Process(target=function, args=args, kwargs=kwargs)
process.start()
process.join(timeout=timeout)
if process.is_alive():
process.terminate()
raise TimeoutError(timeout_msg)
if process.exitcode == 1:
# parent process raises as well
raise ValueError(f"error calling {function.__name__}")


class TagReplacer(html.parser.HTMLParser):
"""Translate a html text replacing tag data by some functions."""

Expand Down
37 changes: 37 additions & 0 deletions tests/test_01_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os.path
import subprocess
import time

import pytest
import pytest_mock
Expand Down Expand Up @@ -92,3 +93,39 @@ def test_str2bool() -> None:
assert utils.str2bool(value, raise_if_unknown=False, default=default) == default
for value in strange_values:
assert utils.str2bool(value, raise_if_unknown=False) is False


def sleep_function(secs, ret_value=None, raises=False):
# used to test run_function_with_timeout
time.sleep(secs)
if raises:
raise ValueError("this is a value error")
return ret_value


def test_run_function_with_timeout() -> None:
# run without any problem
utils.run_function_with_timeout(
2,
"timeout message",
sleep_function,
(
1,
1,
),
)
# run with timeout error
with pytest.raises(TimeoutError):
utils.run_function_with_timeout(1, "timeout message", sleep_function, (2, 1))
# run with a raise
with pytest.raises(ValueError):
utils.run_function_with_timeout(
2,
"timeout message",
sleep_function,
(
1,
1,
),
{"raises": True},
)
1 change: 1 addition & 0 deletions tests/test_90_entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def test_update_catalogue(
_store_file = mocker.patch(
"cads_catalogue.object_storage.store_file", return_value="an url"
)
mocker.patch("cads_catalogue.object_storage.test_connection")
folder_commit_hashes = (
"e5658fef07333700272e36a43df0628efacb5f04",
"5f662d202e4084dd569567bab0957c8a56f79c0f",
Expand Down

0 comments on commit 52354a3

Please sign in to comment.