From bc19dc12bd7be9c98bc8ed4257e09dad140303c1 Mon Sep 17 00:00:00 2001 From: Uditha Atukorala Date: Fri, 28 Jul 2023 11:26:06 +0100 Subject: [PATCH 1/3] (datastore) thread-safe maybe fault tolerant pg connections --- src/datastore/CMakeLists.txt | 1 + src/datastore/config.h | 1 + src/datastore/pg.cpp | 39 +++++++++++++++++++++------- src/datastore/pg.h | 40 ++++++++++++++++++++++++----- src/datastore/pg_test.cpp | 50 ++++++++++++++++++++++++++++++++++++ src/err/errors.h | 3 +++ 6 files changed, 119 insertions(+), 15 deletions(-) create mode 100644 src/datastore/pg_test.cpp diff --git a/src/datastore/CMakeLists.txt b/src/datastore/CMakeLists.txt index a0631dfe..1c9c7cae 100644 --- a/src/datastore/CMakeLists.txt +++ b/src/datastore/CMakeLists.txt @@ -47,6 +47,7 @@ if (GATEKEEPER_ENABLE_TESTING) access-policies_test.cpp collections_test.cpp identities_test.cpp + pg_test.cpp rbac-policies_test.cpp redis_test.cpp roles_test.cpp diff --git a/src/datastore/config.h b/src/datastore/config.h index 03cc00ce..d54221c9 100644 --- a/src/datastore/config.h +++ b/src/datastore/config.h @@ -27,6 +27,7 @@ struct config { struct pg_t { std::string opts; + duration_t timeout = 1000ms; }; struct redis_t { diff --git a/src/datastore/pg.cpp b/src/datastore/pg.cpp index 556ba404..4a799f6d 100644 --- a/src/datastore/pg.cpp +++ b/src/datastore/pg.cpp @@ -1,22 +1,43 @@ #include "pg.h" -static std::shared_ptr _conn = nullptr; +#include "err/errors.h" + +static datastore::config::pg_t _conf; +static datastore::pg::conn_t _conn = nullptr; namespace datastore { namespace pg { -std::shared_ptr conn() { - // FIXME: check if initialised - return _conn; +conn_t::element_type &connection::conn() const { + return *_conn; } -result_t exec(std::string_view qry) { - nontxn_t tx(*conn()); - return tx.exec(qry); +conn_t::element_type &connection::reconnect() { + _conn = connect(); + return *_conn; } -void init(const config::pg_t &c) { +connection conn() { + if (!_conn) { + throw err::DatastorePgConnectionUnavailable(); + } + + static std::timed_mutex mutex; + if (!mutex.try_lock_for(_conf.timeout)) { + throw err::DatastorePgTimeout(); + } + + return connection(_conn, connection::lock_t(mutex, std::adopt_lock)); +} + +conn_t connect() { // Ref: https://www.postgresql.org/docs/current/libpq-envars.html - _conn = std::make_shared(c.opts); + _conn = std::make_shared(_conf.opts); + return _conn; +} + +void init(const config::pg_t &c) { + _conf = c; + connect(); } } // namespace pg } // namespace datastore diff --git a/src/datastore/pg.h b/src/datastore/pg.h index 96cf1114..5eaba2ac 100644 --- a/src/datastore/pg.h +++ b/src/datastore/pg.h @@ -6,7 +6,7 @@ namespace datastore { namespace pg { -using conn_t = pqxx::connection; +using conn_t = std::shared_ptr; using row_t = pqxx::row; using result_t = pqxx::result; using nontxn_t = pqxx::nontransaction; @@ -14,13 +14,41 @@ using nontxn_t = pqxx::nontransaction; using fkey_violation_t = pqxx::foreign_key_violation; using unique_violation_t = pqxx::unique_violation; -std::shared_ptr conn(); +class connection { +public: + using lock_t = std::unique_lock; -result_t exec(std::string_view qry); + connection(const conn_t &conn, lock_t &&lock) noexcept : _conn(conn), _lock(std::move(lock)) {} -template inline result_t exec(std::string_view qry, Args &&...args) { - nontxn_t tx(*conn()); - return tx.exec_params(pqxx::zview(qry), args...); + auto exec(std::string_view qry, auto &&...args) { + try { + return nontxn_exec(qry, std::forward(args)...); + } catch (const pqxx::broken_connection &e) { + // Try to reconnect, if it fails will throw an error + reconnect(); + } + + return nontxn_exec(qry, std::forward(args)...); + } + +private: + result_t nontxn_exec(std::string_view qry, auto &&...args) const { + nontxn_t tx(conn()); + return tx.exec_params(pqxx::zview(qry), std::forward(args)...); + } + + conn_t::element_type &conn() const; + conn_t::element_type &reconnect(); + + conn_t _conn; + lock_t _lock; +}; + +connection conn(); +conn_t connect(); + +inline auto exec(std::string_view qry, auto &&...args) { + return conn().exec(qry, std::forward(args)...); } void init(const config::pg_t &c); diff --git a/src/datastore/pg_test.cpp b/src/datastore/pg_test.cpp new file mode 100644 index 00000000..3cfdbbb4 --- /dev/null +++ b/src/datastore/pg_test.cpp @@ -0,0 +1,50 @@ +#include + +#include + +#include "pg.h" +#include "testing.h" + +TEST(pg, concurrency) { + if (std::thread::hardware_concurrency() < 2) { + GTEST_SKIP() << "Not enough hardware support to run concurrency tests"; + } + + auto conf = datastore::testing::conf(); + conf.pg.timeout = 50ms; + ASSERT_NO_THROW(datastore::pg::init(conf.pg)); + + // Success: timeout while waiting for connection lock + { + std::thread t1([conf]() { + auto conn = datastore::pg::conn(); + std::this_thread::sleep_for(conf.pg.timeout * 5); + }); + + std::thread t2([]() { + // Connection is locked into t1 scope, expect a timeout + EXPECT_THROW(datastore::pg::conn(), err::DatastorePgTimeout); + }); + + t1.join(); + t2.join(); + } +} + +TEST(pg, conn) { + // Error: connection unavailable + { EXPECT_THROW(datastore::pg::conn(), err::DatastorePgConnectionUnavailable); } +} + +TEST(pg, reconnect) { + auto conf = datastore::testing::conf(); + ASSERT_NO_THROW(datastore::pg::init(conf.pg)); + + // Success: reconnect + { + auto c = datastore::pg::connect(); + c->close(); + + EXPECT_NO_THROW(datastore::pg::exec("select 'ping';")); + } +} diff --git a/src/err/errors.h b/src/err/errors.h index cfb14d4f..e2878815 100644 --- a/src/err/errors.h +++ b/src/err/errors.h @@ -3,6 +3,9 @@ #include "basic_error.h" namespace err { +using DatastorePgConnectionUnavailable = basic_error<"gk:1.0.5.503", "Unavailable">; +using DatastorePgTimeout = basic_error<"gk:1.0.6.503", "Operation timed out">; + using DatastoreRedisCommandError = basic_error<"gk:1.0.4.503", "Unavailable">; using DatastoreRedisConnectionFailure = basic_error<"gk:1.0.1.503", "Unavailable">; using DatastoreRedisConnectionUnavailable = basic_error<"gk:1.0.3.503", "Unavailable">; From 8ed56711ff044bbd98c1f2458450744db1e3a424 Mon Sep 17 00:00:00 2001 From: Uditha Atukorala Date: Fri, 28 Jul 2023 17:08:31 +0100 Subject: [PATCH 2/3] tidy-up the use of inline --- src/datastore/redis.h | 2 +- src/err/basic_error.h | 2 +- src/err/fixed_string.h | 6 +++--- src/logger/logger.h | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/datastore/redis.h b/src/datastore/redis.h index 828e5698..47bbf943 100644 --- a/src/datastore/redis.h +++ b/src/datastore/redis.h @@ -20,7 +20,7 @@ class connection { connection(const context_t &ptr, lock_t &&lock) noexcept : _ctx(ptr), _lock(std::move(lock)) {} - template inline reply_t cmd(const std::string_view str, Args &&...args) { + template reply_t cmd(const std::string_view str, Args &&...args) { reply_t reply( static_cast(redisCommand(ctx(), str.data(), args...)), freeReplyObject); diff --git a/src/err/basic_error.h b/src/err/basic_error.h index ec7647ea..d349bed9 100644 --- a/src/err/basic_error.h +++ b/src/err/basic_error.h @@ -16,7 +16,7 @@ template struct basic_error : public std::runti std::strcat(_err, M.c_str()); } - inline std::string_view str() const noexcept { return _err; } + std::string_view str() const noexcept { return _err; } friend std::ostream &operator<<(std::ostream &os, const basic_error &err) { return os << err.str(); diff --git a/src/err/fixed_string.h b/src/err/fixed_string.h index a7ca46dc..48ae46ab 100644 --- a/src/err/fixed_string.h +++ b/src/err/fixed_string.h @@ -7,11 +7,11 @@ namespace err { template struct fixed_string { constexpr fixed_string(const char (&str)[N]) { std::copy_n(str, N, value); } - inline const char *c_str() const noexcept { return value; } + const char *c_str() const noexcept { return value; } - constexpr inline std::size_t size() const noexcept { return N; } + constexpr std::size_t size() const noexcept { return N; } - inline std::string_view str() const noexcept { return value; } + std::string_view str() const noexcept { return value; } char value[N]; }; diff --git a/src/logger/logger.h b/src/logger/logger.h index fe9570fd..90985886 100644 --- a/src/logger/logger.h +++ b/src/logger/logger.h @@ -20,7 +20,7 @@ inline void log(std::string_view severity, std::string_view source, Args &&...ar std::cout << glz::write_json(obj) << std::endl; } -void critical(std::string_view source, auto &&...args) { +inline void critical(std::string_view source, auto &&...args) { log("critical", source, std::forward(args)...); std::exit(EXIT_FAILURE); } From 80e69c12bb8a16bd190f55a8d10e231ceab3f606 Mon Sep 17 00:00:00 2001 From: Uditha Atukorala Date: Fri, 28 Jul 2023 17:15:44 +0100 Subject: [PATCH 3/3] (datastore) tidy-up includes --- src/datastore/pg.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/datastore/pg.h b/src/datastore/pg.h index 5eaba2ac..8a5b61bc 100644 --- a/src/datastore/pg.h +++ b/src/datastore/pg.h @@ -1,5 +1,8 @@ #pragma once +#include +#include + #include #include "config.h"