diff --git a/libsql-server/src/connection/connection_core.rs b/libsql-server/src/connection/connection_core.rs index f94f897c07..4dc31c2e5d 100644 --- a/libsql-server/src/connection/connection_core.rs +++ b/libsql-server/src/connection/connection_core.rs @@ -659,9 +659,15 @@ mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); conn.execute_program( Program::seq(&["CREATE TABLE test (x)"]), diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 971879bfd6..940126d209 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -570,6 +570,7 @@ where &self.path, meta_conn, meta_store_wal_manager, + db_kind, ) .await?; diff --git a/libsql-server/src/namespace/configurator/replica.rs b/libsql-server/src/namespace/configurator/replica.rs index 5a56313c3e..2215a55c34 100644 --- a/libsql-server/src/namespace/configurator/replica.rs +++ b/libsql-server/src/namespace/configurator/replica.rs @@ -49,6 +49,7 @@ impl ReplicaConfigurator { } impl ConfigureNamespace for ReplicaConfigurator { + #[tracing::instrument(skip_all, fields(name))] fn setup<'a>( &'a self, meta_store_handle: MetaStoreHandle, diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 2c7a104ae2..b9180f31f4 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -23,6 +23,7 @@ use tokio::sync::{ use crate::config::BottomlessConfig; use crate::connection::config::DatabaseConfig; +use crate::database::DatabaseKind; use crate::schema::{MigrationDetails, MigrationSummary}; use crate::{ config::MetaStoreConfig, connection::legacy::open_conn_active_checkpoint, error::Error, Result, @@ -74,6 +75,7 @@ struct MetaStoreInner { configs: tokio::sync::Mutex>>, conn: tokio::sync::Mutex, wal_manager: MetaStoreWalManager, + db_kind: DatabaseKind, } fn setup_connection(conn: &rusqlite::Connection) -> Result<()> { @@ -182,6 +184,7 @@ impl MetaStoreInner { conn: MetaStoreConnection, wal_manager: MetaStoreWalManager, config: MetaStoreConfig, + db_kind: DatabaseKind, ) -> Result { setup_connection(&conn)?; @@ -189,6 +192,7 @@ impl MetaStoreInner { configs: Default::default(), conn: conn.into(), wal_manager, + db_kind, }; if config.allow_recover_from_fs { @@ -350,9 +354,11 @@ fn try_process( let mut conn = inner.conn.blocking_lock(); if let Some(schema) = config.shared_schema_name.as_ref() { let tx = conn.transaction()?; - if let Some(ref schema) = config.shared_schema_name { - if crate::schema::db::has_pending_migration_jobs(&tx, schema)? { - return Err(crate::Error::PendingMigrationOnSchema(schema.clone())); + if inner.db_kind.is_primary() { + if let Some(ref schema) = config.shared_schema_name { + if crate::schema::db::has_pending_migration_jobs(&tx, schema)? { + return Err(crate::Error::PendingMigrationOnSchema(schema.clone())); + } } } tx.execute( @@ -394,6 +400,7 @@ impl MetaStore { base_path: &Path, conn: MetaStoreConnection, wal_manager: MetaStoreWalManager, + db_kind: DatabaseKind, ) -> Result { let (changes_tx, mut changes_rx) = mpsc::channel(256); @@ -402,7 +409,7 @@ impl MetaStore { let maybe_inner = tokio::task::spawn_blocking({ let base_path = base_path.to_owned(); let config = config.clone(); - move || MetaStoreInner::new(&base_path, conn, wal_manager, config.clone()) + move || MetaStoreInner::new(&base_path, conn, wal_manager, config.clone(), db_kind) }) .await .unwrap(); @@ -446,7 +453,7 @@ impl MetaStore { let inner = tokio::task::spawn_blocking({ let base_path = base_path.to_owned(); - move || MetaStoreInner::new(&base_path, conn, wal, config) + move || MetaStoreInner::new(&base_path, conn, wal, config, db_kind) }) .await .unwrap()?; diff --git a/libsql-server/src/replication/replicator_client.rs b/libsql-server/src/replication/replicator_client.rs index 753baac996..70cdc58a49 100644 --- a/libsql-server/src/replication/replicator_client.rs +++ b/libsql-server/src/replication/replicator_client.rs @@ -121,6 +121,7 @@ impl ReplicatorClient for Client { .await .map_err(|e| Status::new(Code::Internal, e.to_string()))?; } + self.meta_store_handle .store(DatabaseConfig::from(config)) .await diff --git a/libsql-server/src/rpc/streaming_exec.rs b/libsql-server/src/rpc/streaming_exec.rs index 87a24b851a..5c969ddf4b 100644 --- a/libsql-server/src/rpc/streaming_exec.rs +++ b/libsql-server/src/rpc/streaming_exec.rs @@ -396,9 +396,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::Anonymous, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); pin!(stream); @@ -422,9 +428,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); @@ -444,9 +456,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); @@ -468,9 +486,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); // limit the size of the response to force a split let stream = make_proxy_stream_inner(conn, ctx, ReceiverStream::new(rcv), 500); @@ -525,9 +549,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); @@ -552,9 +582,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); @@ -579,9 +615,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); @@ -608,9 +650,15 @@ pub mod test { let ctx = RequestContext::new( Authenticated::FullAccess, NamespaceName::default(), - MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager) - .await - .unwrap(), + MetaStore::new( + Default::default(), + tmp.path(), + maker().unwrap(), + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(), ); let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv)); diff --git a/libsql-server/src/schema/db.rs b/libsql-server/src/schema/db.rs index 7e20fe7822..ec8dcad840 100644 --- a/libsql-server/src/schema/db.rs +++ b/libsql-server/src/schema/db.rs @@ -514,9 +514,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); let mut conn = maker().unwrap(); setup_schema(&mut conn).unwrap(); @@ -558,9 +564,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); // FIXME: the actual error reported here is a shitty constraint error, we should make the // necessary checks beforehand, and return a nice error message. @@ -580,9 +592,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); let mut conn = maker().unwrap(); setup_schema(&mut conn).unwrap(); @@ -627,9 +645,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); let mut conn = maker().unwrap(); setup_schema(&mut conn).unwrap(); @@ -677,9 +701,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); let mut conn = maker().unwrap(); setup_schema(&mut conn).unwrap(); @@ -728,9 +758,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); let mut conn = maker().unwrap(); setup_schema(&mut conn).unwrap(); @@ -754,9 +790,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + crate::database::DatabaseKind::Primary, + ) + .await + .unwrap(); let mut conn = maker().unwrap(); setup_schema(&mut conn).unwrap(); diff --git a/libsql-server/src/schema/scheduler.rs b/libsql-server/src/schema/scheduler.rs index a88fa4f739..5385acf802 100644 --- a/libsql-server/src/schema/scheduler.rs +++ b/libsql-server/src/schema/scheduler.rs @@ -845,9 +845,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + DatabaseKind::Primary, + ) + .await + .unwrap(); let (sender, mut receiver) = mpsc::channel(100); let config = make_config(sender.clone().into(), tmp.path()); let store = @@ -969,9 +975,15 @@ mod test { { let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + DatabaseKind::Primary, + ) + .await + .unwrap(); let (sender, mut receiver) = mpsc::channel(100); let config = make_config(sender.clone().into(), tmp.path()); let store = @@ -1044,9 +1056,15 @@ mod test { let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + DatabaseKind::Primary, + ) + .await + .unwrap(); let (sender, _receiver) = mpsc::channel(100); let config = make_config(sender.clone().into(), tmp.path()); let store = @@ -1072,9 +1090,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + DatabaseKind::Primary, + ) + .await + .unwrap(); let (sender, mut receiver) = mpsc::channel(100); let config = make_config(sender.clone().into(), tmp.path()); let store = @@ -1146,9 +1170,15 @@ mod test { let tmp = tempdir().unwrap(); let (maker, manager) = metastore_connection_maker(None, tmp.path()).await.unwrap(); let conn = maker().unwrap(); - let meta_store = MetaStore::new(Default::default(), tmp.path(), conn, manager) - .await - .unwrap(); + let meta_store = MetaStore::new( + Default::default(), + tmp.path(), + conn, + manager, + DatabaseKind::Primary, + ) + .await + .unwrap(); let (sender, _receiver) = mpsc::channel(100); let config = make_config(sender.clone().into(), tmp.path()); let store = diff --git a/libsql-server/tests/cluster/mod.rs b/libsql-server/tests/cluster/mod.rs index 1171d4a5d0..46f9801dff 100644 --- a/libsql-server/tests/cluster/mod.rs +++ b/libsql-server/tests/cluster/mod.rs @@ -17,6 +17,7 @@ use crate::common::{http::Client, net::SimServer, snapshot_metrics}; mod replica_restart; mod replication; +mod schema_dbs; pub fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) { init_tracing(); diff --git a/libsql-server/tests/cluster/schema_dbs.rs b/libsql-server/tests/cluster/schema_dbs.rs new file mode 100644 index 0000000000..a5318a5af2 --- /dev/null +++ b/libsql-server/tests/cluster/schema_dbs.rs @@ -0,0 +1,131 @@ +use std::time::Duration; + +use libsql::Database; +use serde_json::json; +use turmoil::Builder; + +use crate::common::http::Client; +use crate::common::net::TurmoilConnector; + +use super::make_cluster; + +#[test] +fn schema_migration_basics() { + let mut sim = Builder::new() + .simulation_duration(Duration::from_secs(1000)) + .build(); + make_cluster(&mut sim, 1, true); + + sim.client("client", async { + let http = Client::new(); + + assert!(http + .post( + "http://primary:9090/v1/namespaces/schema/create", + json!({ "shared_schema": true }) + ) + .await + .unwrap() + .status() + .is_success()); + assert!(http + .post( + "http://primary:9090/v1/namespaces/foo/create", + json!({ "shared_schema_name": "schema" }) + ) + .await + .unwrap() + .status() + .is_success()); + + { + let db = Database::open_remote_with_connector( + "http://schema.primary:8080", + "", + TurmoilConnector, + ) + .unwrap(); + let conn = db.connect().unwrap(); + conn.execute("create table test (x)", ()).await.unwrap(); + } + + { + let db = Database::open_remote_with_connector( + "http://foo.primary:8080", + "", + TurmoilConnector, + ) + .unwrap(); + let conn = db.connect().unwrap(); + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + + assert_eq!( + conn.query("select count(*) from test", ()) + .await + .unwrap() + .next() + .await + .unwrap() + .unwrap() + .get::(0) + .unwrap(), + 1 + ); + } + + { + let db = Database::open_remote_with_connector( + "http://schema.replica0:8080", + "", + TurmoilConnector, + ) + .unwrap(); + let conn = db.connect().unwrap(); + conn.execute("create table test2 (x)", ()).await.unwrap(); + } + + { + let db = Database::open_remote_with_connector( + "http://foo.replica0:8080", + "", + TurmoilConnector, + ) + .unwrap(); + let conn = db.connect().unwrap(); + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + + assert_eq!( + conn.query("select count(*) from test", ()) + .await + .unwrap() + .next() + .await + .unwrap() + .unwrap() + .get::(0) + .unwrap(), + 2 + ); + assert_eq!( + conn.query("select count(*) from test2", ()) + .await + .unwrap() + .next() + .await + .unwrap() + .unwrap() + .get::(0) + .unwrap(), + 0 + ); + } + + Ok(()) + }); + + sim.run().unwrap(); +}