Skip to content

Commit

Permalink
move reliability to publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisBiryukov91 committed Sep 5, 2024
1 parent 0082311 commit 2170e62
Show file tree
Hide file tree
Showing 8 changed files with 527 additions and 443 deletions.
727 changes: 377 additions & 350 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ Functions
.. doxygenfunction:: z_sample_priority
.. doxygenfunction:: z_sample_congestion_control
.. doxygenfunction:: z_sample_express
.. doxygenfunction:: z_sample_reliability


Timestamp
Expand Down Expand Up @@ -495,6 +496,7 @@ Types

.. doxygenenum:: z_congestion_control_t
.. doxygenenum:: z_priority_t
.. doxygenenum:: z_reliability_t

.. doxygenstruct:: z_put_options_t
:members:
Expand Down Expand Up @@ -530,6 +532,8 @@ Functions
.. doxygenfunction:: z_publisher_put_options_default
.. doxygenfunction:: z_publisher_delete_options_default

.. doxygenfunction:: z_reliability_default

.. doxygenfunction:: zc_closure_matching_status_check
.. doxygenfunction:: zc_closure_matching_status_null
.. doxygenfunction:: zc_closure_matching_status_drop
Expand All @@ -550,8 +554,6 @@ Types
.. doxygenstruct:: z_owned_closure_sample_t
:members:

.. doxygenenum:: z_reliability_t

.. doxygenstruct:: z_subscriber_options_t
:members:

Expand Down
48 changes: 40 additions & 8 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ typedef enum z_query_target_t {
Z_QUERY_TARGET_ALL_COMPLETE,
} z_query_target_t;
/**
* The subscription reliability.
* The publisher reliability.
* NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
* It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
*/
#if defined(UNSTABLE)
typedef enum z_reliability_t {
Expand Down Expand Up @@ -591,6 +593,12 @@ typedef struct z_publisher_options_t {
* If true, Zenoh will not wait to batch this message with others to reduce the bandwith.
*/
bool is_express;
#if defined(UNSTABLE)
/**
* The publisher reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination for this publisher.
Expand All @@ -611,12 +619,6 @@ typedef struct z_queryable_options_t {
* Options passed to the `z_declare_subscriber()` function.
*/
typedef struct z_subscriber_options_t {
#if defined(UNSTABLE)
/**
* The subscription reliability.
*/
enum z_reliability_t reliability;
#endif
#if !defined(UNSTABLE)
/**
* Dummy field to avoid having fieldless struct
Expand Down Expand Up @@ -644,6 +646,12 @@ typedef struct z_delete_options_t {
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
#if defined(UNSTABLE)
/**
* The delete operation reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination of this message.
Expand Down Expand Up @@ -794,6 +802,12 @@ typedef struct z_put_options_t {
* The timestamp of this message.
*/
struct z_timestamp_t *timestamp;
#if defined(UNSTABLE)
/**
* The put operation reliability.
*/
enum z_reliability_t reliability;
#endif
#if defined(UNSTABLE)
/**
* The allowed destination of this message.
Expand Down Expand Up @@ -1947,7 +1961,7 @@ z_result_t z_declare_subscriber(struct z_owned_subscriber_t *this_,
const struct z_loaned_session_t *session,
const struct z_loaned_keyexpr_t *key_expr,
struct z_moved_closure_sample_t *callback,
struct z_subscriber_options_t *options);
struct z_subscriber_options_t *_options);
/**
* Sends request to delete data on specified key expression (used when working with <a href="https://zenoh.io/docs/manual/abstractions/#storage"> Zenoh storages </a>).
*
Expand Down Expand Up @@ -2613,6 +2627,12 @@ ZENOHC_API enum z_whatami_t z_hello_whatami(const struct z_loaned_hello_t *this_
#if defined(UNSTABLE)
ZENOHC_API z_id_t z_hello_zid(const struct z_loaned_hello_t *this_);
#endif
/**
* Formats the `z_id_t` into 16-digit hex string (LSB-first order)
*/
#if defined(UNSTABLE)
ZENOHC_API void z_id_to_string(const z_id_t *zid, struct z_owned_string_t *dst);
#endif
/**
* Fetches the Zenoh IDs of all connected peers.
*
Expand Down Expand Up @@ -3504,6 +3524,12 @@ ZENOHC_API uint8_t z_random_u8(void);
#if (defined(SHARED_MEMORY) && defined(UNSTABLE))
ZENOHC_API void z_ref_shm_client_storage_global(z_owned_shm_client_storage_t *this_);
#endif
/**
* Returns the default value for `reliability`.
*/
#if defined(UNSTABLE)
ZENOHC_API enum z_reliability_t z_reliability_default(void);
#endif
/**
* Constructs an owned shallow copy of reply in provided uninitialized memory location.
*/
Expand Down Expand Up @@ -3706,6 +3732,12 @@ ZENOHC_API const struct z_loaned_bytes_t *z_sample_payload(const struct z_loaned
* Returns sample qos priority value.
*/
ZENOHC_API enum z_priority_t z_sample_priority(const struct z_loaned_sample_t *this_);
/**
* Returns the reliability setting the sample was delieverd with.
*/
#if defined(UNSTABLE)
ZENOHC_API enum z_reliability_t z_sample_reliability(const struct z_loaned_sample_t *this_);
#endif
/**
* Returns the sample source_info.
*/
Expand Down
26 changes: 13 additions & 13 deletions include/zenoh_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -657,43 +657,43 @@ inline void z_call(const z_loaned_closure_sample_t* closure, const z_loaned_samp
z_closure_sample_call(closure, sample);
};

extern "C" typedef void (*z_closure_drop_callback_t)(void*);
extern "C" using z_closure_drop_callback_t = void(void*);

extern "C" typedef void (*z_closure_hello_callback_t)(const z_loaned_hello_t*, void*);
extern "C" using z_closure_hello_callback_t = void(const z_loaned_hello_t*, void*);
inline void z_closure(
z_owned_closure_hello_t* closure,
z_closure_hello_callback_t call,
z_closure_drop_callback_t drop,
z_closure_hello_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
closure->call = call;
};
extern "C" typedef void (*z_closure_query_callback_t)(const z_loaned_query_t*, void*);
extern "C" using z_closure_query_callback_t = void(const z_loaned_query_t*, void*);
inline void z_closure(
z_owned_closure_query_t* closure,
z_closure_query_callback_t call,
z_closure_drop_callback_t drop,
z_closure_query_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
closure->call = call;
};
extern "C" typedef void (*z_closure_reply_callback_t)(const z_loaned_reply_t*, void*);
extern "C" using z_closure_reply_callback_t = void(const z_loaned_reply_t*, void*);
inline void z_closure(
z_owned_closure_reply_t* closure,
z_closure_reply_callback_t call,
z_closure_drop_callback_t drop,
z_closure_reply_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
closure->call = call;
};
extern "C" typedef void (*z_closure_sample_callback_t)(const z_loaned_sample_t*, void*);
extern "C" using z_closure_sample_callback_t = void(const z_loaned_sample_t*, void*);
inline void z_closure(
z_owned_closure_sample_t* closure,
z_closure_sample_callback_t call,
z_closure_drop_callback_t drop,
z_closure_sample_callback_t* call,
z_closure_drop_callback_t* drop,
void *context) {
closure->context = context;
closure->drop = drop;
Expand Down
63 changes: 57 additions & 6 deletions src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
use std::{mem::MaybeUninit, ptr::null};

use libc::c_ulong;
use zenoh::{
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleKind},
time::Timestamp,
};
#[cfg(feature = "unstable")]
use zenoh::{
pubsub::Reliability,
query::ReplyKeyExpr,
sample::{Locality, SourceInfo},
session::EntityGlobalId,
};
use zenoh::{
qos::{CongestionControl, Priority},
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleKind},
time::Timestamp,
};

#[cfg(feature = "unstable")]
use crate::transmute::IntoCType;
Expand Down Expand Up @@ -186,6 +187,13 @@ pub extern "C" fn z_sample_congestion_control(this_: &z_loaned_sample_t) -> z_co
this_.as_rust_type_ref().congestion_control().into()
}

#[cfg(feature = "unstable")]
/// Returns the reliability setting the sample was delieverd with.
#[no_mangle]
pub extern "C" fn z_sample_reliability(this_: &z_loaned_sample_t) -> z_reliability_t {
this_.as_rust_type_ref().reliability().into()
}

/// Returns ``true`` if sample is valid, ``false`` if it is in gravestone state.
#[no_mangle]
pub extern "C" fn z_internal_sample_check(this_: &z_owned_sample_t) -> bool {
Expand Down Expand Up @@ -256,6 +264,49 @@ pub extern "C" fn zc_locality_default() -> zc_locality_t {
Locality::default().into()
}

/// The publisher reliability.
/// NOTE: Currently `reliability` does not trigger any data retransmission on the wire.
/// It is rather used as a marker on the wire and it may be used to select the best link available (e.g. TCP for reliable data and UDP for best effort data).
#[cfg(feature = "unstable")]
#[allow(non_camel_case_types, clippy::upper_case_acronyms)]
#[repr(C)]
#[derive(Clone, Copy)]
pub enum z_reliability_t {
/// Defines reliability as ``BEST_EFFORT``
BEST_EFFORT,
/// Defines reliability as ``RELIABLE``
RELIABLE,
}

#[cfg(feature = "unstable")]
impl From<Reliability> for z_reliability_t {
#[inline]
fn from(r: Reliability) -> Self {
match r {
Reliability::BestEffort => z_reliability_t::BEST_EFFORT,
Reliability::Reliable => z_reliability_t::RELIABLE,
}
}
}

#[cfg(feature = "unstable")]
/// Returns the default value for `reliability`.
#[no_mangle]
pub extern "C" fn z_reliability_default() -> z_reliability_t {
Reliability::default().into()
}

#[cfg(feature = "unstable")]
impl From<z_reliability_t> for Reliability {
#[inline]
fn from(val: z_reliability_t) -> Self {
match val {
z_reliability_t::BEST_EFFORT => Reliability::BestEffort,
z_reliability_t::RELIABLE => Reliability::Reliable,
}
}
}

#[cfg(feature = "unstable")]
/// Key expressions types to which Queryable should reply to.
#[repr(C)]
Expand Down
14 changes: 11 additions & 3 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use crate::{
};
#[cfg(feature = "unstable")]
use crate::{
transmute::IntoCType, z_entity_global_id_t, zc_closure_matching_status_call,
zc_closure_matching_status_loan, zc_locality_default, zc_locality_t,
transmute::IntoCType, z_entity_global_id_t, z_reliability_default, z_reliability_t,
zc_closure_matching_status_call, zc_closure_matching_status_loan, zc_locality_default,
zc_locality_t,
};

/// Options passed to the `z_declare_publisher()` function.
Expand All @@ -49,6 +50,9 @@ pub struct z_publisher_options_t {
pub priority: z_priority_t,
/// If true, Zenoh will not wait to batch this message with others to reduce the bandwith.
pub is_express: bool,
/// The publisher reliability.
#[cfg(feature = "unstable")]
pub reliability: z_reliability_t,
#[cfg(feature = "unstable")]
/// The allowed destination for this publisher.
pub allowed_destination: zc_locality_t,
Expand All @@ -63,6 +67,8 @@ pub extern "C" fn z_publisher_options_default(this_: &mut MaybeUninit<z_publishe
priority: Priority::default().into(),
is_express: false,
#[cfg(feature = "unstable")]
reliability: z_reliability_default(),
#[cfg(feature = "unstable")]
allowed_destination: zc_locality_default(),
});
}
Expand Down Expand Up @@ -103,7 +109,9 @@ pub extern "C" fn z_declare_publisher(
.express(options.is_express);
#[cfg(feature = "unstable")]
{
p = p.allowed_destination(options.allowed_destination.into());
p = p
.reliability(options.reliability.into())
.allowed_destination(options.allowed_destination.into());
}
if let Some(encoding) = options.encoding.take() {
p = p.encoding(encoding.take_rust_type());
Expand Down
Loading

0 comments on commit 2170e62

Please sign in to comment.