Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove subscriber reliability option #1353

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 4 additions & 36 deletions commons/zenoh-codec/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_protocol::{
common::{iext, imsg, ZExtZ64},
common::{iext, imsg},
core::{ExprId, ExprLen, WireExpr},
network::{
declare::{self, common, keyexpr, queryable, subscriber, token, Declare, DeclareBody},
Expand Down Expand Up @@ -384,9 +384,6 @@ where
}
}

// SubscriberInfo
crate::impl_zextz64!(subscriber::ext::SubscriberInfo, subscriber::ext::Info::ID);

// DeclareSubscriber
impl<W> WCodec<&subscriber::DeclareSubscriber, &mut W> for Zenoh080
where
Expand All @@ -395,18 +392,10 @@ where
type Output = Result<(), DidntWrite>;

fn write(self, writer: &mut W, x: &subscriber::DeclareSubscriber) -> Self::Output {
let subscriber::DeclareSubscriber {
id,
wire_expr,
ext_info,
} = x;
let subscriber::DeclareSubscriber { id, wire_expr } = x;

// Header
let mut header = declare::id::D_SUBSCRIBER;
let mut n_exts = (ext_info != &subscriber::ext::SubscriberInfo::DEFAULT) as u8;
if n_exts != 0 {
header |= subscriber::flag::Z;
}
if wire_expr.mapping != Mapping::DEFAULT {
header |= subscriber::flag::M;
}
Expand All @@ -420,10 +409,6 @@ where
self.write(&mut *writer, wire_expr)?;

// Extensions
if ext_info != &subscriber::ext::SubscriberInfo::DEFAULT {
n_exts -= 1;
self.write(&mut *writer, (*ext_info, n_exts != 0))?;
}

Ok(())
}
Expand Down Expand Up @@ -465,30 +450,13 @@ where
};

// Extensions
let mut ext_info = subscriber::ext::SubscriberInfo::DEFAULT;

let mut has_ext = imsg::has_flag(self.header, subscriber::flag::Z);
while has_ext {
let ext: u8 = self.codec.read(&mut *reader)?;
let eodec = Zenoh080Header::new(ext);
match iext::eid(ext) {
subscriber::ext::Info::ID => {
let (i, ext): (subscriber::ext::SubscriberInfo, bool) =
eodec.read(&mut *reader)?;
ext_info = i;
has_ext = ext;
}
_ => {
has_ext = extension::skip(reader, "DeclareSubscriber", ext)?;
}
}
has_ext = extension::skip(reader, "DeclareSubscriber", ext)?;
}

Ok(subscriber::DeclareSubscriber {
id,
wire_expr,
ext_info,
})
Ok(subscriber::DeclareSubscriber { id, wire_expr })
}
}

Expand Down
78 changes: 3 additions & 75 deletions commons/zenoh-protocol/src/network/declare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub use subscriber::*;
pub use token::*;

use crate::{
common::{imsg, ZExtZ64, ZExtZBuf},
core::{ExprId, Reliability, WireExpr},
common::{ZExtZ64, ZExtZBuf},
core::{ExprId, WireExpr},
network::Mapping,
zextz64, zextzbuf,
};
Expand Down Expand Up @@ -341,73 +341,6 @@ pub mod subscriber {
pub struct DeclareSubscriber {
pub id: SubscriberId,
pub wire_expr: WireExpr<'static>,
pub ext_info: ext::SubscriberInfo,
}

pub mod ext {
use super::*;

pub type Info = zextz64!(0x01, false);

/// # The subscription mode.
///
/// ```text
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|0_1| ID |
/// +-+-+-+---------+
/// % reserved |R%
/// +---------------+
///
/// - if R==1 then the subscription is reliable, else it is best effort
/// - rsv: Reserved
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubscriberInfo {
pub reliability: Reliability,
}

impl SubscriberInfo {
pub const R: u64 = 1;

pub const DEFAULT: Self = Self {
reliability: Reliability::DEFAULT,
};

#[cfg(feature = "test")]
pub fn rand() -> Self {
let reliability = Reliability::rand();

Self { reliability }
}
}

impl Default for SubscriberInfo {
fn default() -> Self {
Self::DEFAULT
}
}

impl From<Info> for SubscriberInfo {
fn from(ext: Info) -> Self {
let reliability = if imsg::has_option(ext.value, SubscriberInfo::R) {
Reliability::Reliable
} else {
Reliability::BestEffort
};
Self { reliability }
}
}

impl From<SubscriberInfo> for Info {
fn from(ext: SubscriberInfo) -> Self {
let mut v: u64 = 0;
if ext.reliability == Reliability::Reliable {
v |= SubscriberInfo::R;
}
Info::new(v)
}
}
}

impl DeclareSubscriber {
Expand All @@ -418,13 +351,8 @@ pub mod subscriber {

let id: SubscriberId = rng.gen();
let wire_expr = WireExpr::rand();
let ext_info = ext::SubscriberInfo::rand();

Self {
id,
wire_expr,
ext_info,
}
Self { id, wire_expr }
}
}

Expand Down
65 changes: 39 additions & 26 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};

#[cfg(feature = "unstable")]
use zenoh::pubsub::Reliability;
use zenoh::{
handlers::{locked, DefaultHandler, IntoHandler},
internal::zlock,
key_expr::KeyExpr,
prelude::Wait,
pubsub::{Reliability, Subscriber},
pubsub::Subscriber,
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait},
session::{SessionDeclarations, SessionRef},
Expand All @@ -41,7 +43,6 @@ pub struct QueryingSubscriberBuilder<'a, 'b, KeySpace, Handler> {
pub(crate) session: SessionRef<'a>,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
pub(crate) origin: Locality,
pub(crate) query_selector: Option<ZResult<Selector<'b>>>,
pub(crate) query_target: QueryTarget,
Expand All @@ -65,7 +66,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand All @@ -78,7 +78,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand Down Expand Up @@ -118,7 +117,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand All @@ -131,7 +129,6 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle
session,
key_expr,
key_space,
reliability,
origin,
query_selector,
query_target,
Expand All @@ -145,23 +142,35 @@ impl<'a, 'b, KeySpace> QueryingSubscriberBuilder<'a, 'b, KeySpace, DefaultHandle

impl<'a, 'b, Handler> QueryingSubscriberBuilder<'a, 'b, crate::UserSpace, Handler> {
/// Change the subscription reliability.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut, unused_variables)]
pub fn reliability(mut self, reliability: Reliability) -> Self {
self.reliability = reliability;
self
}

/// Change the subscription reliability to Reliable.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn reliable(mut self) -> Self {
self.reliability = Reliability::Reliable;
self
}

/// Change the subscription reliability to BestEffort.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn best_effort(mut self) -> Self {
self.reliability = Reliability::BestEffort;
self
}

Expand Down Expand Up @@ -249,7 +258,6 @@ where
session: self.session,
key_expr: Ok(key_expr.clone()),
key_space: self.key_space,
reliability: self.reliability,
origin: self.origin,
fetch: |cb| match key_space {
crate::KeySpace::User => match query_selector {
Expand Down Expand Up @@ -365,7 +373,6 @@ pub struct FetchingSubscriberBuilder<
pub(crate) session: SessionRef<'a>,
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) key_space: KeySpace,
pub(crate) reliability: Reliability,
pub(crate) origin: Locality,
pub(crate) fetch: Fetch,
pub(crate) handler: Handler,
Expand All @@ -390,7 +397,6 @@ where
session: self.session,
key_expr: self.key_expr.map(|s| s.into_owned()),
key_space: self.key_space,
reliability: self.reliability,
origin: self.origin,
fetch: self.fetch,
handler: self.handler,
Expand Down Expand Up @@ -422,7 +428,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler: _,
Expand All @@ -432,7 +437,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler: callback,
Expand Down Expand Up @@ -476,7 +480,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler: _,
Expand All @@ -486,7 +489,6 @@ where
session,
key_expr,
key_space,
reliability,
origin,
fetch,
handler,
Expand All @@ -506,23 +508,35 @@ where
TryIntoSample: ExtractSample,
{
/// Change the subscription reliability.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut, unused_variables)]
pub fn reliability(mut self, reliability: Reliability) -> Self {
self.reliability = reliability;
self
}

/// Change the subscription reliability to Reliable.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn reliable(mut self) -> Self {
self.reliability = Reliability::Reliable;
self
}

/// Change the subscription reliability to BestEffort.
#[inline]
#[cfg(feature = "unstable")]
#[deprecated(
since = "1.0.0",
note = "please use `reliability` on `declare_publisher` or `put`"
)]
#[allow(unused_mut)]
pub fn best_effort(mut self) -> Self {
self.reliability = Reliability::BestEffort;
self
}

Expand Down Expand Up @@ -698,7 +712,6 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
.session
.declare_subscriber(&key_expr)
.callback(sub_callback)
.reliability(conf.reliability)
.allowed_origin(conf.origin)
.wait()?,
crate::KeySpace::Liveliness => conf
Expand Down
Loading
Loading