Skip to content

Commit

Permalink
force checkpoint explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
sivukhin committed Aug 26, 2024
1 parent 6c1ea12 commit 0514c4b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 22 deletions.
24 changes: 3 additions & 21 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1677,28 +1677,10 @@ impl Replicator {
}
// drop of injector will cause drop&close of last DB connection which will perform final
// WAL checkpoint of the DB
injector
.checkpoint()
.map_err(|e| anyhow!("unable to apply WAL after restore procedure: {e}"))?;
drop(injector);

let db_path_str = db_path
.to_str()
.ok_or(anyhow!("failed to convert db path to string"))?;
let db_wal_file_path = format!("{}-wal", &db_path_str);
let db_wal_index_path = format!("{}-shm", &db_path_str);
let has_wal_file = tokio::fs::try_exists(&db_wal_file_path).await?;
let has_wal_index = tokio::fs::try_exists(&db_wal_index_path).await?;
if has_wal_file || has_wal_index {
// restore process was not finished successfully as WAL wasn't transferred completely
tracing::error!(
"WAL wasn't transferred completely during restoration: db_name={}, generation={}",
&self.db_name,
&generation
);
let _ = self
.remove_wal_files(&db_path_str)
.await
.inspect_err(|e| tracing::error!("unable to remove wal files: {}", e));
return Err(anyhow!("WAL wasn't transferred completely"));
}
Ok(applied_wal_frame)
}

Expand Down
15 changes: 14 additions & 1 deletion libsql-replication/src/injector/sqlite_injector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use std::{collections::VecDeque, path::PathBuf};

use parking_lot::Mutex;
use rusqlite::OpenFlags;
use rusqlite::{params, OpenFlags};
use tokio::task::spawn_blocking;

use crate::frame::{Frame, FrameNo};
Expand All @@ -25,6 +25,13 @@ pub struct SqliteInjector {
pub(in super::super) inner: Arc<Mutex<SqliteInjectorInner>>,
}

impl SqliteInjector {
pub fn checkpoint(&mut self) -> Result<()> {
let inner = self.inner.clone();
return inner.lock().checkpoint();
}
}

impl Injector for SqliteInjector {
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
let inner = self.inner.clone();
Expand Down Expand Up @@ -124,6 +131,12 @@ impl SqliteInjectorInner {
})
}

pub fn checkpoint(&mut self) -> Result<()> {
let conn = self.connection.lock();
let _ = conn.query_row("PRAGMA wal_checkpoint(TRUNCATE)", params![], |_| Ok(()))?;
Ok(())
}

/// Inject a frame into the log. If this was a commit frame, returns Ok(Some(FrameNo)).
pub fn inject_frame(&mut self, frame: Frame) -> Result<Option<FrameNo>, Error> {
let frame_close_txn = frame.header().size_after.get() != 0;
Expand Down

0 comments on commit 0514c4b

Please sign in to comment.