Skip to content

Commit

Permalink
jana-plot-utilization.py shows barrier events
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanwbrei committed Oct 10, 2024
1 parent 8415d4f commit 0f5de8a
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 26 deletions.
139 changes: 113 additions & 26 deletions scripts/jana-plot-utilization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,103 @@
def parse_logfile():

# Parse the logs and store intervals
thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name)
thread_history = defaultdict(list) # Key: thread_id, Value: list of (start_time, end_time, processor_name, event_nr, result)
barrier_history = [] # [(released_timestamp, finished_timestamp)]

start_times = {} # Key: (thread_id, processor_name), Value: start_time

# Define a regular expression to parse the log lines
log_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) (Executing|Executed) arrow (\w+)")
source_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+)$")
source_finish_noemit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+)$")
source_finish_emit_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), emitting event# (\d+)$")
source_finish_pending_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) with result (\w+), holding back barrier event# (\d+)$")
processor_start_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executing arrow (\w+) for event# (\d+)$")
processor_finish_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) Executed arrow (\w+) for event# (\d+)$")
barrier_inflight_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event is in-flight$")
barrier_finished_pattern = re.compile(r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3}) \[debug\] (\d+) JEventSourceArrow: Barrier event finished, returning to normal operation$")

with open("log.txt", "r") as log_file:
for line in log_file:
match = re.search(log_pattern, line.strip())

match = re.match(source_start_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, action, processor_name = match.groups()
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_times[(thread_id, processor_name)] = millis
continue

# Convert timestamp to milliseconds
match = re.match(source_finish_noemit_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, None, result))
continue

match = re.match(source_finish_emit_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result))
continue

match = re.match(source_finish_pending_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, result, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result))
continue

match = re.match(processor_start_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_times[(thread_id, processor_name)] = millis
continue

match = re.match(processor_finish_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id, processor_name, event_nr = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name, event_nr, result))
continue

match = re.match(barrier_inflight_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_times[()] = millis
continue

if action == "Executing":
# Log the start time of the processor for the thread
start_times[(thread_id, processor_name)] = millis
match = re.match(barrier_finished_pattern, line.strip())
if match:
hours_str, mins_str, secs_str, millis_str, thread_id = match.groups()
millis = (((int(hours_str) * 60) + int(mins_str) * 60) + int(secs_str)) * 1000 + int(millis_str)
start_time = start_times.pop((), None)
if start_time:
barrier_history.append((start_time, millis))
continue

elif action == "Executed":
# Calculate the duration of the processor and store the interval
start_time = start_times.pop((thread_id, processor_name), None)
if start_time:
thread_history[thread_id].append((start_time, millis, processor_name))
return (thread_history, barrier_history)

return thread_history


def create_svg(all_thread_history):
def create_svg(all_thread_history, barrier_history):
# Assign colors to processors
processor_colors = {}
color_palette = ['#FF6347', '#4682B4', '#32CD32', '#FFD700', '#9370DB', '#FF69B4']
color_palette = ['#004E64', '#00A5CF', '#9FFFCB', '#25A18E', '#7AE582', '#FF69B4']
color_index = 0

# Figure out drawing coordinate system
overall_start_time = min(start for history in all_thread_history.values() for (start,_,_) in history)
overall_end_time = max(end for history in all_thread_history.values() for (_,end,_) in history)
overall_start_time = min(start for history in all_thread_history.values() for (start,_,_,_,_) in history)
overall_end_time = max(end for history in all_thread_history.values() for (_,end,_,_,_) in history)
thread_count = len(all_thread_history)
width=1000
x_scale = width/(overall_end_time-overall_start_time)
Expand All @@ -55,14 +115,28 @@ def create_svg(all_thread_history):

# Create the SVG drawing
dwg = svgwrite.Drawing("timeline.svg", profile='tiny', size=(width, height))
dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white"))
#dwg.add(dwg.rect(insert=(0,0),size=(width,height),stroke="red",fill="white"))

# Draw a rectangle for each processor run on each thread's timeline
y_position = thread_y_padding

for barrier_start,barrier_end in barrier_history:
rect_start = (barrier_start-overall_start_time)*x_scale
if (barrier_end == barrier_start):
rect_width=1
else:
rect_width = (barrier_end-barrier_start)*x_scale

rect = dwg.rect(insert=(rect_start, 0),
size=(rect_width, height),
fill="red",
stroke="none",
stroke_width=1)
dwg.add(rect)

for thread_id, intervals in all_thread_history.items():
dwg.add(dwg.rect(insert=(0,y_position),size=(1000,thread_height),stroke="lightgray",fill="lightgray"))
for start_time, end_time, processor_name in intervals:
for start_time, end_time, processor_name, event_nr, result in intervals:
# Calculate the position and width of the rectangle
# Assign a unique color to each processor name
if processor_name not in processor_colors:
Expand All @@ -71,31 +145,44 @@ def create_svg(all_thread_history):

# Draw the rectangle
rect_start = (start_time-overall_start_time)*x_scale
rect_width = (end_time-start_time)*x_scale
if (end_time == start_time):
rect_width=1
else:
rect_width = (end_time-start_time)*x_scale

rect_stroke_color = "black"
if (result == "ComeBackLater" and event_nr is None):
rect_stroke_color = "gray"


rect = dwg.rect(insert=(rect_start, y_position),
size=(rect_width, thread_height),
fill=processor_colors[processor_name],
stroke="black",
stroke_width=0)
mouseover = processor_name + ": " + str(end_time-start_time) + "ms"
stroke=rect_stroke_color,
stroke_width=1)
mouseover = "Arrow: " + processor_name + "\nEvent nr: " + str(event_nr) + "\nResult: " + result + "\nTime: "+ str(end_time-start_time) + "ms"
rect.add(svgwrite.base.Title(mouseover))
dwg.add(rect)
if (event_nr is not None):
text = dwg.text(str(event_nr), insert=(rect_start+1, y_position+thread_height-1), fill="white", font_size=8)
dwg.add(text)

# Move the y position for the next thread
y_position += (thread_y_padding + thread_height)


# Save the SVG file
dwg.save()



if __name__ == "__main__":
thread_history = parse_logfile()
thread_history,barrier_history = parse_logfile()
#thread_history = {
# 1103:[(0,1,"a"), (2,5,"b"), (6,8,"a")],
# 219:[(0,3,"b"), (3,6,"c"), (9,10,"d")],
# 3:[(2,7,"a")]
#}
create_svg(thread_history)
create_svg(thread_history, barrier_history)


3 changes: 3 additions & 0 deletions src/libraries/JANA/Topology/JEventSourceArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St
else {
// Topology has _not_ finished draining, all we can do is wait
LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event" << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
arrow_status = JArrowMetrics::Status::ComeBackLater;
success = false;
return nullptr;
Expand All @@ -113,6 +114,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St
else {
// Barrier event has NOT finished
LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END;
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
success = false;
arrow_status = JArrowMetrics::Status::ComeBackLater;
return nullptr;
Expand All @@ -138,6 +140,7 @@ Event* JEventSourceArrow::process(Event* event, bool& success, JArrowMetrics::St
}
else if ((*event)->GetSequential()){
// Source succeeded, but returned a barrier event
LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << (*event)->GetEventNumber() << LOG_END;
m_pending_barrier_event = event;
m_barrier_active = true;
return nullptr;
Expand Down
9 changes: 9 additions & 0 deletions src/programs/unit_tests/Components/BarrierEventTests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
#include <JANA/JApplication.h>
#include <JANA/JEventSource.h>
#include <JANA/JEventProcessor.h>
#include "JANA/Utils/JBenchUtils.h"
#include "catch.hpp"

int global_resource = 0;


struct BarrierSource : public JEventSource {

JBenchUtils bench;

BarrierSource() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}
Expand All @@ -31,6 +34,7 @@ struct BarrierSource : public JEventSource {
else {
LOG_INFO(GetLogger()) << "Emitting non-barrier event " << event_nr << LOG_END;
}
bench.consume_cpu_ms(50, 0, true);
return Result::Success;
}
};
Expand All @@ -39,6 +43,8 @@ struct BarrierSource : public JEventSource {

struct BarrierProcessor : public JEventProcessor {

JBenchUtils bench;

BarrierProcessor() {
SetCallbackStyle(CallbackStyle::ExpertMode);
}
Expand All @@ -53,6 +59,7 @@ struct BarrierProcessor : public JEventProcessor {
LOG_INFO(GetLogger()) << "Processing non-barrier event = " << event.GetEventNumber() << ", reading global var = " << global_resource << LOG_END;
REQUIRE(global_resource == (event.GetEventNumber() / 10));
}
bench.consume_cpu_ms(100, 0, true);
}
};

Expand All @@ -64,6 +71,8 @@ TEST_CASE("BarrierEventTests") {
app.Add(new BarrierSource);
app.SetParameterValue("nthreads", 4);
app.SetParameterValue("jana:nevents", 40);
app.SetParameterValue("jana:log:show_threadstamp", true);
app.SetParameterValue("jana:loglevel", "debug");
app.Run(true);
}
};
Expand Down

0 comments on commit 0f5de8a

Please sign in to comment.