Skip to content

Commit

Permalink
#120 Rework observer/scoped_observer/observe/tap
Browse files Browse the repository at this point in the history
* observable_node is no longer owns observers and it is removed at all (node_base is used instead)
* observer nodes own their subjects
* observer owns its observer node and nothing else
* tap nodes are introduced to own both observer and observed node
* result of observe should no longer discarded for both L-value and R-value subjects
  • Loading branch information
YarikTH committed Jul 16, 2023
1 parent 1e55ef2 commit 755bb79
Show file tree
Hide file tree
Showing 17 changed files with 183 additions and 403 deletions.
156 changes: 48 additions & 108 deletions include/ureact/adaptor/observe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,28 @@ class signal_observer_node final : public observer_node
{
public:
template <typename InF>
signal_observer_node(
const context& context, const std::shared_ptr<signal_node<S>>& subject, InF&& func )
signal_observer_node( const context& context, const signal<S>& subject, InF&& func )
: signal_observer_node::observer_node( context )
, m_subject( subject )
, m_func( std::forward<InF>( func ) )
{
this->attach_to( subject->get_node_id() );
this->attach_to( m_subject );
}

~signal_observer_node() override
{
this->detach_from_all();
}

UREACT_WARN_UNUSED_RESULT update_result update() override
{
if( auto subject = m_subject.lock() )
if( m_subject.is_valid() )
{
const observer_action action = std::invoke( m_func, subject->value_ref() );
const observer_action action
= std::invoke( m_func, get_internals( m_subject ).get_value() );

if( action == observer_action::stop_and_detach )
subject->unregister_observer( this );
detach_observer();
}

return update_result::unchanged;
Expand All @@ -119,66 +124,70 @@ class signal_observer_node final : public observer_node
{
detach_from_all();

m_subject.reset();
m_subject = signal<S>{};
}

std::weak_ptr<signal_node<S>> m_subject;
signal<S> m_subject;
F m_func;
};

template <typename E, typename F, typename... Deps>
template <typename E, typename Func, typename... Deps>
class events_observer_node final : public observer_node
{
public:
template <typename InF>
template <typename F>
events_observer_node( const context& context,
const std::shared_ptr<event_stream_node<E>>& subject,
InF&& func,
const std::shared_ptr<signal_node<Deps>>&... deps )
const events<E>& subject,
F&& func,
const signal_pack<Deps...>& deps )
: events_observer_node::observer_node( context )
, m_subject( subject )
, m_func( std::forward<InF>( func ) )
, m_deps( deps... )
, m_func( std::forward<F>( func ) )
, m_deps( deps )
{
this->attach_to( subject->get_node_id() );
( this->attach_to( deps->get_node_id() ), ... );
this->attach_to( subject );
this->attach_to( deps.data );
}

~events_observer_node() override
{
this->detach_from_all();
}

UREACT_WARN_UNUSED_RESULT update_result update() override
{
if( auto subject = m_subject.lock() )
if( m_subject.is_valid() )
{
const event_range<E> subject_events{ subject->get_events() };

if( !subject_events.empty() )
const auto& src_events = get_internals( m_subject ).get_events();
if( !src_events.empty() )
{
const observer_action action = std::apply(
[this, &subject_events]( const std::shared_ptr<signal_node<Deps>>&... args ) {
return std::invoke( m_func, subject_events, args->value_ref()... );
[this, &src_events]( const signal<Deps>&... args ) {
return std::invoke( m_func,
event_range<E>( src_events ),
get_internals( args ).value_ref()... );
},
m_deps );
m_deps.data );

if( action == observer_action::stop_and_detach )
subject->unregister_observer( this );
detach_observer();
}
}

return update_result::unchanged;
}

private:
using DepHolder = std::tuple<std::shared_ptr<signal_node<Deps>>...>;

std::weak_ptr<event_stream_node<E>> m_subject;
F m_func;
DepHolder m_deps;

void detach_observer() override
{
detach_from_all();

m_subject.reset();
m_subject = events<E>{};
}

events<E> m_subject;
Func m_func;
signal_pack<Deps...> m_deps;
};

template <typename InF, typename S>
Expand All @@ -196,14 +205,8 @@ auto observe_signal_impl( const signal<S>& subject, InF&& func ) -> observer
signal_observer_node<S, add_observer_action_next_ret<F>>,
signal_observer_node<S, F>>;

const auto& subject_ptr = get_internals( subject ).get_node_ptr();

auto node( create_node<Node>( subject.get_context(), subject_ptr, std::forward<InF>( func ) ) );
observer_node* raw_node_ptr = node.get();

subject_ptr->register_observer( std::move( node ) );

return observer( raw_node_ptr, subject_ptr );
return create_wrapped_node<observer, Node>(
subject.get_context(), subject, std::forward<InF>( func ) );
}

template <typename InF, typename E, typename... Deps>
Expand Down Expand Up @@ -233,26 +236,10 @@ auto observe_events_impl(
static_assert( !std::is_same_v<wrapper_t, signature_mismatches>,
"observe: Passed function does not match any of the supported signatures" );

using Node = events_observer_node<E, wrapper_t, Deps...>;

const context& context = subject.get_context();

auto node_builder = [&context, &subject, &func]( const signal<Deps>&... deps ) {
return create_node<Node>( context,
get_internals( subject ).get_node_ptr(),
std::forward<InF>( func ),
get_internals( deps ).get_node_ptr()... );
};

const auto& subject_node = get_internals( subject ).get_node_ptr();

auto node( std::apply( node_builder, dep_pack.data ) );

observer_node* raw_node = node.get();

subject_node->register_observer( std::move( node ) );

return observer( raw_node, subject_node );
return detail::create_wrapped_node<observer, events_observer_node<E, wrapper_t, Deps...>>(
context, subject, std::forward<InF>( func ), dep_pack );
}

struct ObserveAdaptor : Adaptor
Expand All @@ -269,28 +256,13 @@ struct ObserveAdaptor : Adaptor
* By returning observer_action::stop_and_detach, the observer function can request
* its own detachment. Returning observer_action::next keeps the observer attached.
* Using a void return type is the same as always returning observer_action::next.
*
* @note Resulting observer can be ignored. Lifetime of observer node will match subject signal's lifetime
*/
template <typename F, typename S>
constexpr auto operator()( const signal<S>& subject, F&& func ) const
UREACT_WARN_UNUSED_RESULT constexpr auto operator()( const signal<S>& subject, F&& func ) const
{
return observe_signal_impl( subject, std::forward<F>( func ) );
}

/*!
* @brief Create observer for temporary signal
*
* Same as observe(const signal<S>& subject, F&& func),
* but subject signal is about to die so caller must use result, otherwise observation isn't performed.
*/
template <typename F, typename S>
UREACT_WARN_UNUSED_RESULT_MSG( "Observing the temporary so observer should be stored" )
constexpr auto operator()( signal<S>&& subject, F&& func ) const
{
return observe_signal_impl( std::move( subject ), std::forward<F>( func ) );
}

/*!
* @brief Create observer for event stream
*
Expand All @@ -307,32 +279,16 @@ struct ObserveAdaptor : Adaptor
* its own detachment. Returning observer_action::next keeps the observer attached.
* Using a void return type is the same as always returning observer_action::next.
*
* @note Resulting observer can be ignored. Lifetime of observer node will match subject signal's lifetime
* @note The event_range<E> option allows to explicitly batch process single turn events
* @note Changes of signals in dep_pack do not trigger an update - only received events do
*/
template <typename F, typename E, typename... Deps>
constexpr auto operator()(
UREACT_WARN_UNUSED_RESULT constexpr auto operator()(
const events<E>& subject, const signal_pack<Deps...>& dep_pack, F&& func ) const
{
return observe_events_impl( subject, dep_pack, std::forward<F>( func ) );
}

/*!
* @brief Create observer for temporary event stream
*
* Same as observe(const events<E>& subject, const signal_pack<Deps...>& dep_pack, F&& func),
* but subject signal is about to die so caller must use result, otherwise observation isn't performed.
*/
template <typename F, typename E, typename... Deps>
UREACT_WARN_UNUSED_RESULT_MSG( "Observing the temporary so observer should be stored" )
constexpr auto operator()( events<E>&& subject,
const signal_pack<Deps...>& dep_pack,
F&& func ) const // TODO: check in tests
{
return observe_events_impl( std::move( subject ), dep_pack, std::forward<F>( func ) );
}

/*!
* @brief Create observer for event stream
*
Expand All @@ -341,33 +297,17 @@ struct ObserveAdaptor : Adaptor
* See observe(const events<E>& subject, const signal_pack<Deps...>& dep_pack, F&& func)
*/
template <typename F, typename E>
constexpr auto operator()( const events<E>& subject, F&& func ) const
UREACT_WARN_UNUSED_RESULT constexpr auto operator()( const events<E>& subject, F&& func ) const
{
return operator()( subject, signal_pack<>{}, std::forward<F>( func ) );
}

/*!
* @brief Create observer for temporary event stream
*
* Same as observe(const events<E>& subject, F&& func),
* but subject signal is about to die so caller must use result, otherwise observation isn't performed.
*/
template <typename F, typename E>
UREACT_WARN_UNUSED_RESULT_MSG( "Observing the temporary so observer should be stored" )
constexpr auto operator()( events<E>&& subject, F&& func ) const // TODO: check in tests
{
return operator()( std::move( subject ), signal_pack<>{}, std::forward<F>( func ) );
}

/*!
* @brief Curried version of observe(T&& subject, F&& func)
*/
template <typename F>
UREACT_WARN_UNUSED_RESULT constexpr auto operator()( F&& func ) const // TODO: check in tests
{
// TODO: propagate [[nodiscard]] to closure operator() and operator|
// they should not be nodiscard for l-value arguments, but only for r-values like observe() does
// but maybe all observe() concept should be reconsidered before to not do feature that is possibly not needed
return make_partial<ObserveAdaptor>( std::forward<F>( func ) );
}

Expand Down
86 changes: 71 additions & 15 deletions include/ureact/adaptor/tap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,69 @@ UREACT_BEGIN_NAMESPACE
namespace detail
{

/// Passes target signal changes and holds its observer
template <typename S>
class signal_tap_node final : public signal_node<S>
{
public:
signal_tap_node( const context& context, const signal<S>& target, observer observer )
: signal_tap_node::signal_node( context, get_internals( target ).value_ref() )
, m_target( target )
, m_observer( std::move( observer ) )
{
this->attach_to( m_target );
}

~signal_tap_node() override
{
this->detach_from_all();
}

UREACT_WARN_UNUSED_RESULT update_result update() override
{
return this->try_change_value( get_internals( m_target ).get_value() );
}

private:
signal<S> m_target;
observer m_observer;
};

/// Passes target events changes and holds its observer
template <typename E>
class event_tap_node final : public event_stream_node<E>
{
public:
event_tap_node( const context& context, const events<E>& target, observer observer )
: event_tap_node::event_stream_node( context )
, m_target( target )
, m_observer( std::move( observer ) )
{
this->attach_to( m_target );
}

~event_tap_node() override
{
this->detach_from_all();
}

UREACT_WARN_UNUSED_RESULT update_result update() override
{
const auto& src_events = get_internals( m_target ).get_events();
this->get_events() = src_events;

return !this->get_events().empty() ? update_result::changed : update_result::unchanged;
}

private:
events<E> m_target;
observer m_observer;
};

struct TapAdaptor : Adaptor
{
/*!
* @brief Create observer for signal and return observed signal
* @brief Create tapped copy for observed signal
*
* When the signal value S of subject changes, func is called
*
Expand All @@ -33,17 +92,16 @@ struct TapAdaptor : Adaptor
* its own detachment. Returning observer_action::next keeps the observer attached.
* Using a void return type is the same as always returning observer_action::next.
*/
template <typename F,
typename Signal, //
class = std::enable_if_t<is_signal_v<std::decay_t<Signal>>>>
UREACT_WARN_UNUSED_RESULT constexpr auto operator()( Signal&& subject, F&& func ) const
template <typename F, typename S>
UREACT_WARN_UNUSED_RESULT constexpr auto operator()( const signal<S>& subject, F&& func ) const
{
std::ignore = observe( subject, std::forward<F>( func ) );
return std::forward<Signal>( subject );
return create_wrapped_node<signal<S>, signal_tap_node<S>>( subject.get_context(),
subject,
observe_signal_impl( subject, std::forward<F>( func ) ) );
}

/*!
* @brief Create observer for event stream and return observed event stream
* @brief Create tapped copy for observed event stream
*
* For every event e in subject, func is called.
* Synchronized values of signals in dep_pack are passed to func as additional arguments.
Expand All @@ -61,15 +119,13 @@ struct TapAdaptor : Adaptor
* @note The event_range<E> option allows to explicitly batch process single turn events
* @note Changes of signals in dep_pack do not trigger an update - only received events do
*/
template <typename F,
typename Events,
typename... Deps,
class = std::enable_if_t<is_event_v<std::decay_t<Events>>>>
template <typename F, typename E, typename... Deps>
UREACT_WARN_UNUSED_RESULT constexpr auto operator()(
Events&& subject, const signal_pack<Deps...>& dep_pack, F&& func ) const
const events<E>& subject, const signal_pack<Deps...>& dep_pack, F&& func ) const
{
std::ignore = observe( subject, dep_pack, std::forward<F>( func ) );
return std::forward<Events>( subject );
return create_wrapped_node<events<E>, event_tap_node<E>>( subject.get_context(),
subject,
observe_events_impl( subject, dep_pack, std::forward<F>( func ) ) );
}

/*!
Expand Down
Loading

0 comments on commit 755bb79

Please sign in to comment.