Skip to content

Commit

Permalink
Revert usage of journal_persistence_ids on other queries besides high…
Browse files Browse the repository at this point in the history
…estSequenceNrForPersistenceId
  • Loading branch information
tiagomota committed Oct 13, 2021
1 parent 7354754 commit b1e97ff
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWith
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

override def allPersistenceIdsSource(max: Long): Source[String, NotUsed] =
Source.fromPublisher(db.stream(queries.allPersistenceIds(max).result))
Source.fromPublisher(db.stream(queries.allPersistenceIdsDistinct(max).result))

override def eventsByTag(
tag: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,11 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
import akka.persistence.postgres.db.ExtendedPostgresProfile.api._

private val journalTable: TableQuery[JournalTable] = FlatJournalTable(readJournalConfig.journalTableConfiguration)
private val journalPersistenceIdsTable: TableQuery[JournalPersistenceIdsTable] = JournalPersistenceIdsTable(
readJournalConfig.journalPersistenceIdsTableConfiguration)

private def _allPersistenceIds(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
if (readJournalConfig.includeDeleted)
journalPersistenceIdsTable.map(_.persistenceId).take(max)
else
journalPersistenceIdsTable
.joinLeft(journalTable.filter(_.deleted === false))
.on(_.persistenceId === _.persistenceId)
.filter(_._2.isDefined)
.map(_._1.persistenceId)
.take(max)
private def _allPersistenceIdsDistinct(max: ConstColumn[Long]): Query[Rep[String], String, Seq] =
baseTableQuery().map(_.persistenceId).distinct.take(max)

val allPersistenceIds = Compiled(_allPersistenceIds _)
val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _)

private def baseTableQuery() =
if (readJournalConfig.includeDeleted) journalTable
Expand Down Expand Up @@ -65,6 +55,6 @@ class ReadJournalQueries(val readJournalConfig: ReadJournalConfig) {
val orderingByOrdering = Compiled(_journalSequenceQuery _)

val maxOrdering = Compiled {
journalPersistenceIdsTable.map(_.maxOrdering).max.getOrElse(0L)
journalTable.map(_.ordering).max.getOrElse(0L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import akka.persistence.postgres.util.BaseQueryTest
class ReadJournalQueriesTest extends BaseQueryTest {

it should "create SQL query for allPersistenceIdsDistinct" in withReadJournalQueries { queries =>
queries.allPersistenceIds(23L) shouldBeSQL """select "persistence_id" from "journal_persistence_ids" limit ?"""
queries.allPersistenceIdsDistinct(23L) shouldBeSQL """select distinct "persistence_id" from "journal" limit ?"""
}

it should "create SQL query for messagesQuery" in withReadJournalQueries { queries =>
Expand All @@ -30,7 +30,7 @@ class ReadJournalQueriesTest extends BaseQueryTest {
}

it should "create SQL query for maxJournalSequenceQuery" in withReadJournalQueries { queries =>
queries.maxOrdering shouldBeSQL """select max("max_ordering") from "journal_persistence_ids""""
queries.maxOrdering shouldBeSQL """select max("ordering") from "journal""""
}

private def withReadJournalQueries(f: ReadJournalQueries => Unit): Unit = {
Expand Down

0 comments on commit b1e97ff

Please sign in to comment.