Skip to content

Commit

Permalink
Make usage of journal metadata optional through configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagomota committed Jul 5, 2023
1 parent 9cc898b commit fc6d492
Show file tree
Hide file tree
Showing 25 changed files with 295 additions and 144 deletions.
45 changes: 42 additions & 3 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,17 @@ postgres-journal {
metadata = "metadata"
}
}

# Used to hold journal information that can be used to speed up queries
journalMetadata {
tableName = "journal_metadata"
schemaName = ""
columnNames = {
persistenceId = "persistence_id"
maxSequenceNumber = "max_sequence_number"
maxOrdering = "max_ordering"
minOrdering = "min_ordering"
}
}
tags {
tableName = "tags"
schameName = ""
Expand Down Expand Up @@ -176,6 +186,14 @@ postgres-journal {
# to the same value for these other journals.
use-shared-db = null

# This setting can be used to enable the usage of the data being stored
# at the journal_metadata table, in order to speed up some queries that would
# solely use the journal table.
# In case the metadata table does not hold the required information (not available yet),
# the logic fallback to the journal-only queries.
# This setting is disabled by default.
use-journal-metadata = false

slick {

db {
Expand Down Expand Up @@ -358,7 +376,18 @@ postgres-read-journal {
# to the same value for these other journals.
use-shared-db = null

dao = "akka.persistence.postgres.query.dao.ByteArrayReadJournalDao"
# This setting can be used to enable the usage of the data being stored
# at the journal_metadata table, in order to speed up some queries that would
# solely use the journal table.
# In case the metadata table does not hold the required information (not available yet),
# the logic fallback to the journal-only queries.
# This setting is disabled by default.
use-journal-metadata = false


# Replace with "akka.persistence.postgres.query.dao.PartitionedJournalDao" in order to leverage dedicated queries to
# partitioned journal.
dao = "akka.persistence.postgres.query.dao.FlatReadJournalDao"

# Confguration for akka.persistence.postgres.tag.TagIdResolver
tags {
Expand Down Expand Up @@ -402,7 +431,17 @@ postgres-read-journal {
message = "message"
}
}

# Used to hold journal information that can be used to speed up queries
journalMetadata {
tableName = "journal_metadata"
schemaName = ""
columnNames = {
persistenceId = "persistence_id"
maxSequenceNumber = "max_sequence_number"
maxOrdering = "max_ordering"
minOrdering = "min_ordering"
}
}
tags {
tableName = "tags"
schameName = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.concurrent.duration._

object ConfigKeys {
val useSharedDb = "use-shared-db"
val useJournalMetadata = "use-journal-metadata"
}

class SlickConfiguration(config: Config) {
Expand Down Expand Up @@ -148,8 +149,10 @@ class JournalConfig(config: Config) {
val tagsConfig = new TagsConfig(config)
val tagsTableConfiguration = new TagsTableConfiguration(config)
val useSharedDb: Option[String] = config.asOptionalNonEmptyString(ConfigKeys.useSharedDb)
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)

override def toString: String =
s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb)"
s"JournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$tagsConfig,$partitionsConfig,$useSharedDb,$useJournalMetadata)"
}

class SnapshotConfig(config: Config) {
Expand Down Expand Up @@ -186,7 +189,8 @@ class ReadJournalConfig(config: Config) {
val maxBufferSize: Int = config.as[String]("max-buffer-size", "500").toInt
val addShutdownHook: Boolean = config.asBoolean("add-shutdown-hook", true)
val includeDeleted: Boolean = config.as[Boolean]("includeLogicallyDeleted", true)
val useJournalMetadata: Boolean = config.asBoolean(ConfigKeys.useJournalMetadata, false)

override def toString: String =
s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)"
s"ReadJournalConfig($journalTableConfiguration,$journalMetadataTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted,$useJournalMetadata)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
import akka.{ Done, NotUsed }
import org.slf4j.{ Logger, LoggerFactory }
import slick.dbio.DBIOAction
import slick.jdbc.JdbcBackend._

import scala.collection.immutable._
Expand All @@ -39,6 +40,9 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW

val logger: Logger = LoggerFactory.getLogger(this.getClass)

lazy val metadataQueries: JournalMetadataQueries = new JournalMetadataQueries(
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))

// This logging may block since we don't control how the user will configure logback
// We can't use a Akka logging neither because we don't have an ActorSystem in scope and
// we should not introduce another dependency here.
Expand Down Expand Up @@ -138,15 +142,20 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
db.run(queries.highestStoredSequenceNrForPersistenceId(persistenceId).result.headOption).flatMap {
case Some(maxSequenceNr) =>
// journal_metadata has the max sequence nr stored
Future.successful(maxSequenceNr)
case None =>
// journal_metadata has yet to store the max sequence number to this persistenceId
db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result)
.map(_.getOrElse(0L)) // Default to 0L when nothing is found for this persistenceId
}
val query = if (journalConfig.useJournalMetadata) {
metadataQueries.highestSequenceNrForPersistenceId(persistenceId).result.headOption.flatMap {
case Some(maxSequenceNr) =>
// return the stored max sequence nr on journal metadata table
DBIOAction.successful(Some(maxSequenceNr))
case None =>
// journal metadata do not have information for this persistenceId -> fallback to standard behaviour
queries.highestSequenceNrForPersistenceId(persistenceId).result
}
} else
queries.highestSequenceNrForPersistenceId(persistenceId).result

// Default to 0L when nothing is found for this persistenceId
db.run(query).map(_.getOrElse(0L))
}

override def messages(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ class FlatJournalDao(val db: Database, val journalConfig: JournalConfig, seriali
implicit val ec: ExecutionContext,
val mat: Materializer)
extends BaseByteArrayJournalDao {
val queries = new JournalQueries(
FlatJournalTable(journalConfig.journalTableConfiguration),
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))
val queries = new JournalQueries(FlatJournalTable(journalConfig.journalTableConfiguration))
val tagDao = new SimpleTagDao(db, journalConfig.tagsTableConfiguration)
val eventTagConverter = new CachedTagIdResolver(tagDao, journalConfig.tagsConfig)
val serializer = new ByteArrayJournalSerializer(serialization, eventTagConverter)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.persistence.postgres.journal.dao

import slick.lifted.TableQuery

class JournalMetadataQueries(journalMetadataTable: TableQuery[JournalMetadataTable]) {
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = {
journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1)
}

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

private def _minAndMaxOrderingForPersistenceId(
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))

val minAndMaxOrderingForPersistenceId = Compiled(_minAndMaxOrderingForPersistenceId _)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import io.circe.Json
import slick.lifted.TableQuery
import slick.sql.FixedSqlAction

class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTable: TableQuery[JournalMetadataTable]) {
class JournalQueries(journalTable: TableQuery[JournalTable]) {

import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

Expand Down Expand Up @@ -51,17 +51,11 @@ class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTabl
private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
journalTable.filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max

private def _highestStoredSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = {
journalMetadataTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1)
}

private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] =
journalTable.filter(_.deleted === true).filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

val highestStoredSequenceNrForPersistenceId = Compiled(_highestStoredSequenceNrForPersistenceId _)

val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)

private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
Expand All @@ -74,12 +68,6 @@ class JournalQueries(journalTable: TableQuery[JournalTable], journalMetadataTabl

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct)

private def _minAndMaxOrderingStoredForPersistenceId(
persistenceId: Rep[String]): Query[(Rep[Long], Rep[Long]), (Long, Long), Seq] =
journalMetadataTable.filter(_.persistenceId === persistenceId).take(1).map(r => (r.minOrdering, r.maxOrdering))

val minAndMaxOrderingStoredForPersistenceId = Compiled(_minAndMaxOrderingStoredForPersistenceId _)

private def _messagesQuery(
persistenceId: Rep[String],
fromSequenceNr: Rep[Long],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ class NestedPartitionsJournalDao(db: Database, journalConfig: JournalConfig, ser
implicit ec: ExecutionContext,
mat: Materializer)
extends FlatJournalDao(db, journalConfig, serialization) {
override val queries = new JournalQueries(
NestedPartitionsJournalTable(journalConfig.journalTableConfiguration),
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))
override val queries = new JournalQueries(NestedPartitionsJournalTable(journalConfig.journalTableConfiguration))
private val journalTableCfg = journalConfig.journalTableConfiguration
private val partitionSize = journalConfig.partitionsConfig.size
private val partitionPrefix = journalConfig.partitionsConfig.prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ
implicit ec: ExecutionContext,
mat: Materializer)
extends FlatJournalDao(db, journalConfig, serialization) {
override val queries = new JournalQueries(
PartitionedJournalTable(journalConfig.journalTableConfiguration),
JournalMetadataTable(journalConfig.journalMetadataTableConfiguration))
override val queries = new JournalQueries(PartitionedJournalTable(journalConfig.journalTableConfiguration))
private val journalTableCfg = journalConfig.journalTableConfiguration
private val partitionSize = journalConfig.partitionsConfig.size
private val partitionPrefix = journalConfig.partitionsConfig.prefix
Expand Down Expand Up @@ -96,17 +94,23 @@ class PartitionedJournalDao(db: Database, journalConfig: JournalConfig, serializ
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
// Query the metadata table to get the known min and max ordering a persistence_id has,
// so that the postgres query planner might immediately discard scanning unnecessary partitions
val messagesQuery = queries.minAndMaxOrderingStoredForPersistenceId(persistenceId).result.headOption.flatMap {
case Some((minOrdering, maxOrdering)) =>
queries
.messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering)
.result
case None =>
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result
}

Source.fromPublisher(db.stream(messagesQuery)).via(serializer.deserializeFlow)
// This behaviour override is only applied here, because it is only useful on the PartitionedJournal strategy.
val query = if (journalConfig.useJournalMetadata) {
metadataQueries.minAndMaxOrderingForPersistenceId(persistenceId).result.headOption.flatMap {
case Some((minOrdering, maxOrdering)) =>
// if journal_metadata knows the min and max ordering of a persistenceId,
// use them to help the query planner to avoid scanning unnecessary partitions.
queries
.messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering)
.result
case None =>
// fallback to standard behaviour
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result
}
} else
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result

Source.fromPublisher(db.stream(query)).via(serializer.deserializeFlow)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ package query.dao
import akka.NotUsed
import akka.persistence.PersistentRepr
import akka.persistence.postgres.config.ReadJournalConfig
import akka.persistence.postgres.journal.dao.{ BaseJournalDaoWithReadMessages, ByteArrayJournalSerializer }
import akka.persistence.postgres.journal.dao.{
BaseJournalDaoWithReadMessages,
ByteArrayJournalSerializer,
JournalMetadataTable
}
import akka.persistence.postgres.serialization.FlowPersistentReprSerializer
import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver }
import akka.serialization.Serialization
Expand Down Expand Up @@ -63,44 +67,3 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
db.run(queries.maxOrdering.result)
}
}

class ByteArrayReadJournalDao(
val db: Database,
val readJournalConfig: ReadJournalConfig,
serialization: Serialization,
val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer)
extends BaseByteArrayReadJournalDao {
val queries = new ReadJournalQueries(readJournalConfig)
val serializer = new ByteArrayJournalSerializer(
serialization,
new CachedTagIdResolver(
new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration),
readJournalConfig.tagsConfig))
}

class PartitionedReadJournalDao(
db: Database,
readJournalConfig: ReadJournalConfig,
serialization: Serialization,
tagIdResolver: TagIdResolver)(implicit ec: ExecutionContext, mat: Materializer)
extends ByteArrayReadJournalDao(db, readJournalConfig, serialization, tagIdResolver) {

import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

override def messages(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long,
max: Long): Source[Try[(PersistentRepr, Long)], NotUsed] = {
val messagesQuery = queries.minAndMaxOrderingStoredForPersistenceId(persistenceId).result.headOption.flatMap {
case Some((minOrdering, maxOrdering)) =>
queries
.messagesOrderingBoundedQuery(persistenceId, fromSequenceNr, toSequenceNr, max, minOrdering, maxOrdering)
.result
case None =>
queries.messagesQuery(persistenceId, fromSequenceNr, toSequenceNr, max).result
}

Source.fromPublisher(db.stream(messagesQuery)).via(serializer.deserializeFlow)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package akka.persistence.postgres.query.dao

import akka.persistence.postgres.config.ReadJournalConfig
import akka.persistence.postgres.journal.dao.ByteArrayJournalSerializer
import akka.persistence.postgres.tag.{ CachedTagIdResolver, SimpleTagDao, TagIdResolver }
import akka.serialization.Serialization
import akka.stream.Materializer
import slick.jdbc.JdbcBackend.Database

import scala.concurrent.ExecutionContext

class FlatReadJournalDao(
val db: Database,
val readJournalConfig: ReadJournalConfig,
serialization: Serialization,
val tagIdResolver: TagIdResolver)(implicit val ec: ExecutionContext, val mat: Materializer)
extends BaseByteArrayReadJournalDao {
val queries = new ReadJournalQueries(readJournalConfig)
val serializer = new ByteArrayJournalSerializer(
serialization,
new CachedTagIdResolver(
new SimpleTagDao(db, readJournalConfig.tagsTableConfiguration),
readJournalConfig.tagsConfig))
}
Loading

0 comments on commit fc6d492

Please sign in to comment.