Skip to content

Commit

Permalink
feat(daemonscaler)!: add provider daemonscaler
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

cleanup
  • Loading branch information
brooksmtownsend committed Sep 1, 2023
1 parent 1e03244 commit 697d391
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 7 deletions.
9 changes: 3 additions & 6 deletions src/scaler/daemonscaler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
storage::{Actor, Host, ReadStore},
};

// pub mod provider;
pub mod provider;

// Annotation constants
pub const ACTOR_DAEMON_SCALER_TYPE: &str = "actordaemonscaler";
Expand Down Expand Up @@ -181,9 +181,6 @@ impl<S: ReadStore + Send + Sync + Clone> Scaler for ActorDaemonScaler<S> {
actors_per_host
.iter()
.filter_map(|(host_id, current_count)| {
println!(
"Calculated running actors, reconciling with expected count"
);
// Here we'll generate commands for the proper host depending on where they are running
match current_count.cmp(&self.config.spread_config.replicas) {
Ordering::Equal => None,
Expand Down Expand Up @@ -708,10 +705,10 @@ mod test {
for cmd in cmds.iter() {
match cmd {
Command::StartActor(start) => {
if start.host_id == host_id_one.to_string() {
if start.host_id == *host_id_one {
assert_eq!(start.count, 411);
assert_eq!(start.reference, echo_ref);
} else if start.host_id == host_id_two.to_string() {
} else if start.host_id == *host_id_two {
assert_eq!(start.count, 309);
assert_eq!(start.reference, echo_ref);
} else {
Expand Down
248 changes: 248 additions & 0 deletions src/scaler/daemonscaler/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
use std::collections::HashMap;

use anyhow::Result;
use async_trait::async_trait;
use tokio::sync::{OnceCell, RwLock};
use tracing::{instrument, trace};

use crate::events::{HostHeartbeat, ProviderInfo};
use crate::model::Spread;
use crate::scaler::spreadscaler::provider::ProviderSpreadConfig;
use crate::scaler::spreadscaler::{eligible_hosts, spreadscaler_annotations};
use crate::server::StatusInfo;
use crate::storage::Provider;
use crate::{
commands::{Command, StartProvider},
events::{Event, HostStarted, HostStopped},
model::{SpreadScalerProperty, TraitProperty},
scaler::Scaler,
storage::{Host, ReadStore},
};

// Annotation constants
pub const PROVIDER_DAEMON_SCALER_TYPE: &str = "providerdaemonscaler";

/// The ProviderDaemonScaler ensures that a provider is running on every host, according to a
/// [SpreadScalerProperty](crate::model::SpreadScalerProperty)
///
/// If no [Spreads](crate::model::Spread) are specified, this Scaler simply maintains the number of replicas
/// on every available host.
pub struct ProviderDaemonScaler<S> {
config: ProviderSpreadConfig,
provider_id: OnceCell<String>,
store: S,
id: String,
status: RwLock<StatusInfo>,
}

#[async_trait]
impl<S: ReadStore + Send + Sync + Clone> Scaler for ProviderDaemonScaler<S> {
fn id(&self) -> &str {
&self.id
}

async fn status(&self) -> StatusInfo {
let _ = self.reconcile().await;
self.status.read().await.to_owned()
}

async fn update_config(&mut self, config: TraitProperty) -> Result<Vec<Command>> {
let spread_config = match config {
TraitProperty::SpreadScaler(prop) => prop,
_ => anyhow::bail!("Given config was not a daemon scaler config object"),
};
// If no spreads are specified, an empty spread is sufficient to match _every_ host
// in a lattice
let spread_config = if spread_config.spread.is_empty() {
SpreadScalerProperty {
replicas: spread_config.replicas,
spread: vec![Spread::default()],
}
} else {
spread_config
};
self.config.spread_config = spread_config;
self.reconcile().await
}

#[instrument(level = "trace", skip_all, fields(scaler_id = %self.id))]
async fn handle_event(&self, event: &Event) -> Result<Vec<Command>> {
// NOTE(brooksmtownsend): We could be more efficient here and instead of running
// the entire reconcile, smart compute exactly what needs to change, but it just
// requires more code branches and would be fine as a future improvement
match event {
Event::ProviderStarted(provider_started)
if provider_started.contract_id == self.config.provider_contract_id
&& provider_started.image_ref == self.config.provider_reference
&& provider_started.link_name == self.config.provider_link_name =>
{
self.reconcile().await
}
Event::ProviderStopped(provider_stopped)
if provider_stopped.contract_id == self.config.provider_contract_id
&& provider_stopped.link_name == self.config.provider_link_name
// If this is None, provider hasn't been started in the lattice yet, so we don't need to reconcile
&& self.provider_id().await.map(|id| id == provider_stopped.public_key).unwrap_or(false) =>
{
self.reconcile().await
}
// If the host labels match any spread requirement, perform reconcile
Event::HostStopped(HostStopped { labels, .. })
| Event::HostStarted(HostStarted { labels, .. })
| Event::HostHeartbeat(HostHeartbeat { labels, .. })
if self.config.spread_config.spread.iter().any(|spread| {
spread.requirements.iter().all(|(key, value)| {
labels.get(key).map(|val| val == value).unwrap_or(false)
})
}) =>
{
self.reconcile().await
}
// No other event impacts the job of this scaler so we can ignore it
_ => Ok(Vec::new()),
}
}

#[instrument(level = "trace", skip_all, fields(name = %self.config.model_name, scaler_id = %self.id))]
async fn reconcile(&self) -> Result<Vec<Command>> {
let hosts = self.store.list::<Host>(&self.config.lattice_id).await?;

let provider_id = self.provider_id().await.unwrap_or_default();
let contract_id = &self.config.provider_contract_id;
let link_name = &self.config.provider_link_name;
let provider_ref = &self.config.provider_reference;

let mut spread_status = vec![];

trace!(spread = ?self.config.spread_config.spread, ?provider_id, "Computing commands");
let commands = self
.config
.spread_config
.spread
.iter()
.flat_map(|spread| {
let eligible_hosts = eligible_hosts(&hosts, spread);
if !eligible_hosts.is_empty() {
eligible_hosts
.iter()
// Filter out hosts that are already running this provider
.filter(|(_host_id, host)| {
host.providers
.get(&ProviderInfo {
contract_id: contract_id.to_string(),
link_name: link_name.to_string(),
public_key: provider_id.to_string(),
annotations: HashMap::default(),
})
.is_none()
})
.map(|(_host_id, host)| {
Command::StartProvider(StartProvider {
reference: provider_ref.to_owned(),
host_id: host.id.to_string(),
link_name: Some(link_name.to_owned()),
model_name: self.config.model_name.to_owned(),
annotations: spreadscaler_annotations(&spread.name, &self.id),
config: self.config.provider_config.clone(),
})
})
.collect::<Vec<Command>>()
} else {
// No hosts were eligible, so we can't attempt to add or remove providers
trace!(?spread.name, "Found no eligible hosts for daemon scaler");
spread_status.push(StatusInfo::failed(&format!(
"Could not satisfy daemonscaler {} for {}, 0 eligible hosts found.",
spread.name, self.config.provider_reference
)));
vec![]
}
})
.collect::<Vec<Command>>();

trace!(?commands, "Calculated commands for provider daemonscaler");

let status = match (spread_status.is_empty(), commands.is_empty()) {
(true, true) => StatusInfo::ready(""),
(_, false) => StatusInfo::compensating(""),
(false, true) => StatusInfo::failed(
&spread_status
.into_iter()
.map(|s| s.message)
.collect::<Vec<String>>()
.join(" "),
),
};
trace!(?status, "Updating scaler status");
*self.status.write().await = status;

Ok(commands)
}

#[instrument(level = "trace", skip_all, fields(name = %self.config.model_name))]
async fn cleanup(&self) -> Result<Vec<Command>> {
let mut config_clone = self.config.clone();
config_clone.spread_config.replicas = 0;

let cleanerupper = ProviderDaemonScaler {
config: config_clone,
store: self.store.clone(),
provider_id: self.provider_id.clone(),
id: self.id.clone(),
status: RwLock::new(StatusInfo::compensating("")),
};

cleanerupper.reconcile().await
}
}

impl<S: ReadStore + Send + Sync> ProviderDaemonScaler<S> {
/// Construct a new ProviderDaemonScaler with specified configuration values
pub fn new(store: S, config: ProviderSpreadConfig, component_name: &str) -> Self {
let id = format!(
"{PROVIDER_DAEMON_SCALER_TYPE}-{}-{component_name}-{}-{}",
config.model_name, config.provider_reference, config.provider_link_name,
);
// If no spreads are specified, an empty spread is sufficient to match _every_ host
// in a lattice
let spread_config = if config.spread_config.spread.is_empty() {
SpreadScalerProperty {
replicas: config.spread_config.replicas,
spread: vec![Spread::default()],
}
} else {
config.spread_config
};
Self {
store,
provider_id: OnceCell::new(),
config: ProviderSpreadConfig {
spread_config,
..config
},
id,
status: RwLock::new(StatusInfo::compensating("")),
}
}

/// Helper function to retrieve the provider ID for the configured provider
async fn provider_id(&self) -> Result<&str> {
self.provider_id
.get_or_try_init(|| async {
self.store
.list::<Provider>(&self.config.lattice_id)
.await
.unwrap_or_default()
.iter()
.find(|(_id, provider)| provider.reference == self.config.provider_reference)
.map(|(_id, provider)| provider.id.to_owned())
.ok_or_else(|| {
anyhow::anyhow!(
"Couldn't find a provider id for the provider reference {}",
self.config.provider_reference
)
})
})
.await
.map(|id| id.as_str())
}
}
29 changes: 28 additions & 1 deletion src/scaler/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
};

use super::{
daemonscaler::ActorDaemonScaler,
daemonscaler::{provider::ProviderDaemonScaler, ActorDaemonScaler},
spreadscaler::{
link::LinkScaler,
provider::{ProviderSpreadConfig, ProviderSpreadScaler},
Expand Down Expand Up @@ -632,6 +632,33 @@ where
Some(Duration::from_secs(60)),
)) as BoxedScaler)
}
(DAEMONSCALER_TRAIT, TraitProperty::SpreadScaler(p)) => {
Some(Box::new(BackoffAwareScaler::new(
ProviderDaemonScaler::new(
store.clone(),
ProviderSpreadConfig {
lattice_id: lattice_id.to_owned(),
provider_reference: props.image.to_owned(),
spread_config: p.to_owned(),
provider_contract_id: props.contract.to_owned(),
provider_link_name: props
.link_name
.as_deref()
.unwrap_or(DEFAULT_LINK_NAME)
.to_owned(),
model_name: name.to_owned(),
provider_config: props.config.to_owned(),
},
&component.name,
),
notifier.to_owned(),
notifier_subject,
name,
// Providers are a bit longer because it can take a bit to download
Some(Duration::from_secs(60)),
)) as BoxedScaler)
}

_ => None,
}
}))
Expand Down

0 comments on commit 697d391

Please sign in to comment.