From 7d8c7709f7d0e2f2fb508796634600c69b9a8fc2 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Tue, 5 Mar 2024 15:22:55 +1300 Subject: [PATCH] Introduce `Scheduler#load` and `Async::Idler` for load shedding and saturation. (#309) Introduce Scheduler#load which is a 1-second load average. This load average can be used to detect overload conditions in the event loop. In addition, introduce Async::Idler which will schedule tasks up to a given maximum_load. --- async.gemspec | 2 +- examples/load/test.rb | 27 +++++++++++++++++++++++++++ lib/async/idler.rb | 39 +++++++++++++++++++++++++++++++++++++++ lib/async/scheduler.rb | 35 +++++++++++++++++++++++++++++++++++ test/async/idler.rb | 35 +++++++++++++++++++++++++++++++++++ 5 files changed, 137 insertions(+), 1 deletion(-) create mode 100755 examples/load/test.rb create mode 100644 lib/async/idler.rb create mode 100644 test/async/idler.rb diff --git a/async.gemspec b/async.gemspec index ca7f351..28a7e3f 100644 --- a/async.gemspec +++ b/async.gemspec @@ -26,6 +26,6 @@ Gem::Specification.new do |spec| spec.add_dependency "console", "~> 1.10" spec.add_dependency "fiber-annotation" - spec.add_dependency "io-event", "~> 1.1" + spec.add_dependency "io-event", "~> 1.5", ">= 1.5.1" spec.add_dependency "timers", "~> 4.1" end diff --git a/examples/load/test.rb b/examples/load/test.rb new file mode 100755 index 0000000..ef42c55 --- /dev/null +++ b/examples/load/test.rb @@ -0,0 +1,27 @@ +#!/usr/bin/env ruby + +require_relative '../../lib/async' +require_relative '../../lib/async/idler' + +Async do + idler = Async::Idler.new(0.8) + + Async do + while true + idler.async do + $stdout.write '.' + while true + sleep 0.1 + end + end + end + end + + scheduler = Fiber.scheduler + while true + load = scheduler.load + + $stdout.write "\nLoad: #{load} " + sleep 1.0 + end +end diff --git a/lib/async/idler.rb b/lib/async/idler.rb new file mode 100644 index 0000000..9226d62 --- /dev/null +++ b/lib/async/idler.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +module Async + class Idler + def initialize(maximum_load = 0.8, backoff: 0.01, parent: nil) + @maximum_load = maximum_load + @backoff = backoff + @parent = parent + end + + def async(*arguments, parent: (@parent or Task.current), **options, &block) + wait + + # It is crucial that we optimistically execute the child task, so that we prevent a tight loop invoking this method from consuming all available resources. + parent.async(*arguments, **options, &block) + end + + def wait + scheduler = Fiber.scheduler + backoff = nil + + while true + load = scheduler.load + break if load < @maximum_load + + if backoff + sleep(backoff) + backoff *= 2.0 + else + scheduler.yield + backoff = @backoff + end + end + end + end +end diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 63e1245..3b2703f 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -37,9 +37,33 @@ def initialize(parent = nil, selector: nil) @blocked = 0 + @busy_time = 0.0 + @idle_time = 0.0 + @timers = ::Timers::Group.new end + # Compute the scheduler load according to the busy and idle times that are updated by the run loop. + # @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded. + def load + total_time = @busy_time + @idle_time + + # If the total time is zero, then the load is zero: + return 0.0 if total_time.zero? + + # We normalize to a 1 second window: + if total_time > 1.0 + ratio = 1.0 / total_time + @busy_time *= ratio + @idle_time *= ratio + + # We don't need to divide here as we've already normalised it to a 1s window: + return @busy_time + else + return @busy_time / total_time + end + end + def scheduler_close # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: unless $! @@ -267,6 +291,8 @@ def run_once(timeout = nil) # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. # @returns [Boolean] Whether there is more work to do. private def run_once!(timeout = 0) + start_time = Async::Clock.now + interval = @timers.wait_interval # If there is no interval to wait (thus no timers), and no tasks, we could be done: @@ -288,6 +314,15 @@ def run_once(timeout = nil) @timers.fire + # Compute load: + end_time = Async::Clock.now + total_duration = end_time - start_time + idle_duration = @selector.idle_duration + busy_duration = total_duration - idle_duration + + @busy_time += busy_duration + @idle_time += idle_duration + # The reactor still has work to do: return true end diff --git a/test/async/idler.rb b/test/async/idler.rb new file mode 100644 index 0000000..dcc7a33 --- /dev/null +++ b/test/async/idler.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2018-2023, by Samuel Williams. + +require 'async/idler' +require 'sus/fixtures/async' + +require 'chainable_async' + +describe Async::Idler do + include Sus::Fixtures::Async::ReactorContext + let(:idler) {subject.new(0.5)} + + it 'can schedule tasks up to the desired load' do + # Generate the load: + Async do + while true + idler.async do + while true + sleep 0.1 + end + end + end + end + + # This test must be longer than the test window... + sleep 1.1 + + # Verify that the load is within the desired range: + expect(Fiber.scheduler.load).to be_within(0.1).of(0.5) + end + + it_behaves_like ChainableAsync +end