diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index 9f48668e0..5c51212dc 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -600,6 +600,8 @@ impl Store { _ => WatchError::with_source(WatchErrorKind::Other, err), })?; + let no_messages = consumer.cached_info().num_pending == 0; + Ok(Watch { subscription: consumer.messages().await.map_err(|err| match err.kind() { crate::jetstream::consumer::StreamErrorKind::TimedOut => { @@ -612,6 +614,7 @@ impl Store { prefix: self.prefix.clone(), bucket: self.name.clone(), seen_current: false, + no_messages, }) } @@ -1072,6 +1075,7 @@ impl Store { /// A structure representing a watch on a key-value bucket, yielding values whenever there are changes. pub struct Watch { + no_messages: bool, seen_current: bool, subscription: super::consumer::push::Ordered, prefix: String, @@ -1085,6 +1089,9 @@ impl futures::Stream for Watch { mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { + if self.no_messages { + return Poll::Ready(None); + } match self.subscription.poll_next_unpin(cx) { Poll::Ready(message) => match message { None => Poll::Ready(None), diff --git a/async-nats/tests/kv_tests.rs b/async-nats/tests/kv_tests.rs index 1901ca092..2161ce26b 100644 --- a/async-nats/tests/kv_tests.rs +++ b/async-nats/tests/kv_tests.rs @@ -532,6 +532,29 @@ mod kv { } } } + + #[tokio::test] + async fn watch_no_messages() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + + let context = async_nats::jetstream::new(client); + let kv = context + .create_key_value(async_nats::jetstream::kv::Config { + bucket: "history".to_string(), + description: "test_description".to_string(), + history: 15, + storage: StorageType::File, + num_replicas: 1, + ..Default::default() + }) + .await + .unwrap(); + + let mut watcher = kv.watch("foo").await.unwrap(); + assert!(watcher.next().await.is_none()); + } + #[tokio::test] async fn watch() { let server = nats_server::run_server("tests/configs/jetstream.conf");