diff --git a/README.md b/README.md index 436c5f4c..5612bd52 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ You can read more about DAOs and schema variants in [the official documentation] To use `akka-persistence-postgres` in your SBT project, add the following to your `build.sbt`: ```scala -libraryDependencies += "com.swissborg" %% "akka-persistence-postgres" % "0.5.0" +libraryDependencies += "com.swissborg" %% "akka-persistence-postgres" % "0.6.0-RC1" ``` For a maven project add: @@ -29,7 +29,7 @@ For a maven project add: com.swissborg akka-persistence-postgres_2.13 - 0.5.0 + 0.6.0-RC1 ``` to your `pom.xml`. @@ -113,62 +113,10 @@ Example partition names: `j_myActor_0`, `j_myActor_1`, `j_worker_0` etc. Keep in mind that the default maximum length for a table name in Postgres is 63 bytes, so you should avoid any non-ascii characters in your `persistenceId`s and keep the `prefix` reasonably short. > :warning: Once any of the partitioning setting under `postgres-journal.tables.journal.partitions` branch is settled, you should never change it. Otherwise you might end up with PostgresExceptions caused by table name or range conflicts. -## Migration - -### Migration from akka-persistence-jdbc 4.0.0 -It is possible to migrate existing journals from Akka Persistence JDBC 4.0.0. -Since we decided to extract metadata from the serialized payload and store it in a separate column it is not possible to migrate exiting journal and snapshot store using plain SQL scripts. - -#### How migration works -Each journal event and snapshot has to be read, deserialized, metadata and tags must be extracted and then everything stored in the new table. - -We provide you with an optional artifact, `akka-persistence-postgres-migration` that brings to your project the necessary classes to automate the above process. - -**Important**: Our util classes neither drop nor update any old data. Original tables will be still there but renamed with an `old_` prefix. It's up to you when to drop them. -#### How to use plugin provided migrations -##### Add akka-persistence-migration to your project -Add the following to your `build.sbt` -``` -libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.5.0" -``` -For a maven project add: -```xml - - com.swisborg - akka-persistence-postgres-migration_2.13 - 0.5.0 - -``` -to your `pom.xml`. - -##### Create and run migrations: -```scala -import akka.persistence.postgres.migration.journal.Jdbc4JournalMigration -import akka.persistence.postgres.migration.snapshot.Jdbc4SnapshotStoreMigration - -for { -_ <- new Jdbc4JournalMigration(config).run() -_ <- new Jdbc4SnapshotStoreMigration(config).run() -} yield () -``` -**Very important note**: The migration has to be finished before your application starts any persistent actors! - -It's your choice whether you want to trigger migration manually or (recommended) leverage a database version control system of your choice (e.g. Flyway). - -#### Examples -An example Flyway-based migration can be found in the demo app: https://github.com/mkubala/demo-akka-persistence-postgres/blob/master/src/main/scala/com/github/mkubala/FlywayMigrationExample.scala - -### Migration from akka-persistence-postgres 0.4.0 to 0.5.0 -New indices need to be created on each partition, to avoid locking production databases for too long, it should be done in 2 steps: -1. manually create indices CONCURRENTLY, -2. deploy new release with migration scripts. - -#### Manually create indices CONCURRENTLY -Execute DDL statements produced by the [sample migration script](scripts/migration-0.5.0/partitioned/1-add-indices-manually.sql), adapt top level variables to match your journal configuration before executing. +## Migration -#### Deploy new release with migration scripts -See [sample flyway migration script](scripts/migration-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration. +Please see the documentation regarding migrations [here](https://swissborg.github.io/akka-persistence-postgres/migration). ## Contributing We are also always looking for contributions and new ideas, so if you’d like to join the project, check out the [open issues](https://github.com/SwissBorg/akka-persistence-postgres/issues), or post your own suggestions! diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 191e68d0..1db61189 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -126,7 +126,17 @@ postgres-journal { metadata = "metadata" } } - + # Used to hold journal information that can be used to speed up queries + journalMetadata { + tableName = "journal_metadata" + schemaName = "" + columnNames = { + persistenceId = "persistence_id" + maxSequenceNumber = "max_sequence_number" + maxOrdering = "max_ordering" + minOrdering = "min_ordering" + } + } tags { tableName = "tags" schameName = "" @@ -176,6 +186,14 @@ postgres-journal { # to the same value for these other journals. use-shared-db = null + # This setting can be used to enable the usage of the data being stored + # at the journal_metadata table, in order to speed up some queries that would + # solely use the journal table. + # In case the metadata table does not hold the required information (not available yet), + # the logic fallback to the journal-only queries. + # This setting is disabled by default. + use-journal-metadata = false + slick { db { @@ -358,7 +376,18 @@ postgres-read-journal { # to the same value for these other journals. use-shared-db = null - dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao" + # This setting can be used to enable the usage of the data being stored + # at the journal_metadata table, in order to speed up some queries that would + # solely use the journal table. + # In case the metadata table does not hold the required information (not available yet), + # the logic fallback to the journal-only queries. + # This setting is disabled by default. + use-journal-metadata = false + + + # Replace with "akka.persistence.postgres.query.dao.PartitionedReadJournalDao" in order to leverage dedicated queries to + # partitioned journal. + dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao" # Confguration for akka.persistence.postgres.tag.TagIdResolver tags { @@ -402,7 +431,17 @@ postgres-read-journal { message = "message" } } - + # Used to hold journal information that can be used to speed up queries + journalMetadata { + tableName = "journal_metadata" + schemaName = "" + columnNames = { + persistenceId = "persistence_id" + maxSequenceNumber = "max_sequence_number" + maxOrdering = "max_ordering" + minOrdering = "min_ordering" + } + } tags { tableName = "tags" schameName = "" diff --git a/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala index 7273886b..73e3c079 100644 --- a/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/postgres/config/AkkaPersistenceConfig.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration._ object ConfigKeys { val useSharedDb = "use-shared-db" + val useJournalMetadata = "use-journal-metadata" } class SlickConfiguration(config: Config) { @@ -49,6 +50,26 @@ class JournalTableConfiguration(config: Config) { override def toString: String = s"JournalTableConfiguration($tableName,$schemaName,$columnNames)" } +class JournalMetadataTableColumnNames(config: Config) { + private val cfg = config.asConfig("tables.journalMetadata.columnNames") + val id: String = cfg.as[String]("id", "id") + val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id") + val maxSequenceNumber: String = cfg.as[String]("maxSequenceNumber", "max_sequence_number") + val maxOrdering: String = cfg.as[String]("maxOrdering", "max_ordering") + val minOrdering: String = cfg.as[String]("minOrdering", "min_ordering") + + override def toString: String = + s"JournalMetadataTableColumnNames($id,$persistenceId,$maxSequenceNumber,$maxOrdering,$minOrdering)" +} + +class JournalMetadataTableConfiguration(config: Config) { + private val cfg = config.asConfig("tables.journalMetadata") + val tableName: String = cfg.as[String]("tableName", "journal_metadata") + val schemaName: Option[String] = cfg.as[String]("schemaName").trim + val columnNames: JournalMetadataTableColumnNames = new JournalMetadataTableColumnNames(config) + override def toString: String = s"JournalMetadataTableConfiguration($tableName,$schemaName,$columnNames)" +} + class SnapshotTableColumnNames(config: Config) { private val cfg = config.asConfig("tables.snapshot.columnNames") val persistenceId: String = cfg.as[String]("persistenceId", "persistence_id") @@ -86,7 +107,7 @@ class TagsTableConfiguration(config: Config) { } class JournalPluginConfig(config: Config) { - val dao: String = config.asString("dao", "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao") + val dao: String = config.asString("dao", "akka.persistence.postgres.journal.dao.FlatJournalDao") override def toString: String = s"JournalPluginConfig($dao)" } @@ -101,12 +122,12 @@ class BaseByteArrayJournalDaoConfig(config: Config) { } class ReadJournalPluginConfig(config: Config) { - val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao") + val dao: String = config.as[String]("dao", "akka.persistence.postgres.query.dao.FlatReadJournalDao") override def toString: String = s"ReadJournalPluginConfig($dao)" } class SnapshotPluginConfig(config: Config) { - val dao: String = config.as[String]("dao", "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao") + val dao: String = config.as[String]("dao", "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao") override def toString: String = s"SnapshotPluginConfig($dao)" } @@ -122,13 +143,16 @@ class TagsConfig(config: Config) { class JournalConfig(config: Config) { val partitionsConfig = new JournalPartitionsConfiguration(config) val journalTableConfiguration = new JournalTableConfiguration(config) + val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config) val pluginConfig = new JournalPluginConfig(config) val daoConfig = new BaseByteArrayJournalDaoConfig(config) val tagsConfig = new TagsConfig(config) val tagsTableConfiguration = new TagsTableConfiguration(config) val useSharedDb: Option[String] = config.asOptionalNonEmptyString(ConfigKeys.useSharedDb) + val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false) + override def toString: String = - s"JournalConfig($journalTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb)" + s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb,$useJournalMetadata)" } class SnapshotConfig(config: Config) { @@ -156,6 +180,7 @@ case class JournalSequenceRetrievalConfig( class ReadJournalConfig(config: Config) { val journalTableConfiguration = new JournalTableConfiguration(config) + val journalMetadataTableConfiguration = new JournalMetadataTableConfiguration(config) val journalSequenceRetrievalConfiguration = JournalSequenceRetrievalConfig(config) val pluginConfig = new ReadJournalPluginConfig(config) val tagsConfig = new TagsConfig(config) @@ -164,7 +189,8 @@ class ReadJournalConfig(config: Config) { val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true) val includeDeleted: Boolean = config.as[Boolean]("includeLogicallyDeleted", true) + val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false) override def toString: String = - s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)" + s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted,$useJournalMetadata)" } diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala index 41a61845..27b76493 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala @@ -15,6 +15,7 @@ import akka.stream.scaladsl.{ Keep, Sink, Source } import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult } import akka.{ Done, NotUsed } import org.slf4j.{ Logger, LoggerFactory } +import slick.dbio.DBIOAction import slick.jdbc.JdbcBackend._ import scala.collection.immutable._ @@ -39,6 +40,9 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW val logger: Logger = LoggerFactory.getLogger(this.getClass) + lazy val metadataQueries: JournalMetadataQueries = new JournalMetadataQueries( + JournalMetadataTable(journalConfig.journalMetadataTableConfiguration)) + // This logging may block since we don't control how the user will configure logback // We can't use a Akka logging neither because we don't have an ActorSystem in scope and // we should not introduce another dependency here. @@ -137,10 +141,22 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW private def highestMarkedSequenceNr(persistenceId: String) = queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result - override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = - for { - maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result) - } yield maybeHighestSeqNo.getOrElse(0L) + override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = { + val query = if (journalConfig.useJournalMetadata) { + metadataQueries.highestSequenceNrForPersistenceId(persistenceId).result.headOption.flatMap { + case Some(maxSequenceNr) => + // return the stored max sequence nr on journal metadata table + DBIOAction.successful(Some(maxSequenceNr)) + case None => + // journal metadata do not have information for this persistenceId -> fallback to standard behaviour + queries.highestSequenceNrForPersistenceId(persistenceId).result + } + } else + queries.highestSequenceNrForPersistenceId(persistenceId).result + + // Default to 0L when nothing is found for this persistenceId + db.run(query).map(_.getOrElse(0L)) + } override def messages( persistenceId: String, diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala new file mode 100644 index 00000000..9544c449 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueries.scala @@ -0,0 +1,19 @@ +package akka.persistence.postgres.journal.dao + +import slick.lifted.TableQuery + +class JournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) { + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = { + journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1) + } + + val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) + + private def _minAndMaxOrderingForPersistenceId( + persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] = + journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering)) + + val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _) +} diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala index 345127d7..9465020d 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala @@ -19,9 +19,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) { def writeJournalRows(xs: Seq[JournalRow]): FixedSqlAction[Option[Int], NoStream, slick.dbio.Effect.Write] = compiledJournalTable ++= xs.sortBy(_.sequenceNumber) - private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) = - journalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc) - def delete(persistenceId: String, toSequenceNr: Long): FixedSqlAction[Int, NoStream, slick.dbio.Effect.Write] = { journalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete } @@ -58,16 +55,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) { val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _) - private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) = - selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr) - - val selectByPersistenceIdAndMaxSequenceNumber = Compiled(_selectByPersistenceIdAndMaxSequenceNumber _) - - private def _allPersistenceIdsDistinct: Query[Rep[String], String, Seq] = - journalTable.map(_.persistenceId).distinct - - val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct) - private def _messagesQuery( persistenceId: Rep[String], fromSequenceNr: Rep[Long], @@ -81,6 +68,24 @@ class JournalQueries(journalTable: TableQuery[JournalTable]) { .sortBy(_.sequenceNumber.asc) .take(max) + private def _messagesOrderingBoundedQuery( + persistenceId: Rep[String], + fromSequenceNr: Rep[Long], + toSequenceNr: Rep[Long], + max: ConstColumn[Long], + minOrdering: Rep[Long], + maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] = + journalTable + .filter(_.persistenceId === persistenceId) + .filter(_.deleted === false) + .filter(_.sequenceNumber >= fromSequenceNr) + .filter(_.sequenceNumber <= toSequenceNr) + .filter(_.ordering >= minOrdering) + .filter(_.ordering <= maxOrdering) + .sortBy(_.sequenceNumber.asc) + .take(max) + val messagesQuery = Compiled(_messagesQuery _) + val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _) } diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalTables.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalTables.scala index 38735468..dc80cf79 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalTables.scala @@ -6,7 +6,7 @@ package akka.persistence.postgres package journal.dao -import akka.persistence.postgres.config.JournalTableConfiguration +import akka.persistence.postgres.config.{ JournalMetadataTableConfiguration, JournalTableConfiguration } import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ import io.circe.Json @@ -90,3 +90,31 @@ object NestedPartitionsJournalTable { def apply(journalTableCfg: JournalTableConfiguration): TableQuery[JournalTable] = FlatJournalTable.apply(journalTableCfg) } + +class JournalMetadataTable(_tableTag: Tag, journalMetadataTableCfg: JournalMetadataTableConfiguration) + extends Table[JournalMetadataRow]( + _tableTag, + _schemaName = journalMetadataTableCfg.schemaName, + _tableName = journalMetadataTableCfg.tableName) { + override def * = ( + id, + persistenceId, + maxSequenceNumber, + minOrdering, + maxOrdering) <> (JournalMetadataRow.tupled, JournalMetadataRow.unapply) + + val id: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.id) + val persistenceId: Rep[String] = + column[String](journalMetadataTableCfg.columnNames.persistenceId, O.Length(255, varying = true)) + val maxSequenceNumber: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.maxSequenceNumber) + val minOrdering: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.minOrdering) + val maxOrdering: Rep[Long] = column[Long](journalMetadataTableCfg.columnNames.maxOrdering) + + val pk = primaryKey(s"${tableName}_pk", persistenceId) +} + +object JournalMetadataTable { + def apply( + journalMetadataTableCfg: JournalMetadataTableConfiguration): TableQuery[JournalMetadataTable] = + TableQuery(tag => new JournalMetadataTable(tag, journalMetadataTableCfg)) +} diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala index 68792bf6..0016c24c 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/PartitionedJournalDao.scala @@ -1,16 +1,19 @@ package akka.persistence.postgres.journal.dao -import java.util.concurrent.atomic.AtomicReference - +import akka.NotUsed +import akka.persistence.PersistentRepr import akka.persistence.postgres.JournalRow import akka.persistence.postgres.config.JournalConfig import akka.persistence.postgres.db.DbErrors.{ withHandledIndexErrors, withHandledPartitionErrors } import akka.serialization.Serialization import akka.stream.Materializer +import akka.stream.scaladsl.Source import slick.jdbc.JdbcBackend.Database +import java.util.concurrent.atomic.AtomicReference import scala.collection.immutable.{ Nil, Seq } import scala.concurrent.{ ExecutionContext, Future } +import scala.util.Try class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serialization: Serialization)( implicit ec: ExecutionContext, @@ -85,4 +88,29 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ DBIO.successful(()) } } + + override def messages( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long, + max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = { + + // This behaviour override is only applied here, because it is only useful on the PartitionedJournal strategy. + val query = if (journalConfig.useJournalMetadata) { + metadataQueries.minAndMaxOrderingForPersistenceId(persistenceId).result.headOption.flatMap { + case Some((minOrdering, maxOrdering)) => + // if journal_metadata knows the min and max ordering of a persistenceId, + // use them to help the query planner to avoid scanning unnecessary partitions. + queries + .messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering) + .result + case None => + // fallback to standard behaviour + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + } + } else + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + + Source.fromPublisher(db.stream(query)).via(serializer.deserializeFlow) + } } diff --git a/core/src/main/scala/akka/persistence/postgres/package.scala b/core/src/main/scala/akka/persistence/postgres/package.scala index 979a44e2..145d8c9e 100644 --- a/core/src/main/scala/akka/persistence/postgres/package.scala +++ b/core/src/main/scala/akka/persistence/postgres/package.scala @@ -16,4 +16,11 @@ package object postgres { message: Array[Byte], tags: List[Int], metadata: Json) + + final case class JournalMetadataRow( + id: Long, + persistenceId: String, + maxSequenceNumber: Long, + minOrdering: Long, + maxOrdering: Long) } diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/BaseByteArrayReadJournalDao.scala similarity index 78% rename from core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala rename to core/src/main/scala/akka/persistence/postgres/query/dao/BaseByteArrayReadJournalDao.scala index 778ba038..0ee55c17 100644 --- a/core/src/main/scala/akka/persistence/postgres/query/dao/ByteArrayReadJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/BaseByteArrayReadJournalDao.scala @@ -9,7 +9,11 @@ package query.dao import akka.NotUsed import akka.persistence.PersistentRepr import akka.persistence.postgres.config.ReadJournalConfig -import akka.persistence.postgres.journal.dao.{ BaseJournalDaoWithReadMessages, ByteArrayJournalSerializer } +import akka.persistence.postgres.journal.dao.{ + BaseJournalDaoWithReadMessages, + ByteArrayJournalSerializer, + JournalMetadataTable +} import akka.persistence.postgres.serialization.FlowPersistentReprSerializer import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver } import akka.serialization.Serialization @@ -51,11 +55,10 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, - max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = { + max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = Source .fromPublisher(db.stream(queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result)) .via(serializer.deserializeFlow) - } override def journalSequence(offset: Long, limit: Long): Source[Long, NotUsed] = Source.fromPublisher(db.stream(queries.orderingByOrdering(offset, limit).result)) @@ -64,17 +67,3 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith db.run(queries.maxOrdering.result) } } - -class ByteArrayReadJournalDao( - val db: Database, - val readJournalConfig: ReadJournalConfig, - serialization: Serialization, - val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer) - extends BaseByteArrayReadJournalDao { - val queries = new ReadJournalQueries(readJournalConfig) - val serializer = new ByteArrayJournalSerializer( - serialization, - new CachedTagIdResolver( - new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), - readJournalConfig.tagsConfig)) -} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala new file mode 100644 index 00000000..515cf423 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/FlatReadJournalDao.scala @@ -0,0 +1,26 @@ +package akka.persistence.postgres.query.dao + +import akka.persistence.postgres.config.ReadJournalConfig +import akka.persistence.postgres.journal.dao.{ ByteArrayJournalSerializer, FlatJournalTable } +import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver } +import akka.serialization.Serialization +import akka.stream.Materializer +import slick.jdbc.JdbcBackend.Database + +import scala.concurrent.ExecutionContext + +class FlatReadJournalDao( + val db: Database, + val readJournalConfig: ReadJournalConfig, + serialization: Serialization, + val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer) + extends BaseByteArrayReadJournalDao { + val queries = new ReadJournalQueries( + FlatJournalTable(readJournalConfig.journalTableConfiguration), + readJournalConfig.includeDeleted) + val serializer = new ByteArrayJournalSerializer( + serialization, + new CachedTagIdResolver( + new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), + readJournalConfig.tagsConfig)) +} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala new file mode 100644 index 00000000..82da7647 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/PartitionedReadJournalDao.scala @@ -0,0 +1,64 @@ +package akka.persistence.postgres.query.dao + +import akka.NotUsed +import akka.persistence.PersistentRepr +import akka.persistence.postgres.config.ReadJournalConfig +import akka.persistence.postgres.journal.dao.{ + ByteArrayJournalSerializer, + JournalMetadataTable, + PartitionedJournalTable +} +import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver } +import akka.serialization.Serialization +import akka.stream.Materializer +import akka.stream.scaladsl.Source +import slick.jdbc.JdbcBackend.Database + +import scala.concurrent.ExecutionContext +import scala.util.Try + +class PartitionedReadJournalDao( + val db: Database, + val readJournalConfig: ReadJournalConfig, + serialization: Serialization, + val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer) + extends BaseByteArrayReadJournalDao { + + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + val queries = new ReadJournalQueries( + PartitionedJournalTable(readJournalConfig.journalTableConfiguration), + readJournalConfig.includeDeleted) + private val metadataQueries: ReadJournalMetadataQueries = new ReadJournalMetadataQueries( + JournalMetadataTable(readJournalConfig.journalMetadataTableConfiguration)) + + val serializer = new ByteArrayJournalSerializer( + serialization, + new CachedTagIdResolver( + new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), + readJournalConfig.tagsConfig)) + + override def messages( + persistenceId: String, + fromSequenceNr: Long, + toSequenceNr: Long, + max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = { + // This behaviour override is only applied here, because it is only useful on the PartitionedJournal strategy. + val query = if (readJournalConfig.useJournalMetadata) { + metadataQueries.minAndMaxOrderingForPersistenceId(persistenceId).result.headOption.flatMap { + case Some((minOrdering, maxOrdering)) => + // if journal_metadata knows the min and max ordering of a persistenceId, + // use them to help the query planner to avoid scanning unnecessary partitions. + queries + .messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering) + .result + case None => + // fallback to standard behaviour + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + } + } else + queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result + + Source.fromPublisher(db.stream(query)).via(serializer.deserializeFlow) + } +} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala new file mode 100644 index 00000000..eceea435 --- /dev/null +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueries.scala @@ -0,0 +1,15 @@ +package akka.persistence.postgres.query.dao + +import akka.persistence.postgres.journal.dao.JournalMetadataTable +import slick.lifted.TableQuery + +class ReadJournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) { + + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + private def _minAndMaxOrderingForPersistenceId( + persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] = + journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering)) + + val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _) +} diff --git a/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala index 380b9b71..76962b1b 100644 --- a/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/postgres/query/dao/ReadJournalQueries.scala @@ -6,19 +6,17 @@ package akka.persistence.postgres package query.dao -import akka.persistence.postgres.config.ReadJournalConfig -import akka.persistence.postgres.journal.dao.{ FlatJournalTable, JournalTable } +import akka.persistence.postgres.journal.dao.JournalTable +import slick.lifted.TableQuery -class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) { +class ReadJournalQueries(journalTable: TableQuery[JournalTable], includeDeleted: Boolean) { import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ - private val journalTable: TableQuery[JournalTable] = FlatJournalTable(readJournalConfig.journalTableConfiguration) - private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] = baseTableQuery().map(_.persistenceId).distinct.take(max) private def baseTableQuery() = - if (readJournalConfig.includeDeleted) journalTable + if (includeDeleted) journalTable else journalTable.filter(_.deleted === false) val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _) @@ -27,16 +25,34 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) { persistenceId: Rep[String], fromSequenceNr: Rep[Long], toSequenceNr: Rep[Long], - max: ConstColumn[Long]) = + max: ConstColumn[Long]): Query[JournalTable, JournalRow, Seq] = + baseTableQuery() + .filter(_.persistenceId === persistenceId) + .filter(_.sequenceNumber >= fromSequenceNr) + .filter(_.sequenceNumber <= toSequenceNr) + .sortBy(_.sequenceNumber.asc) + .take(max) + + private def _messagesOrderingBoundedQuery( + persistenceId: Rep[String], + fromSequenceNr: Rep[Long], + toSequenceNr: Rep[Long], + max: ConstColumn[Long], + minOrdering: Rep[Long], + maxOrdering: Rep[Long]): Query[JournalTable, JournalRow, Seq] = baseTableQuery() .filter(_.persistenceId === persistenceId) .filter(_.sequenceNumber >= fromSequenceNr) .filter(_.sequenceNumber <= toSequenceNr) + .filter(_.ordering >= minOrdering) + .filter(_.ordering <= maxOrdering) .sortBy(_.sequenceNumber.asc) .take(max) val messagesQuery = Compiled(_messagesQuery _) + val messagesOrderingBoundedQuery = Compiled(_messagesOrderingBoundedQuery _) + protected def _eventsByTag( tag: Rep[List[Int]], offset: ConstColumn[Long], diff --git a/core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf b/core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf new file mode 100644 index 00000000..8ca28304 --- /dev/null +++ b/core/src/test/resources/nested-partitions-application-with-use-journal-metadata.conf @@ -0,0 +1,2 @@ +include "plain-application-with-use-journal-metadata.conf" +include "nested-partitions-journal.conf" diff --git a/core/src/test/resources/partitioned-application-with-use-journal-metadata.conf b/core/src/test/resources/partitioned-application-with-use-journal-metadata.conf new file mode 100644 index 00000000..ee77c852 --- /dev/null +++ b/core/src/test/resources/partitioned-application-with-use-journal-metadata.conf @@ -0,0 +1,2 @@ +include "plain-application-with-use-journal-metadata.conf" +include "partitioned-journal.conf" diff --git a/core/src/test/resources/plain-application-with-use-journal-metadata.conf b/core/src/test/resources/plain-application-with-use-journal-metadata.conf new file mode 100644 index 00000000..fd9b103e --- /dev/null +++ b/core/src/test/resources/plain-application-with-use-journal-metadata.conf @@ -0,0 +1,4 @@ +include "general.conf" +include "plain-application.conf" + +postgres-journal.use-journal-metadata = true diff --git a/core/src/test/resources/schema/postgres/nested-partitions-schema.sql b/core/src/test/resources/schema/postgres/nested-partitions-schema.sql index fc31fdcd..affb6c4d 100644 --- a/core/src/test/resources/schema/postgres/nested-partitions-schema.sql +++ b/core/src/test/resources/schema/postgres/nested-partitions-schema.sql @@ -64,3 +64,48 @@ CREATE TABLE IF NOT EXISTS public.snapshot metadata jsonb NOT NULL, PRIMARY KEY (persistence_id, sequence_number) ); + +DROP TRIGGER IF EXISTS trig_update_journal_metadata ON public.journal; +DROP FUNCTION IF EXISTS public.update_journal_metadata(); +DROP TABLE IF EXISTS public.journal_metadata; + +CREATE TABLE public.journal_metadata( + id BIGINT GENERATED ALWAYS AS IDENTITY, + max_sequence_number BIGINT NOT NULL, + min_ordering BIGINT NOT NULL, + max_ordering BIGINT NOT NULL, + persistence_id TEXT NOT NULL, + PRIMARY KEY (persistence_id) +) PARTITION BY HASH(persistence_id); + +CREATE TABLE public.journal_metadata_0 PARTITION OF public.journal_metadata FOR VALUES WITH (MODULUS 2, REMAINDER 0); +CREATE TABLE public.journal_metadata_1 PARTITION OF public.journal_metadata FOR VALUES WITH (MODULUS 2, REMAINDER 1); + +CREATE OR REPLACE FUNCTION public.update_journal_metadata() RETURNS TRIGGER AS +$$ +DECLARE +BEGIN + INSERT INTO public.journal_metadata (persistence_id, max_sequence_number, max_ordering, min_ordering) + VALUES ( + NEW.persistence_id, + NEW.sequence_number, + NEW.ordering, + CASE + WHEN NEW.sequence_number = 1 THEN NEW.ordering + ELSE -1 + END + ) + ON CONFLICT (persistence_id) DO UPDATE + SET + max_sequence_number = GREATEST(public.journal_metadata.max_sequence_number, NEW.sequence_number), + max_ordering = GREATEST(public.journal_metadata.max_ordering, NEW.ordering); + + RETURN NEW; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER trig_update_journal_metadata + AFTER INSERT ON public.journal + FOR EACH ROW + EXECUTE PROCEDURE public.update_journal_metadata(); \ No newline at end of file diff --git a/core/src/test/resources/schema/postgres/partitioned-schema.sql b/core/src/test/resources/schema/postgres/partitioned-schema.sql index dc9f20cf..621dc00b 100644 --- a/core/src/test/resources/schema/postgres/partitioned-schema.sql +++ b/core/src/test/resources/schema/postgres/partitioned-schema.sql @@ -65,3 +65,48 @@ CREATE TABLE IF NOT EXISTS public.snapshot metadata jsonb NOT NULL, PRIMARY KEY (persistence_id, sequence_number) ); + +DROP TRIGGER IF EXISTS trig_update_journal_metadata ON public.journal; +DROP FUNCTION IF EXISTS public.update_journal_metadata(); +DROP TABLE IF EXISTS public.journal_metadata; + +CREATE TABLE public.journal_metadata( + id BIGINT GENERATED ALWAYS AS IDENTITY, + max_sequence_number BIGINT NOT NULL, + min_ordering BIGINT NOT NULL, + max_ordering BIGINT NOT NULL, + persistence_id TEXT NOT NULL, + PRIMARY KEY (persistence_id) +) PARTITION BY HASH(persistence_id); + +CREATE TABLE public.journal_metadata_0 PARTITION OF public.journal_metadata FOR VALUES WITH (MODULUS 2, REMAINDER 0); +CREATE TABLE public.journal_metadata_1 PARTITION OF public.journal_metadata FOR VALUES WITH (MODULUS 2, REMAINDER 1); + +CREATE OR REPLACE FUNCTION public.update_journal_metadata() RETURNS TRIGGER AS +$$ +DECLARE +BEGIN + INSERT INTO public.journal_metadata (persistence_id, max_sequence_number, max_ordering, min_ordering) + VALUES ( + NEW.persistence_id, + NEW.sequence_number, + NEW.ordering, + CASE + WHEN NEW.sequence_number = 1 THEN NEW.ordering + ELSE -1 + END + ) + ON CONFLICT (persistence_id) DO UPDATE + SET + max_sequence_number = GREATEST(public.journal_metadata.max_sequence_number, NEW.sequence_number), + max_ordering = GREATEST(public.journal_metadata.max_ordering, NEW.ordering); + + RETURN NEW; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER trig_update_journal_metadata + AFTER INSERT ON public.journal + FOR EACH ROW + EXECUTE PROCEDURE public.update_journal_metadata(); \ No newline at end of file diff --git a/core/src/test/resources/schema/postgres/plain-schema.sql b/core/src/test/resources/schema/postgres/plain-schema.sql index d38aec6f..cd1105a6 100644 --- a/core/src/test/resources/schema/postgres/plain-schema.sql +++ b/core/src/test/resources/schema/postgres/plain-schema.sql @@ -38,3 +38,48 @@ CREATE TABLE IF NOT EXISTS public.snapshot metadata jsonb NOT NULL, PRIMARY KEY (persistence_id, sequence_number) ); + +DROP TRIGGER IF EXISTS trig_update_journal_metadata ON public.journal; +DROP FUNCTION IF EXISTS public.update_journal_metadata(); +DROP TABLE IF EXISTS public.journal_metadata; + +CREATE TABLE public.journal_metadata( + id BIGINT GENERATED ALWAYS AS IDENTITY, + max_sequence_number BIGINT NOT NULL, + min_ordering BIGINT NOT NULL, + max_ordering BIGINT NOT NULL, + persistence_id TEXT NOT NULL, + PRIMARY KEY (persistence_id) +) PARTITION BY HASH(persistence_id); + +CREATE TABLE public.journal_metadata_0 PARTITION OF public.journal_metadata FOR VALUES WITH (MODULUS 2, REMAINDER 0); +CREATE TABLE public.journal_metadata_1 PARTITION OF public.journal_metadata FOR VALUES WITH (MODULUS 2, REMAINDER 1); + +CREATE OR REPLACE FUNCTION public.update_journal_metadata() RETURNS TRIGGER AS +$$ +DECLARE +BEGIN + INSERT INTO public.journal_metadata (persistence_id, max_sequence_number, max_ordering, min_ordering) + VALUES ( + NEW.persistence_id, + NEW.sequence_number, + NEW.ordering, + CASE + WHEN NEW.sequence_number = 1 THEN NEW.ordering + ELSE -1 + END + ) + ON CONFLICT (persistence_id) DO UPDATE + SET + max_sequence_number = GREATEST(public.journal_metadata.max_sequence_number, NEW.sequence_number), + max_ordering = GREATEST(public.journal_metadata.max_ordering, NEW.ordering); + + RETURN NEW; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER trig_update_journal_metadata + AFTER INSERT ON public.journal + FOR EACH ROW + EXECUTE PROCEDURE public.update_journal_metadata(); \ No newline at end of file diff --git a/core/src/test/scala/akka/persistence/postgres/SharedActorSystemTestSpec.scala b/core/src/test/scala/akka/persistence/postgres/SharedActorSystemTestSpec.scala index b4de7585..41b346b9 100644 --- a/core/src/test/scala/akka/persistence/postgres/SharedActorSystemTestSpec.scala +++ b/core/src/test/scala/akka/persistence/postgres/SharedActorSystemTestSpec.scala @@ -29,7 +29,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec implicit lazy val mat: Materializer = SystemMaterializer(system).materializer implicit lazy val ec: ExecutionContext = system.dispatcher - implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute) + implicit val pc: PatienceConfig = PatienceConfig(timeout = 2.minutes) implicit val timeout: Timeout = Timeout(1.minute) lazy val serialization = SerializationExtension(system) diff --git a/core/src/test/scala/akka/persistence/postgres/SingleActorSystemPerTestSpec.scala b/core/src/test/scala/akka/persistence/postgres/SingleActorSystemPerTestSpec.scala index d3287e68..2232cd9a 100644 --- a/core/src/test/scala/akka/persistence/postgres/SingleActorSystemPerTestSpec.scala +++ b/core/src/test/scala/akka/persistence/postgres/SingleActorSystemPerTestSpec.scala @@ -26,7 +26,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config) conf.withValue(path, configValue) }) - implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute) + implicit val pc: PatienceConfig = PatienceConfig(timeout = 2.minutes) implicit val timeout: Timeout = Timeout(1.minute) val cfg: Config = config.getConfig("postgres-journal") diff --git a/core/src/test/scala/akka/persistence/postgres/configuration/AkkaPersistenceConfigTest.scala b/core/src/test/scala/akka/persistence/postgres/configuration/AkkaPersistenceConfigTest.scala index 808d2c62..269bf6c9 100644 --- a/core/src/test/scala/akka/persistence/postgres/configuration/AkkaPersistenceConfigTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/configuration/AkkaPersistenceConfigTest.scala @@ -46,7 +46,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal | } | } | - | dao = "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao" + | dao = "akka.persistence.postgres.journal.dao.FlatJournalDao" | | logicalDelete = true | @@ -111,7 +111,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal | } | } | - | dao = "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao" + | dao = "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao" | | slick { | profile = "slick.jdbc.MySQLProfile$" @@ -163,7 +163,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal | # are delivered downstreams. | max-buffer-size = "10" | - | dao = "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao" + | dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao" | | tags { | cacheTtl = 12 hours @@ -238,7 +238,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal slickConfiguration.jndiName shouldBe None slickConfiguration.jndiDbName shouldBe None - cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao" + cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.journal.dao.FlatJournalDao" cfg.journalTableConfiguration.tableName shouldBe "journal" cfg.journalTableConfiguration.schemaName shouldBe None @@ -267,7 +267,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal slickConfiguration.jndiName shouldBe None slickConfiguration.jndiDbName shouldBe None - cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao" + cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao" cfg.snapshotTableConfiguration.tableName shouldBe "snapshot" cfg.snapshotTableConfiguration.schemaName shouldBe None @@ -284,7 +284,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal slickConfiguration.jndiName shouldBe None slickConfiguration.jndiDbName shouldBe None - cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao" + cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.query.dao.FlatReadJournalDao" cfg.refreshInterval shouldBe 1.second cfg.maxBufferSize shouldBe 500 @@ -313,7 +313,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal slickConfiguration.jndiName shouldBe None slickConfiguration.jndiDbName shouldBe None - cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.dao.bytea.journal.FlatJournalDao" + cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.journal.dao.FlatJournalDao" cfg.journalTableConfiguration.tableName shouldBe "journal" cfg.journalTableConfiguration.schemaName shouldBe None @@ -343,7 +343,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal slickConfiguration.jndiName shouldBe None slickConfiguration.jndiDbName shouldBe None - cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.dao.bytea.snapshot.ByteArraySnapshotDao" + cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.snapshot.dao.ByteArraySnapshotDao" cfg.snapshotTableConfiguration.tableName shouldBe "snapshot" cfg.snapshotTableConfiguration.schemaName shouldBe None @@ -360,7 +360,7 @@ class AkkaPersistenceConfigTest extends AnyFlatSpec with Matchers with OptionVal slickConfiguration.jndiName shouldBe None slickConfiguration.jndiDbName shouldBe None - cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.dao.bytea.readjournal.ByteArrayReadJournalDao" + cfg.pluginConfig.dao shouldBe "akka.persistence.postgres.query.dao.FlatReadJournalDao" cfg.refreshInterval shouldBe 300.millis cfg.maxBufferSize shouldBe 10 diff --git a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala index 4a053391..c7d8d2c7 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalPerfSpec.scala @@ -8,15 +8,15 @@ package akka.persistence.postgres.journal import akka.actor.Props import akka.persistence.CapabilityFlag import akka.persistence.journal.JournalPerfSpec -import akka.persistence.journal.JournalPerfSpec.{BenchActor, Cmd, ResetCounter} +import akka.persistence.journal.JournalPerfSpec.{ BenchActor, Cmd, ResetCounter } import akka.persistence.postgres.config._ import akka.persistence.postgres.db.SlickExtension import akka.persistence.postgres.util.Schema._ -import akka.persistence.postgres.util.{ClasspathResources, DropCreate} +import akka.persistence.postgres.util.{ ClasspathResources, DropCreate } import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ @@ -114,6 +114,9 @@ class NestedPartitionsJournalPerfSpecSharedDb class NestedPartitionsJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec("nested-partitions-application-with-hard-delete.conf", NestedPartitions) +class NestedPartitionsJournalPerfSpecUseJournalMetadata + extends PostgresJournalPerfSpec("nested-partitions-application-with-use-journal-metadata.conf", NestedPartitions) + class PartitionedJournalPerfSpec extends PostgresJournalPerfSpec("partitioned-application.conf", Partitioned) class PartitionedJournalPerfSpecSharedDb @@ -122,9 +125,15 @@ class PartitionedJournalPerfSpecSharedDb class PartitionedJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec("partitioned-application-with-hard-delete.conf", Partitioned) +class PartitionedJournalPerfSpecUseJournalMetadata + extends PostgresJournalPerfSpec("partitioned-application-with-use-journal-metadata.conf", Partitioned) + class PlainJournalPerfSpec extends PostgresJournalPerfSpec("plain-application.conf", Plain) class PlainJournalPerfSpecSharedDb extends PostgresJournalPerfSpec("plain-shared-db-application.conf", Plain) class PlainJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec("plain-application-with-hard-delete.conf", Plain) + +class PlainJournalPerfSpecUseJournalMetadata + extends PostgresJournalPerfSpec("plain-application-with-use-journal-metadata.conf", Plain) diff --git a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala index c4126879..a707380e 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/PostgresJournalSpec.scala @@ -5,16 +5,17 @@ package akka.persistence.postgres.journal -import akka.actor.Actor -import akka.persistence.JournalProtocol.ReplayedMessage +import akka.actor.{ Actor, ActorRef } +import akka.persistence.JournalProtocol.{ ReplayedMessage, WriteMessages, WriteMessagesFailed, WriteMessagesSuccessful } import akka.persistence.journal.JournalSpec import akka.persistence.postgres.config._ import akka.persistence.postgres.db.SlickExtension +import akka.persistence.postgres.journal.dao.JournalMetadataTable import akka.persistence.postgres.query.ScalaPostgresReadJournalOperations import akka.persistence.postgres.util.Schema._ import akka.persistence.postgres.util.{ ClasspathResources, DropCreate } import akka.persistence.query.Sequence -import akka.persistence.{ CapabilityFlag, PersistentImpl } +import akka.persistence.{ AtomicWrite, CapabilityFlag, PersistentImpl, PersistentRepr } import akka.testkit.TestProbe import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -54,6 +55,131 @@ abstract class PostgresJournalSpec(config: String, schemaType: SchemaType) super.afterAll() } + private def writeSingleMessage(seqNr: Int, pid: String, sender: ActorRef, writerUuid: String) = { + val msg = AtomicWrite( + PersistentRepr( + payload = s"a-$seqNr", + sequenceNr = seqNr, + persistenceId = pid, + sender = sender, + writerUuid = writerUuid)) + val probe = TestProbe() + journal ! WriteMessages(List(msg), probe.ref, actorInstanceId) + probe.expectMsg(WriteMessagesSuccessful) + } + + "A journal" must { + "not allow to store events with sequence number lower than what is already stored for the same persistence id" in { + // given + val perId = "perId" + val sender = TestProbe() + val repeatedSnr = 5 + + // when + writeMessages(1, repeatedSnr + 1, perId, sender.ref, writerUuid) + + // then + val msg = AtomicWrite( + PersistentRepr( + payload = s"a-$repeatedSnr", + sequenceNr = repeatedSnr, + persistenceId = perId, + sender = sender.ref, + writerUuid = writerUuid)) + + val probe = TestProbe() + journal ! WriteMessages(Seq(msg), probe.ref, actorInstanceId) + probe.expectMsgType[WriteMessagesFailed] + } + } + + "An insert on the journal" must { + import akka.persistence.postgres.db.ExtendedPostgresProfile.api._ + + val metadataTable = JournalMetadataTable(journalConfig.journalMetadataTableConfiguration) + val UNSET_MIN_ORDERING = -1 + + "automatically insert journal metadata" in { + // given + val perId = "perId-meta-1" + val sender = TestProbe() + val prevMetadataExists = db.run(metadataTable.filter(_.persistenceId === perId).exists.result).futureValue + + // when + writeSingleMessage(1, perId, sender.ref, writerUuid) + + // then + val newMetadataExists = db.run(metadataTable.filter(_.persistenceId === perId).exists.result).futureValue + val maxOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxOrdering).result.head).futureValue + val minOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.minOrdering).result.head).futureValue + + prevMetadataExists shouldBe false + newMetadataExists shouldBe true + // when its the first event the insert should take the ordering value and set that on the min and max_ordering columns + maxOrdering shouldBe minOrdering + minOrdering > 0 shouldBe true + } + + "upsert only max_sequence_number and max_ordering if metadata already exists" in { + // given + val perId = "perId-meta-2" + val sender = TestProbe() + writeSingleMessage(1, perId, sender.ref, writerUuid) + val prevMaxSeqNr = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxSequenceNumber).result.head).futureValue + val prevMaxOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxOrdering).result.head).futureValue + val prevMinOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.minOrdering).result.head).futureValue + + // when + writeSingleMessage(2, perId, sender.ref, writerUuid) + + // then + val newMaxSeqNr = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxSequenceNumber).result.head).futureValue + val newMaxOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxOrdering).result.head).futureValue + val newMinOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.minOrdering).result.head).futureValue + + newMaxSeqNr shouldBe prevMaxSeqNr + 1 + newMaxOrdering shouldBe prevMaxOrdering + 1 + newMinOrdering shouldBe prevMinOrdering + newMaxOrdering > 0 shouldBe true + } + + "set min_ordering to UNSET_MIN_ORDERING when no metadata entry exists but the event being inserted is not the first one for the persistenceId (sequence_number > 1)" in { + // given + val perId = "perId-meta-3" + val sender = TestProbe() + writeSingleMessage(1, perId, sender.ref, writerUuid) + val prevMaxSeqNr = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxSequenceNumber).result.head).futureValue + val prevMaxOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxOrdering).result.head).futureValue + + // when + // simulate case where metadata does not exist, but persistenceId already has events + db.run(metadataTable.filter(_.persistenceId === perId).delete).futureValue + // write new event of same persistenceId + writeSingleMessage(2, perId, sender.ref, writerUuid) + + // then + val newMaxSeqNr = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxSequenceNumber).result.head).futureValue + val newMaxOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.maxOrdering).result.head).futureValue + val newMinOrdering = + db.run(metadataTable.filter(_.persistenceId === perId).map(_.minOrdering).result.head).futureValue + + newMaxSeqNr shouldBe prevMaxSeqNr + 1 + newMaxOrdering shouldBe prevMaxOrdering + 1 + newMinOrdering shouldBe UNSET_MIN_ORDERING + } + } } trait PartitionedJournalSpecTestCases { @@ -109,6 +235,9 @@ class NestedPartitionsJournalSpecSharedDb class NestedPartitionsJournalSpecPhysicalDelete extends PostgresJournalSpec("nested-partitions-application-with-hard-delete.conf", NestedPartitions) +class NestedPartitionsJournalSpecUseJournalMetadata + extends PostgresJournalSpec("nested-partitions-application-with-use-journal-metadata.conf", NestedPartitions) + class PartitionedJournalSpec extends PostgresJournalSpec("partitioned-application.conf", Partitioned) with PartitionedJournalSpecTestCases @@ -119,6 +248,12 @@ class PartitionedJournalSpecPhysicalDelete extends PostgresJournalSpec("partitioned-application-with-hard-delete.conf", Partitioned) with PartitionedJournalSpecTestCases +class PartitionedJournalSpecUseJournalMetadata + extends PostgresJournalSpec("partitioned-application-with-use-journal-metadata.conf", Partitioned) + with PartitionedJournalSpecTestCases + class PlainJournalSpec extends PostgresJournalSpec("plain-application.conf", Plain) class PlainJournalSpecSharedDb extends PostgresJournalSpec("plain-shared-db-application.conf", Plain) class PlainJournalSpecPhysicalDelete extends PostgresJournalSpec("plain-application-with-hard-delete.conf", Plain) +class PlainJournalSpecUseJournalMetadata + extends PostgresJournalSpec("plain-application-with-use-journal-metadata.conf", Plain) diff --git a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala new file mode 100644 index 00000000..5c0c9378 --- /dev/null +++ b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalMetadataQueriesTest.scala @@ -0,0 +1,22 @@ +package akka.persistence.postgres.journal.dao + +import akka.persistence.postgres.util.BaseQueryTest + +class JournalMetadataQueriesTest extends BaseQueryTest { + + it should "create SQL query for highestSequenceNrForPersistenceId" in withJournalMetadataQueries { queries => + queries.highestSequenceNrForPersistenceId( + "aaa") shouldBeSQL """select "max_sequence_number" from "journal_metadata" where "persistence_id" = ? limit 1""" + } + + it should "create SQL query for minAndMaxOrderingForPersistenceId" in withJournalMetadataQueries { queries => + queries.minAndMaxOrderingForPersistenceId( + "aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1""" + } + + private def withJournalMetadataQueries(f: JournalMetadataQueries => Unit): Unit = { + withActorSystem { implicit system => + f(new JournalMetadataQueries(JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))) + } + } +} diff --git a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala index 53964b00..0bd9d6a9 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala @@ -6,10 +6,6 @@ import io.circe.{ Json, JsonObject } class JournalQueriesTest extends BaseQueryTest { - it should "produce SQL query for distinct persistenceID" in withJournalQueries { queries => - queries.allPersistenceIdsDistinct shouldBeSQL """select distinct "persistence_id" from "journal"""" - } - it should "create SQL query for highestMarkedSequenceNrForPersistenceId" in withJournalQueries { queries => queries.highestMarkedSequenceNrForPersistenceId( "aaa") shouldBeSQL """select max("sequence_number") from "journal" where ("deleted" = true) and ("persistence_id" = ?)""" @@ -20,12 +16,6 @@ class JournalQueriesTest extends BaseQueryTest { "aaa") shouldBeSQL """select max("sequence_number") from "journal" where "persistence_id" = ?""" } - it should "create SQL query for selectByPersistenceIdAndMaxSequenceNumber" in withJournalQueries { queries => - queries.selectByPersistenceIdAndMaxSequenceNumber( - "aaa", - 11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ("persistence_id" = ?) and ("sequence_number" <= ?) order by "sequence_number" desc""" - } - it should "create SQL query for messagesQuery" in withJournalQueries { queries => queries.messagesQuery( "aaa", @@ -34,6 +24,16 @@ class JournalQueriesTest extends BaseQueryTest { 11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ((("persistence_id" = ?) and ("deleted" = false)) and ("sequence_number" >= ?)) and ("sequence_number" <= ?) order by "sequence_number" limit ?""" } + it should "create SQL query for messagesOrderingBoundedQuery" in withJournalQueries { queries => + queries.messagesOrderingBoundedQuery( + "aaa", + 11L, + 11L, + 11L, + 11L, + 11L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where ((((("persistence_id" = ?) and ("deleted" = false)) and ("sequence_number" >= ?)) and ("sequence_number" <= ?)) and ("ordering" >= ?)) and ("ordering" <= ?) order by "sequence_number" limit ?""" + } + it should "create SQL query for markJournalMessagesAsDeleted" in withJournalQueries { queries => queries.markJournalMessagesAsDeleted( "aaa", diff --git a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalTablesTest.scala b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalTablesTest.scala index 16652847..d57ed78f 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalTablesTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalTablesTest.scala @@ -18,6 +18,7 @@ class JournalTablesTest extends TablesTestSpec { } { journalName should "be configured with a schema name" in { journalTable.baseTableRow.schemaName shouldBe journalTableConfiguration.schemaName + } it should "be configured with a table name" in { @@ -34,4 +35,27 @@ class JournalTablesTest extends TablesTestSpec { journalTable.baseTableRow.tags.toString shouldBe colName(journalTableConfiguration.columnNames.tags) } } + + val journalMetadataTableConfiguration = journalConfig.journalMetadataTableConfiguration + val journalMetadataTable = JournalMetadataTable(journalMetadataTableConfiguration) + + "JournalMetadataTable" should "be configured with a schema name" in { + journalMetadataTable.baseTableRow.schemaName shouldBe journalMetadataTableConfiguration.schemaName + } + + it should "be configured with a table name" in { + journalMetadataTable.baseTableRow.tableName shouldBe journalMetadataTableConfiguration.tableName + } + + it should "be configured with column names" in { + val colName = toColumnName(journalMetadataTableConfiguration.tableName)(_) + journalMetadataTable.baseTableRow.persistenceId.toString shouldBe colName( + journalMetadataTableConfiguration.columnNames.persistenceId) + journalMetadataTable.baseTableRow.maxSequenceNumber.toString shouldBe colName( + journalMetadataTableConfiguration.columnNames.maxSequenceNumber) + journalMetadataTable.baseTableRow.maxOrdering.toString shouldBe colName( + journalMetadataTableConfiguration.columnNames.maxOrdering) + journalMetadataTable.baseTableRow.minOrdering.toString shouldBe colName( + journalMetadataTableConfiguration.columnNames.minOrdering) + } } diff --git a/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala b/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala index be8babf4..201d8437 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/JournalSequenceActorTest.scala @@ -6,13 +6,12 @@ package akka.persistence.postgres.query import java.util.concurrent.atomic.AtomicLong - import akka.actor.{ ActorRef, ActorSystem } import akka.pattern.ask import akka.persistence.postgres.config.JournalSequenceRetrievalConfig import akka.persistence.postgres.db.ExtendedPostgresProfile import akka.persistence.postgres.query.JournalSequenceActor.{ GetMaxOrderingId, MaxOrderingId } -import akka.persistence.postgres.query.dao.{ ByteArrayReadJournalDao, TestProbeReadJournalDao } +import akka.persistence.postgres.query.dao.{ FlatReadJournalDao, PartitionedReadJournalDao, TestProbeReadJournalDao } import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao } import akka.persistence.postgres.util.Schema.{ NestedPartitions, Partitioned, Plain, SchemaType } import akka.persistence.postgres.{ JournalRow, SharedActorSystemTestSpec } @@ -23,6 +22,7 @@ import akka.testkit.TestProbe import io.circe.{ Json, JsonObject } import org.scalatest.time.Span import org.slf4j.LoggerFactory +import slick.jdbc import slick.jdbc.{ JdbcBackend, JdbcCapabilities } import scala.concurrent.Future @@ -176,7 +176,7 @@ abstract class JournalSequenceActorTest(val schemaType: SchemaType) extends Quer import system.dispatcher implicit val mat: Materializer = SystemMaterializer(system).materializer val readJournalDao = - new ByteArrayReadJournalDao( + new FlatReadJournalDao( db, readJournalConfig, SerializationExtension(system), @@ -316,6 +316,26 @@ class PartitionedJournalSequenceActorTest extends JournalSequenceActorTest(Parti } } } + + override def withJournalSequenceActor(db: jdbc.JdbcBackend.Database, maxTries: Int)(f: ActorRef => Unit)( + implicit system: ActorSystem): Unit = { + import system.dispatcher + implicit val mat: Materializer = SystemMaterializer(system).materializer + val readJournalDao = + new PartitionedReadJournalDao( + db, + readJournalConfig, + SerializationExtension(system), + new CachedTagIdResolver( + new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration), + readJournalConfig.tagsConfig)) + val actor = + system.actorOf( + JournalSequenceActor + .props(readJournalDao, readJournalConfig.journalSequenceRetrievalConfiguration.copy(maxTries = maxTries))) + try f(actor) + finally system.stop(actor) + } } class PlainJournalSequenceActorTest extends JournalSequenceActorTest(Plain) diff --git a/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala new file mode 100644 index 00000000..bef12342 --- /dev/null +++ b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalMetadataQueriesTest.scala @@ -0,0 +1,17 @@ +package akka.persistence.postgres.query.dao + +import akka.persistence.postgres.journal.dao.JournalMetadataTable +import akka.persistence.postgres.util.BaseQueryTest + +class ReadJournalMetadataQueriesTest extends BaseQueryTest { + it should "create SQL query for minAndMaxOrderingForPersistenceId" in withReadJournalMetadataQueries { queries => + queries.minAndMaxOrderingForPersistenceId( + "aaa") shouldBeSQL """select "min_ordering", "max_ordering" from "journal_metadata" where "persistence_id" = ? limit 1""" + } + + private def withReadJournalMetadataQueries(f: ReadJournalMetadataQueries => Unit): Unit = { + withActorSystem { implicit system => + f(new ReadJournalMetadataQueries(JournalMetadataTable(readJournalConfig.journalMetadataTableConfiguration))) + } + } +} diff --git a/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala index 54c27721..b5f58687 100644 --- a/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/query/dao/ReadJournalQueriesTest.scala @@ -1,5 +1,6 @@ package akka.persistence.postgres.query.dao +import akka.persistence.postgres.journal.dao.FlatJournalTable import akka.persistence.postgres.util.BaseQueryTest class ReadJournalQueriesTest extends BaseQueryTest { @@ -16,6 +17,16 @@ class ReadJournalQueriesTest extends BaseQueryTest { 5L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where (("persistence_id" = ?) and ("sequence_number" >= ?)) and ("sequence_number" <= ?) order by "sequence_number" limit ?""" } + it should "create SQL query for messagesOrderingBoundedQuery" in withReadJournalQueries { queries => + queries.messagesOrderingBoundedQuery( + "aaa", + 1L, + 4L, + 5L, + 1L, + 10L) shouldBeSQL """select "ordering", "deleted", "persistence_id", "sequence_number", "message", "tags", "metadata" from "journal" where (((("persistence_id" = ?) and ("sequence_number" >= ?)) and ("sequence_number" <= ?)) and ("ordering" >= ?)) and ("ordering" <= ?) order by "sequence_number" limit ?""" + } + it should "create SQL query for eventsByTag" in withReadJournalQueries { queries => queries.eventsByTag( List(11), @@ -35,7 +46,10 @@ class ReadJournalQueriesTest extends BaseQueryTest { private def withReadJournalQueries(f: ReadJournalQueries => Unit): Unit = { withActorSystem { implicit system => - f(new ReadJournalQueries(readJournalConfig)) + f( + new ReadJournalQueries( + FlatJournalTable(readJournalConfig.journalTableConfiguration), + readJournalConfig.includeDeleted)) } } } diff --git a/core/src/test/scala/akka/persistence/postgres/util/DropCreate.scala b/core/src/test/scala/akka/persistence/postgres/util/DropCreate.scala index 928e2f70..84fa1024 100644 --- a/core/src/test/scala/akka/persistence/postgres/util/DropCreate.scala +++ b/core/src/test/scala/akka/persistence/postgres/util/DropCreate.scala @@ -6,10 +6,10 @@ package akka.persistence.postgres.util import java.sql.Statement - -import akka.persistence.postgres.config.JournalTableConfiguration +import akka.persistence.postgres.config.{ JournalMetadataTableConfiguration, JournalTableConfiguration } import akka.persistence.postgres.journal.dao.{ FlatJournalTable, + JournalMetadataTable, JournalTable, NestedPartitionsJournalTable, PartitionedJournalTable @@ -25,6 +25,8 @@ object Schema { lazy val schema: String = s"schema/postgres/$resourceNamePrefix-schema.sql" lazy val configName: String = s"${resourceNamePrefix}-application.conf" def table(journalTableCfg: JournalTableConfiguration): TableQuery[JournalTable] + def metadataTable(journalMetadataTableCfg: JournalMetadataTableConfiguration) + : TableQuery[JournalMetadataTable] = JournalMetadataTable.apply(journalMetadataTableCfg) } case object Plain extends SchemaType { diff --git a/docs/custom-dao.md b/docs/custom-dao.md index 3a4d8c72..2f980a40 100644 --- a/docs/custom-dao.md +++ b/docs/custom-dao.md @@ -23,7 +23,7 @@ postgres-snapshot-store { } postgres-read-journal { - dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao" + dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao" } ``` diff --git a/docs/index.md b/docs/index.md index 6e8c3ebb..9f9d17a6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -18,7 +18,7 @@ The main goal is to keep index size and memory consumption on a moderate level w To use `akka-persistence-postgres` in your SBT project, add the following to your `build.sbt`: ```scala -libraryDependencies += "com.swisborg" %% "akka-persistence-postgres" % "0.5.0" +libraryDependencies += "com.swisborg" %% "akka-persistence-postgres" % "0.6.0-RC1" ``` For a maven project add: @@ -26,7 +26,7 @@ For a maven project add: com.swisborg akka-persistence-postgres_2.13 - 0.5.0 + 0.6.0-RC1 ``` to your `pom.xml`. diff --git a/docs/migration.md b/docs/migration.md index 137d5d3f..2886efe6 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -22,14 +22,14 @@ We provide you with an optional artifact, `akka-persistence-postgres-migration` #### Add akka-persistence-migration to your project Add the following to your `build.sbt` ``` -libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.5.0" +libraryDependencies += "com.swissborg" %% "akka-persistence-postgres-migration" % "0.6.0-RC1" ``` For a maven project add: ```xml com.swisborg akka-persistence-postgres-migration_2.13 - 0.5.0 + 0.6.0-RC1 ``` to your `pom.xml`. @@ -61,3 +61,65 @@ Execute DDL statements produced by the [sample migration script](https://github. ### Deploy new release with migration scripts See [sample flyway migration script](https://github.com/SwissBorg/akka-persistence-postgres/blob/master/scripts/migration-0.5.0/partitioned/2-add-indices-flyway.sql) and adapt top level variables to match your journal configuration. + +## Migration from akka-persistence-postgres 0.5.0 to 0.6.0 + +Version 0.6.0 aims to improve the performance of the query that has the most DB I/O when using this plugin: +```sql +select max("sequence_number") from "journal" where "persistence_id" = ? +``` + +We introduced a new `journal_metadata` table that will be holding key data per persistence id, that will be used to speed up the above query and others (like the one used to replay events). To do this, we are trading off a bit of performance at event write time and the query read time. This impact is caused by the usage of a DB trigger that is executed everytime an insert on the journal happens. +So, for now this table is holding the following information per persistence id: +- max sequence number among all the associated events; +- min and max ordering interval where the events are located within the journal; + +We believe the trade-off is worth it since the impact on write performance is much lower that the gain when read time, observed on these queries that take the most of the DB I/O. + +Below is the list of sample flyway migration scripts you can use to add this new table and associated triggers. +⚠️ The last one of them is a simplistic data migration to populate the new table. However, if your data size is big consider using a more lazy ad-hoc alternative that does batch reads from the journal and inserts the missing data. The trigger you will be adding is idempotent, so it is safe to re-process some events when the ad-hoc job is catching up to present date events. + +1. [create journal_metadata table](https://github.com/SwissBorg/akka-persistence-postgres/blob/master/scripts/migration-0.6.0/1-create-journal-metadata-table.sql) +2. [create function to update journal_metadata](https://github.com/SwissBorg/akka-persistence-postgres/blob/master/scripts/migration-0.6.0/2-create-function-update-journal-metadata.sql) +3. [create trigger to update journal_metadata](https://github.com/SwissBorg/akka-persistence-postgres/blob/master/scripts/migration-0.6.0/3-create-trigger-update-journal-metadata.sql) +4. [populate journal_metadata with past data](https://github.com/SwissBorg/akka-persistence-postgres/blob/master/scripts/migration-0.6.0/4-populate-journal-metadata.sql) + +⚠️ Ensure to adapt the top level variables of the scripts to appropriate values that match your journal configuration/setup. + +Keep in mind that the usage of the new table by the queries is not enabled by default, so the previous (v0.5.0) behaviour is kept. +In order to make use of it you need to specify it through the configuration of your journal: + +```hocon +{ + postgres-journal { + ... + + use-journal-metadata = true # Default is false + } + + # Same applies to the read journal + postgres-read-journal { + ... + + use-journal-metadata = true # Default is false + } +} +``` + +Another important change that was introduced was that there is now a `FlatReadJournalDao` and a `PartitionedReadJournalDao`. +The first is the direct replacement of the previous `ByteArrayReadJournalDao` and it is the one set by default. +However, with the addition of the `journal_metadata`, if you are using the partitioned journal please change it to `PartitionedReadJournalDao`, +as some of the queries in use will benefit from it. + +```hocon +{ + postgres-read-journal { + ... + + dao = "akka.persistence.postgres.query.dao.PartitionedReadJournalDao" + use-journal-metadata = true # Default is false + } +} +``` + +⚠️ Also, since a new table is being added it might be required for you to adapt your `postgres-journal.tables` configuration. \ No newline at end of file diff --git a/migration/src/main/scala/akka/persistence/postgres/migration/journal/Jdbc4JournalMigration.scala b/migration/src/main/scala/akka/persistence/postgres/migration/journal/Jdbc4JournalMigration.scala index ed8e9808..5ebd7831 100644 --- a/migration/src/main/scala/akka/persistence/postgres/migration/journal/Jdbc4JournalMigration.scala +++ b/migration/src/main/scala/akka/persistence/postgres/migration/journal/Jdbc4JournalMigration.scala @@ -69,6 +69,7 @@ class Jdbc4JournalMigration(globalConfig: Config, tempTableName: String = "tmp_j for { _ <- journalSchema.createTable _ <- journalSchema.createTagsTable + _ <- journalSchema.createJournalMetadataTable } yield () } @@ -106,6 +107,7 @@ class Jdbc4JournalMigration(globalConfig: Config, tempTableName: String = "tmp_j val fut = for { _ <- db.run(createTables.transactionally) + _ <- db.run(journalSchema.createTriggers.transactionally) cnt <- dml.runReduce(_ + _) _ <- db.run(journalSchema.createSequence.transactionally) _ <- db.run(journalSchema.createIndexes.transactionally) diff --git a/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala b/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala index c59b91df..aa8f617d 100644 --- a/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala +++ b/migration/src/main/scala/akka/persistence/postgres/migration/journal/JournalSchema.scala @@ -24,6 +24,28 @@ private[journal] trait JournalSchema { def getTable: TableQuery[TempJournalTable] def createTable: DBIOAction[Unit, NoStream, Effect.Write] + def createJournalMetadataTable: DBIOAction[Unit, NoStream, Effect.Write] = { + val journalMetadataTableCfg = journalCfg.journalMetadataTableConfiguration + val fullTableName = + s"${journalMetadataTableCfg.schemaName.getOrElse("public")}.${journalMetadataTableCfg.tableName}" + + import journalMetadataTableCfg.columnNames._ + for { + _ <- sqlu"""CREATE TABLE #$fullTableName ( + #$id BIGINT GENERATED ALWAYS AS IDENTITY, + #$maxSequenceNumber BIGINT NOT NULL, + #$maxOrdering BIGINT NOT NULL, + #$minOrdering BIGINT NOT NULL, + #$persistenceId TEXT NOT NULL, + PRIMARY KEY (#$persistenceId) + ) PARTITION BY HASH(#$persistenceId)""" + _ <- + sqlu"""CREATE TABLE #${fullTableName}_0 PARTITION OF #$fullTableName FOR VALUES WITH (MODULUS 2, REMAINDER 0)""" + _ <- + sqlu"""CREATE TABLE #${fullTableName}_1 PARTITION OF #$fullTableName FOR VALUES WITH (MODULUS 2, REMAINDER 1)""" + } yield () + } + def createTagsTable: DBIOAction[Unit, NoStream, Effect.Write] = { val tagsTableConfig = journalCfg.tagsTableConfiguration import tagsTableConfig.columnNames._ @@ -71,6 +93,57 @@ private[journal] trait JournalSchema { _ <- sqlu"""ALTER INDEX #${fullTmpTableName}_#${tags}_idx RENAME TO #${journalTableCfg.tableName}_#${tags}_idx""" _ <- sqlu"""ALTER INDEX #${fullTmpTableName}_pkey RENAME TO #${journalTableCfg.tableName}_pkey""" } yield () + + def createTriggers: DBIOAction[Unit, NoStream, Effect.Write] = { + val journalTableCfg = journalCfg.journalTableConfiguration + val journalMetadataTableCfg = journalCfg.journalMetadataTableConfiguration + val schema = journalMetadataTableCfg.schemaName.getOrElse("public") + val fullTableName = s"$schema.${journalMetadataTableCfg.tableName}" + val journalFullTableName = s"$schema.${journalTableCfg.tableName}" + + import journalMetadataTableCfg.columnNames._ + import journalTableCfg.columnNames.{ persistenceId => jPersistenceId, _ } + + for { + _ <- sqlu""" + CREATE OR REPLACE FUNCTION #$schema.update_journal_metadata() RETURNS TRIGGER AS $$$$ + DECLARE + BEGIN + INSERT INTO #$fullTableName (#$persistenceId, #$maxSequenceNumber, #$maxOrdering, #$minOrdering) + VALUES ( + NEW.#$jPersistenceId, + NEW.#$sequenceNumber, + NEW.#$ordering, + CASE + WHEN NEW.#$sequenceNumber = 1 THEN NEW.#$ordering + ELSE -1 + END + ) + ON CONFLICT (#$persistenceId) DO UPDATE + SET + #$maxSequenceNumber = GREATEST(#$fullTableName.#$maxSequenceNumber, NEW.#$sequenceNumber), + #$maxOrdering = GREATEST(#$fullTableName.#$maxOrdering, NEW.#$ordering); + + RETURN NEW; + END; + $$$$ LANGUAGE plpgsql; + """ + + _ <- sqlu""" + CREATE TRIGGER trig_update_journal_metadata + AFTER INSERT ON #$journalFullTableName + FOR EACH ROW + EXECUTE PROCEDURE #$schema.update_journal_metadata(); + """ + + _ <- sqlu""" + CREATE TRIGGER trig_update_journal_metadata + AFTER INSERT ON #$fullTmpTableName + FOR EACH ROW + EXECUTE PROCEDURE #$schema.update_journal_metadata(); + """ + } yield () + } } private[journal] object JournalSchema { diff --git a/migration/src/test/resources/base-migration.conf b/migration/src/test/resources/base-migration.conf index 13380519..f156738e 100644 --- a/migration/src/test/resources/base-migration.conf +++ b/migration/src/test/resources/base-migration.conf @@ -63,6 +63,16 @@ postgres-journal { size = 50 } } + journalMetadata { + schemaName = "migration" + tableName = "fancy_journal_metadata" + columnNames = { + persistenceId = "jm_per_id" + maxSequenceNumber = "jm_max_seq_num" + maxOrdering = "jm_max_ord" + minOrdering = "jm_min_ord" + } + } tags { schemaName = "migration" tableName = "fancy_tags" diff --git a/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala b/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala index fe36fd11..b2e5e5bf 100644 --- a/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala +++ b/migration/src/test/scala/akka/persistence/postgres/migration/MigrationTest.scala @@ -177,6 +177,9 @@ trait PrepareDatabase extends BeforeAndAfterEach with BeforeAndAfterAll with Sca val journalTableConfig = journalConfig.journalTableConfiguration val journalTableName = journalTableConfig.tableName + val journalMetadataTableConfig = journalConfig.journalMetadataTableConfiguration + val journalMetadataTableName = journalMetadataTableConfig.tableName + val tagsTableConfig = journalConfig.tagsTableConfiguration import journalTableConfig.columnNames.{ tags => tagsCol, _ } for { @@ -185,6 +188,9 @@ trait PrepareDatabase extends BeforeAndAfterEach with BeforeAndAfterAll with Sca _ <- sqlu"""DROP TABLE IF EXISTS migration.old_#$journalTableName""" _ <- sqlu"""DROP TABLE IF EXISTS migration.#$tempJournalTableName""" _ <- sqlu"""DROP TABLE IF EXISTS migration.#$journalTableName""" + _ <- sqlu"""DROP TRIGGER IF EXISTS trig_update_journal_metadata ON migration.#$journalTableName""" + _ <- sqlu"""DROP FUNCTION IF EXISTS migration.update_journal_metadata()""" + _ <- sqlu"""DROP TABLE IF EXISTS migration.#$journalMetadataTableName""" _ <- sqlu"""CREATE TABLE IF NOT EXISTS migration.#$journalTableName ( #$ordering BIGSERIAL, diff --git a/scripts/migration-0.6.0/1-create-journal-metadata-table.sql b/scripts/migration-0.6.0/1-create-journal-metadata-table.sql new file mode 100644 index 00000000..84777220 --- /dev/null +++ b/scripts/migration-0.6.0/1-create-journal-metadata-table.sql @@ -0,0 +1,41 @@ +-- Creates table and the amount of partitions defined by jm_partitions_number. Default is 10. +DO $$ +DECLARE + -- replace with appropriate values + schema CONSTANT TEXT := 'public'; + jm_table_name CONSTANT TEXT := 'journal_metadata'; + jm_id_column CONSTANT TEXT := 'id'; + jm_persistence_id_column CONSTANT TEXT := 'persistence_id'; + jm_max_sequence_number_column CONSTANT TEXT := 'max_sequence_number'; + jm_max_ordering_column CONSTANT TEXT := 'max_ordering'; + jm_min_ordering_column CONSTANT TEXT := 'min_ordering'; + jm_partitions_table_name_prefix CONSTANT TEXT := 'journal_metadata_'; + jm_partitions_number CONSTANT INTEGER := 10; + + -- variables + jm_table TEXT; + jm_partition_table TEXT; + sql TEXT; +BEGIN + jm_table := schema || '.' || jm_table_name; + jm_partition_table := schema || '.' || jm_partitions_table_name_prefix; + + sql := 'CREATE TABLE IF NOT EXISTS ' || jm_table || + '(' || + jm_id_column || ' BIGINT GENERATED ALWAYS AS IDENTITY, ' || + jm_max_sequence_number_column || ' BIGINT NOT NULL, ' || + jm_max_ordering_column || ' BIGINT NOT NULL, ' || + jm_min_ordering_column || ' BIGINT NOT NULL, ' || + jm_persistence_id_column || ' TEXT NOT NULL, ' || + 'PRIMARY KEY (' || jm_persistence_id_column || ')' || + ') PARTITION BY HASH(' || jm_persistence_id_column || ')'; + + EXECUTE sql; + + FOR i IN 0..(jm_partitions_number - 1) LOOP + EXECUTE 'CREATE TABLE IF NOT EXISTS ' || jm_partition_table || i || + ' PARTITION OF ' || jm_table || + ' FOR VALUES WITH (MODULUS ' || jm_partitions_number || ', REMAINDER ' || i || ')'; + END LOOP; +END; +$$ LANGUAGE plpgsql; diff --git a/scripts/migration-0.6.0/2-create-function-update-journal-metadata.sql b/scripts/migration-0.6.0/2-create-function-update-journal-metadata.sql new file mode 100644 index 00000000..754219f0 --- /dev/null +++ b/scripts/migration-0.6.0/2-create-function-update-journal-metadata.sql @@ -0,0 +1,41 @@ +-- replace schema value if required +CREATE OR REPLACE FUNCTION public.update_journal_metadata() RETURNS TRIGGER AS +$$ +DECLARE + -- replace with appropriate values + schema CONSTANT TEXT := 'public'; + j_table_name CONSTANT TEXT := 'journal'; + j_persistence_id_column CONSTANT TEXT := 'persistence_id'; + j_sequence_number_column CONSTANT TEXT := 'sequence_number'; + j_ordering_column CONSTANT TEXT := 'ordering'; + jm_table_name CONSTANT TEXT := 'journal_metadata'; + jm_persistence_id_column CONSTANT TEXT := 'persistence_id'; + jm_max_sequence_number_column CONSTANT TEXT := 'max_sequence_number'; + jm_max_ordering_column CONSTANT TEXT := 'max_ordering'; + jm_min_ordering_column CONSTANT TEXT := 'min_ordering'; + first_sequence_number_value CONSTANT INTEGER := 1; + unset_min_ordering_value CONSTANT INTEGER := -1; + + -- variables + j_table TEXT; + jm_table TEXT; + cols TEXT; + vals TEXT; + upds TEXT; + sql TEXT; +BEGIN + j_table := schema || '.' || j_table_name; + jm_table := schema || '.' || jm_table_name; + cols := jm_persistence_id_column || ', ' || jm_max_sequence_number_column || ', ' || jm_max_ordering_column || ', ' || jm_min_ordering_column; + vals := '($1).' || j_persistence_id_column || ', ($1).' || j_sequence_number_column || ', ($1).' || j_ordering_column || + ', CASE WHEN ($1).' || j_sequence_number_column || ' = ' || first_sequence_number_value || ' THEN ($1).' || j_ordering_column || ' ELSE ' || unset_min_ordering_value || ' END'; + upds := jm_max_sequence_number_column || ' = GREATEST(' || jm_table || '.' || jm_max_sequence_number_column || ', ($1).' || j_sequence_number_column || '), ' || + jm_max_ordering_column || ' = GREATEST(' || jm_table || '.' || jm_max_ordering_column || ', ($1).' || j_ordering_column || ')'; + + sql := 'INSERT INTO ' || jm_table || ' (' || cols || ') VALUES (' || vals || ') ' || + 'ON CONFLICT (' || jm_persistence_id_column || ') DO UPDATE SET ' || upds; + + EXECUTE sql USING NEW; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/scripts/migration-0.6.0/3-create-trigger-update-journal-metadata.sql b/scripts/migration-0.6.0/3-create-trigger-update-journal-metadata.sql new file mode 100644 index 00000000..23bee52a --- /dev/null +++ b/scripts/migration-0.6.0/3-create-trigger-update-journal-metadata.sql @@ -0,0 +1,19 @@ +DO $$ +DECLARE + -- replace with appropriate values + schema CONSTANT TEXT := 'public'; + j_table_name CONSTANT TEXT := 'journal'; + + -- variables + j_table TEXT; + sql TEXT; +BEGIN + j_table := schema || '.' || j_table_name; + + sql := 'CREATE TRIGGER trig_update_journal_metadata + AFTER INSERT ON ' || j_table || ' FOR EACH ROW + EXECUTE PROCEDURE ' || schema || '.update_journal_metadata()'; + + EXECUTE sql; +END; +$$ LANGUAGE plpgsql; diff --git a/scripts/migration-0.6.0/4-populate-journal-metadata.sql b/scripts/migration-0.6.0/4-populate-journal-metadata.sql new file mode 100644 index 00000000..35e6d61f --- /dev/null +++ b/scripts/migration-0.6.0/4-populate-journal-metadata.sql @@ -0,0 +1,39 @@ +/* +ATTENTION: This is a simplistic migration, which is not prepared to handle a large number of rows. +If that is your situation, please consider running some kind of batched ad-hoc program that will read the journal, +compute the necessary values and then insert them to the journal metadata table. + +When you upgrade to the 0.6.x series, the crucial part is adding the metadata insert trigger, which will take care of all new events, +meaning that it is totally safe to solve the back filling of data in a ad-hoc manner. +*/ +DO $$ +DECLARE + -- replace with appropriate values + schema CONSTANT TEXT := 'public'; + j_table_name CONSTANT TEXT := 'journal'; + j_persistence_id_column CONSTANT TEXT := 'persistence_id'; + j_sequence_number_column CONSTANT TEXT := 'sequence_number'; + j_ordering_column CONSTANT TEXT := 'ordering'; + jpi_table_name CONSTANT TEXT := 'journal_persistence_ids'; + jpi_max_sequence_number_column CONSTANT TEXT := 'max_sequence_number'; + jpi_max_ordering_column CONSTANT TEXT := 'max_ordering'; + jpi_min_ordering_column CONSTANT TEXT := 'min_ordering'; + + -- variables + j_table TEXT; + jpi_table TEXT; + sql TEXT; +BEGIN + j_table := schema || '.' || j_table_name; + jpi_table := schema || '.' || jpi_table_name; + sql := 'INSERT INTO ' || jpi_table || + ' SELECT ' || + j_persistence_id_column || ', ' || + 'max(' || j_sequence_number_column || '), ' || + 'max(' || j_ordering_column || '), ' || + 'min(' || j_ordering_column || ')' || + ' FROM ' || j_table || ' GROUP BY ' || j_persistence_id_column; + + EXECUTE sql; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file