From fc6d49295e929893685f38c455cfa1d8db5cadf2 Mon Sep 17 00:00:00 2001 From: Tiago Mota Date: Wed, 5 Jul 2023 16:43:15 +0100 Subject: [PATCH] Make usage of journal metadata optional through configuration --- core/src/main/resources/reference.conf | 45 +++++++++++++- .../config/AkkaPersistenceConfig.scala | 8 ++- .../journal/dao/BaseByteArrayJournalDao.scala | 27 ++++++--- .../postgres/journal/dao/FlatJournalDao.scala | 4 +- .../journal/dao/JournalMetadataQueries.scala | 19 ++++++ .../postgres/journal/dao/JournalQueries.scala | 14 +---- .../dao/NestedPartitionsJournalDao.scala | 4 +- .../journal/dao/PartitionedJournalDao.scala | 32 +++++----- ...cala => BaseByteArrayReadJournalDao.scala} | 47 ++------------- .../query/dao/FlatReadJournalDao.scala | 24 ++++++++ .../query/dao/PartitionedReadJournalDao.scala | 58 +++++++++++++++++++ .../dao/ReadJournalMetadataQueries.scala | 15 +++++ .../query/dao/ReadJournalQueries.scala | 8 --- ...application-with-use-journal-metadata.conf | 2 + ...application-with-use-journal-metadata.conf | 2 + ...application-with-use-journal-metadata.conf | 4 ++ .../journal/PostgresJournalPerfSpec.scala | 15 ++++- .../journal/PostgresJournalSpec.scala | 37 +++++++----- .../dao/JournalMetadataQueriesTest.scala | 22 +++++++ .../journal/dao/JournalQueriesTest.scala | 15 +---- .../CurrentEventsByTagWithGapsTest.scala | 4 +- .../query/JournalSequenceActorTest.scala | 9 +-- .../dao/ReadJournalMetadataQueriesTest.scala | 17 ++++++ .../query/dao/ReadJournalQueriesTest.scala | 5 -- docs/custom-dao.md | 2 +- 25 files changed, 295 insertions(+), 144 deletions(-) create mode 100644 core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala rename core/src/main/scala/akka/persistence/postgres/query/dao/{ByteArrayReadJournalDao.scala => BaseByteArrayReadJournalDao.scala} (58%) create mode 100644 core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala create mode 100644 core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala create mode 100644 core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala create mode 100644 core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf create mode 100644 core/src/test/resources/partitioned-application-with-use-journal-metadata.conf create mode 100644 core/src/test/resources/plain-application-with-use-journal-metadata.conf create mode 100644 core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala create mode 100644 core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 191e68d0..1bb83259 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -126,7 +126,17 @@ postgres-journal { metadata = "metadata" } } - + # Used to hold journal information that can be used to speed up queries + journalMetadata { + tableName = "journal_metadata" + schemaName = "" + columnNames = { + persistenceId = "persistence_id" + maxSequenceNumber = "max_sequence_number" + maxOrdering = "max_ordering" + minOrdering = "min_ordering" + } + } tags { tableName = "tags" schameName = "" @@ -176,6 +186,14 @@ postgres-journal { # to the same value for these other journals. use-shared-db = null + # This setting can be used to enable the usage of the data being stored + # at the journal_metadata table, in order to speed up some queries that would + # solely use the journal table. + # In case the metadata table does not hold the required information (not available yet), + # the logic fallback to the journal-only queries. + # This setting is disabled by default. + use-journal-metadata = false + slick { db { @@ -358,7 +376,18 @@ postgres-read-journal { # to the same value for these other journals. use-shared-db = null - dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao" + # This setting can be used to enable the usage of the data being stored + # at the journal_metadata table, in order to speed up some queries that would + # solely use the journal table. + # In case the metadata table does not hold the required information (not available yet), + # the logic fallback to the journal-only queries. + # This setting is disabled by default. + use-journal-metadata = false + + + # Replace with "akka.persistence.postgres.query.dao.PartitionedJournalDao" in order to leverage dedicated queries to + # partitioned journal. + dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao" # Confguration for akka.persistence.postgres.tag.TagIdResolver tags { @@ -402,7 +431,17 @@ postgres-read-journal { message = "message" } } - + # Used to hold journal information that can be used to speed up queries + journalMetadata { + tableName = "journal_metadata" + schemaName = "" + columnNames = { + persistenceId = "persistence_id" + maxSequenceNumber = "max_sequence_number" + maxOrdering = "max_ordering" + minOrdering = "min_ordering" + } + } tags { tableName = "tags" schameName = "" diff --git a/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala index 6d471427..78fece43 100644 --- a/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ object ConfigKeys { val useSharedDb = "use-shared-db" + val useJournalMetadata = "use-journal-metadata" } class SlickConfiguration(config: Config) { @@ -148,8 +149,10 @@ class JournalConfig(config: Config) { val tagsConfig = new TagsConfig(config) val tagsTableConfiguration = new TagsTableConfiguration(config) val useSharedDb: Option[String] = config.asOptionalNonEmptyString(ConfigKeys.useSharedDb) + val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false) + override def toString: String = - s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb)" + s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb,$useJournalMetadata)" } class SnapshotConfig(config: Config) { @@ -186,7 +189,8 @@ class ReadJournalConfig(config: Config) { val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true) val includeDeleted: Boolean = config.as[Boolean]("includeLogicallyDeleted", true) + val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false) override def toString: String = - s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)" + s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted,$useJournalMetadata)" } diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala index e432d5b4..27b76493 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala @@ -15,6 +15,7 @@ import akka.stream.scaladsl.{ Keep, Sink, Source } import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult } import akka.{ Done, NotUsed } import org.slf4j.{ Logger, LoggerFactory } +import slick.dbio.DBIOAction import slick.jdbc.JdbcBackend._ import scala.collection.immutable._ @@ -39,6 +40,9 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW val logger: Logger = LoggerFactory.getLogger(this.getClass) + lazy val metadataQueries: JournalMetadataQueries = new JournalMetadataQueries( + JournalMetadataTable(journalConfig.journalMetadataTableConfiguration)) + // This logging may block since we don't control how the user will configure logback // We can't use a Akka logging neither because we don't have an ActorSystem in scope and // we should not introduce another dependency here. @@ -138,15 +142,20 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { - db.run(queries.highestStoredSequenceNrForPersistenceId(persistenceId).result.headOption).flatMap { - case Some(maxSequenceNr) => - // journal_metadata has the max sequence nr stored - Future.successful(maxSequenceNr) - case None => - // journal_metadata has yet to store the max sequence number to this persistenceId - db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result) - .map(_.getOrElse(0L)) // Default to 0L when nothing is found for this persistenceId - } + val query = if (journalConfig.useJournalMetadata) { + metadataQueries.highestSequenceNrForPersistenceId(persistenceId).result.headOption.flatMap { + case Some(maxSequenceNr) => + // return the stored max sequence nr on journal metadata table + DBIOAction.successful(Some(maxSequenceNr)) + case None => + // journal metadata do not have information for this persistenceId -> fallback to standard behaviour + queries.highestSequenceNrForPersistenceId(persistenceId).result + } + } else + queries.highestSequenceNrForPersistenceId(persistenceId).result + + // Default to 0L when nothing is found for this persistenceId + db.run(query).map(_.getOrElse(0L)) } override def messages( diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/FlatJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/FlatJournalDao.scala index 502eda7b..12feb4c1 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/FlatJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/FlatJournalDao.scala @@ -13,9 +13,7 @@ class FlatJournalDao(val db: Database, val journalConfig: JournalConfig, seriali implicit val ec: ExecutionContext, val mat: Materializer) extends BaseByteArrayJournalDao { - val queries = new JournalQueries( - FlatJournalTable(journalConfig.journalTableConfiguration), - JournalMetadataTable(journalConfig.journalMetadataTableConfiguration)) + val queries = new JournalQueries(FlatJournalTable(journalConfig.journalTableConfiguration)) val tagDao = new SimpleTagDao(db, journalConfig.tagsTableConfiguration) val eventTagConverter = new CachedTagIdResolver(tagDao, journalConfig.tagsConfig) val serializer = new ByteArrayJournalSerializer(serialization, eventTagConverter) diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala new file mode 100644 index 00000000..9544c449 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala @@ -0,0 +1,19 @@ +package akka.persistence.postgres.journal.dao + +import slick.lifted.TableQuery + +class JournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) { + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = { + journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1) + } + + val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) + + private def _minAndMaxOrderingForPersistenceId( + persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] = + journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering)) + + val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _) +} diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala index d98feaaa..a968eb70 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala @@ -10,7 +10,7 @@ import io.circe.Json import slick.lifted.TableQuery import slick.sql.FixedSqlAction -class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTable: TableQuery[JournalMetadataTable]) { +class JournalQueries(journalTable: TableQuery[JournalTable]) { import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ @@ -51,17 +51,11 @@ class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTabl private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = journalTable.filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max - private def _highestStoredSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = { - journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1) - } - private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = journalTable.filter(_.deleted === true).filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) - val highestStoredSequenceNrForPersistenceId = Compiled(_highestStoredSequenceNrForPersistenceId _) - val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _) private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) = @@ -74,12 +68,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTabl val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct) - private def _minAndMaxOrderingStoredForPersistenceId( - persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] = - journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering)) - - val minAndMaxOrderingStoredForPersistenceId = Compiled(_minAndMaxOrderingStoredForPersistenceId _) - private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long], diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/NestedPartitionsJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/NestedPartitionsJournalDao.scala index bea9f471..30c34d66 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/NestedPartitionsJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/NestedPartitionsJournalDao.scala @@ -17,9 +17,7 @@ class NestedPartitionsJournalDao(db: Database, journalConfig: JournalConfig, ser implicit ec: ExecutionContext, mat: Materializer) extends FlatJournalDao(db, journalConfig, serialization) { - override val queries = new JournalQueries( - NestedPartitionsJournalTable(journalConfig.journalTableConfiguration), - JournalMetadataTable(journalConfig.journalMetadataTableConfiguration)) + override val queries = new JournalQueries(NestedPartitionsJournalTable(journalConfig.journalTableConfiguration)) private val journalTableCfg = journalConfig.journalTableConfiguration private val partitionSize = journalConfig.partitionsConfig.size private val partitionPrefix = journalConfig.partitionsConfig.prefix diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala index 8c7faa61..0016c24c 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala @@ -19,9 +19,7 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ implicit ec: ExecutionContext, mat: Materializer) extends FlatJournalDao(db, journalConfig, serialization) { - override val queries = new JournalQueries( - PartitionedJournalTable(journalConfig.journalTableConfiguration), - JournalMetadataTable(journalConfig.journalMetadataTableConfiguration)) + override val queries = new JournalQueries(PartitionedJournalTable(journalConfig.journalTableConfiguration)) private val journalTableCfg = journalConfig.journalTableConfiguration private val partitionSize = journalConfig.partitionsConfig.size private val partitionPrefix = journalConfig.partitionsConfig.prefix @@ -96,17 +94,23 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ fromSequenceNr: Long, toSequenceNr: Long, max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = { - // Query the metadata table to get the known min and max ordering a persistence_id has, - // so that the postgres query planner might immediately discard scanning unnecessary partitions - val messagesQuery = queries.minAndMaxOrderingStoredForPersistenceId(persistenceId).result.headOption.flatMap { - case Some((minOrdering, maxOrdering)) => - queries - .messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering) - .result - case None => - queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result - } - Source.fromPublisher(db.stream(messagesQuery)).via(serializer.deserializeFlow) + // This behaviour override is only applied here, because it is only useful on the PartitionedJournal strategy. + val query = if (journalConfig.useJournalMetadata) { + metadataQueries.minAndMaxOrderingForPersistenceId(persistenceId).result.headOption.flatMap { + case Some((minOrdering, maxOrdering)) => + // if journal_metadata knows the min and max ordering of a persistenceId, + // use them to help the query planner to avoid scanning unnecessary partitions. + queries + .messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering) + .result + case None => + // fallback to standard behaviour + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + } + } else + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + + Source.fromPublisher(db.stream(query)).via(serializer.deserializeFlow) } } diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/BaseByteArrayReadJournalDao.scala similarity index 58% rename from core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala rename to core/src/main/scala/akka/persistence/postgres/query/dao/BaseByteArrayReadJournalDao.scala index 24d80484..0ee55c17 100644 --- a/core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/BaseByteArrayReadJournalDao.scala @@ -9,7 +9,11 @@ package query.dao import akka.NotUsed import akka.persistence.PersistentRepr import akka.persistence.postgres.config.ReadJournalConfig -import akka.persistence.postgres.journal.dao.{ BaseJournalDaoWithReadMessages, ByteArrayJournalSerializer } +import akka.persistence.postgres.journal.dao.{ + BaseJournalDaoWithReadMessages, + ByteArrayJournalSerializer, + JournalMetadataTable +} import akka.persistence.postgres.serialization.FlowPersistentReprSerializer import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver } import akka.serialization.Serialization @@ -63,44 +67,3 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith db.run(queries.maxOrdering.result) } } - -class ByteArrayReadJournalDao( - val db: Database, - val readJournalConfig: ReadJournalConfig, - serialization: Serialization, - val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer) - extends BaseByteArrayReadJournalDao { - val queries = new ReadJournalQueries(readJournalConfig) - val serializer = new ByteArrayJournalSerializer( - serialization, - new CachedTagIdResolver( - new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), - readJournalConfig.tagsConfig)) -} - -class PartitionedReadJournalDao( - db: Database, - readJournalConfig: ReadJournalConfig, - serialization: Serialization, - tagIdResolver: TagIdResolver)(implicit ec: ExecutionContext, mat: Materializer) - extends ByteArrayReadJournalDao(db, readJournalConfig, serialization, tagIdResolver) { - - import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ - - override def messages( - persistenceId: String, - fromSequenceNr: Long, - toSequenceNr: Long, - max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = { - val messagesQuery = queries.minAndMaxOrderingStoredForPersistenceId(persistenceId).result.headOption.flatMap { - case Some((minOrdering, maxOrdering)) => - queries - .messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering) - .result - case None => - queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result - } - - Source.fromPublisher(db.stream(messagesQuery)).via(serializer.deserializeFlow) - } -} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala new file mode 100644 index 00000000..3ed462af --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala @@ -0,0 +1,24 @@ +package akka.persistence.postgres.query.dao + +import akka.persistence.postgres.config.ReadJournalConfig +import akka.persistence.postgres.journal.dao.ByteArrayJournalSerializer +import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver } +import akka.serialization.Serialization +import akka.stream.Materializer +import slick.jdbc.JdbcBackend.Database + +import scala.concurrent.ExecutionContext + +class FlatReadJournalDao( + val db: Database, + val readJournalConfig: ReadJournalConfig, + serialization: Serialization, + val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer) + extends BaseByteArrayReadJournalDao { + val queries = new ReadJournalQueries(readJournalConfig) + val serializer = new ByteArrayJournalSerializer( + serialization, + new CachedTagIdResolver( + new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), + readJournalConfig.tagsConfig)) +} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala new file mode 100644 index 00000000..2cb90e98 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala @@ -0,0 +1,58 @@ +package akka.persistence.postgres.query.dao + +import akka.NotUsed +import akka.persistence.PersistentRepr +import akka.persistence.postgres.config.ReadJournalConfig +import akka.persistence.postgres.journal.dao.{ ByteArrayJournalSerializer, JournalMetadataTable } +import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver } +import akka.serialization.Serialization +import akka.stream.Materializer +import akka.stream.scaladsl.Source +import slick.jdbc.JdbcBackend.Database + +import scala.concurrent.ExecutionContext +import scala.util.Try + +class PartitionedReadJournalDao( + val db: Database, + val readJournalConfig: ReadJournalConfig, + serialization: Serialization, + val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer) + extends BaseByteArrayReadJournalDao { + + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + val queries = new ReadJournalQueries(readJournalConfig) + private val metadataQueries: ReadJournalMetadataQueries = new ReadJournalMetadataQueries( + JournalMetadataTable(readJournalConfig.journalMetadataTableConfiguration)) + + val serializer = new ByteArrayJournalSerializer( + serialization, + new CachedTagIdResolver( + new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), + readJournalConfig.tagsConfig)) + + override def messages( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long, + max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = { + // This behaviour override is only applied here, because it is only useful on the PartitionedJournal strategy. + val query = if (readJournalConfig.useJournalMetadata) { + metadataQueries.minAndMaxOrderingForPersistenceId(persistenceId).result.headOption.flatMap { + case Some((minOrdering, maxOrdering)) => + // if journal_metadata knows the min and max ordering of a persistenceId, + // use them to help the query planner to avoid scanning unnecessary partitions. + queries + .messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering) + .result + case None => + // fallback to standard behaviour + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + } + } else + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + + Source.fromPublisher(db.stream(query)).via(serializer.deserializeFlow) + } +} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala new file mode 100644 index 00000000..eceea435 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala @@ -0,0 +1,15 @@ +package akka.persistence.postgres.query.dao + +import akka.persistence.postgres.journal.dao.JournalMetadataTable +import slick.lifted.TableQuery + +class ReadJournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) { + + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + private def _minAndMaxOrderingForPersistenceId( + persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] = + journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering)) + + val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _) +} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala index 1c842575..ffe045c3 100644 --- a/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala @@ -13,8 +13,6 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) { import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ private val journalTable: TableQuery[JournalTable] = FlatJournalTable(readJournalConfig.journalTableConfiguration) - private val journalMetadataTable: TableQuery[JournalMetadataTable] = - JournalMetadataTable.apply(readJournalConfig.journalMetadataTableConfiguration) private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] = baseTableQuery().map(_.persistenceId).distinct.take(max) @@ -25,12 +23,6 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) { val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _) - private def _minAndMaxOrderingStoredForPersistenceId( - persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] = - journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering)) - - val minAndMaxOrderingStoredForPersistenceId = Compiled(_minAndMaxOrderingStoredForPersistenceId _) - private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long], diff --git a/core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf b/core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf new file mode 100644 index 00000000..8ca28304 --- /dev/null +++ b/core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf @@ -0,0 +1,2 @@ +include "plain-application-with-use-journal-metadata.conf" +include "nested-partitions-journal.conf" diff --git a/core/src/test/resources/partitioned-application-with-use-journal-metadata.conf b/core/src/test/resources/partitioned-application-with-use-journal-metadata.conf new file mode 100644 index 00000000..ee77c852 --- /dev/null +++ b/core/src/test/resources/partitioned-application-with-use-journal-metadata.conf @@ -0,0 +1,2 @@ +include "plain-application-with-use-journal-metadata.conf" +include "partitioned-journal.conf" diff --git a/core/src/test/resources/plain-application-with-use-journal-metadata.conf b/core/src/test/resources/plain-application-with-use-journal-metadata.conf new file mode 100644 index 00000000..fd9b103e --- /dev/null +++ b/core/src/test/resources/plain-application-with-use-journal-metadata.conf @@ -0,0 +1,4 @@ +include "general.conf" +include "plain-application.conf" + +postgres-journal.use-journal-metadata = true diff --git a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala index 4a053391..c7d8d2c7 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala @@ -8,15 +8,15 @@ package akka.persistence.postgres.journal import akka.actor.Props import akka.persistence.CapabilityFlag import akka.persistence.journal.JournalPerfSpec -import akka.persistence.journal.JournalPerfSpec.{BenchActor, Cmd, ResetCounter} +import akka.persistence.journal.JournalPerfSpec.{ BenchActor, Cmd, ResetCounter } import akka.persistence.postgres.config._ import akka.persistence.postgres.db.SlickExtension import akka.persistence.postgres.util.Schema._ -import akka.persistence.postgres.util.{ClasspathResources, DropCreate} +import akka.persistence.postgres.util.{ ClasspathResources, DropCreate } import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ @@ -114,6 +114,9 @@ class NestedPartitionsJournalPerfSpecSharedDb class NestedPartitionsJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec("nested-partitions-application-with-hard-delete.conf", NestedPartitions) +class NestedPartitionsJournalPerfSpecUseJournalMetadata + extends PostgresJournalPerfSpec("nested-partitions-application-with-use-journal-metadata.conf", NestedPartitions) + class PartitionedJournalPerfSpec extends PostgresJournalPerfSpec("partitioned-application.conf", Partitioned) class PartitionedJournalPerfSpecSharedDb @@ -122,9 +125,15 @@ class PartitionedJournalPerfSpecSharedDb class PartitionedJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec("partitioned-application-with-hard-delete.conf", Partitioned) +class PartitionedJournalPerfSpecUseJournalMetadata + extends PostgresJournalPerfSpec("partitioned-application-with-use-journal-metadata.conf", Partitioned) + class PlainJournalPerfSpec extends PostgresJournalPerfSpec("plain-application.conf", Plain) class PlainJournalPerfSpecSharedDb extends PostgresJournalPerfSpec("plain-shared-db-application.conf", Plain) class PlainJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec("plain-application-with-hard-delete.conf", Plain) + +class PlainJournalPerfSpecUseJournalMetadata + extends PostgresJournalPerfSpec("plain-application-with-use-journal-metadata.conf", Plain) diff --git a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala index fb584a0c..6c881c75 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala @@ -6,24 +6,24 @@ package akka.persistence.postgres.journal import akka.actor.Actor -import akka.persistence.JournalProtocol.{ReplayedMessage, WriteMessages, WriteMessagesFailed} +import akka.persistence.JournalProtocol.{ ReplayedMessage, WriteMessages, WriteMessagesFailed } import akka.persistence.journal.JournalSpec import akka.persistence.postgres.config._ import akka.persistence.postgres.db.SlickExtension import akka.persistence.postgres.query.ScalaPostgresReadJournalOperations import akka.persistence.postgres.util.Schema._ -import akka.persistence.postgres.util.{ClasspathResources, DropCreate} +import akka.persistence.postgres.util.{ ClasspathResources, DropCreate } import akka.persistence.query.Sequence -import akka.persistence.{AtomicWrite, CapabilityFlag, PersistentImpl, PersistentRepr} +import akka.persistence.{ AtomicWrite, CapabilityFlag, PersistentImpl, PersistentRepr } import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{Minute, Span} -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.time.{ Minute, Span } +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.{ ExecutionContext, Future } abstract class PostgresJournalSpec(config: String, schemaType: SchemaType) extends JournalSpec(ConfigFactory.load(config)) @@ -65,13 +65,13 @@ abstract class PostgresJournalSpec(config: String, schemaType: SchemaType) writeMessages(1, repeatedSnr + 1, perId, sender.ref, writerUuid) // then - val msg = AtomicWrite(PersistentRepr( - payload = s"a-$repeatedSnr", - sequenceNr = repeatedSnr, - persistenceId = pid, - sender = sender.ref, - writerUuid = writerUuid - )) + val msg = AtomicWrite( + PersistentRepr( + payload = s"a-$repeatedSnr", + sequenceNr = repeatedSnr, + persistenceId = pid, + sender = sender.ref, + writerUuid = writerUuid)) val probe = TestProbe() journal ! WriteMessages(Seq(msg), probe.ref, actorInstanceId) @@ -137,6 +137,9 @@ class NestedPartitionsJournalSpecSharedDb class NestedPartitionsJournalSpecPhysicalDelete extends PostgresJournalSpec("nested-partitions-application-with-hard-delete.conf", NestedPartitions) +class NestedPartitionsJournalSpecUseJournalMetadata + extends PostgresJournalSpec("nested-partitions-application-with-use-journal-metadata.conf", NestedPartitions) + class PartitionedJournalSpec extends PostgresJournalSpec("partitioned-application.conf", Partitioned) with PartitionedJournalSpecTestCases @@ -147,6 +150,12 @@ class PartitionedJournalSpecPhysicalDelete extends PostgresJournalSpec("partitioned-application-with-hard-delete.conf", Partitioned) with PartitionedJournalSpecTestCases +class PartitionedJournalSpecUseJournalMetadata + extends PostgresJournalSpec("partitioned-application-with-use-journal-metadata.conf", Partitioned) + with PartitionedJournalSpecTestCases + class PlainJournalSpec extends PostgresJournalSpec("plain-application.conf", Plain) class PlainJournalSpecSharedDb extends PostgresJournalSpec("plain-shared-db-application.conf", Plain) class PlainJournalSpecPhysicalDelete extends PostgresJournalSpec("plain-application-with-hard-delete.conf", Plain) +class PlainJournalSpecUseJournalMetadata + extends PostgresJournalSpec("plain-application-with-use-journal-metadata.conf", Plain) diff --git a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala new file mode 100644 index 00000000..5c0c9378 --- /dev/null +++ b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala @@ -0,0 +1,22 @@ +package akka.persistence.postgres.journal.dao + +import akka.persistence.postgres.util.BaseQueryTest + +class JournalMetadataQueriesTest extends BaseQueryTest { + + it should "create SQL query for highestSequenceNrForPersistenceId" in withJournalMetadataQueries { queries => + queries.highestSequenceNrForPersistenceId( + "aaa") shouldBeSQL """select "max_sequence_number" from "journal_metadata" where "persistence_id" = ? limit 1""" + } + + it should "create SQL query for minAndMaxOrderingForPersistenceId" in withJournalMetadataQueries { queries => + queries.minAndMaxOrderingForPersistenceId( + "aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1""" + } + + private def withJournalMetadataQueries(f: JournalMetadataQueries => Unit): Unit = { + withActorSystem { implicit system => + f(new JournalMetadataQueries(JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))) + } + } +} diff --git a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala index 682e89ea..abd25ccc 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala @@ -20,22 +20,12 @@ class JournalQueriesTest extends BaseQueryTest { "aaa") shouldBeSQL """select max("sequence_number") from "journal" where "persistence_id" = ?""" } - it should "create SQL query for highestStoredSequenceNrForPersistenceId" in withJournalQueries { queries => - queries.highestStoredSequenceNrForPersistenceId( - "aaa") shouldBeSQL """select "max_sequence_number" from "journal_metadata" where "persistence_id" = ? limit 1""" - } - it should "create SQL query for selectByPersistenceIdAndMaxSequenceNumber" in withJournalQueries { queries => queries.selectByPersistenceIdAndMaxSequenceNumber( "aaa", 11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ("persistence_id" = ?) and ("sequence_number" <= ?) order by "sequence_number" desc""" } - it should "create SQL query for minAndMaxOrderingStoredForPersistenceId" in withJournalQueries { queries => - queries.minAndMaxOrderingStoredForPersistenceId( - "aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1""" - } - it should "create SQL query for messagesQuery" in withJournalQueries { queries => queries.messagesQuery( "aaa", @@ -84,10 +74,7 @@ class JournalQueriesTest extends BaseQueryTest { private def withJournalQueries(f: JournalQueries => Unit): Unit = { withActorSystem { implicit system => - f( - new JournalQueries( - FlatJournalTable.apply(journalConfig.journalTableConfiguration), - JournalMetadataTable.apply(journalConfig.journalMetadataTableConfiguration))) + f(new JournalQueries(FlatJournalTable.apply(journalConfig.journalTableConfiguration))) } } } diff --git a/core/src/test/scala/akka/persistence/postgres/query/CurrentEventsByTagWithGapsTest.scala b/core/src/test/scala/akka/persistence/postgres/query/CurrentEventsByTagWithGapsTest.scala index 19f9869e..539c36b5 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/CurrentEventsByTagWithGapsTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/CurrentEventsByTagWithGapsTest.scala @@ -46,9 +46,7 @@ class CurrentEventsByTagWithGapsTest }.futureValue val journalTable = schemaType.table(journalConfig.journalTableConfiguration) - val journalMetadataTable = - schemaType.metadataTable(journalConfig.journalMetadataTableConfiguration) - val journalQueries = new JournalQueries(journalTable, journalMetadataTable) + val journalQueries = new JournalQueries(journalTable) val journalOps = new JavaDslPostgresReadJournalOperations(system) val tag = "testTag" diff --git a/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala b/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala index 96d4da5c..201813d5 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala @@ -11,11 +11,7 @@ import akka.pattern.ask import akka.persistence.postgres.config.JournalSequenceRetrievalConfig import akka.persistence.postgres.db.ExtendedPostgresProfile import akka.persistence.postgres.query.JournalSequenceActor.{ GetMaxOrderingId, MaxOrderingId } -import akka.persistence.postgres.query.dao.{ - ByteArrayReadJournalDao, - PartitionedReadJournalDao, - TestProbeReadJournalDao -} +import akka.persistence.postgres.query.dao.{ FlatReadJournalDao, PartitionedReadJournalDao, TestProbeReadJournalDao } import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao } import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType } import akka.persistence.postgres.{ JournalRow, SharedActorSystemTestSpec } @@ -31,7 +27,6 @@ import slick.jdbc.{ JdbcBackend, JdbcCapabilities } import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util.Random abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends QueryTestSpec(schemaType.configName) { private val log = LoggerFactory.getLogger(classOf[JournalSequenceActorTest]) @@ -181,7 +176,7 @@ abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends Quer import system.dispatcher implicit val mat: Materializer = SystemMaterializer(system).materializer val readJournalDao = - new ByteArrayReadJournalDao( + new FlatReadJournalDao( db, readJournalConfig, SerializationExtension(system), diff --git a/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala new file mode 100644 index 00000000..bef12342 --- /dev/null +++ b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala @@ -0,0 +1,17 @@ +package akka.persistence.postgres.query.dao + +import akka.persistence.postgres.journal.dao.JournalMetadataTable +import akka.persistence.postgres.util.BaseQueryTest + +class ReadJournalMetadataQueriesTest extends BaseQueryTest { + it should "create SQL query for minAndMaxOrderingForPersistenceId" in withReadJournalMetadataQueries { queries => + queries.minAndMaxOrderingForPersistenceId( + "aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1""" + } + + private def withReadJournalMetadataQueries(f: ReadJournalMetadataQueries => Unit): Unit = { + withActorSystem { implicit system => + f(new ReadJournalMetadataQueries(JournalMetadataTable(readJournalConfig.journalMetadataTableConfiguration))) + } + } +} diff --git a/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala index 90a179cf..086623c6 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala @@ -8,11 +8,6 @@ class ReadJournalQueriesTest extends BaseQueryTest { queries.allPersistenceIdsDistinct(23L) shouldBeSQL """select distinct "persistence_id" from "journal" limit ?""" } - it should "create SQL query for minAndMaxOrderingStoredForPersistenceId" in withReadJournalQueries { queries => - queries.minAndMaxOrderingStoredForPersistenceId( - "aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1""" - } - it should "create SQL query for messagesQuery" in withReadJournalQueries { queries => queries.messagesQuery( "p1", diff --git a/docs/custom-dao.md b/docs/custom-dao.md index 3a4d8c72..2f980a40 100644 --- a/docs/custom-dao.md +++ b/docs/custom-dao.md @@ -23,7 +23,7 @@ postgres-snapshot-store { } postgres-read-journal { - dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao" + dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao" } ```