From 0e95d77fd04e0e9c0e85aed0eae1ec29426297d1 Mon Sep 17 00:00:00 2001 From: Tiago Mota Date: Wed, 13 Oct 2021 15:05:32 +0100 Subject: [PATCH] Keep highestSequenceNrForPersistenceId journal query and comment the new one which uses journal_persistence_ids table --- .../postgres/journal/dao/BaseByteArrayJournalDao.scala | 2 +- .../akka/persistence/postgres/journal/dao/JournalQueries.scala | 3 ++- .../persistence/postgres/journal/dao/JournalQueriesTest.scala | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala index ed0ae9f6..e776619a 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/BaseByteArrayJournalDao.scala @@ -139,7 +139,7 @@ trait BaseByteArrayJournalDao extends JournalDaoWithUpdates with BaseJournalDaoW override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = for { - maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result.headOption) + maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result) //.headOption) } yield maybeHighestSeqNo.getOrElse(0L) override def messages( diff --git a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala index 5359ce5a..f7a90e95 100644 --- a/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/postgres/journal/dao/JournalQueries.scala @@ -48,7 +48,8 @@ class JournalQueries( .update(true) private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]) = { - journalPersistenceIdsTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1) + journalTable.filter(_.persistenceId === persistenceId).map(_.sequenceNumber).max + // journalPersistenceIdsTable.filter(_.persistenceId === persistenceId).map(_.maxSequenceNumber).take(1) } private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Rep[Option[Long]] = diff --git a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala index c3cc7d64..7a334223 100644 --- a/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala +++ b/core/src/test/scala/akka/persistence/postgres/journal/dao/JournalQueriesTest.scala @@ -13,7 +13,8 @@ class JournalQueriesTest extends BaseQueryTest { it should "create SQL query for highestSequenceNrForPersistenceId" in withJournalQueries { queries => queries.highestSequenceNrForPersistenceId( - "aaa") shouldBeSQL """select "max_sequence_number" from "journal_persistence_ids" where "persistence_id" = ? limit 1""" + "aaa") shouldBeSQL """select max("sequence_number") from "journal" where "persistence_id" = ?""" + // queries.highestSequenceNrForPersistenceId("aaa") shouldBeSQL """select "max_sequence_number" from "journal_persistence_ids" where "persistence_id" = ? limit 1""" } it should "create SQL query for messagesQuery" in withJournalQueries { queries =>