Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish JTopologyBuilder #346

Merged
merged 9 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ set(JANA2_SOURCES
Topology/JEventProcessorArrow.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Topology/JTopologyBuilder.cc

Services/JComponentManager.cc
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ bool JArrowProcessingController::is_timed_out() {
auto metrics = measure_performance();

int timeout_s;
if (metrics->total_uptime_s < m_warmup_timeout_s * m_topology->m_event_pool_size / metrics->thread_count) {
if (metrics->total_uptime_s < (m_warmup_timeout_s * m_topology->m_pool_capacity * 1.0) / metrics->thread_count) {
// We are at the beginning and not all events have necessarily had a chance to warm up
timeout_s = m_warmup_timeout_s;
}
Expand Down
14 changes: 6 additions & 8 deletions src/libraries/JANA/Engine/JPerfSummary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {
os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl;
os << std::endl;

os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Thresh | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl;

for (auto as : s.arrows) {
os << " | "
Expand All @@ -37,17 +37,15 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {

if (!as.is_source) {

os << std::setw(7) << as.threshold << " |"
<< std::setw(8) << as.messages_pending << " |";
os << std::setw(8) << as.messages_pending << " |";
}
else {

os << " - | - |";
os << " - |";
}
os << std::setw(12) << as.total_messages_completed << " |"
<< std::endl;
}
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+---------+-------------+" << std::endl;


os << " +--------------------------+-------------+--------------+----------------+--------------+----------------+" << std::endl;
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Engine/JPerfSummary.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ struct ArrowSummary {
int running_upstreams;
bool has_backpressure;
size_t messages_pending;
size_t threshold;

size_t total_messages_completed;
size_t last_messages_completed;
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/Engine/JScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,6 @@ void JScheduler::summarize_arrows(std::vector<ArrowSummary>& summaries) {
summary.is_source = as.arrow->is_source();
summary.is_sink = as.arrow->is_sink();
summary.messages_pending = as.arrow->get_pending();
summary.threshold = as.arrow->get_threshold();

summary.thread_count = as.thread_count;
summary.running_upstreams = as.active_or_draining_upstream_arrow_count;
Expand Down
15 changes: 8 additions & 7 deletions src/libraries/JANA/JEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class JEventProcessor : public jana::components::JComponent,
// any contention.

if (m_status == Status::Uninitialized) {
DoInitialize();
throw JException("JEventProcessor: Attempted to call DoTap() before Initialize()");
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
Expand Down Expand Up @@ -121,15 +121,16 @@ class JEventProcessor : public jana::components::JComponent,

auto run_number = event->GetRunNumber();

if (m_status == Status::Uninitialized) {
DoInitialize();
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
}
{
// Protect the call to BeginRun(), etc, to prevent some threads from running Process() before BeginRun().
std::lock_guard<std::mutex> lock(m_mutex);

if (m_status == Status::Uninitialized) {
throw JException("JEventProcessor: Attempted to call DoLegacyProcess() before Initialize()");
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoLegacyProcess() after Finalize()");
}
if (m_last_run_number != run_number) {
if (m_last_run_number != -1) {
CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
Expand Down
4 changes: 3 additions & 1 deletion src/libraries/JANA/JEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class JEventSource : public jana::components::JComponent,
uint64_t m_nevents = 0;
bool m_enable_finish_event = false;
bool m_enable_get_objects = false;
bool m_enable_preprocess = false;


public:
Expand Down Expand Up @@ -145,11 +146,11 @@ class JEventSource : public jana::components::JComponent,

bool IsGetObjectsEnabled() const { return m_enable_get_objects; }
bool IsFinishEventEnabled() const { return m_enable_finish_event; }
bool IsPreprocessEnabled() const { return m_enable_preprocess; }

uint64_t GetNSkip() { return m_nskip; }
uint64_t GetNEvents() { return m_nevents; }

// TODO: Deprecate me
virtual std::string GetVDescription() const {
return "<description unavailable>";
} ///< Optional for getting description via source rather than JEventSourceGenerator
Expand All @@ -166,6 +167,7 @@ class JEventSource : public jana::components::JComponent,
/// which will hurt performance. Conceptually, FinishEvent isn't great, and so should be avoided when possible.
void EnableFinishEvent(bool enable=true) { m_enable_finish_event = enable; }
void EnableGetObjects(bool enable=true) { m_enable_get_objects = enable; }
void EnablePreprocess(bool enable=true) { m_enable_preprocess = enable; }

void SetNEvents(uint64_t nevents) { m_nevents = nevents; };
void SetNSkip(uint64_t nskip) { m_nskip = nskip; };
Expand Down
70 changes: 0 additions & 70 deletions src/libraries/JANA/Topology/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once
#include <iostream>
#include <atomic>
#include <cassert>
#include <vector>

Expand All @@ -29,8 +27,6 @@ class JArrow {
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

mutable std::mutex m_arrow_mutex; // Protects access to arrow properties

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows

Expand Down Expand Up @@ -77,11 +73,6 @@ class JArrow {
// TODO: Make no longer virtual
virtual size_t get_pending();

// TODO: Get rid of me
virtual size_t get_threshold();

virtual void set_threshold(size_t /* threshold */);

void attach(JArrow* downstream) {
m_listeners.push_back(downstream);
};
Expand Down Expand Up @@ -113,18 +104,11 @@ struct PlaceRefBase {
size_t max_item_count = 1;

virtual size_t get_pending() { return 0; }
virtual size_t get_threshold() { return 0; }
virtual void set_threshold(size_t) {}
};

template <typename T>
struct PlaceRef : public PlaceRefBase {

PlaceRef(JArrow* parent) {
assert(parent != nullptr);
parent->attach(this);
}

PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
parent->attach(this);
Expand All @@ -133,28 +117,6 @@ struct PlaceRef : public PlaceRefBase {
this->max_item_count = max_item_count;
}

PlaceRef(JArrow* parent, JMailbox<T*>* queue, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
assert(queue != nullptr);
parent->attach(this);
this->place_ref = queue;
this->is_queue = true;
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

PlaceRef(JArrow* parent, JPool<T>* pool, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
assert(pool != nullptr);
parent->attach(this);
this->place_ref = pool;
this->is_queue = false;
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

void set_queue(JMailbox<T*>* queue) {
assert(queue != nullptr);
this->place_ref = queue;
Expand All @@ -176,23 +138,6 @@ struct PlaceRef : public PlaceRefBase {
return 0;
}

size_t get_threshold() override {
assert(place_ref != nullptr);
if (is_input && is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
return queue->get_threshold();
}
return -1;
}

void set_threshold(size_t threshold) override {
assert(place_ref != nullptr);
if (is_input && is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
queue->set_threshold(threshold);
}
}

bool pull(Data<T>& data) {
assert(place_ref != nullptr);
if (is_input) { // Actually pull the data
Expand Down Expand Up @@ -267,19 +212,4 @@ inline size_t JArrow::get_pending() {
return sum;
}

inline size_t JArrow::get_threshold() {
size_t result = -1;
for (PlaceRefBase* place : m_places) {
result = std::min(result, place->get_threshold());
}
return result;

}

inline void JArrow::set_threshold(size_t threshold) {
for (PlaceRefBase* place : m_places) {
place->set_threshold(threshold);
}
}


1 change: 1 addition & 0 deletions src/libraries/JANA/Topology/JArrowMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once
#include <mutex>
#include <chrono>
#include <string>

class JArrowMetrics {

Expand Down
20 changes: 18 additions & 2 deletions src/libraries/JANA/Topology/JEventMapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void JEventMapArrow::add_processor(JEventProcessor* processor) {

void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventSource* source : m_sources) {
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope
source->Preprocess(**event);
Expand All @@ -45,16 +45,32 @@ void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status&
processor->DoMap(*event);
}
}
LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
}

void JEventMapArrow::initialize() {
LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_procs) {
if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) {
LOG_INFO(m_logger) << "Initializing JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
processor->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
}
}
LOG_DEBUG(m_logger) << "Initialized arrow '" << get_name() << "'" << LOG_END;
}

void JEventMapArrow::finalize() {
LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_procs) {
if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) {
LOG_DEBUG(m_logger) << "Finalizing JEventProcessor " << processor->GetTypeName() << LOG_END;
processor->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventProcessor " << processor->GetTypeName() << LOG_END;
}
}
LOG_DEBUG(m_logger) << "Finalized arrow " << get_name() << LOG_END;
}

2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St
}
else if (source_status == JEventSource::Result::FailureTryAgain){
// This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureTryAgain"<< LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
success = false;
arrow_status = JArrowMetrics::Status::ComeBackLater;
return event;
Expand Down
4 changes: 2 additions & 2 deletions src/libraries/JANA/Topology/JEventTapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ void JEventTapArrow::add_processor(JEventProcessor* proc) {

void JEventTapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventProcessor* proc : m_procs) {
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), proc->GetTypeName()); // times execution until this goes out of scope
if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
proc->DoTap(*event);
}
}
LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
}
Expand Down
50 changes: 4 additions & 46 deletions src/libraries/JANA/Topology/JFoldArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,58 +24,16 @@ class JFoldArrow : public JArrow {
public:
JFoldArrow(
std::string name,
//JEventFolder* folder,
JEventLevel parent_level,
JEventLevel child_level,
JMailbox<EventT*>* child_in,
JEventPool* child_out,
JMailbox<EventT*>* parent_out)
JEventLevel child_level)

: JArrow(std::move(name), false, false, false),
// m_folder(folder),
m_parent_level(parent_level),
m_child_level(child_level),
m_child_in(this, child_in, true, 1, 1),
m_child_out(this, child_out, false, 1, 1),
m_parent_out(this, parent_out, false, 1, 1)
{
}

JFoldArrow(
std::string name,
//JEventFolder* folder,
JEventLevel parent_level,
JEventLevel child_level,
JMailbox<EventT*>* child_in,
JMailbox<EventT*>* child_out,
JMailbox<EventT*>* parent_out)

: JArrow(std::move(name), false, false, false),
// m_folder(folder),
m_parent_level(parent_level),
m_child_level(child_level),
m_child_in(this, child_in, true, 1, 1),
m_child_out(this, child_out, false, 1, 1),
m_parent_out(this, parent_out, false, 1, 1)
{
}

JFoldArrow(
std::string name,
//JEventFolder* folder,
JEventLevel parent_level,
JEventLevel child_level,
JMailbox<EventT*>* child_in,
JEventPool* child_out,
JEventPool* parent_out)

: JArrow(std::move(name), false, false, false),
// m_folder(folder),
m_parent_level(parent_level),
m_child_level(child_level),
m_child_in(this, child_in, true, 1, 1),
m_child_out(this, child_out, false, 1, 1),
m_parent_out(this, parent_out, false, 1, 1)
m_child_in(this, true, 1, 1),
m_child_out(this, false, 1, 1),
m_parent_out(this, false, 1, 1)
{
}

Expand Down
Loading
Loading