Skip to content

Commit

Permalink
feat(multitenant): use multitenant prefix ctl cmds
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed Jul 11, 2023
1 parent 640515e commit 484adf1
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 33 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ in a lattice is forthcoming._

In advanced use cases, **wadm** is also capable of:

- Monitoring multiple lattices
- Monitoring multiple lattices.
- Running multiple replicas to distribute load among multiple processes, or for a high-availability
architecture.

Expand Down
38 changes: 32 additions & 6 deletions bin/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! different credentials
use wasmcloud_control_interface::{Client, ClientBuilder};

// Copied from https://github.com/wasmCloud/control-interface-client/blob/main/src/broker.rs#L1, not public
const DEFAULT_TOPIC_PREFIX: &str = "wasmbus.ctl";

#[derive(Debug, Default, Clone)]
pub struct ControlClientConfig {
/// The jetstream domain to use for the clients
Expand Down Expand Up @@ -30,21 +33,44 @@ impl ControlClientConstructor {
}

/// Get the client for the given lattice ID
pub async fn get_connection(&self, id: &str) -> anyhow::Result<Client> {
pub async fn get_connection(
&self,
id: &str,
multitenant_prefix: Option<&str>,
) -> anyhow::Result<Client> {
let builder = ClientBuilder::new(self.client.clone()).lattice_prefix(id);
let builder = if let Some(domain) = self.config.js_domain.as_deref() {
builder.js_domain(domain)
} else {
builder
};
let builder = if let Some(prefix) = self.config.topic_prefix.as_deref() {
builder.topic_prefix(prefix)
} else {
builder
};

let builder = builder.topic_prefix(topic_prefix(
multitenant_prefix,
self.config.topic_prefix.as_deref(),
));

builder
.build()
.await
.map_err(|e| anyhow::anyhow!("Error building client for {id}: {e:?}"))
}
}

/// Returns the topic prefix to use for the given multitenant prefix and topic prefix. The
/// default prefix is `wasmbus.ctl`.
///
/// If running in multitenant mode, we listen to events on *.wasmbus.evt and need to send commands
/// back to the '*' account. This match takes into account custom prefixes as well to support
/// advanced use cases.
///
/// This function does _not_ take into account whether or not wadm is running in multitenant mode, it's assumed
/// that passing a Some() value for multitenant_prefix means that wadm is running in multitenant mode.
fn topic_prefix(multitenant_prefix: Option<&str>, topic_prefix: Option<&str>) -> String {
match (multitenant_prefix, topic_prefix) {
(Some(mt), Some(prefix)) => format!("{}.{}", mt, prefix),
(Some(mt), None) => format!("{}.{DEFAULT_TOPIC_PREFIX}", mt),
(None, Some(prefix)) => prefix.to_string(),
_ => DEFAULT_TOPIC_PREFIX.to_string(),
}
}
20 changes: 16 additions & 4 deletions bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,13 @@ struct CommandWorkerCreator {
impl WorkerCreator for CommandWorkerCreator {
type Output = CommandWorker;

async fn create(&self, lattice_id: &str) -> anyhow::Result<Self::Output> {
async fn create(
&self,
lattice_id: &str,
multitenant_prefix: Option<&str>,
) -> anyhow::Result<Self::Output> {
self.pool
.get_connection(lattice_id)
.get_connection(lattice_id, multitenant_prefix)
.await
.map(CommandWorker::new)
}
Expand All @@ -334,8 +338,16 @@ where
{
type Output = EventWorker<StateStore, wasmcloud_control_interface::Client, Context>;

async fn create(&self, lattice_id: &str) -> anyhow::Result<Self::Output> {
match self.pool.get_connection(lattice_id).await {
async fn create(
&self,
lattice_id: &str,
multitenant_prefix: Option<&str>,
) -> anyhow::Result<Self::Output> {
match self
.pool
.get_connection(lattice_id, multitenant_prefix)
.await
{
Ok(client) => {
let publisher = CommandPublisher::new(
self.publisher.clone(),
Expand Down
19 changes: 14 additions & 5 deletions bin/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ where
if !is_event_we_care_about(&msg.payload) {
continue;
}
let lattice_id = match self.parser.parse(&msg.subject) {
Some(id) => id,
None => {
let (lattice_id, multitenant_prefix) = match self.parser.parse(&msg.subject) {
(Some(lattice), Some(account)) => (lattice, Some(account)),
(Some(lattice), None) => (lattice, None),
(None, _) => {
trace!(subject = %msg.subject, "Found non-matching lattice subject");
continue;
}
Expand All @@ -72,7 +73,11 @@ where
let needs_event = !self.event_manager.has_consumer(&events_topic).await;
if needs_command {
debug!(%lattice_id, subject = %msg.subject, mapped_subject = %command_topic, "Found unmonitored lattice, adding command consumer");
let worker = match self.command_worker_creator.create(lattice_id).await {
let worker = match self
.command_worker_creator
.create(lattice_id, multitenant_prefix)
.await
{
Ok(w) => w,
Err(e) => {
error!(error = %e, %lattice_id, "Couldn't construct worker for command consumer. Will retry on next heartbeat");
Expand All @@ -88,7 +93,11 @@ where
}
if needs_event {
debug!(%lattice_id, subject = %msg.subject, mapped_subject = %events_topic, "Found unmonitored lattice, adding event consumer");
let worker = match self.event_worker_creator.create(lattice_id).await {
let worker = match self
.event_worker_creator
.create(lattice_id, multitenant_prefix)
.await
{
Ok(w) => w,
Err(e) => {
error!(error = %e, %lattice_id, "Couldn't construct worker for event consumer. Will retry on next heartbeat");
Expand Down
8 changes: 6 additions & 2 deletions src/consumers/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ pub trait Worker {
pub trait WorkerCreator {
type Output: Worker + Send + Sync + 'static;

async fn create(&self, lattice_id: &str) -> anyhow::Result<Self::Output>;
async fn create(
&self,
lattice_id: &str,
multitenant_prefix: Option<&str>,
) -> anyhow::Result<Self::Output>;
}

/// A manager of a specific type of Consumer that handles giving out permits to work and managing
Expand Down Expand Up @@ -147,7 +151,7 @@ impl<C> ConsumerManager<C> {
// friendly consumer manager name
trace!(%lattice_id, subject = %info.config.filter_subject, "Adding consumer for lattice");

let worker = match worker_generator.create(lattice_id).await {
let worker = match worker_generator.create(lattice_id, None).await {
Ok(w) => w,
Err(e) => {
error!(error = %e, %lattice_id, "Unable to add consumer for lattice. Error when generating worker");
Expand Down
39 changes: 24 additions & 15 deletions src/nats_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ impl LatticeIdParser {
}
}

/// Parses the given subject based on settings and then returns the lattice ID of the subject.
/// Parses the given subject based on settings and then returns the lattice ID of the subject and
/// the account ID if it is multitenant.
/// Returns None if it couldn't parse the topic
pub fn parse<'a>(&self, subject: &'a str) -> Option<&'a str> {
pub fn parse<'a>(&self, subject: &'a str) -> (Option<&'a str>, Option<&'a str>) {
let separated: Vec<&str> = subject.split('.').collect();
// For reference, topics look like the following:
//
Expand All @@ -32,17 +33,17 @@ impl LatticeIdParser {
// Note that the account ID should be prefaced with an `A`
match separated[..] {
[prefix, evt, lattice_id] if prefix == self.prefix && evt == EVENT_SUBJECT => {
Some(lattice_id)
(Some(lattice_id), None)
}
[account_id, prefix, evt, lattice_id]
if self.multitenant
&& prefix == self.prefix
&& evt == EVENT_SUBJECT
&& account_id.starts_with('A') =>
{
Some(lattice_id)
(Some(lattice_id), Some(&account_id))
}
_ => None,
_ => (None, None),
}
}
}
Expand All @@ -59,14 +60,15 @@ mod test {
assert_eq!(
parser
.parse("wasmbus.evt.blahblah")
.0
.expect("Should return lattice id"),
"blahblah",
"Should return the right ID"
);

// Shouldn't parse a multitenant
assert!(
parser.parse("ACCOUNTID.wasmbus.evt.default").is_none(),
parser.parse("ACCOUNTID.wasmbus.evt.default").0.is_none(),
"Shouldn't parse a multitenant topic"
);

Expand All @@ -76,18 +78,25 @@ mod test {
assert_eq!(
parser
.parse("wasmbus.evt.blahblah")
.0
.expect("Should return lattice id"),
"blahblah",
"Should return the right ID"
);

let res = parser.parse("ACCOUNTID.wasmbus.evt.blahblah");

assert_eq!(
parser
.parse("ACCOUNTID.wasmbus.evt.blahblah")
.expect("Should return lattice id"),
res.0.expect("Should return lattice id"),
"blahblah",
"Should return the right ID"
);

assert_eq!(
res.1.expect("Should return account id in multitenant mode"),
"ACCOUNTID",
"Should return the right ID"
);
}

#[test]
Expand All @@ -96,32 +105,32 @@ mod test {

// Test 3 and 4 part subjects to make sure they don't parse
assert!(
parser.parse("BLAH.wasmbus.notevt.default").is_none(),
parser.parse("BLAH.wasmbus.notevt.default").0.is_none(),
"Shouldn't parse 4 part invalid topic"
);

assert!(
parser.parse("wasmbus.notme.default").is_none(),
parser.parse("wasmbus.notme.default").0.is_none(),
"Shouldn't parse 3 part invalid topic"
);

assert!(
parser.parse("lebus.evt.default").is_none(),
parser.parse("lebus.evt.default").0.is_none(),
"Shouldn't parse an non-matching prefix"
);

assert!(
parser.parse("wasmbus.evt").is_none(),
parser.parse("wasmbus.evt").0.is_none(),
"Shouldn't parse a too short topic"
);

assert!(
parser.parse("BADACCOUNT.wasmbus.evt.default").is_none(),
parser.parse("BADACCOUNT.wasmbus.evt.default").0.is_none(),
"Shouldn't parse invalid account topic"
);

assert!(
parser.parse("wasmbus.notme.default.bar.baz").is_none(),
parser.parse("wasmbus.notme.default.bar.baz").0.is_none(),
"Shouldn't parse long topic"
);
}
Expand Down

0 comments on commit 484adf1

Please sign in to comment.