Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Aug 11, 2023
1 parent 2d221ae commit 16245ba
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 65 deletions.
1 change: 0 additions & 1 deletion src/scaler/spreadscaler/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ where
}

async fn status(&self) -> StatusInfo {
let _ = self.reconcile().await;
self.status.read().await.to_owned()
}

Expand Down
1 change: 0 additions & 1 deletion src/scaler/spreadscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorSpreadScaler<S> {
}

async fn status(&self) -> StatusInfo {
let _ = self.reconcile().await;
self.status.read().await.to_owned()
}

Expand Down
1 change: 0 additions & 1 deletion src/scaler/spreadscaler/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderSpreadScaler<S> {
}

async fn status(&self) -> StatusInfo {
let _ = self.reconcile().await;
self.status.read().await.to_owned()
}

Expand Down
54 changes: 7 additions & 47 deletions src/workers/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,12 +774,6 @@ where
}
};

let previous_status = scaler_status(&scalers).await;

// There's a problem here where I am assuming that handling the event will always
// update the status. However, if the event is handled by a scaler that is
// not interested in that event, it will return no commands AND it won't update the status.

let (commands, res) = get_commands_and_result(
scalers.iter().map(|s| s.handle_event(event)),
"Errors occurred while handling event",
Expand All @@ -788,28 +782,6 @@ where

trace!(?commands, "Publishing commands");

// let status = if commands.is_empty() {
// let current_status = scaler_status(&scalers).await;
// // Only publish the status if it meaningfully differs from the previous status
// // if previous_status == current_status {
// // None
// // } else {
// Some(current_status)
// // }
// } else {
// Some(StatusInfo::compensating(&format!(
// "hint: Event {:?} modified scaler {} state, running compensating commands: {:?}",
// event,
// name.to_owned(),
// commands
// )))
// };
// if let Some(status) = status {
// if let Err(e) = self.status_publisher.publish_status(name, status).await {
// warn!(error = ?e, "Failed to set status for scaler");
// };
// }

self.command_publisher.publish_commands(commands).await?;

res
Expand All @@ -820,35 +792,23 @@ where
let scalers = self.scalers.get_all_scalers().await;

let futs = scalers.iter().map(|(name, scalers)| async {
let previous_status = scaler_status(scalers).await;

let (commands, res) = get_commands_and_result(
scalers.iter().map(|scaler| scaler.handle_event(event)),
"Errors occurred while handling event with all scalers",
)
.await;

let status = if commands.is_empty() {
let current_status = scaler_status(scalers).await;
// Only publish the status if it meaningfully differs from the previous status
// if previous_status == current_status {
// None
// } else {
Some(current_status)
// }
scaler_status(scalers).await
} else {
Some(StatusInfo::compensating(&format!(
"all: Event {:?} modified scaler {} state, running compensating commands: {:?}",
event,
StatusInfo::compensating(&format!(
"Event modified scaler \"{}\" state, running compensating commands",
name.to_owned(),
commands
)))
))
};
if let Err(e) = self.status_publisher.publish_status(name, status).await {
warn!(error = ?e, "Failed to set status for scaler");
};
if let Some(status) = status {
if let Err(e) = self.status_publisher.publish_status(name, status).await {
warn!(error = ?e, "Failed to set status for scaler");
};
}

(commands, res)
});
Expand Down
9 changes: 4 additions & 5 deletions tests/e2e_multiple_hosts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ async fn test_no_requirements(client_info: &ClientInfo) {
)
}

let inventory = client_info.get_all_inventory("default").await?;
// sleep for 2 seconds
// Cheating to force a heartbeat, ensuring status updates
let _ = client_info.get_all_inventory("default").await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// SAFETY: we already know some status existed when we checked for compensating. If there's no status now, it means
// we borked our stream and this _should_ fail
Expand Down Expand Up @@ -203,9 +203,8 @@ async fn test_no_requirements(client_info: &ClientInfo) {
"wasmcloud.azurecr.io/httpserver:0.17.0",
ExpectedCount::Exactly(0),
)?;

let inventory = client_info.get_all_inventory("default").await?;
// sleep for 2 seconds
// Cheating to force a heartbeat, ensuring status updates
let _ = client_info.get_all_inventory("default").await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let status = get_manifest_status(&stream, "default", "echo-simple")
.await
Expand Down
15 changes: 9 additions & 6 deletions tests/e2e_multitenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,11 @@ async fn test_basic_separation(client_info: &ClientInfo) -> anyhow::Result<()> {
links
)
}
let east_inventory = client_info.get_all_inventory(LATTICE_EAST).await?;
let west_inventory = client_info.get_all_inventory(LATTICE_WEST).await?;

// Cheating to force a heartbeat, ensuring status updates
let _ = client_info.get_all_inventory(LATTICE_EAST).await?;
let _ = client_info.get_all_inventory(LATTICE_WEST).await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

match (
get_manifest_status(&stream, LATTICE_EAST, "echo-simple").await,
Expand Down Expand Up @@ -318,10 +321,10 @@ async fn test_basic_separation(client_info: &ClientInfo) -> anyhow::Result<()> {
"Shouldn't have errored when undeploying manifest: {resp:?}"
);

// Give wadm a literal second to process the manifest_unpublished event and update the status
let east_inventory = client_info.get_all_inventory(LATTICE_EAST).await?;
let west_inventory = client_info.get_all_inventory(LATTICE_WEST).await?;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Cheating to force a heartbeat, ensuring status updates
let _ = client_info.get_all_inventory(LATTICE_EAST).await?;
let _ = client_info.get_all_inventory(LATTICE_WEST).await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

match (
get_manifest_status(&stream, LATTICE_EAST, "echo-simple").await,
Expand Down
9 changes: 5 additions & 4 deletions tests/e2e_upgrades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ async fn test_upgrade(client_info: &ClientInfo) {
)
}

// Sleep for 2 seconds
let inventory = client_info.get_all_inventory("default").await?;
// Cheating to force a heartbeat, ensuring status updates
let _ = client_info.get_all_inventory("default").await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// SAFETY: we already know some status existed when we checked for compensating. If there's no status now, it means
Expand Down Expand Up @@ -282,8 +282,9 @@ async fn test_upgrade(client_info: &ClientInfo) {
links
)
}
// sleep for 2 seconds
let inventory = client_info.get_all_inventory("default").await?;

// Cheating to force a heartbeat, ensuring status updates
let _ = client_info.get_all_inventory("default").await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

let status = get_manifest_status(&stream, "default", "updateapp")
Expand Down

0 comments on commit 16245ba

Please sign in to comment.