Skip to content

Commit

Permalink
feat(multitenant)!: store and query manifests by account
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Jul 18, 2023
1 parent e32ed51 commit ec1802a
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 107 deletions.
11 changes: 7 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn main() -> anyhow::Result<()> {
manifest_storage,
client,
Some(&args.api_prefix),
args.multitenant,
ManifestNotifier::new(wadm_event_prefix, context),
)
.await?;
Expand Down Expand Up @@ -363,6 +364,7 @@ where
self.publisher.clone(),
self.notify_stream.clone(),
lattice_id,
multitenant_prefix,
self.state_store.clone(),
self.manifest_store.clone(),
publisher.clone(),
Expand Down
5 changes: 3 additions & 2 deletions src/scaler/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ where
client: P,
stream: JsStream,
lattice_id: &str,
multitenant_prefix: Option<&str>,
state_store: StateStore,
manifest_store: KvStore,
publisher: CommandPublisher<P>,
Expand Down Expand Up @@ -171,10 +172,10 @@ where
// Get current scalers set up
let manifest_store = crate::server::ModelStorage::new(manifest_store);
let futs = manifest_store
.list(lattice_id)
.list(multitenant_prefix, lattice_id)
.await?
.into_iter()
.map(|summary| manifest_store.get(lattice_id, summary.name));
.map(|summary| manifest_store.get(multitenant_prefix, lattice_id, summary.name));
let all_manifests = futures::future::join_all(futs)
.await
.into_iter()
Expand Down
171 changes: 107 additions & 64 deletions src/server/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) struct Handler<P> {

impl<P: Publisher> Handler<P> {
#[instrument(level = "debug", skip(self, msg))]
pub async fn put_model(&self, msg: Message, lattice_id: &str) {
pub async fn put_model(&self, msg: Message, account_id: Option<&str>, lattice_id: &str) {
trace!("Parsing incoming manifest");
let manifest = match parse_manifest(msg.payload.into(), msg.headers.as_ref()) {
Ok(m) => m,
Expand Down Expand Up @@ -78,7 +78,7 @@ impl<P: Publisher> Handler<P> {
let manifest_name = manifest.metadata.name.clone();

let (mut current_manifests, current_revision) =
match self.store.get(lattice_id, &manifest_name).await {
match self.store.get(account_id, lattice_id, &manifest_name).await {
Ok(Some(data)) => data,
Ok(None) => (StoredManifest::default(), 0),
Err(e) => {
Expand Down Expand Up @@ -115,7 +115,12 @@ impl<P: Publisher> Handler<P> {
trace!(total_manifests = %resp.total_versions, "Storing manifests");
if let Err(e) = self
.store
.set(lattice_id, current_manifests, Some(current_revision))
.set(
account_id,
lattice_id,
current_manifests,
Some(current_revision),
)
.await
{
error!(error = %e, "Unable to store updated data");
Expand All @@ -135,7 +140,13 @@ impl<P: Publisher> Handler<P> {
}

#[instrument(level = "debug", skip(self, msg))]
pub async fn get_model(&self, msg: Message, lattice_id: &str, name: &str) {
pub async fn get_model(
&self,
msg: Message,
account_id: Option<&str>,
lattice_id: &str,
name: &str,
) {
// For empty payloads, just fetch the latest version
let req: GetModelRequest = if msg.payload.is_empty() {
GetModelRequest { version: None }
Expand All @@ -153,7 +164,7 @@ impl<P: Publisher> Handler<P> {
}
};

let (manifests, _) = match self.store.get(lattice_id, name).await {
let (manifests, _) = match self.store.get(account_id, lattice_id, name).await {
Ok(Some(m)) => m,
Ok(None) => {
self.send_reply(
Expand Down Expand Up @@ -214,8 +225,8 @@ impl<P: Publisher> Handler<P> {
}

#[instrument(level = "debug", skip(self, msg))]
pub async fn list_models(&self, msg: Message, lattice_id: &str) {
let data = match self.store.list(lattice_id).await {
pub async fn list_models(&self, msg: Message, account_id: Option<&str>, lattice_id: &str) {
let data = match self.store.list(account_id, lattice_id).await {
Ok(d) => d,
Err(e) => {
error!(error = %e, "Unable to fetch data");
Expand All @@ -234,8 +245,14 @@ impl<P: Publisher> Handler<P> {
// timestamp (at least for now). However, this is guaranteed to return the list of versions
// ordered by time of creation. When we document, we should change this to reflect that
#[instrument(level = "debug", skip(self, msg))]
pub async fn list_versions(&self, msg: Message, lattice_id: &str, name: &str) {
let data: VersionResponse = match self.store.get(lattice_id, name).await {
pub async fn list_versions(
&self,
msg: Message,
account_id: Option<&str>,
lattice_id: &str,
name: &str,
) {
let data: VersionResponse = match self.store.get(account_id, lattice_id, name).await {
Ok(Some((manifest, _))) => VersionResponse {
result: GetResult::Success,
message: "Successfully fetched versions".to_string(),
Expand Down Expand Up @@ -275,7 +292,13 @@ impl<P: Publisher> Handler<P> {
// that is deployed is deleted, it is automatically undeployed, and we indicate that to the
// user. This should be documented when we get to our documentation tasks
#[instrument(level = "debug", skip(self, msg))]
pub async fn delete_model(&self, msg: Message, lattice_id: &str, name: &str) {
pub async fn delete_model(
&self,
msg: Message,
account_id: Option<&str>,
lattice_id: &str,
name: &str,
) {
let req: DeleteModelRequest =
match serde_json::from_reader(std::io::Cursor::new(msg.payload)) {
Ok(r) => r,
Expand All @@ -289,7 +312,7 @@ impl<P: Publisher> Handler<P> {
}
};
let reply_data = if req.delete_all {
match self.store.delete(lattice_id, name).await {
match self.store.delete(account_id, lattice_id, name).await {
Ok(_) => {
DeleteModelResponse {
result: DeleteResult::Deleted,
Expand All @@ -308,7 +331,7 @@ impl<P: Publisher> Handler<P> {
}
}
} else {
match self.store.get(lattice_id, name).await {
match self.store.get(account_id, lattice_id, name).await {
Ok(Some((mut current, current_revision))) => {
let deleted = current.delete_version(&req.version);
if deleted && !current.is_empty() {
Expand All @@ -326,7 +349,7 @@ impl<P: Publisher> Handler<P> {
false
};
self.store
.set(lattice_id, current, Some(current_revision))
.set(account_id, lattice_id, current, Some(current_revision))
.await
.map(|_| DeleteModelResponse {
result: DeleteResult::Deleted,
Expand All @@ -344,7 +367,7 @@ impl<P: Publisher> Handler<P> {
} else if deleted && current.is_empty() {
// If we deleted the last one, delete the model from the store
self.store
.delete(lattice_id, name)
.delete(account_id, lattice_id, name)
.await
.map(|_| DeleteModelResponse {
result: DeleteResult::Deleted,
Expand Down Expand Up @@ -418,7 +441,13 @@ impl<P: Publisher> Handler<P> {
}

#[instrument(level = "debug", skip(self, msg))]
pub async fn deploy_model(&self, msg: Message, lattice_id: &str, name: &str) {
pub async fn deploy_model(
&self,
msg: Message,
account_id: Option<&str>,
lattice_id: &str,
name: &str,
) {
let req: DeployModelRequest = if msg.payload.is_empty() {
DeployModelRequest { version: None }
} else {
Expand All @@ -437,29 +466,30 @@ impl<P: Publisher> Handler<P> {
trace!(?req, "Got request");

trace!("Fetching current data from store");
let (mut manifests, current_revision) = match self.store.get(lattice_id, name).await {
Ok(Some(m)) => m,
Ok(None) => {
self.send_reply(
msg.reply,
// NOTE: We are constructing all data here, so this shouldn't fail, but just in
// case we unwrap to nothing
serde_json::to_vec(&DeployModelResponse {
result: DeployResult::NotFound,
message: format!("Model with the name {name} not found"),
})
.unwrap_or_default(),
)
.await;
return;
}
Err(e) => {
error!(error = %e, "Unable to fetch data");
self.send_error(msg.reply, "Internal storage error".to_string())
let (mut manifests, current_revision) =
match self.store.get(account_id, lattice_id, name).await {
Ok(Some(m)) => m,
Ok(None) => {
self.send_reply(
msg.reply,
// NOTE: We are constructing all data here, so this shouldn't fail, but just in
// case we unwrap to nothing
serde_json::to_vec(&DeployModelResponse {
result: DeployResult::NotFound,
message: format!("Model with the name {name} not found"),
})
.unwrap_or_default(),
)
.await;
return;
}
};
return;
}
Err(e) => {
error!(error = %e, "Unable to fetch data");
self.send_error(msg.reply, "Internal storage error".to_string())
.await;
return;
}
};

if !manifests.deploy(req.version) {
trace!("Requested version does not exist");
Expand All @@ -486,7 +516,7 @@ impl<P: Publisher> Handler<P> {

let reply = self
.store
.set(lattice_id, manifests, Some(current_revision))
.set(account_id, lattice_id, manifests, Some(current_revision))
.await
.map(|_| DeployModelResponse {
result: DeployResult::Acknowledged,
Expand Down Expand Up @@ -529,7 +559,13 @@ impl<P: Publisher> Handler<P> {
// unless specified in the request. We also have the exact same acknowledgement types as a
// deploy request
#[instrument(level = "debug", skip(self, msg))]
pub async fn undeploy_model(&self, msg: Message, lattice_id: &str, name: &str) {
pub async fn undeploy_model(
&self,
msg: Message,
account_id: Option<&str>,
lattice_id: &str,
name: &str,
) {
let req: UndeployModelRequest = if msg.payload.is_empty() {
UndeployModelRequest {
non_destructive: false,
Expand All @@ -550,34 +586,35 @@ impl<P: Publisher> Handler<P> {
trace!(?req, "Got request");

trace!("Fetching current data from store");
let (mut manifests, current_revision) = match self.store.get(lattice_id, name).await {
Ok(Some(m)) => m,
Ok(None) => {
self.send_reply(
msg.reply,
// NOTE: We are constructing all data here, so this shouldn't fail, but just in
// case we unwrap to nothing
serde_json::to_vec(&DeployModelResponse {
result: DeployResult::NotFound,
message: format!("Model with the name {name} not found"),
})
.unwrap_or_default(),
)
.await;
return;
}
Err(e) => {
error!(error = %e, "Unable to fetch data");
self.send_error(msg.reply, "Internal storage error".to_string())
let (mut manifests, current_revision) =
match self.store.get(account_id, lattice_id, name).await {
Ok(Some(m)) => m,
Ok(None) => {
self.send_reply(
msg.reply,
// NOTE: We are constructing all data here, so this shouldn't fail, but just in
// case we unwrap to nothing
serde_json::to_vec(&DeployModelResponse {
result: DeployResult::NotFound,
message: format!("Model with the name {name} not found"),
})
.unwrap_or_default(),
)
.await;
return;
}
};
return;
}
Err(e) => {
error!(error = %e, "Unable to fetch data");
self.send_error(msg.reply, "Internal storage error".to_string())
.await;
return;
}
};

let reply = if manifests.undeploy() {
trace!("Manifest undeployed. Storing updated manifest");
self.store
.set(lattice_id, manifests, Some(current_revision))
.set(account_id, lattice_id, manifests, Some(current_revision))
.await
.map(|_| DeployModelResponse {
result: DeployResult::Acknowledged,
Expand Down Expand Up @@ -627,9 +664,15 @@ impl<P: Publisher> Handler<P> {
}

#[instrument(level = "debug", skip(self, msg))]
pub async fn model_status(&self, msg: Message, lattice_id: &str, name: &str) {
pub async fn model_status(
&self,
msg: Message,
account_id: Option<&str>,
lattice_id: &str,
name: &str,
) {
trace!("Fetching current manifest from store");
let manifests: StoredManifest = match self.store.get(lattice_id, name).await {
let manifests: StoredManifest = match self.store.get(account_id, lattice_id, name).await {
Ok(Some((m, _))) => m,
Ok(None) => {
self.send_reply(
Expand Down
Loading

0 comments on commit ec1802a

Please sign in to comment.