Skip to content

Commit

Permalink
make the channels work on OWNED queries
Browse files Browse the repository at this point in the history
  • Loading branch information
p-avital committed Dec 14, 2023
1 parent 7ca1b56 commit 5a4e28a
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ serde_yaml = "0.9.19"

[lib]
path="src/lib.rs"
name = "zenohcd"
name = "zenohc"
crate-type = ["cdylib", "staticlib"]
doctest = false

Expand Down
63 changes: 49 additions & 14 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,40 @@ typedef struct z_owned_closure_hello_t {
void (*call)(struct z_owned_hello_t*, void*);
void (*drop)(void*);
} z_owned_closure_hello_t;
/**
* Owned variant of a Query received by a Queryable.
*
* You may construct it by `z_query_clone`-ing a loaned query.
* When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns,
* the query will receive its termination signal.
*
* Holding onto an `z_owned_query_t` for too long (10s by default, can be set in `z_get`'s options) will trigger a timeout error
* to be sent to the querier by the infrastructure, and new responses to the outdated query will be silently dropped.
*/
typedef struct z_owned_query_t {
void *_0;
} z_owned_query_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
*
* Members:
* void *context: a pointer to an arbitrary state.
* void *call(const struct z_query_t*, const void *context): the typical callback function. `context` will be passed as its last argument.
* void *drop(void*): allows the callback's state to be freed.
*
* Closures are not guaranteed not to be called concurrently.
*
* It is guaranteed that:
*
* - `call` will never be called once `drop` has started.
* - `drop` will only be called **once**, and **after every** `call` has ended.
* - The two previous guarantees imply that `call` and `drop` are never called concurrently.
*/
typedef struct z_owned_closure_owned_query_t {
void *context;
void (*call)(struct z_owned_query_t*, void *context);
void (*drop)(void*);
} z_owned_closure_owned_query_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
*
Expand Down Expand Up @@ -630,19 +664,6 @@ typedef struct z_put_options_t {
enum z_congestion_control_t congestion_control;
enum z_priority_t priority;
} z_put_options_t;
/**
* Owned variant of a Query received by a Queryable.
*
* You may construct it by `z_query_clone`-ing a loaned query.
* When the last `z_owned_query_t` corresponding to a query is destroyed, or the callback that produced the query cloned to build them returns,
* the query will receive its termination signal.
*
* Holding onto an `z_owned_query_t` for too long (10s by default, can be set in `z_get`'s options) will trigger a timeout error
* to be sent to the querier by the infrastructure, and new responses to the outdated query will be silently dropped.
*/
typedef struct z_owned_query_t {
void *_0;
} z_owned_query_t;
/**
* A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
* - `this` is a pointer to an arbitrary state.
Expand All @@ -665,7 +686,7 @@ typedef struct z_owned_query_channel_closure_t {
* A pair of closures
*/
typedef struct z_owned_query_channel_t {
struct z_owned_closure_query_t send;
struct z_owned_closure_owned_query_t send;
struct z_owned_query_channel_closure_t recv;
} z_owned_query_channel_t;
/**
Expand Down Expand Up @@ -879,6 +900,20 @@ ZENOHC_API struct z_owned_closure_hello_t z_closure_hello_null(void);
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
void z_closure_owned_query_call(const struct z_owned_closure_owned_query_t *closure,
struct z_owned_query_t *query);
/**
* Drops the closure. Droping an uninitialized closure is a no-op.
*/
ZENOHC_API void z_closure_owned_query_drop(struct z_owned_closure_owned_query_t *closure);
/**
* Constructs a null safe-to-drop value of 'z_owned_closure_query_t' type
*/
ZENOHC_API struct z_owned_closure_owned_query_t z_closure_owned_query_null(void);
/**
* Calls the closure. Calling an uninitialized closure is a no-op.
*/
ZENOHC_API
void z_closure_query_call(const struct z_owned_closure_query_t *closure,
const struct z_query_t *query);
/**
Expand Down
8 changes: 4 additions & 4 deletions src/closures/query_channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{z_closure_query_drop, z_owned_closure_query_t, z_owned_query_t};
use crate::{z_closure_owned_query_drop, z_owned_closure_owned_query_t, z_owned_query_t};
use libc::c_void;
use std::sync::mpsc::TryRecvError;

Expand All @@ -23,19 +23,19 @@ pub struct z_owned_query_channel_closure_t {
/// A pair of closures
#[repr(C)]
pub struct z_owned_query_channel_t {
pub send: z_owned_closure_query_t,
pub send: z_owned_closure_owned_query_t,
pub recv: z_owned_query_channel_closure_t,
}
#[no_mangle]
pub extern "C" fn z_query_channel_drop(channel: &mut z_owned_query_channel_t) {
z_closure_query_drop(&mut channel.send);
z_closure_owned_query_drop(&mut channel.send);
z_query_channel_closure_drop(&mut channel.recv);
}
/// Constructs a null safe-to-drop value of 'z_owned_query_channel_t' type
#[no_mangle]
pub extern "C" fn z_query_channel_null() -> z_owned_query_channel_t {
z_owned_query_channel_t {
send: z_owned_closure_query_t::empty(),
send: z_owned_closure_owned_query_t::empty(),
recv: z_owned_query_channel_closure_t::empty(),
}
}
Expand Down
83 changes: 82 additions & 1 deletion src/closures/query_closure.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::z_query_t;
use crate::{z_owned_query_t, z_query_t};
use libc::c_void;
/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
///
Expand Down Expand Up @@ -74,3 +74,84 @@ impl<F: Fn(&z_query_t)> From<F> for z_owned_closure_query_t {
}
}
}

/// A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks:
///
/// Members:
/// void *context: a pointer to an arbitrary state.
/// void *call(const struct z_query_t*, const void *context): the typical callback function. `context` will be passed as its last argument.
/// void *drop(void*): allows the callback's state to be freed.
///
/// Closures are not guaranteed not to be called concurrently.
///
/// It is guaranteed that:
///
/// - `call` will never be called once `drop` has started.
/// - `drop` will only be called **once**, and **after every** `call` has ended.
/// - The two previous guarantees imply that `call` and `drop` are never called concurrently.
#[repr(C)]
pub struct z_owned_closure_owned_query_t {
context: *mut c_void,
call: Option<extern "C" fn(&mut z_owned_query_t, context: *mut c_void)>,
drop: Option<extern "C" fn(*mut c_void)>,
}
impl z_owned_closure_owned_query_t {
pub fn empty() -> Self {
z_owned_closure_owned_query_t {
context: std::ptr::null_mut(),
call: None,
drop: None,
}
}
}
unsafe impl Send for z_owned_closure_owned_query_t {}
unsafe impl Sync for z_owned_closure_owned_query_t {}
impl Drop for z_owned_closure_owned_query_t {
fn drop(&mut self) {
if let Some(drop) = self.drop {
drop(self.context)
}
}
}
/// Constructs a null safe-to-drop value of 'z_owned_closure_query_t' type
#[no_mangle]
pub extern "C" fn z_closure_owned_query_null() -> z_owned_closure_owned_query_t {
z_owned_closure_owned_query_t::empty()
}
/// Calls the closure. Calling an uninitialized closure is a no-op.
#[no_mangle]
pub extern "C" fn z_closure_owned_query_call(
closure: &z_owned_closure_owned_query_t,
query: &mut z_owned_query_t,
) {
match closure.call {
Some(call) => call(query, closure.context),
None => log::error!("Attempted to call an uninitialized closure!"),
}
}
/// Drops the closure. Droping an uninitialized closure is a no-op.
#[no_mangle]
pub extern "C" fn z_closure_owned_query_drop(closure: &mut z_owned_closure_owned_query_t) {
let mut empty_closure = z_owned_closure_owned_query_t::empty();
std::mem::swap(&mut empty_closure, closure);
}
impl<F: Fn(&mut z_owned_query_t)> From<F> for z_owned_closure_owned_query_t {
fn from(f: F) -> Self {
let this = Box::into_raw(Box::new(f)) as _;
extern "C" fn call<F: Fn(&mut z_owned_query_t)>(
sample: &mut z_owned_query_t,
this: *mut c_void,
) {
let this = unsafe { &*(this as *const F) };
this(sample)
}
extern "C" fn drop<F>(this: *mut c_void) {
std::mem::drop(unsafe { Box::from_raw(this as *mut F) })
}
z_owned_closure_owned_query_t {
context: this,
call: Some(call::<F>),
drop: Some(drop::<F>),
}
}
}
15 changes: 15 additions & 0 deletions src/queryable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ impl Deref for z_query_t {
unsafe { &*(self.0 as *const _) }
}
}
impl From<Option<Query>> for z_owned_query_t {
fn from(value: Option<Query>) -> Self {
unsafe { core::mem::transmute(value) }
}
}
impl From<Query> for z_owned_query_t {
fn from(value: Query) -> Self {
Some(value).into()
}
}
impl Deref for z_owned_query_t {
type Target = Option<Query>;
fn deref(&self) -> &Self::Target {
Expand All @@ -111,6 +121,11 @@ impl DerefMut for z_owned_query_t {
unsafe { &mut *(self.0 as *mut _) }
}
}
impl Drop for z_owned_query_t {
fn drop(&mut self) {
let _: Option<Query> = self.take();
}
}
/// The gravestone value of `z_owned_query_t`.
#[no_mangle]
pub extern "C" fn z_query_null() -> z_owned_query_t {
Expand Down

0 comments on commit 5a4e28a

Please sign in to comment.