Skip to content

Commit

Permalink
Merge pull request #1718 from tursodatabase/schmema-db-replica-regres…
Browse files Browse the repository at this point in the history
…sion

Fix schema db bug with replicas
  • Loading branch information
MarinPostma authored Sep 2, 2024
2 parents d8a4e40 + f0c92ca commit 73a2913
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 68 deletions.
12 changes: 9 additions & 3 deletions libsql-server/src/connection/connection_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,15 @@ mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
conn.execute_program(
Program::seq(&["CREATE TABLE test (x)"]),
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ where
&self.path,
meta_conn,
meta_store_wal_manager,
db_kind,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/namespace/configurator/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl ReplicaConfigurator {
}

impl ConfigureNamespace for ReplicaConfigurator {
#[tracing::instrument(skip_all, fields(name))]
fn setup<'a>(
&'a self,
meta_store_handle: MetaStoreHandle,
Expand Down
17 changes: 12 additions & 5 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio::sync::{

use crate::config::BottomlessConfig;
use crate::connection::config::DatabaseConfig;
use crate::database::DatabaseKind;
use crate::schema::{MigrationDetails, MigrationSummary};
use crate::{
config::MetaStoreConfig, connection::legacy::open_conn_active_checkpoint, error::Error, Result,
Expand Down Expand Up @@ -74,6 +75,7 @@ struct MetaStoreInner {
configs: tokio::sync::Mutex<HashMap<NamespaceName, Sender<InnerConfig>>>,
conn: tokio::sync::Mutex<MetaStoreConnection>,
wal_manager: MetaStoreWalManager,
db_kind: DatabaseKind,
}

fn setup_connection(conn: &rusqlite::Connection) -> Result<()> {
Expand Down Expand Up @@ -182,13 +184,15 @@ impl MetaStoreInner {
conn: MetaStoreConnection,
wal_manager: MetaStoreWalManager,
config: MetaStoreConfig,
db_kind: DatabaseKind,
) -> Result<Self> {
setup_connection(&conn)?;

let mut this = MetaStoreInner {
configs: Default::default(),
conn: conn.into(),
wal_manager,
db_kind,
};

if config.allow_recover_from_fs {
Expand Down Expand Up @@ -350,9 +354,11 @@ fn try_process(
let mut conn = inner.conn.blocking_lock();
if let Some(schema) = config.shared_schema_name.as_ref() {
let tx = conn.transaction()?;
if let Some(ref schema) = config.shared_schema_name {
if crate::schema::db::has_pending_migration_jobs(&tx, schema)? {
return Err(crate::Error::PendingMigrationOnSchema(schema.clone()));
if inner.db_kind.is_primary() {
if let Some(ref schema) = config.shared_schema_name {
if crate::schema::db::has_pending_migration_jobs(&tx, schema)? {
return Err(crate::Error::PendingMigrationOnSchema(schema.clone()));
}
}
}
tx.execute(
Expand Down Expand Up @@ -394,6 +400,7 @@ impl MetaStore {
base_path: &Path,
conn: MetaStoreConnection,
wal_manager: MetaStoreWalManager,
db_kind: DatabaseKind,
) -> Result<Self> {
let (changes_tx, mut changes_rx) = mpsc::channel(256);

Expand All @@ -402,7 +409,7 @@ impl MetaStore {
let maybe_inner = tokio::task::spawn_blocking({
let base_path = base_path.to_owned();
let config = config.clone();
move || MetaStoreInner::new(&base_path, conn, wal_manager, config.clone())
move || MetaStoreInner::new(&base_path, conn, wal_manager, config.clone(), db_kind)
})
.await
.unwrap();
Expand Down Expand Up @@ -446,7 +453,7 @@ impl MetaStore {

let inner = tokio::task::spawn_blocking({
let base_path = base_path.to_owned();
move || MetaStoreInner::new(&base_path, conn, wal, config)
move || MetaStoreInner::new(&base_path, conn, wal, config, db_kind)
})
.await
.unwrap()?;
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl ReplicatorClient for Client {
.await
.map_err(|e| Status::new(Code::Internal, e.to_string()))?;
}

self.meta_store_handle
.store(DatabaseConfig::from(config))
.await
Expand Down
96 changes: 72 additions & 24 deletions libsql-server/src/rpc/streaming_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::Anonymous,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));
pin!(stream);
Expand All @@ -422,9 +428,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -444,9 +456,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -468,9 +486,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
// limit the size of the response to force a split
let stream = make_proxy_stream_inner(conn, ctx, ReceiverStream::new(rcv), 500);
Expand Down Expand Up @@ -525,9 +549,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -552,9 +582,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -579,9 +615,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -608,9 +650,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand Down
Loading

0 comments on commit 73a2913

Please sign in to comment.