Skip to content

Commit

Permalink
add more tests for schema migration with replication
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 2, 2024
1 parent 4b0fb7e commit f0c92ca
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 76 deletions.
12 changes: 9 additions & 3 deletions libsql-server/src/connection/connection_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,15 @@ mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
conn.execute_program(
Program::seq(&["CREATE TABLE test (x)"]),
Expand Down
9 changes: 1 addition & 8 deletions libsql-server/src/namespace/configurator/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ impl ConfigureNamespace for ReplicaConfigurator {
let channel = self.channel.clone();
let uri = self.uri.clone();

dbg!(&name);
let rpc_client = ReplicationLogClient::with_origin(channel.clone(), uri.clone());
let client = crate::replication::replicator_client::Client::new(
name.clone(),
Expand All @@ -88,7 +87,7 @@ impl ConfigureNamespace for ReplicaConfigurator {

tracing::debug!("try perform handshake");
// force a handshake now, to retrieve the primary's current replication index
match replicator.try_perform_handshake().await.map_err(|e| dbg!(e)) {
match replicator.try_perform_handshake().await {
Err(libsql_replication::replicator::Error::Meta(
libsql_replication::meta::Error::LogIncompatible,
)) => {
Expand All @@ -112,7 +111,6 @@ impl ConfigureNamespace for ReplicaConfigurator {
Ok(_) => (),
}

dbg!(&name);
tracing::debug!("done performing handshake");

let primary_current_replicatio_index =
Expand Down Expand Up @@ -170,7 +168,6 @@ impl ConfigureNamespace for ReplicaConfigurator {
}
});

dbg!(&name);
let stats = make_stats(
&db_path,
&mut join_set,
Expand All @@ -181,7 +178,6 @@ impl ConfigureNamespace for ReplicaConfigurator {
)
.await?;

dbg!(&name);
let connection_maker = MakeLegacyConnection::new(
db_path.clone(),
PassthroughWalWrapper,
Expand All @@ -200,7 +196,6 @@ impl ConfigureNamespace for ReplicaConfigurator {
)
.await?;

dbg!(&name);
let connection_maker = Arc::new(
MakeWriteProxyConn::new(
channel.clone(),
Expand All @@ -221,13 +216,11 @@ impl ConfigureNamespace for ReplicaConfigurator {
),
);

dbg!(&name);
join_set.spawn(run_storage_monitor(
Arc::downgrade(&stats),
connection_maker.clone(),
));

dbg!(&name);
Ok(Namespace {
tasks: join_set,
db: Database::Replica(ReplicaDatabase { connection_maker }),
Expand Down
3 changes: 1 addition & 2 deletions libsql-server/src/namespace/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl NamespaceStore {
let init = async {
let ns = self
.make_namespace(namespace, db_config, restore_option)
.await.map_err(|e| dbg!(e))?;
.await?;
Ok(Some(ns))
};

Expand Down Expand Up @@ -516,7 +516,6 @@ impl NamespaceStore {
}

fn get_configurator(&self, db_config: &DatabaseConfig) -> &DynConfigurator {
dbg!(self.inner.db_kind);
match self.inner.db_kind {
DatabaseKind::Primary if db_config.is_shared_schema => {
self.inner.configurators.configure_schema().unwrap()
Expand Down
4 changes: 1 addition & 3 deletions libsql-server/src/replication/replicator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ impl ReplicatorClient for Client {
self.primary_replication_index = hello.current_replication_index;
self.session_token.replace(hello.session_token.clone());

dbg!();
if let Some(config) = &hello.config {
// HACK: if we load a shared schema db before the main schema is replicated,
// inserting the new database in the meta store will cause a foreign constraint Error
Expand All @@ -117,7 +116,6 @@ impl ReplicatorClient for Client {
if let Some(ref name) = config.shared_schema_name {
let name = NamespaceName::from_string(name.clone())
.map_err(|_| Status::new(Code::InvalidArgument, "invalid namespace name"))?;
dbg!(&name);
self.store
.with(name, |_| ())
.await
Expand All @@ -127,7 +125,7 @@ impl ReplicatorClient for Client {
self.meta_store_handle
.store(DatabaseConfig::from(config))
.await
.map_err(|e| dbg!(Error::Internal(e.into())))?;
.map_err(|e| Error::Internal(e.into()))?;

tracing::debug!("replica config has been updated");
} else {
Expand Down
96 changes: 72 additions & 24 deletions libsql-server/src/rpc/streaming_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::Anonymous,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));
pin!(stream);
Expand All @@ -422,9 +428,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -444,9 +456,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -468,9 +486,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
// limit the size of the response to force a split
let stream = make_proxy_stream_inner(conn, ctx, ReceiverStream::new(rcv), 500);
Expand Down Expand Up @@ -525,9 +549,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -552,9 +582,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -579,9 +615,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand All @@ -608,9 +650,15 @@ pub mod test {
let ctx = RequestContext::new(
Authenticated::FullAccess,
NamespaceName::default(),
MetaStore::new(Default::default(), tmp.path(), maker().unwrap(), manager, crate::database::DatabaseKind::Primary)
.await
.unwrap(),
MetaStore::new(
Default::default(),
tmp.path(),
maker().unwrap(),
manager,
crate::database::DatabaseKind::Primary,
)
.await
.unwrap(),
);
let stream = make_proxy_stream(conn, ctx, ReceiverStream::new(rcv));

Expand Down
Loading

0 comments on commit f0c92ca

Please sign in to comment.