diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 7850dd4efc..7aa20200c7 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -1677,28 +1677,10 @@ impl Replicator { } // drop of injector will cause drop&close of last DB connection which will perform final // WAL checkpoint of the DB + injector + .checkpoint() + .map_err(|e| anyhow!("unable to apply WAL after restore procedure: {e}"))?; drop(injector); - - let db_path_str = db_path - .to_str() - .ok_or(anyhow!("failed to convert db path to string"))?; - let db_wal_file_path = format!("{}-wal", &db_path_str); - let db_wal_index_path = format!("{}-shm", &db_path_str); - let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?; - let has_wal_index = tokio::fs::try_exists(&db_wal_index_path).await?; - if has_wal_file || has_wal_index { - // restore process was not finished successfully as WAL wasn't transferred completely - tracing::error!( - "WAL wasn't transferred completely during restoration: db_name={}, generation={}", - &self.db_name, - &generation - ); - let _ = self - .remove_wal_files(&db_path_str) - .await - .inspect_err(|e| tracing::error!("unable to remove wal files: {}", e)); - return Err(anyhow!("WAL wasn't transferred completely")); - } Ok(applied_wal_frame) } diff --git a/libsql-replication/src/injector/sqlite_injector/mod.rs b/libsql-replication/src/injector/sqlite_injector/mod.rs index f6ce2aa89f..25669218fd 100644 --- a/libsql-replication/src/injector/sqlite_injector/mod.rs +++ b/libsql-replication/src/injector/sqlite_injector/mod.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::{collections::VecDeque, path::PathBuf}; use parking_lot::Mutex; -use rusqlite::OpenFlags; +use rusqlite::{params, OpenFlags}; use tokio::task::spawn_blocking; use crate::frame::{Frame, FrameNo}; @@ -25,6 +25,13 @@ pub struct SqliteInjector { pub(in super::super) inner: Arc>, } +impl SqliteInjector { + pub fn checkpoint(&mut self) -> Result<()> { + let inner = self.inner.clone(); + return inner.lock().checkpoint(); + } +} + impl Injector for SqliteInjector { async fn inject_frame(&mut self, frame: RpcFrame) -> Result> { let inner = self.inner.clone(); @@ -124,6 +131,12 @@ impl SqliteInjectorInner { }) } + pub fn checkpoint(&mut self) -> Result<()> { + let conn = self.connection.lock(); + let _ = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", params![], |_| Ok(()))?; + Ok(()) + } + /// Inject a frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)). pub fn inject_frame(&mut self, frame: Frame) -> Result, Error> { let frame_close_txn = frame.header().size_after.get() != 0;