diff --git a/async-nats/src/jetstream/consumer/mod.rs b/async-nats/src/jetstream/consumer/mod.rs index dc2ed42a2..8107080d7 100644 --- a/async-nats/src/jetstream/consumer/mod.rs +++ b/async-nats/src/jetstream/consumer/mod.rs @@ -271,7 +271,12 @@ pub struct Config { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// The maximum number of waiting consumers. #[serde(default, skip_serializing_if = "is_default")] @@ -428,6 +433,26 @@ fn is_default(t: &T) -> bool { t == &T::default() } +pub(crate) mod from_str { + pub(crate) fn deserialize<'de, T, D>(deserializer: D) -> Result + where + T: std::str::FromStr, + T::Err: std::fmt::Display, + D: serde::Deserializer<'de>, + { + let s = ::deserialize(deserializer)?; + T::from_str(&s).map_err(serde::de::Error::custom) + } + + pub(crate) fn serialize(value: &T, serializer: S) -> Result + where + T: std::fmt::Display, + S: serde::Serializer, + { + serializer.serialize_str(&value.to_string()) + } +} + #[derive(Clone, Copy, Debug, PartialEq)] pub enum StreamErrorKind { TimedOut, diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index cb65d0294..dffd9f50d 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -603,7 +603,12 @@ pub struct OrderedConfig { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "super::from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// Only deliver headers without payloads. #[serde(default, skip_serializing_if = "is_default")] @@ -2044,7 +2049,12 @@ pub struct Config { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "super::from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// The maximum number of waiting consumers. #[serde(default, skip_serializing_if = "is_default")] diff --git a/async-nats/src/jetstream/consumer/push.rs b/async-nats/src/jetstream/consumer/push.rs index 006b498db..9ab5a8724 100644 --- a/async-nats/src/jetstream/consumer/push.rs +++ b/async-nats/src/jetstream/consumer/push.rs @@ -234,7 +234,12 @@ pub struct Config { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "super::from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// The maximum number of waiting consumers. #[serde(default, skip_serializing_if = "is_default")] @@ -382,7 +387,12 @@ pub struct OrderedConfig { #[serde(default, skip_serializing_if = "is_default")] pub rate_limit: u64, /// What percentage of acknowledgments should be samples for observability, 0-100 - #[serde(default, skip_serializing_if = "is_default")] + #[serde( + rename = "sample_freq", + with = "super::from_str", + default, + skip_serializing_if = "is_default" + )] pub sample_frequency: u8, /// Only deliver headers without payloads. #[serde(default, skip_serializing_if = "is_default")] diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 2ba66a251..0bcdb11ce 100755 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -2514,6 +2514,59 @@ mod jetstream { } } + #[tokio::test] + async fn consumer_configs_sample_frequency() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + + let client = ConnectOptions::new() + .event_callback(|err| async move { println!("error: {err:?}") }) + .connect(server.client_url()) + .await + .unwrap(); + + let js = async_nats::jetstream::new(client.clone()); + + let stream = js + .create_stream(stream::Config { + name: "StreamWithSampledConsumers".into(), + ..Default::default() + }) + .await + .unwrap(); + + { + let consumer = stream + .create_consumer(consumer::pull::Config { + name: Some("SampledPullConsumer".into()), + description: Some( + "See below to check that Ack Sampling has been set to 100%!".to_string(), + ), + sample_frequency: 100, // <--- sample all the messages + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(100, consumer.cached_info().config.sample_frequency); + } + + { + let consumer = stream + .create_consumer(consumer::pull::Config { + name: Some("SampledPushConsumer".into()), + description: Some( + "See below to check that Ack Sampling has been set to 100%!".to_string(), + ), + sample_frequency: 100, // <--- sample all the messages + ..Default::default() + }) + .await + .unwrap(); + + assert_eq!(100, consumer.cached_info().config.sample_frequency); + } + } + #[tokio::test] async fn timeout_out_request() { let server = nats_server::run_server("tests/configs/jetstream.conf");