Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsql wal encryption #1751

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion libsql-server/src/namespace/configurator/libsql_primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub(super) async fn libsql_primary_common(
encryption_config: base_config.encryption_config.clone(),
block_writes: block_writes.clone(),
resolve_attach_path,
wal_manager: LibsqlWalManager::new(registry.clone(), namespace_resolver.clone()),
wal_manager: LibsqlWalManager::new(registry.clone(), namespace_resolver.clone(), Some("megasecret".to_string().into())),
}),
}
.throttled(
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/namespace/configurator/libsql_replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl ConfigureNamespace for LibsqlReplicaConfigurator {
wal_manager: LibsqlWalManager::new(
self.registry.clone(),
self.namespace_resolver.clone(),
Some("megasecret".to_string().into())
),
}),
};
Expand Down
6 changes: 6 additions & 0 deletions libsql-wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] }
petgraph = "0.6.5"
anyhow = { version = "1.0.86", optional = true }

# feat: encryption
aes = { version = "0.8.4" }
cbc = { version = "0.1.2" }
hmac = { version = "0.12.1" }
sha2 = { version = "0.10.8" }

[dev-dependencies]
criterion = "0.5.1"
hex = "0.4.3"
Expand Down
10 changes: 10 additions & 0 deletions libsql-wal/src/encryption.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use aes::Aes256;

use crate::LIBSQL_PAGE_SIZE;

#[derive(Debug)]
pub struct EncryptionConfig {
pub decryptor: cbc::Decryptor<Aes256>,
pub encryptor: cbc::Encryptor<Aes256>,
pub scratch: Box<[u8; LIBSQL_PAGE_SIZE as usize]>
}
3 changes: 2 additions & 1 deletion libsql-wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod shared_wal;
pub mod storage;
pub mod transaction;
pub mod wal;
mod encryption;

const LIBSQL_MAGIC: u64 = u64::from_be_bytes(*b"LIBSQL\0\0");
const LIBSQL_PAGE_SIZE: u16 = 4096;
Expand Down Expand Up @@ -124,7 +125,7 @@ pub mod test {
tokio::spawn(checkpointer.run());
}

let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver), None);

Self { tmp, registry, wal }
}
Expand Down
14 changes: 13 additions & 1 deletion libsql-wal/src/segment/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::sync::{
Arc,
};

use aes::cipher::block_padding::NoPadding;
use aes::cipher::BlockEncryptMut;
use chrono::{DateTime, Utc};
use crossbeam_skiplist::SkipMap;
use fst::MapBuilder;
Expand All @@ -24,6 +26,7 @@ use crate::io::file::FileExt;
use crate::io::Inspect;
use crate::segment::{checked_frame_offset, SegmentFlags};
use crate::segment::{frame_offset, page_offset, sealed::SealedSegment};
use crate::shared_wal::Context;
use crate::transaction::{Transaction, TxGuardOwned, TxGuardShared};
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};

Expand Down Expand Up @@ -228,12 +231,13 @@ impl<F> CurrentSegment<F> {
}
}

#[tracing::instrument(skip(self, pages, tx))]
#[tracing::instrument(skip_all)]
pub fn insert_pages<'a>(
&self,
pages: impl Iterator<Item = (u32, &'a [u8])>,
size_after: Option<u32>,
tx: &mut TxGuardShared<F>,
ctx: &mut Context,
) -> Result<Option<u64>>
where
F: FileExt,
Expand All @@ -245,6 +249,14 @@ impl<F> CurrentSegment<F> {
// let mut commit_frame_written = false;
let current_savepoint = tx.savepoints.last_mut().expect("no savepoints initialized");
while let Some((page_no, page)) = pages.next() {
let page = match ctx.encryption {
Some(ref mut crypto) if page_no != 1 => {
crypto.encryptor.clone().encrypt_padded_b2b_mut::<NoPadding>(page, crypto.scratch.as_mut_slice()).unwrap();
crypto.scratch.as_slice()
},
_ => page,
};

// optim: if the page is already present, overwrite its content
if let Some(offset) = current_savepoint.index.get(&page_no) {
tracing::trace!(page_no, "recycling frame");
Expand Down
5 changes: 3 additions & 2 deletions libsql-wal/src/segment/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ mod test {
use tempfile::{tempfile, NamedTempFile};
use tokio_stream::StreamExt as _;

use crate::test::{seal_current_segment, TestEnv};
use crate::{shared_wal::Context, test::{seal_current_segment, TestEnv}};

use super::*;

Expand Down Expand Up @@ -442,11 +442,12 @@ mod test {

let mut file = NamedTempFile::new().unwrap();
let mut tx = shared.begin_read(999999).into();
let ctx = Context::default();
while let Some(frame) = stream.next().await {
let frame = frame.unwrap();
let mut buffer = [0; 4096];
shared
.read_page(&mut tx, frame.header.page_no(), &mut buffer)
.read_page(&mut tx, frame.header.page_no(), &mut buffer, &ctx)
.unwrap();
assert_eq!(buffer, frame.data());
file.write_all(frame.data()).unwrap();
Expand Down
21 changes: 19 additions & 2 deletions libsql-wal/src/shared_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;

use aes::cipher::block_padding::NoPadding;
use aes::cipher::BlockDecryptMut;
use arc_swap::ArcSwap;
use crossbeam::deque::Injector;
use crossbeam::sync::Unparker;
Expand All @@ -11,6 +13,7 @@ use tokio::sync::{mpsc, watch};
use uuid::Uuid;

use crate::checkpointer::CheckpointMessage;
use crate::encryption::EncryptionConfig;
use crate::error::{Error, Result};
use crate::io::file::FileExt;
use crate::io::Io;
Expand All @@ -20,6 +23,12 @@ use crate::segment_swap_strategy::SegmentSwapStrategy;
use crate::transaction::{ReadTransaction, Savepoint, Transaction, TxGuard, WriteTransaction};
use libsql_sys::name::NamespaceName;

/// addtional context passed during wal operation
#[derive(Debug, Default)]
pub struct Context {
pub encryption: Option<EncryptionConfig>,
}

#[derive(Default)]
pub struct WalLock {
pub(crate) tx_id: Arc<async_lock::Mutex<Option<u64>>>,
Expand Down Expand Up @@ -220,12 +229,13 @@ impl<IO: Io> SharedWal<IO> {
})
}

#[tracing::instrument(skip(self, tx, buffer))]
#[tracing::instrument(skip_all)]
pub fn read_page(
&self,
tx: &mut Transaction<IO::File>,
page_no: u32,
buffer: &mut [u8],
ctx: &Context,
) -> Result<()> {
match tx.current.find_frame(page_no, tx) {
Some(offset) => {
Expand Down Expand Up @@ -262,6 +272,12 @@ impl<IO: Io> SharedWal<IO> {
}
}

if let Some(ref enc) = ctx.encryption {
if page_no != 1 {
enc.decryptor.clone().decrypt_padded_mut::<NoPadding>(buffer).unwrap();
}
}

tx.pages_read += 1;

Ok(())
Expand All @@ -273,10 +289,11 @@ impl<IO: Io> SharedWal<IO> {
tx: &mut WriteTransaction<IO::File>,
pages: impl Iterator<Item = (u32, &'a [u8])>,
size_after: Option<u32>,
ctx: &mut Context,
) -> Result<()> {
let current = self.current.load();
let mut tx = tx.lock();
if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx)? {
if let Some(last_committed) = current.insert_pages(pages, size_after, &mut tx, ctx)? {
self.new_frame_notifier.send_replace(last_committed);
}

Expand Down
37 changes: 35 additions & 2 deletions libsql-wal/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@ use std::os::unix::prelude::OsStrExt;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use aes::cipher::KeyIvInit as _;
use aes::Aes256;
use hmac::{Hmac, Mac as _};
use libsql_sys::name::NamespaceResolver;
use libsql_sys::wal::{Wal, WalManager};
use sha2::Sha256;

use crate::encryption::EncryptionConfig;
use crate::io::Io;
use crate::registry::WalRegistry;
use crate::segment::sealed::SealedSegment;
use crate::shared_wal::SharedWal;
use crate::shared_wal::{Context, SharedWal};
use crate::storage::Storage;
use crate::transaction::Transaction;
use crate::LIBSQL_PAGE_SIZE;

pub struct LibsqlWalManager<IO: Io, S> {
registry: Arc<WalRegistry<IO, S>>,
next_conn_id: Arc<AtomicU64>,
namespace_resolver: Arc<dyn NamespaceResolver>,
secret: Option<Arc<str>>,
}

impl<IO: Io, S> Clone for LibsqlWalManager<IO, S> {
Expand All @@ -25,6 +32,7 @@ impl<IO: Io, S> Clone for LibsqlWalManager<IO, S> {
registry: self.registry.clone(),
next_conn_id: self.next_conn_id.clone(),
namespace_resolver: self.namespace_resolver.clone(),
secret: self.secret.clone(),
}
}
}
Expand All @@ -33,11 +41,13 @@ impl<FS: Io, S> LibsqlWalManager<FS, S> {
pub fn new(
registry: Arc<WalRegistry<FS, S>>,
namespace_resolver: Arc<dyn NamespaceResolver>,
secret: Option<Arc<str>>,
) -> Self {
Self {
registry,
next_conn_id: Default::default(),
namespace_resolver,
secret,
}
}
}
Expand All @@ -47,6 +57,7 @@ pub struct LibsqlWal<FS: Io> {
tx: Option<Transaction<FS::File>>,
shared: Arc<SharedWal<FS>>,
conn_id: u64,
context: Context,
}

impl<IO: Io, S: Storage<Segment = SealedSegment<IO::File>>> WalManager for LibsqlWalManager<IO, S> {
Expand Down Expand Up @@ -74,11 +85,32 @@ impl<IO: Io, S: Storage<Segment = SealedSegment<IO::File>>> WalManager for Libsq
let conn_id = self
.next_conn_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);

let context = match self.secret {
Some(ref secret) => {
let mut mac = Hmac::<Sha256>::new_from_slice(b"secret").unwrap();
mac.update(secret.as_bytes());
let key_h = mac.finalize().into_bytes();
let iv = [42u8; 16];
let encryptor = cbc::Encryptor::<Aes256>::new(&key_h.into(), &iv.into());
let decryptor = cbc::Decryptor::<Aes256>::new(&key_h.into(), &iv.into());
Context {
encryption: Some(EncryptionConfig {
decryptor,
encryptor,
scratch: Box::new([0; LIBSQL_PAGE_SIZE as usize]),
})
}
}
_ => Default::default(),
};

Ok(LibsqlWal {
last_read_frame_no: None,
tx: None,
shared,
conn_id,
context,
})
}

Expand Down Expand Up @@ -161,7 +193,7 @@ impl<FS: Io> Wal for LibsqlWal<FS> {
tracing::trace!(page_no, "reading frame");
let tx = self.tx.as_mut().unwrap();
self.shared
.read_page(tx, page_no.get(), buffer)
.read_page(tx, page_no.get(), buffer, &mut self.context)
.map_err(Into::into)?;
Ok(())
}
Expand Down Expand Up @@ -279,6 +311,7 @@ impl<FS: Io> Wal for LibsqlWal<FS> {
tx,
page_headers.iter(),
(size_after != 0).then_some(size_after),
&mut self.context,
)
.map_err(Into::into)?;
}
Expand Down
Loading