Skip to content

Commit

Permalink
Merge pull request #371 from JeffersonLab/nbrei_barrier_events
Browse files Browse the repository at this point in the history
Feature: Barrier Events
  • Loading branch information
nathanwbrei authored Oct 11, 2024
2 parents 38c0e9a + 0f5de8a commit 52f140f
Show file tree
Hide file tree
Showing 47 changed files with 878 additions and 632 deletions.
2 changes: 0 additions & 2 deletions docs/howto/other-howtos.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ jana:affinity | int | 0 | Thread pinning strategy. 0
jana:locality | int | 0 | Memory locality strategy. 0: Global. 1: Socket-local. 2: Numa-domain-local. 3. Core-local. 4. Cpu-local
jana:enable_stealing | bool | 0 | Allow threads to pick up work from a different memory location if their local mailbox is empty.
jana:event_queue_threshold | int | 80 | Mailbox buffer size
jana:event_source_chunksize | int | 40 | Reduce mailbox contention by chunking work assignments
jana:event_processor_chunksize | int | 1 | Reduce mailbox contention by chunking work assignments


Creating code skeletons
Expand Down
8 changes: 0 additions & 8 deletions scripts/jana-generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,6 @@ class {name} : public JEventSource {{
public:
{name}();
{name}(std::string resource_name, JApplication* app);
virtual ~{name}() = default;
void Open() override;
Expand Down Expand Up @@ -230,12 +228,6 @@ class {name} : public JEventSource {{
SetCallbackStyle(CallbackStyle::ExpertMode);
}}
{name}::{name}(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) {{
SetTypeName(NAME_OF_THIS); // Provide JANA with class name
SetCallbackStyle(CallbackStyle::ExpertMode);
}}
void {name}::Open() {{
/// Open is called exactly once when processing begins.
Expand Down
188 changes: 188 additions & 0 deletions scripts/jana-plot-utilization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@

import re
import svgwrite
from datetime import datetime, timedelta
from collections import defaultdict


def parse_logfile():

# Parse the logs and store intervals
thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name, event_nr, result)
barrier_history = [] # [(released_timestamp, finished_timestamp)]

start_times = {} # Key: (thread_id, processor_name), Value: start_time

# Define a regular expression to parse the log lines
source_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+)$")
source_finish_noemit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+)$")
source_finish_emit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), emitting event# (\d+)$")
source_finish_pending_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), holding back barrier event# (\d+)$")
processor_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+) for event# (\d+)$")
processor_finish_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) for event# (\d+)$")
barrier_inflight_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event is in-flight$")
barrier_finished_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event finished, returning to normal operation$")

with open("log.txt", "r") as log_file:
for line in log_file:

match = re.match(source_start_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_times[(thread_id, processor_name)] = millis
continue

match = re.match(source_finish_noemit_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, None, result))
continue

match = re.match(source_finish_emit_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result))
continue

match = re.match(source_finish_pending_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result))
continue

match = re.match(processor_start_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_times[(thread_id, processor_name)] = millis
continue

match = re.match(processor_finish_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result))
continue

match = re.match(barrier_inflight_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_times[()] = millis
continue

match = re.match(barrier_finished_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((), None)
if start_time:
barrier_history.append((start_time, millis))
continue

return (thread_history, barrier_history)



def create_svg(all_thread_history, barrier_history):
# Assign colors to processors
processor_colors = {}
color_palette = ['#004E64', '#00A5CF', '#9FFFCB', '#25A18E', '#7AE582', '#FF69B4']
color_index = 0

# Figure out drawing coordinate system
overall_start_time = min(start for history in all_thread_history.values() for (start,_,_,_,_) in history)
overall_end_time = max(end for history in all_thread_history.values() for (_,end,_,_,_) in history)
thread_count = len(all_thread_history)
width=1000
x_scale = width/(overall_end_time-overall_start_time)
thread_height=40
thread_y_padding=20
height=thread_count * thread_height + (thread_count+1) * thread_y_padding


# Create the SVG drawing
dwg = svgwrite.Drawing("timeline.svg", profile='tiny', size=(width, height))
#dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white"))

# Draw a rectangle for each processor run on each thread's timeline
y_position = thread_y_padding

for barrier_start,barrier_end in barrier_history:
rect_start = (barrier_start-overall_start_time)*x_scale
if (barrier_end == barrier_start):
rect_width=1
else:
rect_width = (barrier_end-barrier_start)*x_scale

rect = dwg.rect(insert=(rect_start, 0),
size=(rect_width, height),
fill="red",
stroke="none",
stroke_width=1)
dwg.add(rect)

for thread_id, intervals in all_thread_history.items():
dwg.add(dwg.rect(insert=(0,y_position),size=(1000,thread_height),stroke="lightgray",fill="lightgray"))
for start_time, end_time, processor_name, event_nr, result in intervals:
# Calculate the position and width of the rectangle
# Assign a unique color to each processor name
if processor_name not in processor_colors:
processor_colors[processor_name] = color_palette[color_index % len(color_palette)]
color_index += 1

# Draw the rectangle
rect_start = (start_time-overall_start_time)*x_scale
if (end_time == start_time):
rect_width=1
else:
rect_width = (end_time-start_time)*x_scale

rect_stroke_color = "black"
if (result == "ComeBackLater" and event_nr is None):
rect_stroke_color = "gray"


rect = dwg.rect(insert=(rect_start, y_position),
size=(rect_width, thread_height),
fill=processor_colors[processor_name],
stroke=rect_stroke_color,
stroke_width=1)
mouseover = "Arrow: " + processor_name + "\nEvent nr: " + str(event_nr) + "\nResult: " + result + "\nTime: "+ str(end_time-start_time) + "ms"
rect.add(svgwrite.base.Title(mouseover))
dwg.add(rect)
if (event_nr is not None):
text = dwg.text(str(event_nr), insert=(rect_start+1, y_position+thread_height-1), fill="white", font_size=8)
dwg.add(text)

# Move the y position for the next thread
y_position += (thread_y_padding + thread_height)


# Save the SVG file
dwg.save()



if __name__ == "__main__":
thread_history,barrier_history = parse_logfile()
#thread_history = {
# 1103:[(0,1,"a"), (2,5,"b"), (6,8,"a")],
# 219:[(0,3,"b"), (3,6,"c"), (9,10,"d")],
# 3:[(2,7,"a")]
#}
create_svg(thread_history, barrier_history)


5 changes: 0 additions & 5 deletions src/examples/DstExample/DstExampleSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ DstExampleSource::DstExampleSource() : JEventSource() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}

DstExampleSource::DstExampleSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) {
SetTypeName(NAME_OF_THIS); // Provide JANA with class name
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void DstExampleSource::Open() {

/// Open is called exactly once when processing begins.
Expand Down
2 changes: 0 additions & 2 deletions src/examples/DstExample/DstExampleSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ class DstExampleSource : public JEventSource {
public:
DstExampleSource();

DstExampleSource(std::string resource_name, JApplication* app);

virtual ~DstExampleSource() = default;

void Open() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BlockingGroupedEventSource : public JEventSource {

public:

BlockingGroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) {
BlockingGroupedEventSource() {
// TODO: Get EventGroupManager from ServiceLocator instead
SetCallbackStyle(CallbackStyle::ExpertMode);
m_pending_group_id = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/examples/EventGroupExample/EventGroupExamplePlugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void InitPlugin(JApplication *app) {

app->Add(new GroupedEventProcessor());

auto evt_src = new BlockingGroupedEventSource("blocking_source", app);
auto evt_src = new BlockingGroupedEventSource();

app->Add(evt_src);

Expand Down
2 changes: 1 addition & 1 deletion src/examples/EventGroupExample/GroupedEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class GroupedEventSource : public JEventSource {
int m_current_event_number;

public:
GroupedEventSource(std::string res_name, JApplication* app) : JEventSource(std::move(res_name), app) {
GroupedEventSource() {
// TODO: Get EventGroupManager from ServiceLocator instead
SetCallbackStyle(CallbackStyle::ExpertMode);
m_remaining_events_in_group = 5;
Expand Down
12 changes: 7 additions & 5 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ int main() {
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
auto source_arrow = new JEventSourceArrow("simpleSource", {source});
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventProcessorArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

builder.arrows.push_back(source_arrow);
Expand Down
5 changes: 0 additions & 5 deletions src/examples/Tutorial/RandomSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ RandomSource::RandomSource() : JEventSource() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}

RandomSource::RandomSource(std::string resource_name, JApplication* app) : JEventSource(resource_name, app) {
SetTypeName(NAME_OF_THIS); // Provide JANA with class name
SetCallbackStyle(CallbackStyle::ExpertMode);
}

void RandomSource::Open() {

/// Open is called exactly once when processing begins.
Expand Down
2 changes: 0 additions & 2 deletions src/examples/Tutorial/RandomSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class RandomSource : public JEventSource {
public:
RandomSource();

RandomSource(std::string resource_name, JApplication* app);

virtual ~RandomSource() = default;

void Open() override;
Expand Down
2 changes: 1 addition & 1 deletion src/examples/Tutorial/Tutorial.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void InitPlugin(JApplication* app) {
app->Add(new JFactoryGeneratorT<SimpleClusterFactory>);

// Always use RandomSource
app->Add(new RandomSource("random", app));
app->Add(new RandomSource());

// Only use RandomSource when 'random' specified on cmd line
// app->Add(new JEventSourceGeneratorT<RandomSource>);
Expand Down
1 change: 1 addition & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
set(JANA2_SOURCES

JApplication.cc
JEventSource.cc
JFactory.cc
JFactorySet.cc
JMultifactory.cc
Expand Down
27 changes: 25 additions & 2 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,32 @@ void JArrowProcessingController::print_report() {

void JArrowProcessingController::print_final_report() {
auto metrics = measure_performance();


LOG_INFO(GetLogger()) << "Detailed report" << *metrics << LOG_END;
LOG_INFO(GetLogger()) << "Detailed report:" << LOG_END;
LOG_INFO(GetLogger()) << LOG_END;
LOG_INFO(GetLogger()) << " Thread team size [count]: " << metrics->thread_count << LOG_END;
LOG_INFO(GetLogger()) << " Total uptime [s]: " << std::setprecision(4) << metrics->total_uptime_s << LOG_END;
LOG_INFO(GetLogger()) << " Completed events [count]: " << metrics->total_events_completed << LOG_END;
LOG_INFO(GetLogger()) << " Avg throughput [Hz]: " << std::setprecision(3) << metrics->avg_throughput_hz << LOG_END;
LOG_INFO(GetLogger()) << " Sequential bottleneck [Hz]: " << std::setprecision(3) << metrics->avg_seq_bottleneck_hz << LOG_END;
LOG_INFO(GetLogger()) << " Parallel bottleneck [Hz]: " << std::setprecision(3) << metrics->avg_par_bottleneck_hz << LOG_END;
LOG_INFO(GetLogger()) << " Efficiency [0..1]: " << std::setprecision(3) << metrics->avg_efficiency_frac << LOG_END;

if (!metrics->arrows.empty()) {
LOG_INFO(GetLogger()) << LOG_END;
LOG_INFO(GetLogger()) << " Arrow-level metrics:" << LOG_END;
LOG_INFO(GetLogger()) << LOG_END;
}
for (auto& as : metrics->arrows) {
LOG_INFO(GetLogger()) << " - Arrow name: " << as.arrow_name << LOG_END;
LOG_INFO(GetLogger()) << " Events completed: " << as.total_messages_completed << LOG_END;
LOG_INFO(GetLogger()) << " Avg processing latency [ms/event]: " << as.avg_latency_ms << LOG_END;
LOG_INFO(GetLogger()) << " Avg queue latency [ms/event]: " << as.avg_queue_latency_ms << LOG_END;
LOG_INFO(GetLogger()) << " Total queue visits [count]: " << as.queue_visit_count << LOG_END;
LOG_INFO(GetLogger()) << " Queue overhead [0..1]: " << as.avg_queue_overhead_frac << LOG_END;
LOG_INFO(GetLogger()) << LOG_END;
}

LOG_WARN(GetLogger()) << "Final report: " << metrics->total_events_completed << " events processed at "
<< JTypeInfo::to_string_with_si_prefix(metrics->avg_throughput_hz) << "Hz" << LOG_END;
}
Expand Down
13 changes: 7 additions & 6 deletions src/libraries/JANA/Engine/JPerfSummary.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <ostream>
#include <iomanip>



std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {

os << std::endl;
Expand All @@ -22,17 +24,16 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {
os << " Efficiency [0..1]: " << std::setprecision(3) << s.avg_efficiency_frac << std::endl;
os << std::endl;

os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Chunk | Thresh | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;
os << " | Name | Type | Par | Threads | Thresh | Pending | Completed |" << std::endl;
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;

for (auto as : s.arrows) {
os << " | "
<< std::setw(24) << std::left << as.arrow_name << " | "
<< std::setw(6) << std::left << (as.is_source ? "Src" : (as.is_sink ? "Sink" : "")) << " | "
<< std::setw(3) << std::right << (as.is_parallel ? " T " : " F ") << " | "
<< std::setw(7) << as.thread_count << " |"
<< std::setw(6) << as.chunksize << " |";
<< std::setw(7) << as.thread_count << " |";

if (!as.is_source) {

Expand All @@ -46,7 +47,7 @@ std::ostream& operator<<(std::ostream& os, const JPerfSummary& s) {
os << std::setw(12) << as.total_messages_completed << " |"
<< std::endl;
}
os << " +--------------------------+--------+-----+---------+-------+--------+---------+-------------+" << std::endl;
os << " +--------------------------+--------+-----+---------+--------+---------+-------------+" << std::endl;


os << " +--------------------------+-------------+--------------+----------------+--------------+----------------+" << std::endl;
Expand Down
Loading

0 comments on commit 52f140f

Please sign in to comment.