diff --git a/cads_catalogue/entry_points.py b/cads_catalogue/entry_points.py index de69d1d..61b2003 100644 --- a/cads_catalogue/entry_points.py +++ b/cads_catalogue/entry_points.py @@ -29,6 +29,7 @@ maintenance, manager, messages, + object_storage, validations, ) @@ -210,6 +211,14 @@ def update_catalogue( raise ValueError("%r is not a folder" % licences_folder_path) if not os.path.isdir(messages_folder_path) and not exclude_messages: raise ValueError("%r is not a folder" % messages_folder_path) + + # test object storage connection, with a timeout (it may raise an error) + logger.info("testing connection to object storage") + storage_settings = config.ensure_storage_settings(config.storagesettings) + object_storage.test_connection_with_timeout( + 15, storage_settings.object_storage_url, storage_settings.storage_kws + ) + filter_is_active = bool( include or exclude or exclude_resources or exclude_licences or exclude_messages ) @@ -230,9 +239,6 @@ def update_catalogue( logger.info("checking database structure") database.init_database(connection_string) - # get storage parameters from environment - storage_settings = config.ensure_storage_settings(config.storagesettings) - paths_db_hash_map = [ (CATALOGUE_DIR, "catalogue_repo_commit"), (resources_folder_path, "metadata_repo_commit"), diff --git a/cads_catalogue/object_storage.py b/cads_catalogue/object_storage.py index e70f92b..48c88e5 100644 --- a/cads_catalogue/object_storage.py +++ b/cads_catalogue/object_storage.py @@ -134,6 +134,29 @@ def setup_bucket(client, bucket_name) -> None: logger.warning(f"unable to set CORS policy on bucket {bucket_name}") +def test_connection(object_storage_url, storage_kws): + try: + client = boto3.client("s3", endpoint_url=object_storage_url, **storage_kws) + client.list_buckets() + logger.debug("connection to object storage is ok.") + except Exception: + logger.error( + "connection to object storage currently doesn't work. " + "Check connection parameters or try again later." + ) + raise + + +def test_connection_with_timeout(timeout, object_storage_url, storage_kws): + timeout_msg = ( + "connection to object storage currently doesn't work. " + "Check connection parameters or try again later." + ) + utils.run_function_with_timeout( + timeout, timeout_msg, test_connection, args=(object_storage_url, storage_kws) + ) + + def store_file( file_path: str | pathlib.Path, object_storage_url: str, # type: ignore diff --git a/cads_catalogue/utils.py b/cads_catalogue/utils.py index b840e42..5fc9f1f 100644 --- a/cads_catalogue/utils.py +++ b/cads_catalogue/utils.py @@ -19,6 +19,7 @@ import html.parser import json import mimetypes +import multiprocessing as mp import pathlib import subprocess import urllib.parse @@ -35,6 +36,32 @@ def is_url(astring): return False +def run_function_with_timeout(timeout, timeout_msg, function, args=(), kwargs=None): + """Run a function with a timeout inside a child process. + + Raise an error in case of timeout or child process raises an error. + + Parameters + ---------- + :param timeout: timeout in seconds + :param timeout_msg: timeout message + :param function: function to run + :param args: args of the function + :param kwargs: kwargs of the function + """ + if kwargs is None: + kwargs = dict() + process = mp.Process(target=function, args=args, kwargs=kwargs) + process.start() + process.join(timeout=timeout) + if process.is_alive(): + process.terminate() + raise TimeoutError(timeout_msg) + if process.exitcode == 1: + # parent process raises as well + raise ValueError(f"error calling {function.__name__}") + + class TagReplacer(html.parser.HTMLParser): """Translate a html text replacing tag data by some functions.""" diff --git a/tests/test_01_utils.py b/tests/test_01_utils.py index 046c6b9..faabe20 100644 --- a/tests/test_01_utils.py +++ b/tests/test_01_utils.py @@ -1,5 +1,6 @@ import os.path import subprocess +import time import pytest import pytest_mock @@ -92,3 +93,39 @@ def test_str2bool() -> None: assert utils.str2bool(value, raise_if_unknown=False, default=default) == default for value in strange_values: assert utils.str2bool(value, raise_if_unknown=False) is False + + +def sleep_function(secs, ret_value=None, raises=False): + # used to test run_function_with_timeout + time.sleep(secs) + if raises: + raise ValueError("this is a value error") + return ret_value + + +def test_run_function_with_timeout() -> None: + # run without any problem + utils.run_function_with_timeout( + 2, + "timeout message", + sleep_function, + ( + 1, + 1, + ), + ) + # run with timeout error + with pytest.raises(TimeoutError): + utils.run_function_with_timeout(1, "timeout message", sleep_function, (2, 1)) + # run with a raise + with pytest.raises(ValueError): + utils.run_function_with_timeout( + 2, + "timeout message", + sleep_function, + ( + 1, + 1, + ), + {"raises": True}, + ) diff --git a/tests/test_90_entry_points.py b/tests/test_90_entry_points.py index a312a0c..0dcc19e 100644 --- a/tests/test_90_entry_points.py +++ b/tests/test_90_entry_points.py @@ -133,6 +133,7 @@ def test_update_catalogue( _store_file = mocker.patch( "cads_catalogue.object_storage.store_file", return_value="an url" ) + mocker.patch("cads_catalogue.object_storage.test_connection") folder_commit_hashes = ( "e5658fef07333700272e36a43df0628efacb5f04", "5f662d202e4084dd569567bab0957c8a56f79c0f",