From 9c64c0e3d43f18a75e15743e35c5038e1a473e54 Mon Sep 17 00:00:00 2001 From: Johan Brandhorst-Satzkorn Date: Fri, 6 Sep 2024 22:27:32 +0000 Subject: [PATCH] backport of commit 370edd42ad3dac2ce6752dc2423ed5befdcddb84 --- internal/daemon/controller/controller.go | 35 ++++++++++++------------ internal/daemon/worker/status_test.go | 1 + internal/daemon/worker/worker.go | 31 ++++++++++----------- internal/daemon/worker/worker_test.go | 4 +-- 4 files changed, 35 insertions(+), 36 deletions(-) diff --git a/internal/daemon/controller/controller.go b/internal/daemon/controller/controller.go index 361d5e5cc3..3ac24f483e 100644 --- a/internal/daemon/controller/controller.go +++ b/internal/daemon/controller/controller.go @@ -10,7 +10,6 @@ import ( "strings" "sync" "sync/atomic" - "time" "github.com/hashicorp/boundary/internal/alias" talias "github.com/hashicorp/boundary/internal/alias/target" @@ -89,10 +88,10 @@ type downstreamWorkersTicker interface { } var ( - downstreamReceiverFactory func() downstreamReceiver + downstreamReceiverFactory func(*atomic.Int64) (downstreamReceiver, error) downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error) - downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, time.Duration) (downstreamWorkersTicker, error) + downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, *atomic.Int64) (downstreamWorkersTicker, error) commandClientFactory func(context.Context, *Controller) error extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error) ) @@ -124,7 +123,7 @@ type Controller struct { // because they are casted to time.Duration. workerStatusGracePeriod *atomic.Int64 livenessTimeToStale *atomic.Int64 - getDownstreamWorkersTimeout *atomic.Pointer[time.Duration] + getDownstreamWorkersTimeout *atomic.Int64 apiGrpcServer *grpc.Server apiGrpcServerListener grpcServerListener @@ -190,11 +189,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { downstreamConnManager: cluster.NewDownstreamManager(), workerStatusGracePeriod: new(atomic.Int64), livenessTimeToStale: new(atomic.Int64), - getDownstreamWorkersTimeout: new(atomic.Pointer[time.Duration]), - } - - if downstreamReceiverFactory != nil { - c.downstreamConns = downstreamReceiverFactory() + getDownstreamWorkersTimeout: new(atomic.Int64), } c.started.Store(false) @@ -243,11 +238,17 @@ func New(ctx context.Context, conf *Config) (*Controller, error) { switch conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration { case 0: - to := server.DefaultLiveness - c.getDownstreamWorkersTimeout.Store(&to) + c.getDownstreamWorkersTimeout.Store(int64(server.DefaultLiveness)) default: - to := conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration - c.getDownstreamWorkersTimeout.Store(&to) + c.getDownstreamWorkersTimeout.Store(int64(conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration)) + } + + if downstreamReceiverFactory != nil { + var err error + c.downstreamConns, err = downstreamReceiverFactory(c.getDownstreamWorkersTimeout) + if err != nil { + return nil, fmt.Errorf("%s: unable to initialize downstream receiver: %w", op, err) + } } clusterListeners := make([]*base.ServerListener, 0) @@ -591,7 +592,7 @@ func (c *Controller) Start() error { // we'll use "root" to designate that this is the root of the graph (aka // a controller) boundVer := version.Get().VersionNumber() - dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, *c.getDownstreamWorkersTimeout.Load()) + dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, c.getDownstreamWorkersTimeout) if err != nil { return fmt.Errorf("error creating downstream workers ticker: %w", err) } @@ -709,11 +710,9 @@ func (c *Controller) ReloadTimings(newConfig *config.Config) error { switch newConfig.Controller.GetDownstreamWorkersTimeoutDuration { case 0: - to := server.DefaultLiveness - c.getDownstreamWorkersTimeout.Store(&to) + c.getDownstreamWorkersTimeout.Store(int64(server.DefaultLiveness)) default: - to := newConfig.Controller.GetDownstreamWorkersTimeoutDuration - c.getDownstreamWorkersTimeout.Store(&to) + c.getDownstreamWorkersTimeout.Store(int64(newConfig.Controller.GetDownstreamWorkersTimeoutDuration)) } return nil diff --git a/internal/daemon/worker/status_test.go b/internal/daemon/worker/status_test.go index 8b17dcfd54..34ff14f348 100644 --- a/internal/daemon/worker/status_test.go +++ b/internal/daemon/worker/status_test.go @@ -24,6 +24,7 @@ func TestWorkerWaitForNextSuccessfulStatusUpdate(t *testing.T) { }) err := event.InitSysEventer(testLogger, testLock, "TestWorkerWaitForNextSuccessfulStatusUpdate", event.WithEventerConfig(testConfig)) require.NoError(t, err) + t.Cleanup(func() { event.TestResetSystEventer(t) }) for _, name := range []string{"ok", "timeout"} { t.Run(name, func(t *testing.T) { require := require.New(t) diff --git a/internal/daemon/worker/worker.go b/internal/daemon/worker/worker.go index 902029dad9..febee37447 100644 --- a/internal/daemon/worker/worker.go +++ b/internal/daemon/worker/worker.go @@ -97,7 +97,7 @@ type recorderManager interface { // reverseConnReceiverFactory provides a simple factory which a Worker can use to // create its reverseConnReceiver -var reverseConnReceiverFactory func() reverseConnReceiver +var reverseConnReceiverFactory func(*atomic.Int64) (reverseConnReceiver, error) var recordingStorageFactory func( ctx context.Context, @@ -189,7 +189,7 @@ type Worker struct { // because they are casted to time.Duration. successfulStatusGracePeriod *atomic.Int64 statusCallTimeoutDuration *atomic.Int64 - getDownstreamWorkersTimeoutDuration *atomic.Pointer[time.Duration] + getDownstreamWorkersTimeoutDuration *atomic.Int64 // AuthRotationNextRotation is useful in tests to understand how long to // sleep @@ -232,18 +232,13 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { localStorageState: new(atomic.Value), successfulStatusGracePeriod: new(atomic.Int64), statusCallTimeoutDuration: new(atomic.Int64), - getDownstreamWorkersTimeoutDuration: new(atomic.Pointer[time.Duration]), + getDownstreamWorkersTimeoutDuration: new(atomic.Int64), upstreamConnectionState: new(atomic.Value), downstreamWorkers: new(atomic.Pointer[downstreamersContainer]), } w.operationalState.Store(server.UnknownOperationalState) w.localStorageState.Store(server.UnknownLocalStorageState) - - if reverseConnReceiverFactory != nil { - w.downstreamReceiver = reverseConnReceiverFactory() - } - w.lastStatusSuccess.Store((*LastStatusInformation)(nil)) scheme := strconv.FormatInt(time.Now().UnixNano(), 36) controllerResolver := manual.NewBuilderWithScheme(scheme) @@ -338,15 +333,21 @@ func New(ctx context.Context, conf *Config) (*Worker, error) { } switch conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration { case 0: - to := server.DefaultLiveness - w.getDownstreamWorkersTimeoutDuration.Store(&to) + w.getDownstreamWorkersTimeoutDuration.Store(int64(server.DefaultLiveness)) default: - to := conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration - w.getDownstreamWorkersTimeoutDuration.Store(&to) + w.getDownstreamWorkersTimeoutDuration.Store(int64(conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration)) } // FIXME: This is really ugly, but works. session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load()) + if reverseConnReceiverFactory != nil { + var err error + w.downstreamReceiver, err = reverseConnReceiverFactory(w.getDownstreamWorkersTimeoutDuration) + if err != nil { + return nil, fmt.Errorf("%s: error creating reverse connection receiver: %w", op, err) + } + } + if eventListenerFactory != nil { var err error w.storageEventListener, err = eventListenerFactory(w) @@ -429,11 +430,9 @@ func (w *Worker) Reload(ctx context.Context, newConf *config.Config) { } switch newConf.Worker.GetDownstreamWorkersTimeoutDuration { case 0: - to := server.DefaultLiveness - w.getDownstreamWorkersTimeoutDuration.Store(&to) + w.getDownstreamWorkersTimeoutDuration.Store(int64(server.DefaultLiveness)) default: - to := newConf.Worker.GetDownstreamWorkersTimeoutDuration - w.getDownstreamWorkersTimeoutDuration.Store(&to) + w.getDownstreamWorkersTimeoutDuration.Store(int64(newConf.Worker.GetDownstreamWorkersTimeoutDuration)) } // See comment about this in worker.go session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load()) diff --git a/internal/daemon/worker/worker_test.go b/internal/daemon/worker/worker_test.go index 963730f4cc..82b20dff8e 100644 --- a/internal/daemon/worker/worker_test.go +++ b/internal/daemon/worker/worker_test.go @@ -181,7 +181,7 @@ func TestWorkerNew(t *testing.T) { } if util.IsNil(tt.in.Eventer) { require.NoError(t, event.InitSysEventer(hclog.Default(), &sync.Mutex{}, "worker_test", event.WithEventerConfig(&event.EventerConfig{}))) - defer event.TestResetSystEventer(t) + t.Cleanup(func() { event.TestResetSystEventer(t) }) tt.in.Eventer = event.SysEventer() } @@ -332,7 +332,7 @@ func TestSetupWorkerAuthStorage(t *testing.T) { func Test_Worker_getSessionTls(t *testing.T) { require.NoError(t, event.InitSysEventer(hclog.Default(), &sync.Mutex{}, "worker_test", event.WithEventerConfig(&event.EventerConfig{}))) - defer event.TestResetSystEventer(t) + t.Cleanup(func() { event.TestResetSystEventer(t) }) conf := &Config{ Server: &base.Server{