From d98fb56fb5edb02f511817ce5cad087408b9f36b Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 20 Aug 2024 14:57:03 -0400 Subject: [PATCH 01/14] Move queue and pool refs out of Arrow constructor args --- .../SubeventExample/SubeventExample.cc | 12 ++++--- src/libraries/JANA/Topology/JEventMapArrow.cc | 12 ++----- src/libraries/JANA/Topology/JEventMapArrow.h | 2 +- .../JANA/Topology/JEventProcessorArrow.cc | 13 ++------ .../JANA/Topology/JEventProcessorArrow.h | 8 +---- .../JANA/Topology/JEventSourceArrow.cc | 7 ++-- .../JANA/Topology/JEventSourceArrow.h | 2 +- src/libraries/JANA/Topology/JEventTapArrow.cc | 13 ++------ src/libraries/JANA/Topology/JEventTapArrow.h | 2 +- src/libraries/JANA/Topology/JPipelineArrow.h | 32 ++++++++----------- src/libraries/JANA/Topology/JPool.h | 1 + .../JANA/Topology/JTopologyBuilder.cc | 24 ++++++++++---- .../JANA/Topology/JTopologyBuilder.h | 2 +- .../unit_tests/Topology/SubeventTests.cc | 15 +++++---- .../Topology/TestTopologyComponents.h | 15 +++++++-- 15 files changed, 73 insertions(+), 87 deletions(-) diff --git a/src/examples/SubeventExample/SubeventExample.cc b/src/examples/SubeventExample/SubeventExample.cc index 634b4ae9f..8617fef82 100644 --- a/src/examples/SubeventExample/SubeventExample.cc +++ b/src/examples/SubeventExample/SubeventExample.cc @@ -107,11 +107,13 @@ int main() { auto subprocess_arrow = new JSubeventArrow("subprocess", &processor, &subevents_in, &subevents_out); auto merge_arrow = new JMergeArrow("merge", &processor, &subevents_out, &events_out); - auto source_arrow = new JEventSourceArrow("simpleSource", - {source}, - &events_in, - topology->event_pool); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool); + auto source_arrow = new JEventSourceArrow("simpleSource", {source}); + source_arrow->set_input(topology->event_pool); + source_arrow->set_output(&events_in); + + auto proc_arrow = new JEventProcessorArrow("simpleProcessor"); + proc_arrow->set_input(&events_out); + proc_arrow->set_output(topology->event_pool); proc_arrow->add_processor(new SimpleProcessor); builder.arrows.push_back(source_arrow); diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc index 9713e3cb2..107768740 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -10,16 +10,8 @@ #include -JEventMapArrow::JEventMapArrow(std::string name, - EventQueue *input_queue, - EventQueue *output_queue) - : JPipelineArrow(std::move(name), - true, - false, - false, - input_queue, - output_queue, - nullptr) {} +JEventMapArrow::JEventMapArrow(std::string name) + : JPipelineArrow(std::move(name), true, false, false) {} void JEventMapArrow::add_source(JEventSource* source) { m_sources.push_back(source); diff --git a/src/libraries/JANA/Topology/JEventMapArrow.h b/src/libraries/JANA/Topology/JEventMapArrow.h index a0d521bb8..e1c0a0343 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.h +++ b/src/libraries/JANA/Topology/JEventMapArrow.h @@ -22,7 +22,7 @@ class JEventMapArrow : public JPipelineArrow { std::vector m_procs; public: - JEventMapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue); + JEventMapArrow(std::string name); void add_source(JEventSource* source); void add_unfolder(JEventUnfolder* unfolder); diff --git a/src/libraries/JANA/Topology/JEventProcessorArrow.cc b/src/libraries/JANA/Topology/JEventProcessorArrow.cc index 02c761358..86684b631 100644 --- a/src/libraries/JANA/Topology/JEventProcessorArrow.cc +++ b/src/libraries/JANA/Topology/JEventProcessorArrow.cc @@ -9,17 +9,8 @@ #include -JEventProcessorArrow::JEventProcessorArrow(std::string name, - EventQueue *input_queue, - EventQueue *output_queue, - JEventPool *pool) - : JPipelineArrow(std::move(name), - true, - false, - true, - input_queue, - output_queue, - pool) {} +JEventProcessorArrow::JEventProcessorArrow(std::string name) + : JPipelineArrow(std::move(name), true, false, true) {} void JEventProcessorArrow::add_processor(JEventProcessor* processor) { m_processors.push_back(processor); diff --git a/src/libraries/JANA/Topology/JEventProcessorArrow.h b/src/libraries/JANA/Topology/JEventProcessorArrow.h index 21b2f94b9..007c65c54 100644 --- a/src/libraries/JANA/Topology/JEventProcessorArrow.h +++ b/src/libraries/JANA/Topology/JEventProcessorArrow.h @@ -17,15 +17,9 @@ class JEventProcessorArrow : public JPipelineArrow std::vector m_processors; public: - JEventProcessorArrow(std::string name, - EventQueue *input_queue, - EventQueue *output_queue, - JEventPool *pool); - + JEventProcessorArrow(std::string name); void add_processor(JEventProcessor* processor); - void process(Event* event, bool& success, JArrowMetrics::Status& status); - void initialize() final; void finalize() final; }; diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 99b159673..cb109c131 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -11,11 +11,8 @@ JEventSourceArrow::JEventSourceArrow(std::string name, - std::vector sources, - EventQueue* output_queue, - JEventPool* pool - ) - : JPipelineArrow(name, false, true, false, nullptr, output_queue, pool), m_sources(sources) { + std::vector sources) + : JPipelineArrow(name, false, true, false), m_sources(sources) { } diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.h b/src/libraries/JANA/Topology/JEventSourceArrow.h index fafb0d8b2..0f5b776c4 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JEventSourceArrow.h @@ -15,7 +15,7 @@ class JEventSourceArrow : public JPipelineArrow { size_t m_current_source = 0; public: - JEventSourceArrow(std::string name, std::vector sources, EventQueue* output_queue, JEventPool* pool); + JEventSourceArrow(std::string name, std::vector sources); void initialize() final; void finalize() final; diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JEventTapArrow.cc index ee91db984..412cfac07 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.cc +++ b/src/libraries/JANA/Topology/JEventTapArrow.cc @@ -9,17 +9,8 @@ #include -JEventTapArrow::JEventTapArrow(std::string name, - EventQueue *input_queue, - EventQueue *output_queue, - JEventPool *pool) - : JPipelineArrow(std::move(name), - false, - false, - false, - input_queue, - output_queue, - pool) {} +JEventTapArrow::JEventTapArrow(std::string name) + : JPipelineArrow(std::move(name), false, false, false) {} void JEventTapArrow::add_processor(JEventProcessor* proc) { m_procs.push_back(proc); diff --git a/src/libraries/JANA/Topology/JEventTapArrow.h b/src/libraries/JANA/Topology/JEventTapArrow.h index ad862411e..856f5863e 100644 --- a/src/libraries/JANA/Topology/JEventTapArrow.h +++ b/src/libraries/JANA/Topology/JEventTapArrow.h @@ -18,7 +18,7 @@ class JEventTapArrow : public JPipelineArrow { std::vector m_procs; public: - JEventTapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue, JEventPool *pool); + JEventTapArrow(std::string name); void add_processor(JEventProcessor* proc); void process(Event* event, bool& success, JArrowMetrics::Status& status); diff --git a/src/libraries/JANA/Topology/JPipelineArrow.h b/src/libraries/JANA/Topology/JPipelineArrow.h index 1ce6f0228..28db84cd3 100644 --- a/src/libraries/JANA/Topology/JPipelineArrow.h +++ b/src/libraries/JANA/Topology/JPipelineArrow.h @@ -18,27 +18,21 @@ class JPipelineArrow : public JArrow { JPipelineArrow(std::string name, bool is_parallel, bool is_source, - bool is_sink, - JMailbox* input_queue, - JMailbox* output_queue, - JPool* pool - ) + bool is_sink) : JArrow(std::move(name), is_parallel, is_source, is_sink) { + } - if (input_queue == nullptr) { - assert(pool != nullptr); - m_input.set_pool(pool); - } - else { - m_input.set_queue(input_queue); - } - if (output_queue == nullptr) { - assert(pool != nullptr); - m_output.set_pool(pool); - } - else { - m_output.set_queue(output_queue); - } + void set_input(JMailbox* queue) { + m_input.set_queue(queue); + } + void set_input(JPool* pool) { + m_input.set_pool(pool); + } + void set_output(JMailbox* queue) { + m_output.set_queue(queue); + } + void set_output(JPool* pool) { + m_output.set_pool(pool); } void execute(JArrowMetrics& result, size_t location_id) final { diff --git a/src/libraries/JANA/Topology/JPool.h b/src/libraries/JANA/Topology/JPool.h index 83bd2b437..e1c28f4d6 100644 --- a/src/libraries/JANA/Topology/JPool.h +++ b/src/libraries/JANA/Topology/JPool.h @@ -5,6 +5,7 @@ #include #include #include +#include class JPoolBase { diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 9ad69bfea..df4d33875 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -212,7 +212,9 @@ void JTopologyBuilder::attach_lower_level(JEventLevel current_level, JUnfoldArro auto q2 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); queues.push_back(q2); - auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap", q1, q2, nullptr); + auto* proc_arrow = new JEventProcessorArrow(ss.str()+"Tap"); + proc_arrow->set_input(q1); + proc_arrow->set_output(q2); arrows.push_back(proc_arrow); proc_arrow->set_chunksize(m_event_processor_chunksize); proc_arrow->set_logger(GetLogger()); @@ -302,11 +304,15 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) { auto queue = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); queues.push_back(queue); - auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, queue, pool_at_level); + auto* src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level); + src_arrow->set_input(pool_at_level); + src_arrow->set_output(queue); arrows.push_back(src_arrow); src_arrow->set_chunksize(m_event_source_chunksize); - auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", queue, nullptr, pool_at_level); + auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap"); + proc_arrow->set_input(queue); + proc_arrow->set_output(pool_at_level); arrows.push_back(proc_arrow); proc_arrow->set_chunksize(m_event_processor_chunksize); @@ -326,11 +332,15 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) { queues.push_back(q1); queues.push_back(q2); - auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level, q1, pool_at_level); + auto *src_arrow = new JEventSourceArrow(level_str+"Source", sources_at_level); + src_arrow->set_input(pool_at_level); + src_arrow->set_output(q1); arrows.push_back(src_arrow); src_arrow->set_chunksize(m_event_source_chunksize); - auto *map_arrow = new JEventMapArrow(level_str+"Map", q1, q2);; + auto *map_arrow = new JEventMapArrow(level_str+"Map"); + map_arrow->set_input(q1); + map_arrow->set_output(q2); arrows.push_back(map_arrow); map_arrow->set_chunksize(m_event_source_chunksize); src_arrow->attach(map_arrow); @@ -358,7 +368,9 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) { auto q3 = new EventQueue(m_event_queue_threshold, mapping.get_loc_count(), m_enable_stealing); queues.push_back(q3); - auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap", q3, nullptr, pool_at_level); + auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap"); + proc_arrow->set_input(q3); + proc_arrow->set_output(pool_at_level); arrows.push_back(proc_arrow); proc_arrow->set_chunksize(m_event_processor_chunksize); diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index e1b546709..270fe6863 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -7,6 +7,7 @@ #include #include #include +#include #include // TODO: Should't be here #include @@ -21,7 +22,6 @@ class JPoolBase; class JQueue; class JFoldArrow; class JUnfoldArrow; -class JEventPool; class JTopologyBuilder : public JService { public: diff --git a/src/programs/unit_tests/Topology/SubeventTests.cc b/src/programs/unit_tests/Topology/SubeventTests.cc index c372bf85a..d929395d8 100644 --- a/src/programs/unit_tests/Topology/SubeventTests.cc +++ b/src/programs/unit_tests/Topology/SubeventTests.cc @@ -100,12 +100,14 @@ TEST_CASE("Basic subevent arrow functionality") { auto topology = app.GetService(); topology->set_configure_fn([&](JTopologyBuilder& topology) { - auto source_arrow = new JEventSourceArrow("simpleSource", - {new SimpleSource}, - &events_in, - topology.event_pool); - auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, - nullptr, topology.event_pool); + + auto source_arrow = new JEventSourceArrow("simpleSource", {new SimpleSource}); + source_arrow->set_input(topology.event_pool); + source_arrow->set_output(&events_in); + + auto proc_arrow = new JEventProcessorArrow("simpleProcessor"); + proc_arrow->set_input(&events_out); + proc_arrow->set_output(topology.event_pool); proc_arrow->add_processor(new SimpleProcessor); topology.arrows.push_back(source_arrow); @@ -113,6 +115,7 @@ TEST_CASE("Basic subevent arrow functionality") { topology.arrows.push_back(subprocess_arrow); topology.arrows.push_back(merge_arrow); topology.arrows.push_back(proc_arrow); + source_arrow->attach(split_arrow); split_arrow->attach(subprocess_arrow); subprocess_arrow->attach(merge_arrow); diff --git a/src/programs/unit_tests/Topology/TestTopologyComponents.h b/src/programs/unit_tests/Topology/TestTopologyComponents.h index c0a6f827f..9ea4b6624 100644 --- a/src/programs/unit_tests/Topology/TestTopologyComponents.h +++ b/src/programs/unit_tests/Topology/TestTopologyComponents.h @@ -15,7 +15,10 @@ struct RandIntSource : public JPipelineArrow { int emit_sum = 0; // Sum of all ints emitted so far RandIntSource(std::string name, JPool* pool, JMailbox* output_queue) - : JPipelineArrow(name, false, true, false, nullptr, output_queue, pool) {} + : JPipelineArrow(name, false, true, false) { + this->set_input(pool); + this->set_output(output_queue); + } void process(int* item, bool& success, JArrowMetrics::Status& status) { @@ -54,7 +57,10 @@ struct MultByTwoProcessor : public ParallelProcessor { struct SubOneProcessor : public JPipelineArrow { SubOneProcessor(std::string name, JMailbox* input_queue, JMailbox* output_queue) - : JPipelineArrow(name, true, false, false, input_queue, output_queue, nullptr) {} + : JPipelineArrow(name, true, false, false) { + this->set_input(input_queue); + this->set_output(output_queue); + } void process(double* item, bool&, JArrowMetrics::Status&) { *item -= 1; @@ -68,7 +74,10 @@ struct SumSink : public JPipelineArrow, T> { T sum = 0; SumSink(std::string name, JMailbox* input_queue, JPool* pool) - : JPipelineArrow,T>(name, false, false, true, input_queue, nullptr, pool) {} + : JPipelineArrow,T>(name, false, false, true) { + this->set_input(input_queue); + this->set_output(pool); + } void process(T* item, bool&, JArrowMetrics::Status&) { sum += *item; From 0f2a74c1ef3fb12e0e882a55d6b6c4d07b58379f Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 30 Sep 2024 22:27:50 -0400 Subject: [PATCH 02/14] JEventSourceArrow inherits directly from JArrow again --- .../JANA/Topology/JEventSourceArrow.cc | 48 +++++++++++++++++-- .../JANA/Topology/JEventSourceArrow.h | 21 +++++++- 2 files changed, 64 insertions(+), 5 deletions(-) diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index cb109c131..d41e17990 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -10,9 +10,51 @@ -JEventSourceArrow::JEventSourceArrow(std::string name, - std::vector sources) - : JPipelineArrow(name, false, true, false), m_sources(sources) { +JEventSourceArrow::JEventSourceArrow(std::string name, std::vector sources) + : JArrow(name, false, true, false), m_sources(sources) { +} + + +void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { + + auto start_total_time = std::chrono::steady_clock::now(); + + Data in_data {location_id}; + Data out_data {location_id}; + + bool success = m_input.pull(in_data) && m_output.pull(out_data); + if (!success) { + m_input.revert(in_data); + m_output.revert(out_data); + // TODO: Test that revert works properly + + auto end_total_time = std::chrono::steady_clock::now(); + result.update(JArrowMetrics::Status::ComeBackLater, 0, 1, std::chrono::milliseconds(0), end_total_time - start_total_time); + return; + } + + bool process_succeeded = true; + JArrowMetrics::Status process_status = JArrowMetrics::Status::KeepGoing; + assert(in_data.item_count == 1); + Event* event = in_data.items[0]; + + auto start_processing_time = std::chrono::steady_clock::now(); + process(event, process_succeeded, process_status); + auto end_processing_time = std::chrono::steady_clock::now(); + + if (process_succeeded) { + in_data.item_count = 0; + out_data.item_count = 1; + out_data.items[0] = event; + } + m_input.push(in_data); + m_output.push(out_data); + + // Publish metrics + auto end_total_time = std::chrono::steady_clock::now(); + auto latency = (end_processing_time - start_processing_time); + auto overhead = (end_total_time - start_total_time) - latency; + result.update(process_status, process_succeeded, 1, latency, overhead); } diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.h b/src/libraries/JANA/Topology/JEventSourceArrow.h index 0f5b776c4..fcc446f9e 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JEventSourceArrow.h @@ -9,16 +9,33 @@ using Event = std::shared_ptr; using EventQueue = JMailbox; class JEventPool; -class JEventSourceArrow : public JPipelineArrow { +class JEventSourceArrow : public JArrow { private: std::vector m_sources; size_t m_current_source = 0; + PlaceRef m_input {this, true, 1, 1}; + PlaceRef m_output {this, false, 1, 1}; + public: JEventSourceArrow(std::string name, std::vector sources); + + void set_input(JMailbox* queue) { + m_input.set_queue(queue); + } + void set_input(JPool* pool) { + m_input.set_pool(pool); + } + void set_output(JMailbox* queue) { + m_output.set_queue(queue); + } + void set_output(JPool* pool) { + m_output.set_pool(pool); + } + void initialize() final; void finalize() final; - + void execute(JArrowMetrics& result, size_t location_id) final; void process(Event* event, bool& success, JArrowMetrics::Status& status); }; From 1255a446ee7e79d1462169afa7651d440425e57b Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 30 Sep 2024 14:08:40 -0400 Subject: [PATCH 03/14] Reformat JEventSource.h --- src/libraries/JANA/CMakeLists.txt | 1 + src/libraries/JANA/JEventSource.cc | 272 ++++++++++++++++++++++ src/libraries/JANA/JEventSource.h | 348 ++++------------------------- 3 files changed, 322 insertions(+), 299 deletions(-) create mode 100644 src/libraries/JANA/JEventSource.cc diff --git a/src/libraries/JANA/CMakeLists.txt b/src/libraries/JANA/CMakeLists.txt index ffb7ea743..5050d4112 100644 --- a/src/libraries/JANA/CMakeLists.txt +++ b/src/libraries/JANA/CMakeLists.txt @@ -4,6 +4,7 @@ set(JANA2_SOURCES JApplication.cc + JEventSource.cc JFactory.cc JFactorySet.cc JMultifactory.cc diff --git a/src/libraries/JANA/JEventSource.cc b/src/libraries/JANA/JEventSource.cc new file mode 100644 index 000000000..dd9e6b5ac --- /dev/null +++ b/src/libraries/JANA/JEventSource.cc @@ -0,0 +1,272 @@ +#include + +void JEventSource::DoInit() { + if (m_status == Status::Uninitialized) { + CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init();}); + m_status = Status::Initialized; + LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; + } + else { + throw JException("Attempted to initialize a JEventSource that is not uninitialized!"); + } +} + +void JEventSource::DoInitialize() { + DoOpen(); +} + +void JEventSource::DoOpen(bool with_lock) { + if (with_lock) { + std::lock_guard lock(m_mutex); + if (m_status != Status::Initialized) { + throw JException("Attempted to open a JEventSource that hasn't been initialized!"); + } + CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();}); + if (GetResourceName().empty()) { + LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END; + } + else { + LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; + } + m_status = Status::Opened; + } + else { + if (m_status != Status::Initialized) { + throw JException("Attempted to open a JEventSource that hasn't been initialized!"); + } + CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();}); + if (GetResourceName().empty()) { + LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END; + } + else { + LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; + } + m_status = Status::Opened; + } +} + +void JEventSource::DoClose(bool with_lock) { + if (with_lock) { + std::lock_guard lock(m_mutex); + + if (m_status != JEventSource::Status::Opened) return; + + CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();}); + if (GetResourceName().empty()) { + LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END; + } + else { + LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; + } + m_status = Status::Closed; + } + else { + if (m_status != JEventSource::Status::Opened) return; + + CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();}); + if (GetResourceName().empty()) { + LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END; + } + else { + LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; + } + m_status = Status::Closed; + } +} + +JEventSource::Result JEventSource::DoNext(std::shared_ptr event) { + + std::lock_guard lock(m_mutex); // In general, DoNext must be synchronized. + + if (m_status == Status::Uninitialized) { + throw JException("JEventSource has not been initialized!"); + } + + if (m_callback_style == CallbackStyle::LegacyMode) { + return DoNextCompatibility(event); + } + + auto first_evt_nr = m_nskip; + auto last_evt_nr = m_nevents + m_nskip; + + if (m_status == Status::Initialized) { + DoOpen(false); + } + if (m_status == Status::Opened) { + if (m_nevents != 0 && (m_event_count == last_evt_nr)) { + // We exit early (and recycle) because we hit our jana:nevents limit + DoClose(false); + return Result::FailureFinished; + } + // If we reach this point, we will need to actually read an event + + // We configure the event + event->SetEventNumber(m_event_count); // Default event number to event count + event->SetJEventSource(this); + event->SetSequential(false); + event->GetJCallGraphRecorder()->Reset(); + + // Now we call the new-style interface + auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) + JEventSource::Result result; + CallWithJExceptionWrapper("JEventSource::Emit", [&](){ + result = Emit(*event); + }); + event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); + + if (result == Result::Success) { + m_event_count += 1; + // We end up here if we read an entry in our file or retrieved a message from our socket, + // and believe we could obtain another one immediately if we wanted to + for (auto* output : m_outputs) { + output->InsertCollection(*event); + } + if (m_event_count <= first_evt_nr) { + // We immediately throw away this whole event because of nskip + // (although really we should be handling this with Seek()) + return Result::FailureTryAgain; + } + return Result::Success; + } + else if (result == Result::FailureFinished) { + // We end up here if we tried to read an entry in a file, but found EOF + // or if we received a message from a socket that contained no data and indicated no more data will be coming + DoClose(false); + return Result::FailureFinished; + } + else if (result == Result::FailureTryAgain) { + // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet + // or if we polled the socket, found no new messages, but still expect messages later + return Result::FailureTryAgain; + } + else { + throw JException("Invalid JEventSource::Result value!"); + } + } + else { // status == Closed + return Result::FailureFinished; + } +} + +JEventSource::Result JEventSource::DoNextCompatibility(std::shared_ptr event) { + + auto first_evt_nr = m_nskip; + auto last_evt_nr = m_nevents + m_nskip; + + try { + if (m_status == Status::Initialized) { + DoOpen(false); + } + if (m_status == Status::Opened) { + if (m_event_count < first_evt_nr) { + // Skip these events due to nskip + event->SetEventNumber(m_event_count); // Default event number to event count + auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) + GetEvent(event); + event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); + m_event_count += 1; + return Result::FailureTryAgain; // Reject this event and recycle it + } else if (m_nevents != 0 && (m_event_count == last_evt_nr)) { + // Declare ourselves finished due to nevents + DoClose(false); // Close out the event source as soon as it declares itself finished + return Result::FailureFinished; + } else { + // Actually emit an event. + // GetEvent() expects the following things from its incoming JEvent + event->SetEventNumber(m_event_count); + event->SetJApplication(m_app); + event->SetJEventSource(this); + event->SetSequential(false); + event->GetJCallGraphRecorder()->Reset(); + auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) + GetEvent(event); + for (auto* output : m_outputs) { + output->InsertCollection(*event); + } + event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); + m_event_count += 1; + return Result::Success; // Don't reject this event! + } + } else if (m_status == Status::Closed) { + return Result::FailureFinished; + } else { + throw JException("Invalid m_status"); + } + } + catch (RETURN_STATUS rs) { + + if (rs == RETURN_STATUS::kNO_MORE_EVENTS) { + DoClose(false); + return Result::FailureFinished; + } + else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) { + return Result::FailureTryAgain; + } + else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) { + JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN"); + ex.plugin_name = m_plugin_name; + ex.type_name = m_type_name; + ex.function_name = "JEventSource::GetEvent"; + ex.instance_name = m_resource_name; + throw ex; + } + else { + return Result::Success; + } + } + catch (JException& ex) { + if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent"; + if (ex.type_name.empty()) ex.type_name = m_type_name; + if (ex.instance_name.empty()) ex.instance_name = m_prefix; + if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name; + throw ex; + } + catch (std::exception& e){ + auto ex = JException(e.what()); + ex.exception_type = JTypeInfo::demangle_current_exception_type(); + ex.nested_exception = std::current_exception(); + ex.function_name = "JEventSource::GetEvent"; + ex.type_name = m_type_name; + ex.instance_name = m_prefix; + ex.plugin_name = m_plugin_name; + throw ex; + } + catch (...) { + auto ex = JException("Unknown exception"); + ex.exception_type = JTypeInfo::demangle_current_exception_type(); + ex.nested_exception = std::current_exception(); + ex.function_name = "JEventSource::GetEvent"; + ex.type_name = m_type_name; + ex.instance_name = m_prefix; + ex.plugin_name = m_plugin_name; + throw ex; + } +} + + +void JEventSource::DoFinish(JEvent& event) { + /// Calls the optional-and-discouraged user-provided FinishEvent virtual method, enforcing + /// 1. Thread safety + /// 2. The m_enable_finish_event flag + if (m_enable_finish_event) { + std::lock_guard lock(m_mutex); + CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ + FinishEvent(event); + }); + } +} + +void JEventSource::Summarize(JComponentSummary& summary) const { + + auto* result = new JComponentSummary::Component( + "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName()); + + for (const auto* output : m_outputs) { + size_t suboutput_count = output->collection_names.size(); + for (size_t i=0; iAddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel())); + } + } + + summary.Add(result); +} diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index 0f9865900..0129338f1 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -16,22 +16,29 @@ class JApplication; class JFactory; -class JEventSource : public jana::components::JComponent, +class JEventSource : public jana::components::JComponent, public jana::components::JHasOutputs { public: - /// Result describes what happened the last time a GetEvent() was attempted. /// If Emit() or GetEvent() reaches an error state, it should throw a JException instead. enum class Result { Success, FailureTryAgain, FailureFinished }; - // TODO: Deprecate me! /// The user is supposed to _throw_ RETURN_STATUS::kNO_MORE_EVENTS or kBUSY from GetEvent() enum class RETURN_STATUS { kSUCCESS, kNO_MORE_EVENTS, kBUSY, kTRY_AGAIN, kERROR, kUNKNOWN }; - // Constructor - // TODO: Deprecate me! +private: + std::string m_resource_name; + std::atomic_ullong m_event_count {0}; + uint64_t m_nskip = 0; + uint64_t m_nevents = 0; + bool m_enable_finish_event = false; + bool m_enable_get_objects = false; + + +public: + [[deprecated]] explicit JEventSource(std::string resource_name, JApplication* app = nullptr) : m_resource_name(std::move(resource_name)) , m_event_count{0} @@ -48,7 +55,6 @@ class JEventSource : public jana::components::JComponent, virtual void Init() {} - // To be implemented by the user /// `Open` is called by JANA when it is ready to accept events from this event source. The implementor should open /// file pointers or sockets here, instead of in the constructor. This is because the implementor won't know how many /// or which event sources the user will decide to activate within one job. Thus the implementor can avoid problems @@ -57,6 +63,8 @@ class JEventSource : public jana::components::JComponent, virtual void Open() {} + + // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::ExpertMode. // It is very similar to GetEvent(), except the user returns a Result status code instead of throwing an exception. // Exceptions are reserved for unrecoverable errors. It accepts an out parameter JEvent. If there is another @@ -68,6 +76,11 @@ class JEventSource : public jana::components::JComponent, virtual Result Emit(JEvent&) { return Result::Success; }; + /// For work that should be done in parallel on a JEvent, but is tightly coupled to the JEventSource for some reason. + /// Called after Emit() by JEventMapArrow + virtual void Preprocess(const JEvent&) const {}; + + /// `Close` is called by JANA when it is finished accepting events from this event source. Here is where you should /// cleanly close files, sockets, etc. Although GetEvent() knows when (for instance) there are no more events in a /// file, the logic for closing needs to live here because there are other ways a computation may end besides @@ -96,8 +109,6 @@ class JEventSource : public jana::components::JComponent, virtual void GetEvent(std::shared_ptr) {}; - virtual void Preprocess(const JEvent&) {}; - /// `FinishEvent` is used to notify the `JEventSource` that an event has been completely processed. This is the final /// chance to interact with the `JEvent` before it is either cleared and recycled, or deleted. Although it is @@ -106,305 +117,35 @@ class JEventSource : public jana::components::JComponent, /// owned by the JEventSource, e.g. raw data which is keyed off of run number and therefore shared among multiple /// JEvents. `FinishEvent` is also well-suited for use with `EventGroup`s, e.g. to notify someone that a batch of /// events has finished, or to implement "barrier events". + virtual void FinishEvent(JEvent&) {}; /// `GetObjects` was historically used for lazily unpacking data from a JEvent and putting it into a "dummy" JFactory. /// This mechanism has been replaced by `JEvent::Insert`. All lazy evaluation should happen in a (non-dummy) /// JFactory, whereas eager evaluation should happen in `JEventSource::GetEvent` via `JEvent::Insert`. + virtual bool GetObjects(const std::shared_ptr&, JFactory*) { return false; } - virtual void DoInit() { - if (m_status == Status::Uninitialized) { - CallWithJExceptionWrapper("JEventSource::Init", [&](){ Init();}); - m_status = Status::Initialized; - LOG_INFO(GetLogger()) << "Initialized JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; - } - else { - throw JException("Attempted to initialize a JEventSource that is not uninitialized!"); - } - } - - [[deprecated("Replaced by JEventSource::DoOpen()")]] - virtual void DoInitialize() { - DoOpen(); - } - - virtual void DoOpen(bool with_lock=true) { - if (with_lock) { - std::lock_guard lock(m_mutex); - if (m_status != Status::Initialized) { - throw JException("Attempted to open a JEventSource that hasn't been initialized!"); - } - CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();}); - if (GetResourceName().empty()) { - LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END; - } - else { - LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; - } - m_status = Status::Opened; - } - else { - if (m_status != Status::Initialized) { - throw JException("Attempted to open a JEventSource that hasn't been initialized!"); - } - CallWithJExceptionWrapper("JEventSource::Open", [&](){ Open();}); - if (GetResourceName().empty()) { - LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "'" << LOG_END; - } - else { - LOG_INFO(GetLogger()) << "Opened JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; - } - m_status = Status::Opened; - } - } - - virtual void DoClose(bool with_lock=true) { - if (with_lock) { - std::lock_guard lock(m_mutex); - - if (m_status != JEventSource::Status::Opened) return; - - CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();}); - if (GetResourceName().empty()) { - LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END; - } - else { - LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; - } - m_status = Status::Closed; - } - else { - if (m_status != JEventSource::Status::Opened) return; - - CallWithJExceptionWrapper("JEventSource::Close", [&](){ Close();}); - if (GetResourceName().empty()) { - LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "'" << LOG_END; - } - else { - LOG_INFO(GetLogger()) << "Closed JEventSource '" << GetTypeName() << "' ('" << GetResourceName() << "')" << LOG_END; - } - m_status = Status::Closed; - } - } - - Result DoNext(std::shared_ptr event) { - - std::lock_guard lock(m_mutex); // In general, DoNext must be synchronized. - - if (m_status == Status::Uninitialized) { - throw JException("JEventSource has not been initialized!"); - } - - if (m_callback_style == CallbackStyle::LegacyMode) { - return DoNextCompatibility(event); - } - - auto first_evt_nr = m_nskip; - auto last_evt_nr = m_nevents + m_nskip; - - if (m_status == Status::Initialized) { - DoOpen(false); - } - if (m_status == Status::Opened) { - if (m_nevents != 0 && (m_event_count == last_evt_nr)) { - // We exit early (and recycle) because we hit our jana:nevents limit - DoClose(false); - return Result::FailureFinished; - } - // If we reach this point, we will need to actually read an event - - // We configure the event - event->SetEventNumber(m_event_count); // Default event number to event count - event->SetJEventSource(this); - event->SetSequential(false); - event->GetJCallGraphRecorder()->Reset(); - - // Now we call the new-style interface - auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) - JEventSource::Result result; - CallWithJExceptionWrapper("JEventSource::Emit", [&](){ - result = Emit(*event); - }); - event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); - - if (result == Result::Success) { - m_event_count += 1; - // We end up here if we read an entry in our file or retrieved a message from our socket, - // and believe we could obtain another one immediately if we wanted to - for (auto* output : m_outputs) { - output->InsertCollection(*event); - } - if (m_event_count <= first_evt_nr) { - // We immediately throw away this whole event because of nskip - // (although really we should be handling this with Seek()) - return Result::FailureTryAgain; - } - return Result::Success; - } - else if (result == Result::FailureFinished) { - // We end up here if we tried to read an entry in a file, but found EOF - // or if we received a message from a socket that contained no data and indicated no more data will be coming - DoClose(false); - return Result::FailureFinished; - } - else if (result == Result::FailureTryAgain) { - // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet - // or if we polled the socket, found no new messages, but still expect messages later - return Result::FailureTryAgain; - } - else { - throw JException("Invalid JEventSource::Result value!"); - } - } - else { // status == Closed - return Result::FailureFinished; - } - } - - Result DoNextCompatibility(std::shared_ptr event) { - - auto first_evt_nr = m_nskip; - auto last_evt_nr = m_nevents + m_nskip; - - try { - if (m_status == Status::Initialized) { - DoOpen(false); - } - if (m_status == Status::Opened) { - if (m_event_count < first_evt_nr) { - // Skip these events due to nskip - event->SetEventNumber(m_event_count); // Default event number to event count - auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) - GetEvent(event); - event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); - m_event_count += 1; - return Result::FailureTryAgain; // Reject this event and recycle it - } else if (m_nevents != 0 && (m_event_count == last_evt_nr)) { - // Declare ourselves finished due to nevents - DoClose(false); // Close out the event source as soon as it declares itself finished - return Result::FailureFinished; - } else { - // Actually emit an event. - // GetEvent() expects the following things from its incoming JEvent - event->SetEventNumber(m_event_count); - event->SetJApplication(m_app); - event->SetJEventSource(this); - event->SetSequential(false); - event->GetJCallGraphRecorder()->Reset(); - auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) - GetEvent(event); - for (auto* output : m_outputs) { - output->InsertCollection(*event); - } - event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); - m_event_count += 1; - return Result::Success; // Don't reject this event! - } - } else if (m_status == Status::Closed) { - return Result::FailureFinished; - } else { - throw JException("Invalid m_status"); - } - } - catch (RETURN_STATUS rs) { - - if (rs == RETURN_STATUS::kNO_MORE_EVENTS) { - DoClose(false); - return Result::FailureFinished; - } - else if (rs == RETURN_STATUS::kTRY_AGAIN || rs == RETURN_STATUS::kBUSY) { - return Result::FailureTryAgain; - } - else if (rs == RETURN_STATUS::kERROR || rs == RETURN_STATUS::kUNKNOWN) { - JException ex ("JEventSource threw RETURN_STATUS::kERROR or kUNKNOWN"); - ex.plugin_name = m_plugin_name; - ex.type_name = m_type_name; - ex.function_name = "JEventSource::GetEvent"; - ex.instance_name = m_resource_name; - throw ex; - } - else { - return Result::Success; - } - } - catch (JException& ex) { - if (ex.function_name.empty()) ex.function_name = "JEventSource::GetEvent"; - if (ex.type_name.empty()) ex.type_name = m_type_name; - if (ex.instance_name.empty()) ex.instance_name = m_prefix; - if (ex.plugin_name.empty()) ex.plugin_name = m_plugin_name; - throw ex; - } - catch (std::exception& e){ - auto ex = JException(e.what()); - ex.exception_type = JTypeInfo::demangle_current_exception_type(); - ex.nested_exception = std::current_exception(); - ex.function_name = "JEventSource::GetEvent"; - ex.type_name = m_type_name; - ex.instance_name = m_prefix; - ex.plugin_name = m_plugin_name; - throw ex; - } - catch (...) { - auto ex = JException("Unknown exception"); - ex.exception_type = JTypeInfo::demangle_current_exception_type(); - ex.nested_exception = std::current_exception(); - ex.function_name = "JEventSource::GetEvent"; - ex.type_name = m_type_name; - ex.instance_name = m_prefix; - ex.plugin_name = m_plugin_name; - throw ex; - } - } - - /// Calls the optional-and-discouraged user-provided FinishEvent virtual method, enforcing - /// 1. Thread safety - /// 2. The m_enable_finish_event flag - - void DoFinish(JEvent& event) { - if (m_enable_finish_event) { - std::lock_guard lock(m_mutex); - CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ - FinishEvent(event); - }); - } - } - - void Summarize(JComponentSummary& summary) const override { - - auto* result = new JComponentSummary::Component( - "Source", GetPrefix(), GetTypeName(), GetLevel(), GetPluginName()); - - for (const auto* output : m_outputs) { - size_t suboutput_count = output->collection_names.size(); - for (size_t i=0; iAddOutput(new JComponentSummary::Collection("", output->collection_names[i], output->type_name, GetLevel())); - } - } - - summary.Add(result); - } - - // Getters and setters + // Getters - void SetResourceName(std::string resource_name) { m_resource_name = resource_name; } - std::string GetResourceName() const { return m_resource_name; } - uint64_t GetEventCount() const { return m_event_count; }; - // TODO: Deprecate me + [[deprecated]] virtual std::string GetType() const { return m_type_name; } - // TODO: Deprecate me + [[deprecated]] std::string GetName() const { return m_resource_name; } bool IsGetObjectsEnabled() const { return m_enable_get_objects; } bool IsFinishEventEnabled() const { return m_enable_finish_event; } + + uint64_t GetNSkip() { return m_nskip; } + uint64_t GetNEvents() { return m_nevents; } // TODO: Deprecate me virtual std::string GetVDescription() const { @@ -412,10 +153,10 @@ class JEventSource : public jana::components::JComponent, } ///< Optional for getting description via source rather than JEventSourceGenerator - uint64_t GetNSkip() { return m_nskip; } - uint64_t GetNEvents() { return m_nevents; } + // Setters + + void SetResourceName(std::string resource_name) { m_resource_name = resource_name; } - // Meant to be called by user /// EnableFinishEvent() is intended to be called by the user in the constructor in order to /// tell JANA to call the provided FinishEvent method after all JEventProcessors /// have finished with a given event. This should only be enabled when absolutely necessary @@ -424,20 +165,29 @@ class JEventSource : public jana::components::JComponent, void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; } void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; } - // Meant to be called by JANA void SetNEvents(uint64_t nevents) { m_nevents = nevents; }; - - // Meant to be called by JANA void SetNSkip(uint64_t nskip) { m_nskip = nskip; }; -private: - std::string m_resource_name; - std::atomic_ullong m_event_count {0}; - uint64_t m_nskip = 0; - uint64_t m_nevents = 0; - bool m_enable_finish_event = false; - bool m_enable_get_objects = false; + // Internal + + [[deprecated("Replaced by JEventSource::DoOpen()")]] + void DoInitialize(); + + virtual void DoInit(); + + void DoOpen(bool with_lock=true); + + void DoClose(bool with_lock=true); + + Result DoNext(std::shared_ptr event); + + Result DoNextCompatibility(std::shared_ptr event); + + void DoFinish(JEvent& event); + + void Summarize(JComponentSummary& summary) const override; + }; From 1762891190f95f17ff23f3c9502fb1120e0e13ce Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 30 Sep 2024 14:15:44 -0400 Subject: [PATCH 04/14] Move away from deprecated JEventSource ctor --- scripts/jana-generate.py | 8 -------- src/examples/DstExample/DstExampleSource.cc | 5 ----- src/examples/DstExample/DstExampleSource.h | 2 -- .../EventGroupExample/BlockingGroupedEventSource.h | 2 +- src/examples/EventGroupExample/EventGroupExamplePlugin.cc | 2 +- src/examples/EventGroupExample/GroupedEventSource.h | 2 +- src/examples/Tutorial/RandomSource.cc | 5 ----- src/examples/Tutorial/RandomSource.h | 2 -- src/examples/Tutorial/Tutorial.cc | 2 +- 9 files changed, 4 insertions(+), 26 deletions(-) diff --git a/scripts/jana-generate.py b/scripts/jana-generate.py index f84597cbc..ffa3d119b 100755 --- a/scripts/jana-generate.py +++ b/scripts/jana-generate.py @@ -187,8 +187,6 @@ class {name} : public JEventSource {{ public: {name}(); - {name}(std::string resource_name, JApplication* app); - virtual ~{name}() = default; void Open() override; @@ -230,12 +228,6 @@ class {name} : public JEventSource {{ SetCallbackStyle(CallbackStyle::ExpertMode); }} -{name}::{name}(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) {{ - SetTypeName(NAME_OF_THIS); // Provide JANA with class name - SetCallbackStyle(CallbackStyle::ExpertMode); -}} - - void {name}::Open() {{ /// Open is called exactly once when processing begins. diff --git a/src/examples/DstExample/DstExampleSource.cc b/src/examples/DstExample/DstExampleSource.cc index 518c49545..efc07f2f2 100644 --- a/src/examples/DstExample/DstExampleSource.cc +++ b/src/examples/DstExample/DstExampleSource.cc @@ -15,11 +15,6 @@ DstExampleSource::DstExampleSource() : JEventSource() { SetCallbackStyle(CallbackStyle::ExpertMode); } -DstExampleSource::DstExampleSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) { - SetTypeName(NAME_OF_THIS); // Provide JANA with class name - SetCallbackStyle(CallbackStyle::ExpertMode); -} - void DstExampleSource::Open() { /// Open is called exactly once when processing begins. diff --git a/src/examples/DstExample/DstExampleSource.h b/src/examples/DstExample/DstExampleSource.h index ef0a9226e..7d75201af 100644 --- a/src/examples/DstExample/DstExampleSource.h +++ b/src/examples/DstExample/DstExampleSource.h @@ -17,8 +17,6 @@ class DstExampleSource : public JEventSource { public: DstExampleSource(); - DstExampleSource(std::string resource_name, JApplication* app); - virtual ~DstExampleSource() = default; void Open() override; diff --git a/src/examples/EventGroupExample/BlockingGroupedEventSource.h b/src/examples/EventGroupExample/BlockingGroupedEventSource.h index 917877950..5d82e00e8 100644 --- a/src/examples/EventGroupExample/BlockingGroupedEventSource.h +++ b/src/examples/EventGroupExample/BlockingGroupedEventSource.h @@ -24,7 +24,7 @@ class BlockingGroupedEventSource : public JEventSource { public: - BlockingGroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) { + BlockingGroupedEventSource() { // TODO: Get EventGroupManager from ServiceLocator instead SetCallbackStyle(CallbackStyle::ExpertMode); m_pending_group_id = 1; diff --git a/src/examples/EventGroupExample/EventGroupExamplePlugin.cc b/src/examples/EventGroupExample/EventGroupExamplePlugin.cc index 76f635740..9323ea54f 100644 --- a/src/examples/EventGroupExample/EventGroupExamplePlugin.cc +++ b/src/examples/EventGroupExample/EventGroupExamplePlugin.cc @@ -67,7 +67,7 @@ void InitPlugin(JApplication *app) { app->Add(new GroupedEventProcessor()); - auto evt_src = new BlockingGroupedEventSource("blocking_source", app); + auto evt_src = new BlockingGroupedEventSource(); app->Add(evt_src); diff --git a/src/examples/EventGroupExample/GroupedEventSource.h b/src/examples/EventGroupExample/GroupedEventSource.h index 6933ef935..a46474759 100644 --- a/src/examples/EventGroupExample/GroupedEventSource.h +++ b/src/examples/EventGroupExample/GroupedEventSource.h @@ -19,7 +19,7 @@ class GroupedEventSource : public JEventSource { int m_current_event_number; public: - GroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) { + GroupedEventSource() { // TODO: Get EventGroupManager from ServiceLocator instead SetCallbackStyle(CallbackStyle::ExpertMode); m_remaining_events_in_group = 5; diff --git a/src/examples/Tutorial/RandomSource.cc b/src/examples/Tutorial/RandomSource.cc index 70a8d4268..a5881cafb 100644 --- a/src/examples/Tutorial/RandomSource.cc +++ b/src/examples/Tutorial/RandomSource.cc @@ -23,11 +23,6 @@ RandomSource::RandomSource() : JEventSource() { SetCallbackStyle(CallbackStyle::ExpertMode); } -RandomSource::RandomSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) { - SetTypeName(NAME_OF_THIS); // Provide JANA with class name - SetCallbackStyle(CallbackStyle::ExpertMode); -} - void RandomSource::Open() { /// Open is called exactly once when processing begins. diff --git a/src/examples/Tutorial/RandomSource.h b/src/examples/Tutorial/RandomSource.h index 87fb80c61..11e1d2c4e 100644 --- a/src/examples/Tutorial/RandomSource.h +++ b/src/examples/Tutorial/RandomSource.h @@ -16,8 +16,6 @@ class RandomSource : public JEventSource { public: RandomSource(); - RandomSource(std::string resource_name, JApplication* app); - virtual ~RandomSource() = default; void Open() override; diff --git a/src/examples/Tutorial/Tutorial.cc b/src/examples/Tutorial/Tutorial.cc index 1823332ba..0c31b78b0 100644 --- a/src/examples/Tutorial/Tutorial.cc +++ b/src/examples/Tutorial/Tutorial.cc @@ -20,7 +20,7 @@ void InitPlugin(JApplication* app) { app->Add(new JFactoryGeneratorT); // Always use RandomSource - app->Add(new RandomSource("random", app)); + app->Add(new RandomSource()); // Only use RandomSource when 'random' specified on cmd line // app->Add(new JEventSourceGeneratorT); From 3a8eee1d224f36744de181515fa4da53f7dae13f Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 30 Sep 2024 14:49:29 -0400 Subject: [PATCH 05/14] JEventSource reports both emitted and finished event counts --- src/libraries/JANA/JEventSource.cc | 25 ++++++------ src/libraries/JANA/JEventSource.h | 38 ++++++++++--------- .../Components/JEventSourceTests.cc | 8 ++-- .../unit_tests/Components/NEventNSkipTests.cc | 18 ++++----- .../unit_tests/Engine/TerminationTests.cc | 6 +-- .../unit_tests/Topology/SubeventTests.cc | 2 +- 6 files changed, 49 insertions(+), 48 deletions(-) diff --git a/src/libraries/JANA/JEventSource.cc b/src/libraries/JANA/JEventSource.cc index dd9e6b5ac..e33d397a9 100644 --- a/src/libraries/JANA/JEventSource.cc +++ b/src/libraries/JANA/JEventSource.cc @@ -93,7 +93,7 @@ JEventSource::Result JEventSource::DoNext(std::shared_ptr event) { DoOpen(false); } if (m_status == Status::Opened) { - if (m_nevents != 0 && (m_event_count == last_evt_nr)) { + if (m_nevents != 0 && (m_events_emitted == last_evt_nr)) { // We exit early (and recycle) because we hit our jana:nevents limit DoClose(false); return Result::FailureFinished; @@ -101,7 +101,7 @@ JEventSource::Result JEventSource::DoNext(std::shared_ptr event) { // If we reach this point, we will need to actually read an event // We configure the event - event->SetEventNumber(m_event_count); // Default event number to event count + event->SetEventNumber(m_events_emitted); // Default event number to event count event->SetJEventSource(this); event->SetSequential(false); event->GetJCallGraphRecorder()->Reset(); @@ -115,13 +115,13 @@ JEventSource::Result JEventSource::DoNext(std::shared_ptr event) { event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); if (result == Result::Success) { - m_event_count += 1; + m_events_emitted += 1; // We end up here if we read an entry in our file or retrieved a message from our socket, // and believe we could obtain another one immediately if we wanted to for (auto* output : m_outputs) { output->InsertCollection(*event); } - if (m_event_count <= first_evt_nr) { + if (m_events_emitted <= first_evt_nr) { // We immediately throw away this whole event because of nskip // (although really we should be handling this with Seek()) return Result::FailureTryAgain; @@ -158,22 +158,22 @@ JEventSource::Result JEventSource::DoNextCompatibility(std::shared_ptr e DoOpen(false); } if (m_status == Status::Opened) { - if (m_event_count < first_evt_nr) { + if (m_events_emitted < first_evt_nr) { // Skip these events due to nskip - event->SetEventNumber(m_event_count); // Default event number to event count + event->SetEventNumber(m_events_emitted); // Default event number to event count auto previous_origin = event->GetJCallGraphRecorder()->SetInsertDataOrigin( JCallGraphRecorder::ORIGIN_FROM_SOURCE); // (see note at top of JCallGraphRecorder.h) GetEvent(event); event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); - m_event_count += 1; + m_events_emitted += 1; return Result::FailureTryAgain; // Reject this event and recycle it - } else if (m_nevents != 0 && (m_event_count == last_evt_nr)) { + } else if (m_nevents != 0 && (m_events_emitted == last_evt_nr)) { // Declare ourselves finished due to nevents DoClose(false); // Close out the event source as soon as it declares itself finished return Result::FailureFinished; } else { // Actually emit an event. // GetEvent() expects the following things from its incoming JEvent - event->SetEventNumber(m_event_count); + event->SetEventNumber(m_events_emitted); event->SetJApplication(m_app); event->SetJEventSource(this); event->SetSequential(false); @@ -184,7 +184,7 @@ JEventSource::Result JEventSource::DoNextCompatibility(std::shared_ptr e output->InsertCollection(*event); } event->GetJCallGraphRecorder()->SetInsertDataOrigin( previous_origin ); - m_event_count += 1; + m_events_emitted += 1; return Result::Success; // Don't reject this event! } } else if (m_status == Status::Closed) { @@ -245,9 +245,8 @@ JEventSource::Result JEventSource::DoNextCompatibility(std::shared_ptr e void JEventSource::DoFinish(JEvent& event) { - /// Calls the optional-and-discouraged user-provided FinishEvent virtual method, enforcing - /// 1. Thread safety - /// 2. The m_enable_finish_event flag + + m_events_finished.fetch_add(1); if (m_enable_finish_event) { std::lock_guard lock(m_mutex); CallWithJExceptionWrapper("JEventSource::FinishEvent", [&](){ diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index 0129338f1..40f6ef264 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -30,7 +30,8 @@ class JEventSource : public jana::components::JComponent, private: std::string m_resource_name; - std::atomic_ullong m_event_count {0}; + std::atomic_ullong m_events_emitted {0}; + std::atomic_ullong m_events_finished {0}; uint64_t m_nskip = 0; uint64_t m_nevents = 0; bool m_enable_finish_event = false; @@ -40,9 +41,7 @@ class JEventSource : public jana::components::JComponent, public: [[deprecated]] explicit JEventSource(std::string resource_name, JApplication* app = nullptr) - : m_resource_name(std::move(resource_name)) - , m_event_count{0} - { + : m_resource_name(std::move(resource_name)) { m_app = app; } @@ -55,6 +54,7 @@ class JEventSource : public jana::components::JComponent, virtual void Init() {} + /// `Open` is called by JANA when it is ready to accept events from this event source. The implementor should open /// file pointers or sockets here, instead of in the constructor. This is because the implementor won't know how many /// or which event sources the user will decide to activate within one job. Thus the implementor can avoid problems @@ -63,8 +63,6 @@ class JEventSource : public jana::components::JComponent, virtual void Open() {} - - // `Emit` is called by JANA in order to emit a fresh event into the stream, when using CallbackStyle::ExpertMode. // It is very similar to GetEvent(), except the user returns a Result status code instead of throwing an exception. // Exceptions are reserved for unrecoverable errors. It accepts an out parameter JEvent. If there is another @@ -81,6 +79,17 @@ class JEventSource : public jana::components::JComponent, virtual void Preprocess(const JEvent&) const {}; + /// `FinishEvent` is used to notify the `JEventSource` that an event has been completely processed. This is the final + /// chance to interact with the `JEvent` before it is either cleared and recycled, or deleted. Although it is + /// possible to use this for freeing JObjects stored in the JEvent , this is strongly discouraged in favor of putting + /// that logic on the destructor, RAII-style. Instead, this callback should be used for updating and freeing state + /// owned by the JEventSource, e.g. raw data which is keyed off of run number and therefore shared among multiple + /// JEvents. `FinishEvent` is also well-suited for use with `EventGroup`s, e.g. to notify someone that a batch of + /// events has finished, or to implement "barrier events". + + virtual void FinishEvent(JEvent&) {}; + + /// `Close` is called by JANA when it is finished accepting events from this event source. Here is where you should /// cleanly close files, sockets, etc. Although GetEvent() knows when (for instance) there are no more events in a /// file, the logic for closing needs to live here because there are other ways a computation may end besides @@ -110,17 +119,6 @@ class JEventSource : public jana::components::JComponent, virtual void GetEvent(std::shared_ptr) {}; - /// `FinishEvent` is used to notify the `JEventSource` that an event has been completely processed. This is the final - /// chance to interact with the `JEvent` before it is either cleared and recycled, or deleted. Although it is - /// possible to use this for freeing JObjects stored in the JEvent , this is strongly discouraged in favor of putting - /// that logic on the destructor, RAII-style. Instead, this callback should be used for updating and freeing state - /// owned by the JEventSource, e.g. raw data which is keyed off of run number and therefore shared among multiple - /// JEvents. `FinishEvent` is also well-suited for use with `EventGroup`s, e.g. to notify someone that a batch of - /// events has finished, or to implement "barrier events". - - virtual void FinishEvent(JEvent&) {}; - - /// `GetObjects` was historically used for lazily unpacking data from a JEvent and putting it into a "dummy" JFactory. /// This mechanism has been replaced by `JEvent::Insert`. All lazy evaluation should happen in a (non-dummy) /// JFactory, whereas eager evaluation should happen in `JEventSource::GetEvent` via `JEvent::Insert`. @@ -133,7 +131,11 @@ class JEventSource : public jana::components::JComponent, // Getters std::string GetResourceName() const { return m_resource_name; } - uint64_t GetEventCount() const { return m_event_count; }; + + [[deprecated]] + uint64_t GetEventCount() const { return m_events_emitted; }; + uint64_t GetEmittedEventCount() const { return m_events_emitted; }; + uint64_t GetFinishedEventCount() const { return m_events_finished; }; [[deprecated]] virtual std::string GetType() const { return m_type_name; } diff --git a/src/programs/unit_tests/Components/JEventSourceTests.cc b/src/programs/unit_tests/Components/JEventSourceTests.cc index 3b7c653d1..990a3904b 100644 --- a/src/programs/unit_tests/Components/JEventSourceTests.cc +++ b/src/programs/unit_tests/Components/JEventSourceTests.cc @@ -16,7 +16,7 @@ struct MyEventSource : public JEventSource { Result Emit(JEvent&) override { emit_count++; - if (GetEventCount() >= events_in_file) { + if (GetEmittedEventCount() >= events_in_file) { LOG_INFO(GetLogger()) << "Emit() called, returning FailureFinished" << LOG_END; return Result::FailureFinished; } @@ -44,7 +44,7 @@ TEST_CASE("JEventSource_ExpertMode_EmitCount") { app.Run(); REQUIRE(sut->open_count == 1); REQUIRE(sut->emit_count == 6); // Emit called 5 times successfully and fails on the 6th - REQUIRE(sut->GetEventCount() == 5); // Emits 5 events successfully (including skipped) + REQUIRE(sut->GetEmittedEventCount() == 5); // Emits 5 events successfully (including skipped) REQUIRE(sut->close_count == 1); } @@ -54,7 +54,7 @@ TEST_CASE("JEventSource_ExpertMode_EmitCount") { app.Run(); REQUIRE(sut->open_count == 1); REQUIRE(sut->emit_count == 3); // Emit called 3 times successfully - REQUIRE(sut->GetEventCount() == 3); // Nevents limit discovered outside Emit + REQUIRE(sut->GetEmittedEventCount() == 3); // Nevents limit discovered outside Emit REQUIRE(sut->close_count == 1); } @@ -64,7 +64,7 @@ TEST_CASE("JEventSource_ExpertMode_EmitCount") { app.Run(); REQUIRE(sut->open_count == 1); REQUIRE(sut->emit_count == 6); // Emit called 5 times successfully and fails on the 6th - REQUIRE(sut->GetEventCount() == 5); // 5 events successfully emitted, 3 of which were (presumably) skipped + REQUIRE(sut->GetEmittedEventCount() == 5); // 5 events successfully emitted, 3 of which were (presumably) skipped REQUIRE(sut->close_count == 1); } } diff --git a/src/programs/unit_tests/Components/NEventNSkipTests.cc b/src/programs/unit_tests/Components/NEventNSkipTests.cc index 983d66719..3bf880cfe 100644 --- a/src/programs/unit_tests/Components/NEventNSkipTests.cc +++ b/src/programs/unit_tests/Components/NEventNSkipTests.cc @@ -110,9 +110,9 @@ TEST_CASE("JEventSourceArrow with multiple JEventSources") { REQUIRE(source1->close_count == 1); REQUIRE(source2->close_count == 1); REQUIRE(source3->close_count == 1); - REQUIRE(source1->GetEventCount() == 9); - REQUIRE(source2->GetEventCount() == 13); - REQUIRE(source3->GetEventCount() == 7); + REQUIRE(source1->GetEmittedEventCount() == 9); + REQUIRE(source2->GetEmittedEventCount() == 13); + REQUIRE(source3->GetEmittedEventCount() == 7); REQUIRE(app.GetNEventsProcessed() == 9+13+7); } @@ -136,9 +136,9 @@ TEST_CASE("JEventSourceArrow with multiple JEventSources") { REQUIRE(source1->close_count == 1); REQUIRE(source2->close_count == 1); REQUIRE(source3->close_count == 1); - REQUIRE(source1->GetEventCount() == 9); // 3 dropped, 6 emitted - REQUIRE(source2->GetEventCount() == 12); // 3 dropped, 9 emitted - REQUIRE(source3->GetEventCount() == 7); // 3 dropped, 4 emitted + REQUIRE(source1->GetEmittedEventCount() == 9); // 3 dropped, 6 emitted + REQUIRE(source2->GetEmittedEventCount() == 12); // 3 dropped, 9 emitted + REQUIRE(source3->GetEmittedEventCount() == 7); // 3 dropped, 4 emitted REQUIRE(app.GetNEventsProcessed() == 19); } @@ -164,9 +164,9 @@ TEST_CASE("JEventSourceArrow with multiple JEventSources") { REQUIRE(source1->close_count == 1); REQUIRE(source2->close_count == 1); REQUIRE(source3->close_count == 1); - REQUIRE(source1->GetEventCount() == 6); // 2 dropped, 4 emitted - REQUIRE(source2->GetEventCount() == 13); // 13 emitted - REQUIRE(source3->GetEventCount() == 4); // 4 emitted + REQUIRE(source1->GetEmittedEventCount() == 6); // 2 dropped, 4 emitted + REQUIRE(source2->GetEmittedEventCount() == 13); // 13 emitted + REQUIRE(source3->GetEmittedEventCount() == 4); // 4 emitted REQUIRE(app.GetNEventsProcessed() == 21); } diff --git a/src/programs/unit_tests/Engine/TerminationTests.cc b/src/programs/unit_tests/Engine/TerminationTests.cc index 312f91223..5c601e080 100644 --- a/src/programs/unit_tests/Engine/TerminationTests.cc +++ b/src/programs/unit_tests/Engine/TerminationTests.cc @@ -26,7 +26,7 @@ TEST_CASE("TerminationTests") { app.Stop(true); REQUIRE(source->event_count > 0); REQUIRE(processor->finish_call_count == 1); - REQUIRE(app.GetNEventsProcessed() == source->event_count); + REQUIRE(app.GetNEventsProcessed() == source->GetEmittedEventCount()); } SECTION("Self termination") { @@ -37,7 +37,7 @@ TEST_CASE("TerminationTests") { REQUIRE(source->event_count == 10); REQUIRE(processor->processed_count == 10); REQUIRE(processor->finish_call_count == 1); - REQUIRE(app.GetNEventsProcessed() == source->event_count); + REQUIRE(app.GetNEventsProcessed() == source->GetEmittedEventCount()); } SECTION("Interrupted during JEventSource::Open()") { @@ -46,7 +46,7 @@ TEST_CASE("TerminationTests") { app.Add(source); app.Run(true); REQUIRE(processor->finish_call_count == 1); - REQUIRE(app.GetNEventsProcessed() == source->GetEventCount()); + REQUIRE(app.GetNEventsProcessed() == source->GetEmittedEventCount()); // We don't know how many events will emit before Stop() request propagates } diff --git a/src/programs/unit_tests/Topology/SubeventTests.cc b/src/programs/unit_tests/Topology/SubeventTests.cc index d929395d8..5ee7573e7 100644 --- a/src/programs/unit_tests/Topology/SubeventTests.cc +++ b/src/programs/unit_tests/Topology/SubeventTests.cc @@ -67,7 +67,7 @@ TEST_CASE("Basic subevent arrow functionality") { SetCallbackStyle(CallbackStyle::ExpertMode); } Result Emit(JEvent& event) override { - if (GetEventCount() == 10) return Result::FailureFinished; + if (GetEmittedEventCount() == 10) return Result::FailureFinished; std::vector inputs; inputs.push_back(new MyInput(22,3.6)); inputs.push_back(new MyInput(23,3.5)); From 42245b58640c0b7c8e040970209e9409af4ca68b Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Fri, 4 Oct 2024 00:07:41 -0400 Subject: [PATCH 06/14] Bring back barrier events --- .../JANA/Topology/JEventSourceArrow.cc | 92 +++++++++++++++---- .../JANA/Topology/JEventSourceArrow.h | 4 +- 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index d41e17990..74834a114 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -35,17 +35,30 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { bool process_succeeded = true; JArrowMetrics::Status process_status = JArrowMetrics::Status::KeepGoing; - assert(in_data.item_count == 1); - Event* event = in_data.items[0]; + + // If we have a pending barrier event, the input event will just be nullptr + Event* event_in = (in_data.item_count == 1) ? in_data.items[0] : nullptr; auto start_processing_time = std::chrono::steady_clock::now(); - process(event, process_succeeded, process_status); + Event* event_out = process(event_in, process_succeeded, process_status); auto end_processing_time = std::chrono::steady_clock::now(); if (process_succeeded) { - in_data.item_count = 0; - out_data.item_count = 1; - out_data.items[0] = event; + if (event_out == nullptr) { + // Event will be nullptr if the JEventSource emitted a barrier event that must + // be held in m_pending_barrier_event until all preceding events have finished + in_data.item_count = 0; // Nothing gets returned to the input queue + out_data.item_count = 0; // Nothing gets sent to the output queue + m_input.min_item_count = 0; // We don't ask for any events from the input queue next time + m_input.max_item_count = 0; + } + else { + in_data.item_count = 0; // Nothing gets returned to the input queue + out_data.item_count = 1; // Event gets sent to the output queue + out_data.items[0] = event_out; + m_input.min_item_count = 1; // We ask for a fresh event from the input queue next time + m_input.max_item_count = 1; + } } m_input.push(in_data); m_output.push(out_data); @@ -58,18 +71,57 @@ void JEventSourceArrow::execute(JArrowMetrics& result, size_t location_id) { } -void JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::Status& arrow_status) { - - // If there are no sources available then we are automatically finished. - if (m_sources.empty()) { - success = false; - arrow_status = JArrowMetrics::Status::Finished; - return; +Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::Status& arrow_status) { + + LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END; + + // First check to see if we need to handle a barrier event before attempting to emit another event + if (m_barrier_active) { + // A barrier event has been emitted by the source. + if (m_pending_barrier_event != nullptr) { + // This barrier event is pending until the topology drains + if (m_sources[m_current_source]->GetEmittedEventCount() - + m_sources[m_current_source]->GetFinishedEventCount() == 1) { + LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END; + + // Topology has drained; only remaining in-flight event is the barrier event itself, + // which we have held on to until now + Event* barrier_event = m_pending_barrier_event; + m_pending_barrier_event = nullptr; + return barrier_event; + } + else { + // Topology has _not_ finished draining, all we can do is wait + LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END; + arrow_status = JArrowMetrics::Status::ComeBackLater; + success = false; + return nullptr; + } + } + else { + // This barrier event has already been sent into the topology and we need to wait + // until it is finished before emitting any more events + if (m_sources[m_current_source]->GetFinishedEventCount() == + m_sources[m_current_source]->GetEmittedEventCount()) { + + LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END; + + // Barrier event has finished + m_barrier_active = false; + // Continue to emit the next event + } + else { + // Barrier event has NOT finished + LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END; + success = false; + arrow_status = JArrowMetrics::Status::ComeBackLater; + return nullptr; + } + } } while (m_current_source < m_sources.size()) { - LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END; auto source_status = m_sources[m_current_source]->DoNext(*event); if (source_status == JEventSource::Result::FailureFinished) { @@ -82,17 +134,25 @@ void JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::Stat LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureTryAgain"<< LOG_END; success = false; arrow_status = JArrowMetrics::Status::ComeBackLater; - return; + return event; + } + else if ((*event)->GetSequential()){ + // Source succeeded, but returned a barrier event + m_pending_barrier_event = event; + m_barrier_active = true; + return nullptr; } else { + // Source succeeded, did NOT return a barrier event LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << (*event)->GetEventNumber() << LOG_END; success = true; arrow_status = JArrowMetrics::Status::KeepGoing; - return; + return event; } } success = false; arrow_status = JArrowMetrics::Status::Finished; + return event; } void JEventSourceArrow::initialize() { diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.h b/src/libraries/JANA/Topology/JEventSourceArrow.h index fcc446f9e..785d2a5ff 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.h +++ b/src/libraries/JANA/Topology/JEventSourceArrow.h @@ -13,6 +13,8 @@ class JEventSourceArrow : public JArrow { private: std::vector m_sources; size_t m_current_source = 0; + bool m_barrier_active = false; + std::shared_ptr* m_pending_barrier_event = nullptr; PlaceRef m_input {this, true, 1, 1}; PlaceRef m_output {this, false, 1, 1}; @@ -36,6 +38,6 @@ class JEventSourceArrow : public JArrow { void initialize() final; void finalize() final; void execute(JArrowMetrics& result, size_t location_id) final; - void process(Event* event, bool& success, JArrowMetrics::Status& status); + Event* process(Event* event, bool& success, JArrowMetrics::Status& status); }; From 53c6e8356f59e5306a10eb91a91b899ab4a10e20 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 7 Oct 2024 16:35:04 -0400 Subject: [PATCH 07/14] Remove `chunksize` field from JArrow This is handled differently now via PlaceRefs --- docs/howto/other-howtos.md | 2 -- src/libraries/JANA/Engine/JPerfSummary.cc | 13 ++++++------ src/libraries/JANA/Engine/JPerfSummary.h | 1 - src/libraries/JANA/Engine/JScheduler.cc | 1 - src/libraries/JANA/Topology/JArrow.h | 20 ++----------------- src/libraries/JANA/Topology/JSubeventArrow.h | 6 +++--- .../JANA/Topology/JTopologyBuilder.cc | 14 ------------- .../JANA/Topology/JTopologyBuilder.h | 2 -- .../unit_tests/Engine/ArrowActivationTests.cc | 1 - .../unit_tests/Engine/SchedulerTests.cc | 2 -- src/programs/unit_tests/Topology/MapArrow.h | 8 ++++---- .../unit_tests/Topology/TopologyTests.cc | 2 -- 12 files changed, 16 insertions(+), 56 deletions(-) diff --git a/docs/howto/other-howtos.md b/docs/howto/other-howtos.md index 96cd6dce3..f6bfc8ec6 100644 --- a/docs/howto/other-howtos.md +++ b/docs/howto/other-howtos.md @@ -129,8 +129,6 @@ jana:affinity | int | 0 | Thread pinning strategy. 0 jana:locality | int | 0 | Memory locality strategy. 0: Global. 1: Socket-local. 2: Numa-domain-local. 3. Core-local. 4. Cpu-local jana:enable_stealing | bool | 0 | Allow threads to pick up work from a different memory location if their local mailbox is empty. jana:event_queue_threshold | int | 80 | Mailbox buffer size -jana:event_source_chunksize | int | 40 | Reduce mailbox contention by chunking work assignments -jana:event_processor_chunksize | int | 1 | Reduce mailbox contention by chunking work assignments Creating code skeletons diff --git a/src/libraries/JANA/Engine/JPerfSummary.cc b/src/libraries/JANA/Engine/JPerfSummary.cc index da33dc4ce..a82a7435c 100644 --- a/src/libraries/JANA/Engine/JPerfSummary.cc +++ b/src/libraries/JANA/Engine/JPerfSummary.cc @@ -8,6 +8,8 @@ #include #include + + std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) { os << std::endl; @@ -22,17 +24,16 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) { os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl; os << std::endl; - os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; - os << " | Name | Type | Par | Threads | Chunk | Thresh | Pending | Completed |" << std::endl; - os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; + os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl; + os << " | Name | Type | Par | Threads | Thresh | Pending | Completed |" << std::endl; + os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl; for (auto as : s.arrows) { os << " | " << std::setw(24) << std::left << as.arrow_name << " | " << std::setw(6) << std::left << (as.is_source ? "Src" : (as.is_sink ? "Sink" : "")) << " | " << std::setw(3) << std::right << (as.is_parallel ? " T " : " F ") << " | " - << std::setw(7) << as.thread_count << " |" - << std::setw(6) << as.chunksize << " |"; + << std::setw(7) << as.thread_count << " |"; if (!as.is_source) { @@ -46,7 +47,7 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) { os << std::setw(12) << as.total_messages_completed << " |" << std::endl; } - os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl; + os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl; os << " +--------------------------+-------------+--------------+----------------+--------------+----------------+" << std::endl; diff --git a/src/libraries/JANA/Engine/JPerfSummary.h b/src/libraries/JANA/Engine/JPerfSummary.h index e2840edd9..ed45c1acd 100644 --- a/src/libraries/JANA/Engine/JPerfSummary.h +++ b/src/libraries/JANA/Engine/JPerfSummary.h @@ -17,7 +17,6 @@ struct ArrowSummary { bool has_backpressure; size_t messages_pending; size_t threshold; - size_t chunksize; size_t total_messages_completed; size_t last_messages_completed; diff --git a/src/libraries/JANA/Engine/JScheduler.cc b/src/libraries/JANA/Engine/JScheduler.cc index bbde7027f..2d5ee6016 100644 --- a/src/libraries/JANA/Engine/JScheduler.cc +++ b/src/libraries/JANA/Engine/JScheduler.cc @@ -410,7 +410,6 @@ void JScheduler::summarize_arrows(std::vector& summaries) { summary.is_parallel = as.arrow->is_parallel(); summary.is_source = as.arrow->is_source(); summary.is_sink = as.arrow->is_sink(); - summary.chunksize = as.arrow->get_chunksize(); summary.messages_pending = as.arrow->get_pending(); summary.threshold = as.arrow->get_threshold(); diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 981dbef2a..7709997bb 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -31,9 +31,6 @@ class JArrow { mutable std::mutex m_arrow_mutex; // Protects access to arrow properties - // TODO: Get rid of me - size_t m_chunksize = 1; // Number of items to pop off the input queue at once - friend class JScheduler; std::vector m_listeners; // Downstream Arrows @@ -58,26 +55,13 @@ class JArrow { m_is_sink = is_sink; } - // TODO: Get rid of me - void set_chunksize(size_t chunksize) { - std::lock_guard lock(m_arrow_mutex); - m_chunksize = chunksize; - } - - // TODO: Get rid of me - size_t get_chunksize() const { - std::lock_guard lock(m_arrow_mutex); - return m_chunksize; - } - - // TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize JArrowMetrics& get_metrics() { return m_metrics; } - JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink, size_t chunksize=16) : - m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink), m_chunksize(chunksize) { + JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) : + m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) { m_metrics.clear(); }; diff --git a/src/libraries/JANA/Topology/JSubeventArrow.h b/src/libraries/JANA/Topology/JSubeventArrow.h index fd2006d1f..e90648b1e 100644 --- a/src/libraries/JANA/Topology/JSubeventArrow.h +++ b/src/libraries/JANA/Topology/JSubeventArrow.h @@ -119,7 +119,7 @@ void JSplitArrow::execute(JArrowMetrics& result, size_t locatio std::shared_ptr* event = nullptr; bool success; - size_t reserved_size = m_outbox->reserve(get_chunksize()); + size_t reserved_size = m_outbox->reserve(1); size_t actual_size = reserved_size; // TODO: Exit early if we don't have enough space on output queue @@ -168,7 +168,7 @@ void JSubeventArrow::execute(JArrowMetrics& result, size_t loca // TODO: Think more carefully about subevent bucket size std::vector> inputs; - size_t downstream_accepts = m_outbox->reserve(get_chunksize(), location_id); + size_t downstream_accepts = m_outbox->reserve(1, location_id); auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id); auto start_latency_time = std::chrono::steady_clock::now(); @@ -209,7 +209,7 @@ void JMergeArrow::execute(JArrowMetrics& result, size_t locatio // TODO: Think more carefully about subevent bucket size std::vector> inputs; - size_t downstream_accepts = m_outbox->reserve(get_chunksize(), location_id); + size_t downstream_accepts = m_outbox->reserve(1, location_id); auto in_status = m_inbox->pop(inputs, downstream_accepts, location_id); auto start_latency_time = std::chrono::steady_clock::now(); diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index df4d33875..d55ae85b7 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -142,12 +142,6 @@ void JTopologyBuilder::acquire_services(JServiceLocator *sl) { m_params->SetDefaultParameter("jana:event_queue_threshold", m_event_queue_threshold, "Max number of events allowed on the main event queue. Higher => Better load balancing; Lower => Fewer events in flight") ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:event_source_chunksize", m_event_source_chunksize, - "Max number of events that a JEventSource may enqueue at once. Higher => less queue contention; Lower => better load balancing") - ->SetIsAdvanced(true); - m_params->SetDefaultParameter("jana:event_processor_chunksize", m_event_processor_chunksize, - "Max number of events that the JEventProcessors may dequeue at once. Higher => less queue contention; Lower => better load balancing") - ->SetIsAdvanced(true); m_params->SetDefaultParameter("jana:enable_stealing", m_enable_stealing, "Enable work stealing. Improves load balancing when jana:locality != 0; otherwise does nothing.") ->SetIsAdvanced(true); @@ -216,7 +210,6 @@ void JTopologyBuilder::attach_lower_level(JEventLevel current_level, JUnfoldArro proc_arrow->set_input(q1); proc_arrow->set_output(q2); arrows.push_back(proc_arrow); - proc_arrow->set_chunksize(m_event_processor_chunksize); proc_arrow->set_logger(GetLogger()); if (found_sink) { proc_arrow->set_is_sink(false); @@ -308,13 +301,11 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) { src_arrow->set_input(pool_at_level); src_arrow->set_output(queue); arrows.push_back(src_arrow); - src_arrow->set_chunksize(m_event_source_chunksize); auto* proc_arrow = new JEventProcessorArrow(level_str+"Tap"); proc_arrow->set_input(queue); proc_arrow->set_output(pool_at_level); arrows.push_back(proc_arrow); - proc_arrow->set_chunksize(m_event_processor_chunksize); for (auto proc: procs_at_level) { proc_arrow->add_processor(proc); @@ -336,26 +327,22 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) { src_arrow->set_input(pool_at_level); src_arrow->set_output(q1); arrows.push_back(src_arrow); - src_arrow->set_chunksize(m_event_source_chunksize); auto *map_arrow = new JEventMapArrow(level_str+"Map"); map_arrow->set_input(q1); map_arrow->set_output(q2); arrows.push_back(map_arrow); - map_arrow->set_chunksize(m_event_source_chunksize); src_arrow->attach(map_arrow); // TODO: We are using q2 temporarily knowing that it will be overwritten in attach_lower_level. // It would be better to rejigger how we validate PlaceRefs and accept empty placerefs/fewer ctor args auto *unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0], q2, pool_at_level, q2); arrows.push_back(unfold_arrow); - unfold_arrow->set_chunksize(m_event_source_chunksize); map_arrow->attach(unfold_arrow); // child_in, child_out, parent_out auto *fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel(), q2, pool_at_level, pool_at_level); // TODO: Support user-provided folders - fold_arrow->set_chunksize(m_event_source_chunksize); bool found_sink = (procs_at_level.size() > 0); attach_lower_level(unfolders_at_level[0]->GetChildLevel(), unfold_arrow, fold_arrow, found_sink); @@ -372,7 +359,6 @@ void JTopologyBuilder::attach_top_level(JEventLevel current_level) { proc_arrow->set_input(q3); proc_arrow->set_output(pool_at_level); arrows.push_back(proc_arrow); - proc_arrow->set_chunksize(m_event_processor_chunksize); for (auto proc: procs_at_level) { proc_arrow->add_processor(proc); diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 270fe6863..05c3c9ca3 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -37,8 +37,6 @@ class JTopologyBuilder : public JService { // Topology configuration size_t m_event_pool_size = 4; size_t m_event_queue_threshold = 80; - size_t m_event_source_chunksize = 40; - size_t m_event_processor_chunksize = 1; size_t m_location_count = 1; bool m_enable_stealing = false; bool m_limit_total_events_in_flight = true; diff --git a/src/programs/unit_tests/Engine/ArrowActivationTests.cc b/src/programs/unit_tests/Engine/ArrowActivationTests.cc index 80ef5d860..47f12f2fa 100644 --- a/src/programs/unit_tests/Engine/ArrowActivationTests.cc +++ b/src/programs/unit_tests/Engine/ArrowActivationTests.cc @@ -53,7 +53,6 @@ TEST_CASE("ArrowActivationTests") { multiply_by_two->set_logger(logger); subtract_one->set_logger(logger); sum_everything->set_logger(logger); - emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); scheduler.logger = logger; diff --git a/src/programs/unit_tests/Engine/SchedulerTests.cc b/src/programs/unit_tests/Engine/SchedulerTests.cc index 0136fc2ad..aec091ccb 100644 --- a/src/programs/unit_tests/Engine/SchedulerTests.cc +++ b/src/programs/unit_tests/Engine/SchedulerTests.cc @@ -46,7 +46,6 @@ TEST_CASE("SchedulerTests") { subtract_one->set_logger(logger); sum_everything->set_logger(logger); - emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); scheduler.logger = logger; @@ -132,7 +131,6 @@ TEST_CASE("SchedulerRoundRobinBehaviorTests") { subtract_one->set_logger(logger); sum_everything->set_logger(logger); - emit_rand_ints->set_chunksize(1); JScheduler scheduler(topology); scheduler.logger = logger; diff --git a/src/programs/unit_tests/Topology/MapArrow.h b/src/programs/unit_tests/Topology/MapArrow.h index e24670129..81790a5ef 100644 --- a/src/programs/unit_tests/Topology/MapArrow.h +++ b/src/programs/unit_tests/Topology/MapArrow.h @@ -43,11 +43,11 @@ class MapArrow : public JArrow { auto start_total_time = std::chrono::steady_clock::now(); std::vector xs; std::vector ys; - xs.reserve(get_chunksize()); - ys.reserve(get_chunksize()); - // TODO: These allocations are unnecessary and should be eliminated + xs.reserve(1); + ys.reserve(1); + // TODO: Push/pop single items instead of list - auto in_status = _input_queue->pop(xs, get_chunksize()); + auto in_status = _input_queue->pop(xs, 1); auto start_latency_time = std::chrono::steady_clock::now(); for (S &x : xs) { diff --git a/src/programs/unit_tests/Topology/TopologyTests.cc b/src/programs/unit_tests/Topology/TopologyTests.cc index d13e89c48..6cc77975a 100644 --- a/src/programs/unit_tests/Topology/TopologyTests.cc +++ b/src/programs/unit_tests/Topology/TopologyTests.cc @@ -59,8 +59,6 @@ TEST_CASE("JTopology: Basic functionality") { subtract_one->set_logger(logger); sum_everything->set_logger(logger); - emit_rand_ints->set_chunksize(1); - JScheduler scheduler(topology); scheduler.logger = logger; From 4cf953fa56668c7a524907a03c41acdbfe7e6491 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 8 Oct 2024 03:47:49 -0400 Subject: [PATCH 08/14] Clean up detailed final report --- .../JANA/Engine/JArrowProcessingController.cc | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/libraries/JANA/Engine/JArrowProcessingController.cc b/src/libraries/JANA/Engine/JArrowProcessingController.cc index ce38f6ea7..e45fa0f1d 100644 --- a/src/libraries/JANA/Engine/JArrowProcessingController.cc +++ b/src/libraries/JANA/Engine/JArrowProcessingController.cc @@ -233,9 +233,32 @@ void JArrowProcessingController::print_report() { void JArrowProcessingController::print_final_report() { auto metrics = measure_performance(); - - LOG_INFO(GetLogger()) << "Detailed report" << *metrics << LOG_END; + LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END; + LOG_INFO(GetLogger()) << LOG_END; + LOG_INFO(GetLogger()) << " Thread team size [count]: " << metrics->thread_count << LOG_END; + LOG_INFO(GetLogger()) << " Total uptime [s]: " << std::setprecision(4) << metrics->total_uptime_s << LOG_END; + LOG_INFO(GetLogger()) << " Completed events [count]: " << metrics->total_events_completed << LOG_END; + LOG_INFO(GetLogger()) << " Avg throughput [Hz]: " << std::setprecision(3) << metrics->avg_throughput_hz << LOG_END; + LOG_INFO(GetLogger()) << " Sequential bottleneck [Hz]: " << std::setprecision(3) << metrics->avg_seq_bottleneck_hz << LOG_END; + LOG_INFO(GetLogger()) << " Parallel bottleneck [Hz]: " << std::setprecision(3) << metrics->avg_par_bottleneck_hz << LOG_END; + LOG_INFO(GetLogger()) << " Efficiency [0..1]: " << std::setprecision(3) << metrics->avg_efficiency_frac << LOG_END; + + if (!metrics->arrows.empty()) { + LOG_INFO(GetLogger()) << LOG_END; + LOG_INFO(GetLogger()) << " Arrow-level metrics:" << LOG_END; + LOG_INFO(GetLogger()) << LOG_END; + } + for (auto& as : metrics->arrows) { + LOG_INFO(GetLogger()) << " - Arrow name: " << as.arrow_name << LOG_END; + LOG_INFO(GetLogger()) << " Events completed: " << as.total_messages_completed << LOG_END; + LOG_INFO(GetLogger()) << " Avg processing latency [ms/event]: " << as.avg_latency_ms << LOG_END; + LOG_INFO(GetLogger()) << " Avg queue latency [ms/event]: " << as.avg_queue_latency_ms << LOG_END; + LOG_INFO(GetLogger()) << " Total queue visits [count]: " << as.queue_visit_count << LOG_END; + LOG_INFO(GetLogger()) << " Queue overhead [0..1]: " << as.avg_queue_overhead_frac << LOG_END; + LOG_INFO(GetLogger()) << LOG_END; + } + LOG_WARN(GetLogger()) << "Final report: " << metrics->total_events_completed << " events processed at " << JTypeInfo::to_string_with_si_prefix(metrics->avg_throughput_hz) << "Hz" << LOG_END; } From fe43efcb6da34f96504383624eb6119bb5b703a4 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 9 Oct 2024 14:38:09 -0400 Subject: [PATCH 09/14] Bugfix: Event spuriously reset upon return to pool --- src/libraries/JANA/Topology/JArrow.h | 4 +- src/libraries/JANA/Topology/JPool.h | 52 +++---------------- .../unit_tests/Topology/JPoolTests.cc | 6 +-- 3 files changed, 13 insertions(+), 49 deletions(-) diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 7709997bb..4a3deed81 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -235,7 +235,7 @@ struct PlaceRef : public PlaceRefBase { else { if (is_input) { auto pool = static_cast*>(place_ref); - pool->push(data.items.data(), data.item_count, data.location_id); + pool->push(data.items.data(), data.item_count, false, data.location_id); } } } @@ -251,7 +251,7 @@ struct PlaceRef : public PlaceRefBase { } else { auto pool = static_cast*>(place_ref); - pool->push(data.items.data(), data.item_count, data.location_id); + pool->push(data.items.data(), data.item_count, !is_input, data.location_id); data.item_count = 0; data.reserve_count = 0; return 1; diff --git a/src/libraries/JANA/Topology/JPool.h b/src/libraries/JANA/Topology/JPool.h index e1c28f4d6..e7aa3bc39 100644 --- a/src/libraries/JANA/Topology/JPool.h +++ b/src/libraries/JANA/Topology/JPool.h @@ -92,12 +92,14 @@ class JPool : public JPoolBase { } - void put(T* item, size_t location=0) { + void put(T* item, bool release, size_t location) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). - // Do any necessary teardown within the item itself - release_item(item); + if (release) { + // Do any necessary teardown within the item itself + release_item(item); + } // Consider each location starting with current one for (size_t l = location; l& dest, size_t count, size_t location=0) { - - assert(m_pools != nullptr); // If you hit this, you forgot to call init(). - - LocalPool& pool = m_pools[location % m_location_count]; - std::lock_guard lock(pool.mutex); - - if (m_limit_total_events_in_flight && pool.available_items.size() < count) { - return false; - } - else { - while (count > 0 && !pool.available_items.empty()) { - T* t = pool.available_items.back(); - pool.available_items.pop_back(); - dest.push_back(t); - count -= 1; - } - while (count > 0) { - auto t = new T; - configure_item(t); - dest.push_back(t); - count -= 1; - } - return true; - } - } - - // TODO: Remove me - void put_many(std::vector& finished_events, size_t location=0) { - for (T* item : finished_events) { - put(item, location); - } - } - - size_t pop(T** dest, size_t min_count, size_t max_count, size_t location=0) { + size_t pop(T** dest, size_t min_count, size_t max_count, size_t location) { assert(m_pools != nullptr); // If you hit this, you forgot to call init(). @@ -197,9 +161,9 @@ class JPool : public JPoolBase { } } - void push(T** source, size_t count, size_t location=0) { + void push(T** source, size_t count, bool release, size_t location) { for (size_t i=0; ix = 5; - pool.put(f, 0); + pool.put(f, true, 0); h = pool.get(0); REQUIRE(h != nullptr); @@ -74,11 +74,11 @@ TEST_CASE("JPoolTests_SingleLocationUnlimitedEvents") { REQUIRE(g->x == 3); f->x = 5; - pool.put(f, 0); + pool.put(f, true, 0); // f goes back into the pool, so dtor does not get called REQUIRE(was_dtor_called == false); - pool.put(h, 0); + pool.put(h, true, 0); // h's dtor DOES get called REQUIRE(was_dtor_called == true); From 98f6c3f1475928db2ec868d258b6eae55b152e24 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 9 Oct 2024 16:23:09 -0400 Subject: [PATCH 10/14] Improve BarrierEventTests --- .../Components/BarrierEventTests.cc | 58 ++++++++++++++++- .../unit_tests/Components/BarrierEventTests.h | 64 ------------------- 2 files changed, 55 insertions(+), 67 deletions(-) delete mode 100644 src/programs/unit_tests/Components/BarrierEventTests.h diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index 8dce85b45..ae484a48f 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -2,18 +2,70 @@ // Copyright 2021, Jefferson Science Associates, LLC. // Subject to the terms in the LICENSE file found in the top-level directory. -#include "BarrierEventTests.h" +#include +#include +#include #include "catch.hpp" +int global_resource = 0; + + +struct BarrierSource : public JEventSource { + + BarrierSource() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + + void Open() override { + } + + Result Emit(JEvent& event) override { + + auto event_nr = GetEmittedEventCount() + 1; + event.SetEventNumber(event_nr); + + if (event_nr % 10 == 0) { + LOG_INFO(GetLogger()) << "Emitting barrier event " << event_nr << LOG_END; + event.SetSequential(true); + } + else { + LOG_INFO(GetLogger()) << "Emitting non-barrier event " << event_nr << LOG_END; + } + return Result::Success; + } +}; + + + +struct BarrierProcessor : public JEventProcessor { + + BarrierProcessor() { + SetCallbackStyle(CallbackStyle::ExpertMode); + } + void Process(const JEvent& event) override { + + if (event.GetSequential()) { + LOG_INFO(GetLogger()) << "Processing barrier event = " << event.GetEventNumber() << ", writing global var = " << global_resource+1 << LOG_END; + REQUIRE(global_resource == ((event.GetEventNumber() - 1) / 10)); + global_resource += 1; + } + else { + LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END; + REQUIRE(global_resource == (event.GetEventNumber() / 10)); + } + } +}; + + TEST_CASE("BarrierEventTests") { SECTION("Basic Barrier") { JApplication app; app.Add(new BarrierProcessor); app.Add(new BarrierSource); app.SetParameterValue("nthreads", 4); - app.SetParameterValue("jana:event_source_chunksize", 1); - app.SetParameterValue("jana:event_processor_chunksize", 1); + app.SetParameterValue("jana:nevents", 40); app.Run(true); } }; + diff --git a/src/programs/unit_tests/Components/BarrierEventTests.h b/src/programs/unit_tests/Components/BarrierEventTests.h deleted file mode 100644 index 5843c9854..000000000 --- a/src/programs/unit_tests/Components/BarrierEventTests.h +++ /dev/null @@ -1,64 +0,0 @@ - -// Copyright 2021, Jefferson Science Associates, LLC. -// Subject to the terms in the LICENSE file found in the top-level directory. - - -#ifndef JANA2_BARRIEREVENTTESTS_H -#define JANA2_BARRIEREVENTTESTS_H - -#include -#include -#include - -int global_resource = 0; - -class BarrierSource : public JEventSource { - int event_count=0; - -public: - - BarrierSource() { - SetCallbackStyle(CallbackStyle::ExpertMode); - } - - void Open() override { - } - - Result Emit(JEvent& event) override { - event_count++; - - if (event_count >= 100) { - return Result::FailureFinished; - } - - LOG << "Emitting event " << event_count << LOG_END; - event.SetEventNumber(event_count); - - if (event_count % 10 == 0) { - event.SetSequential(true); - } - return Result::Success; - } -}; - - - -struct BarrierProcessor : public JEventProcessor { - -public: - BarrierProcessor() { - SetCallbackStyle(CallbackStyle::ExpertMode); - } - void Process(const JEvent& event) override { - - if (event.GetSequential()) { - global_resource += 1; - LOG << "Barrier event = " << event.GetEventNumber() << ", writing global var = " << global_resource << LOG_END; - } - else { - LOG << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END; - } - } -}; - -#endif //JANA2_BARRIEREVENTTESTS_H From 8e14ca3e4752ab59fe128a7a05e1c534517fa260 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 9 Oct 2024 16:24:22 -0400 Subject: [PATCH 11/14] Bugfix: JFactory::DoInit() sees null JApp pointer on custom JFGs --- src/libraries/JANA/Services/JComponentManager.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/libraries/JANA/Services/JComponentManager.cc b/src/libraries/JANA/Services/JComponentManager.cc index 686263262..1fe7a2b76 100644 --- a/src/libraries/JANA/Services/JComponentManager.cc +++ b/src/libraries/JANA/Services/JComponentManager.cc @@ -120,6 +120,7 @@ void JComponentManager::initialize_components() { try { // Run Init() on each factory in order to capture any parameters // (and eventually services) that are retrieved via GetApplication(). + fac->SetApplication(GetApplication()); fac->DoInit(); } catch (...) { From 9c94943ff11718ec15e2460b1ee1d3ddcaad0b4e Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 9 Oct 2024 16:46:09 -0400 Subject: [PATCH 12/14] Bugfix: Data race in logger timestamp generation --- src/libraries/JANA/JLogger.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/libraries/JANA/JLogger.h b/src/libraries/JANA/JLogger.h index 569040f89..df02124a5 100644 --- a/src/libraries/JANA/JLogger.h +++ b/src/libraries/JANA/JLogger.h @@ -11,6 +11,7 @@ #include #include #include +#include @@ -72,13 +73,13 @@ struct JLogMessage { if (logger.show_timestamp) { auto now = std::chrono::system_clock::now(); std::time_t current_time = std::chrono::system_clock::to_time_t(now); - std::tm* local_time = std::localtime(¤t_time); - char buffer[100]; - std::strftime(buffer, sizeof(buffer), "%H:%M:%S", local_time); + + tm tm_buf; + localtime_r(¤t_time, &tm_buf); // Extract milliseconds by calculating the duration since the last whole second auto milliseconds = std::chrono::duration_cast(now.time_since_epoch()) % 1000; - builder << buffer << "."; + builder << std::put_time(&tm_buf, "%H:%M:%S."); builder << std::setfill('0') << std::setw(3) << milliseconds.count() << std::setfill(' ') << " "; } if (logger.show_level) { From 8415d4f133f63be97ea2fe68b168a1d3dc5a132a Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Thu, 3 Oct 2024 11:54:08 -0400 Subject: [PATCH 13/14] Add script for generating utilization visualization --- scripts/jana-plot-utilization.py | 101 +++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 scripts/jana-plot-utilization.py diff --git a/scripts/jana-plot-utilization.py b/scripts/jana-plot-utilization.py new file mode 100644 index 000000000..c46d5637a --- /dev/null +++ b/scripts/jana-plot-utilization.py @@ -0,0 +1,101 @@ + +import re +import svgwrite +from datetime import datetime, timedelta +from collections import defaultdict + + +def parse_logfile(): + + # Parse the logs and store intervals + thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name) + start_times = {} # Key: (thread_id, processor_name), Value: start_time + + # Define a regular expression to parse the log lines + log_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) (Executing|Executed) arrow (\w+)") + + with open("log.txt", "r") as log_file: + for line in log_file: + match = re.search(log_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, action, processor_name = match.groups() + + # Convert timestamp to milliseconds + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + + if action == "Executing": + # Log the start time of the processor for the thread + start_times[(thread_id, processor_name)] = millis + + elif action == "Executed": + # Calculate the duration of the processor and store the interval + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name)) + + return thread_history + + +def create_svg(all_thread_history): + # Assign colors to processors + processor_colors = {} + color_palette = ['#FF6347', '#4682B4', '#32CD32', '#FFD700', '#9370DB', '#FF69B4'] + color_index = 0 + + # Figure out drawing coordinate system + overall_start_time = min(start for history in all_thread_history.values() for (start,_,_) in history) + overall_end_time = max(end for history in all_thread_history.values() for (_,end,_) in history) + thread_count = len(all_thread_history) + width=1000 + x_scale = width/(overall_end_time-overall_start_time) + thread_height=40 + thread_y_padding=20 + height=thread_count * thread_height + (thread_count+1) * thread_y_padding + + + # Create the SVG drawing + dwg = svgwrite.Drawing("timeline.svg", profile='tiny', size=(width, height)) + dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white")) + + # Draw a rectangle for each processor run on each thread's timeline + y_position = thread_y_padding + + for thread_id, intervals in all_thread_history.items(): + dwg.add(dwg.rect(insert=(0,y_position),size=(1000,thread_height),stroke="lightgray",fill="lightgray")) + for start_time, end_time, processor_name in intervals: + # Calculate the position and width of the rectangle + # Assign a unique color to each processor name + if processor_name not in processor_colors: + processor_colors[processor_name] = color_palette[color_index % len(color_palette)] + color_index += 1 + + # Draw the rectangle + rect_start = (start_time-overall_start_time)*x_scale + rect_width = (end_time-start_time)*x_scale + rect = dwg.rect(insert=(rect_start, y_position), + size=(rect_width, thread_height), + fill=processor_colors[processor_name], + stroke="black", + stroke_width=0) + mouseover = processor_name + ": " + str(end_time-start_time) + "ms" + rect.add(svgwrite.base.Title(mouseover)) + dwg.add(rect) + + # Move the y position for the next thread + y_position += (thread_y_padding + thread_height) + + # Save the SVG file + dwg.save() + + + +if __name__ == "__main__": + thread_history = parse_logfile() + #thread_history = { + # 1103:[(0,1,"a"), (2,5,"b"), (6,8,"a")], + # 219:[(0,3,"b"), (3,6,"c"), (9,10,"d")], + # 3:[(2,7,"a")] + #} + create_svg(thread_history) + + From 0f5de8a8f149657e78329b2a3b98b811d79b0ff5 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Thu, 10 Oct 2024 15:07:21 -0400 Subject: [PATCH 14/14] jana-plot-utilization.py shows barrier events --- scripts/jana-plot-utilization.py | 139 ++++++++++++++---- .../JANA/Topology/JEventSourceArrow.cc | 3 + .../Components/BarrierEventTests.cc | 9 ++ 3 files changed, 125 insertions(+), 26 deletions(-) diff --git a/scripts/jana-plot-utilization.py b/scripts/jana-plot-utilization.py index c46d5637a..80dc9ef03 100644 --- a/scripts/jana-plot-utilization.py +++ b/scripts/jana-plot-utilization.py @@ -8,43 +8,103 @@ def parse_logfile(): # Parse the logs and store intervals - thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name) + thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name, event_nr, result) + barrier_history = [] # [(released_timestamp, finished_timestamp)] + start_times = {} # Key: (thread_id, processor_name), Value: start_time # Define a regular expression to parse the log lines - log_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) (Executing|Executed) arrow (\w+)") + source_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+)$") + source_finish_noemit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+)$") + source_finish_emit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), emitting event# (\d+)$") + source_finish_pending_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), holding back barrier event# (\d+)$") + processor_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+) for event# (\d+)$") + processor_finish_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) for event# (\d+)$") + barrier_inflight_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event is in-flight$") + barrier_finished_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event finished, returning to normal operation$") with open("log.txt", "r") as log_file: for line in log_file: - match = re.search(log_pattern, line.strip()) + + match = re.match(source_start_pattern, line.strip()) if match: - hours_str, mins_str, secs_str, millis_str, thread_id, action, processor_name = match.groups() + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_times[(thread_id, processor_name)] = millis + continue - # Convert timestamp to milliseconds + match = re.match(source_finish_noemit_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, None, result)) + continue + + match = re.match(source_finish_emit_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result)) + continue + + match = re.match(source_finish_pending_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result)) + continue + + match = re.match(processor_start_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_times[(thread_id, processor_name)] = millis + continue + + match = re.match(processor_finish_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((thread_id, processor_name), None) + if start_time: + thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result)) + continue + + match = re.match(barrier_inflight_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id = match.groups() millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_times[()] = millis + continue - if action == "Executing": - # Log the start time of the processor for the thread - start_times[(thread_id, processor_name)] = millis + match = re.match(barrier_finished_pattern, line.strip()) + if match: + hours_str, mins_str, secs_str, millis_str, thread_id = match.groups() + millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str) + start_time = start_times.pop((), None) + if start_time: + barrier_history.append((start_time, millis)) + continue - elif action == "Executed": - # Calculate the duration of the processor and store the interval - start_time = start_times.pop((thread_id, processor_name), None) - if start_time: - thread_history[thread_id].append((start_time, millis, processor_name)) + return (thread_history, barrier_history) - return thread_history -def create_svg(all_thread_history): +def create_svg(all_thread_history, barrier_history): # Assign colors to processors processor_colors = {} - color_palette = ['#FF6347', '#4682B4', '#32CD32', '#FFD700', '#9370DB', '#FF69B4'] + color_palette = ['#004E64', '#00A5CF', '#9FFFCB', '#25A18E', '#7AE582', '#FF69B4'] color_index = 0 # Figure out drawing coordinate system - overall_start_time = min(start for history in all_thread_history.values() for (start,_,_) in history) - overall_end_time = max(end for history in all_thread_history.values() for (_,end,_) in history) + overall_start_time = min(start for history in all_thread_history.values() for (start,_,_,_,_) in history) + overall_end_time = max(end for history in all_thread_history.values() for (_,end,_,_,_) in history) thread_count = len(all_thread_history) width=1000 x_scale = width/(overall_end_time-overall_start_time) @@ -55,14 +115,28 @@ def create_svg(all_thread_history): # Create the SVG drawing dwg = svgwrite.Drawing("timeline.svg", profile='tiny', size=(width, height)) - dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white")) + #dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white")) # Draw a rectangle for each processor run on each thread's timeline y_position = thread_y_padding + for barrier_start,barrier_end in barrier_history: + rect_start = (barrier_start-overall_start_time)*x_scale + if (barrier_end == barrier_start): + rect_width=1 + else: + rect_width = (barrier_end-barrier_start)*x_scale + + rect = dwg.rect(insert=(rect_start, 0), + size=(rect_width, height), + fill="red", + stroke="none", + stroke_width=1) + dwg.add(rect) + for thread_id, intervals in all_thread_history.items(): dwg.add(dwg.rect(insert=(0,y_position),size=(1000,thread_height),stroke="lightgray",fill="lightgray")) - for start_time, end_time, processor_name in intervals: + for start_time, end_time, processor_name, event_nr, result in intervals: # Calculate the position and width of the rectangle # Assign a unique color to each processor name if processor_name not in processor_colors: @@ -71,31 +145,44 @@ def create_svg(all_thread_history): # Draw the rectangle rect_start = (start_time-overall_start_time)*x_scale - rect_width = (end_time-start_time)*x_scale + if (end_time == start_time): + rect_width=1 + else: + rect_width = (end_time-start_time)*x_scale + + rect_stroke_color = "black" + if (result == "ComeBackLater" and event_nr is None): + rect_stroke_color = "gray" + + rect = dwg.rect(insert=(rect_start, y_position), size=(rect_width, thread_height), fill=processor_colors[processor_name], - stroke="black", - stroke_width=0) - mouseover = processor_name + ": " + str(end_time-start_time) + "ms" + stroke=rect_stroke_color, + stroke_width=1) + mouseover = "Arrow: " + processor_name + "\nEvent nr: " + str(event_nr) + "\nResult: " + result + "\nTime: "+ str(end_time-start_time) + "ms" rect.add(svgwrite.base.Title(mouseover)) dwg.add(rect) + if (event_nr is not None): + text = dwg.text(str(event_nr), insert=(rect_start+1, y_position+thread_height-1), fill="white", font_size=8) + dwg.add(text) # Move the y position for the next thread y_position += (thread_y_padding + thread_height) + # Save the SVG file dwg.save() if __name__ == "__main__": - thread_history = parse_logfile() + thread_history,barrier_history = parse_logfile() #thread_history = { # 1103:[(0,1,"a"), (2,5,"b"), (6,8,"a")], # 219:[(0,3,"b"), (3,6,"c"), (9,10,"d")], # 3:[(2,7,"a")] #} - create_svg(thread_history) + create_svg(thread_history, barrier_history) diff --git a/src/libraries/JANA/Topology/JEventSourceArrow.cc b/src/libraries/JANA/Topology/JEventSourceArrow.cc index 74834a114..0c5bc48a7 100644 --- a/src/libraries/JANA/Topology/JEventSourceArrow.cc +++ b/src/libraries/JANA/Topology/JEventSourceArrow.cc @@ -93,6 +93,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St else { // Topology has _not_ finished draining, all we can do is wait LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; arrow_status = JArrowMetrics::Status::ComeBackLater; success = false; return nullptr; @@ -113,6 +114,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St else { // Barrier event has NOT finished LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END; + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END; success = false; arrow_status = JArrowMetrics::Status::ComeBackLater; return nullptr; @@ -138,6 +140,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St } else if ((*event)->GetSequential()){ // Source succeeded, but returned a barrier event + LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << (*event)->GetEventNumber() << LOG_END; m_pending_barrier_event = event; m_barrier_active = true; return nullptr; diff --git a/src/programs/unit_tests/Components/BarrierEventTests.cc b/src/programs/unit_tests/Components/BarrierEventTests.cc index ae484a48f..ba90ea2d5 100644 --- a/src/programs/unit_tests/Components/BarrierEventTests.cc +++ b/src/programs/unit_tests/Components/BarrierEventTests.cc @@ -5,6 +5,7 @@ #include #include #include +#include "JANA/Utils/JBenchUtils.h" #include "catch.hpp" int global_resource = 0; @@ -12,6 +13,8 @@ int global_resource = 0; struct BarrierSource : public JEventSource { + JBenchUtils bench; + BarrierSource() { SetCallbackStyle(CallbackStyle::ExpertMode); } @@ -31,6 +34,7 @@ struct BarrierSource : public JEventSource { else { LOG_INFO(GetLogger()) << "Emitting non-barrier event " << event_nr << LOG_END; } + bench.consume_cpu_ms(50, 0, true); return Result::Success; } }; @@ -39,6 +43,8 @@ struct BarrierSource : public JEventSource { struct BarrierProcessor : public JEventProcessor { + JBenchUtils bench; + BarrierProcessor() { SetCallbackStyle(CallbackStyle::ExpertMode); } @@ -53,6 +59,7 @@ struct BarrierProcessor : public JEventProcessor { LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END; REQUIRE(global_resource == (event.GetEventNumber() / 10)); } + bench.consume_cpu_ms(100, 0, true); } }; @@ -64,6 +71,8 @@ TEST_CASE("BarrierEventTests") { app.Add(new BarrierSource); app.SetParameterValue("nthreads", 4); app.SetParameterValue("jana:nevents", 40); + app.SetParameterValue("jana:log:show_threadstamp", true); + app.SetParameterValue("jana:loglevel", "debug"); app.Run(true); } };