From 39f7e66764893000428963ef2e91621a10dd1af5 Mon Sep 17 00:00:00 2001 From: Lachezar Tsonov Date: Thu, 22 Aug 2024 10:08:05 +0300 Subject: [PATCH] KUBE-479: Eviction adjustments (#132) * Remove roundTripTime addition to timeout. * Log addition to better identify steps * Cancel child contexts for deletion immediately instead of waiting for function to exit * Remove the PDB path as we decided to keep retrying on PDB errors, add comment about this * Extract the "batch action" in drainHandler as kubernetes_helper * Change helper a bit. Replace usage in drain handler. * Small renames * Add comment about timeout if we still have pods after all our hard work * Adjust delete_node_handler.go * Remove unused error/package * Adjust drain/delete to return an error when some pods failed so we can distinguish this case and continue * Add/adjust tests for drain handler * Make drain handler tests parallel * Small comment adjustment * Check for context cancellation before starting wait for termination * Fix test according to new expectation --- actions/delete_node_handler.go | 6 +- actions/drain_node_handler.go | 283 ++++++++++++++++++----------- actions/drain_node_handler_test.go | 252 +++++++++++++++++++------ actions/kubernetes_helpers.go | 81 +++++++++ 4 files changed, 464 insertions(+), 158 deletions(-) diff --git a/actions/delete_node_handler.go b/actions/delete_node_handler.go index ae8ec3c..a445e43 100644 --- a/actions/delete_node_handler.go +++ b/actions/delete_node_handler.go @@ -136,9 +136,9 @@ func (h *deleteNodeHandler) Handle(ctx context.Context, action *castai.ClusterAc return h.deletePod(ctx, *deleteOptions, pod) } - if err := h.sendPodsRequests(ctx, pods, deletePod); err != nil { - return fmt.Errorf("sending delete pods requests: %w", err) - } + deletedPods, failedPods := executeBatchPodActions(ctx, log, pods, deletePod, "delete-pod") + log.Infof("successfully deleted %d pods, failed to delete %d pods", len(deletedPods), len(failedPods)) + if err := h.deleteNodeVolumeAttachments(ctx, req.NodeName); err != nil { log.Warnf("deleting volume attachments: %v", err) } diff --git a/actions/drain_node_handler.go b/actions/drain_node_handler.go index b862934..e3e9337 100644 --- a/actions/drain_node_handler.go +++ b/actions/drain_node_handler.go @@ -6,7 +6,6 @@ import ( "fmt" "reflect" "strings" - "sync" "time" "github.com/samber/lo" @@ -26,8 +25,7 @@ import ( ) const ( - minDrainTimeout = 0 // Minimal pod drain timeout - roundTripTime = 10 * time.Second // 2xPollInterval for action + minDrainTimeout = 0 // Minimal pod drain timeout ) type drainNodeConfig struct { @@ -60,12 +58,11 @@ func newDrainNodeHandler(log logrus.FieldLogger, clientset kubernetes.Interface, // the result is clamped between 0s and the requested timeout. func (h *drainNodeHandler) getDrainTimeout(action *castai.ClusterAction) time.Duration { timeSinceCreated := time.Since(action.CreatedAt) - drainTimeout := time.Duration(action.ActionDrainNode.DrainTimeoutSeconds) * time.Second + requestedTimeout := time.Duration(action.ActionDrainNode.DrainTimeoutSeconds) * time.Second - // Remove 2 poll interval required for polling the action. - timeSinceCreated = timeSinceCreated - roundTripTime + drainTimeout := requestedTimeout - timeSinceCreated - return lo.Clamp(drainTimeout-timeSinceCreated, minDrainTimeout*time.Second, time.Duration(action.ActionDrainNode.DrainTimeoutSeconds)*time.Second) + return lo.Clamp(drainTimeout, minDrainTimeout*time.Second, requestedTimeout) } type drainNodeHandler struct { @@ -97,58 +94,78 @@ func (h *drainNodeHandler) Handle(ctx context.Context, action *castai.ClusterAct return err } - log.Infof("draining node, drain_timeout_seconds=%f, force=%v created_at=%s", drainTimeout.Seconds(), req.Force, action.CreatedAt) + log.Info("cordoning node for draining") - if err := h.taintNode(ctx, node); err != nil { - return fmt.Errorf("tainting node %q: %w", req.NodeName, err) + if err := h.cordonNode(ctx, node); err != nil { + return fmt.Errorf("cordoning node %q: %w", req.NodeName, err) } - // First try to evict pods gracefully. + log.Infof("draining node, drain_timeout_seconds=%f, force=%v created_at=%s", drainTimeout.Seconds(), req.Force, action.CreatedAt) + + // First try to evict pods gracefully using eviction API. evictCtx, evictCancel := context.WithTimeout(ctx, drainTimeout) defer evictCancel() + err = h.evictNodePods(evictCtx, log, node) - if err != nil && !errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("eviciting node pods: %w", err) + + if err == nil { + log.Info("node fully drained via graceful eviction") + return nil } - // If we reached timeout (maybe a pod was stuck in eviction or we hit another hiccup) and we are forced to drain, then delete pods - // This skips eviction API and even if pods have PDBs (for example), the pods are still deleted. - // If the node is going away (like spot interruption), there is no point in trying to keep the pods alive. - if errors.Is(err, context.DeadlineExceeded) { - if !req.Force { - return err - } + if !req.Force { + return fmt.Errorf("node failed to drain via graceful eviction, force=%v, timeout=%f, will not force delete pods: %w", req.Force, drainTimeout.Seconds(), err) + } - h.log.Infof("timeout=%f exceeded during pod eviction, force=%v, starting pod deletion", drainTimeout.Seconds(), req.Force) - // Try deleting pods gracefully first, then delete with 0 grace period. - options := []metav1.DeleteOptions{ - {}, - *metav1.NewDeleteOptions(0), - } + var podsFailedEvictionErr *podFailedActionError + switch { + case errors.Is(err, context.DeadlineExceeded): + log.Infof("timeout=%f exceeded during pod eviction, force=%v, starting pod deletion", drainTimeout.Seconds(), req.Force) + case errors.As(err, &podsFailedEvictionErr): + log.Infof("some pods failed eviction, force=%v, starting pod deletion: %v", req.Force, err) + default: + // Expected to be errors where we can't continue at all; e.g. missing permissions or lack of connectivity + return fmt.Errorf("evicting node pods: %w", err) + } - var err error - for _, o := range options { - deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout) - defer deleteCancel() + // If voluntary eviction fails, and we are told to force drain, start deleting pods. + // Try deleting pods gracefully first, then delete with 0 grace period. PDBs are not respected here. + options := []metav1.DeleteOptions{ + {}, + *metav1.NewDeleteOptions(0), + } - err = h.deleteNodePods(deleteCtx, log, node, o) - if err == nil { - break - } - if !errors.Is(err, context.DeadlineExceeded) { - return fmt.Errorf("forcefully deleting pods: %w", err) - } + var deleteErr error + for _, o := range options { + deleteCtx, deleteCancel := context.WithTimeout(ctx, h.cfg.podsDeleteTimeout) + + deleteErr = h.deleteNodePods(deleteCtx, log, node, o) + + // Clean-up the child context if we got here; no reason to wait for the function to exit + deleteCancel() + + if deleteErr == nil { + break } - return err + var podsFailedDeletionErr *podFailedActionError + if errors.Is(deleteErr, context.DeadlineExceeded) || errors.As(deleteErr, &podsFailedDeletionErr) { + continue + } + return fmt.Errorf("forcefully deleting pods: %w", deleteErr) } - log.Info("node drained") + // Note: if some pods remained even after forced deletion, we'd get an error from last call here. + if deleteErr == nil { + log.Info("node drained forcefully") + } else { + log.Warnf("node failed to fully force drain: %v", deleteErr) + } - return nil + return deleteErr } -func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error { +func (h *drainNodeHandler) cordonNode(ctx context.Context, node *v1.Node) error { if node.Spec.Unschedulable { return nil } @@ -162,16 +179,25 @@ func (h *drainNodeHandler) taintNode(ctx context.Context, node *v1.Node) error { return nil } +// Return error if at least one pod failed (but don't wait for it!) => to signal if we should do force delete + +// evictNodePods attempts voluntarily eviction for all pods on node. +// This method will wait until all evictable pods on the node either terminate or fail deletion. +// A timeout should be used to avoid infinite waits. +// Errors in calling EVICT for individual pods are accumulated. If at least one pod failed this but termination was successful, an instance of podFailedActionError is returned. +// The method will still wait for termination of other evicted pods first. +// A return value of nil means all pods on the node should be evicted and terminated. func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error { pods, err := h.listNodePodsToEvict(ctx, log, node) if err != nil { return err } - log.Infof("evicting %d pods", len(pods)) if len(pods) == 0 { + log.Infof("no pods to evict") return nil } + log.Infof("evicting %d pods", len(pods)) groupVersion, err := drain.CheckEvictionSupport(h.clientset) if err != nil { return err @@ -180,83 +206,93 @@ func (h *drainNodeHandler) evictNodePods(ctx context.Context, log logrus.FieldLo return h.evictPod(ctx, pod, groupVersion) } - if err := h.sendPodsRequests(ctx, pods, evictPod); err != nil { - return fmt.Errorf("sending evict pods requests: %w", err) + _, podsWithFailedEviction := executeBatchPodActions(ctx, log, pods, evictPod, "evict-pod") + var podsToIgnoreForTermination []*v1.Pod + var failedPodsError *podFailedActionError + if len(podsWithFailedEviction) > 0 { + podErrors := lo.Map(podsWithFailedEviction, func(failure podActionFailure, _ int) error { + return fmt.Errorf("pod %s/%s failed eviction: %w", failure.pod.Namespace, failure.pod.Name, failure.err) + }) + failedPodsError = &podFailedActionError{ + Action: "evict", + Errors: podErrors, + } + log.Warnf("some pods failed eviction, will ignore for termination wait: %v", failedPodsError) + podsToIgnoreForTermination = lo.Map(podsWithFailedEviction, func(failure podActionFailure, _ int) *v1.Pod { + return failure.pod + }) } - return h.waitNodePodsTerminated(ctx, log, node) + err = h.waitNodePodsTerminated(ctx, log, node, podsToIgnoreForTermination) + if err != nil { + return err + } + if failedPodsError != nil { + return failedPodsError + } + return nil } +// deleteNodePods deletes the pods running on node. Use options to control if eviction is graceful or forced. +// This method will wait until all evictable pods on the node either terminate or fail deletion. +// A timeout should be used to avoid infinite waits. +// Errors in calling DELETE for individual pods are accumulated. If at least one pod failed this but termination was successful, an instance of podFailedActionError is returned. +// The method will still wait for termination of other deleted pods first. +// A return value of nil means all pods on the node should be deleted and terminated. func (h *drainNodeHandler) deleteNodePods(ctx context.Context, log logrus.FieldLogger, node *v1.Node, options metav1.DeleteOptions) error { pods, err := h.listNodePodsToEvict(ctx, log, node) if err != nil { return err } + if len(pods) == 0 { + log.Infof("no pods to delete") + return nil + } + if options.GracePeriodSeconds != nil { log.Infof("forcefully deleting %d pods with gracePeriod %d", len(pods), *options.GracePeriodSeconds) } else { log.Infof("forcefully deleting %d pods", len(pods)) } - if len(pods) == 0 { - return nil - } - deletePod := func(ctx context.Context, pod v1.Pod) error { return h.deletePod(ctx, options, pod) } - if err := h.sendPodsRequests(ctx, pods, deletePod); err != nil { - return fmt.Errorf("sending delete pods requests: %w", err) - } - - return h.waitNodePodsTerminated(ctx, log, node) -} - -func (h *drainNodeHandler) sendPodsRequests(ctx context.Context, pods []v1.Pod, f func(context.Context, v1.Pod) error) error { - if len(pods) == 0 { - return nil - } - - var ( - parallelTasks = int(lo.Clamp(float64(len(pods)), 30, 100)) - taskChan = make(chan v1.Pod, len(pods)) - taskErrs = make([]error, 0) - taskErrsMx sync.Mutex - wg sync.WaitGroup - ) - - h.log.Debugf("Starting %d parallel tasks for %d pods: [%v]", parallelTasks, len(pods), lo.Map(pods, func(t v1.Pod, i int) string { - return fmt.Sprintf("%s/%s", t.Namespace, t.Name) - })) - - worker := func(taskChan <-chan v1.Pod) { - for pod := range taskChan { - if err := f(ctx, pod); err != nil { - taskErrsMx.Lock() - taskErrs = append(taskErrs, fmt.Errorf("pod %s/%s failed operation: %w", pod.Namespace, pod.Name, err)) - taskErrsMx.Unlock() - } + _, podsWithFailedDeletion := executeBatchPodActions(ctx, log, pods, deletePod, "delete-pod") + var podsToIgnoreForTermination []*v1.Pod + var failedPodsError *podFailedActionError + if len(podsWithFailedDeletion) > 0 { + podErrors := lo.Map(podsWithFailedDeletion, func(failure podActionFailure, _ int) error { + return fmt.Errorf("pod %s/%s failed deletion: %w", failure.pod.Namespace, failure.pod.Name, failure.err) + }) + failedPodsError = &podFailedActionError{ + Action: "delete", + Errors: podErrors, } - wg.Done() + log.Warnf("some pods failed deletion, will ignore for termination wait: %v", failedPodsError) + podsToIgnoreForTermination = lo.Map(podsWithFailedDeletion, func(failure podActionFailure, _ int) *v1.Pod { + return failure.pod + }) } - for range parallelTasks { - wg.Add(1) - go worker(taskChan) + err = h.waitNodePodsTerminated(ctx, log, node, podsToIgnoreForTermination) + if err != nil { + return err } - - for _, pod := range pods { - taskChan <- pod + if failedPodsError != nil { + return failedPodsError } - - close(taskChan) - wg.Wait() - - return errors.Join(taskErrs...) + return nil } +// listNodePodsToEvict creates a list of pods that are "evictable" on the node. +// The following pods are ignored: +// - static pods +// - DaemonSet pods +// - pods that are already finished (Succeeded or Failed) +// - pods that were marked for deletion recently (Terminating state); the meaning of "recently" is controlled by config func (h *drainNodeHandler) listNodePodsToEvict(ctx context.Context, log logrus.FieldLogger, node *v1.Node) ([]v1.Pod, error) { var pods *v1.PodList err := waitext.Retry( @@ -292,7 +328,7 @@ func (h *drainNodeHandler) listNodePodsToEvict(ctx context.Context, log logrus.F } // Skip completed pods. Will be removed during node removal. - if p.Status.Phase == v1.PodSucceeded { + if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed { continue } @@ -311,7 +347,26 @@ func (h *drainNodeHandler) listNodePodsToEvict(ctx context.Context, log logrus.F return podsToEvict, nil } -func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, log logrus.FieldLogger, node *v1.Node) error { +// waitNodePodsTerminated waits until the pods on the node terminate. +// The wait only considers evictable pods (see listNodePodsToEvict). +// If podsToIgnore is not empty, the list is further filtered by it. +// This is useful when you don't expect some pods on the node to terminate (e.g. because eviction failed for them) so there is no reason to wait until timeout. +// The wait can potentially run forever if pods are scheduled on the node and are not evicted/deleted by anything. Use a timeout to avoid infinite wait. +func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, log logrus.FieldLogger, node *v1.Node, podsToIgnore []*v1.Pod) error { + // Check if context is cancelled before starting any work. + select { + case <-ctx.Done(): + return ctx.Err() + default: + // Continue with the work + } + + podsToIgnoreLookup := make(map[string]struct{}) + for _, pod := range podsToIgnore { + podsToIgnoreLookup[fmt.Sprintf("%s/%s", pod.Namespace, pod.Name)] = struct{}{} + } + + log.Infof("starting wait for pod termination, %d pods in ignore list", len(podsToIgnore)) return waitext.Retry( ctx, waitext.NewConstantBackoff(h.cfg.podsTerminationWaitRetryDelay), @@ -321,8 +376,18 @@ func (h *drainNodeHandler) waitNodePodsTerminated(ctx context.Context, log logru if err != nil { return true, fmt.Errorf("listing %q pods to be terminated: %w", node.Name, err) } - if len(pods) > 0 { - return true, fmt.Errorf("waiting for %d pods to be terminated on node %v", len(pods), node.Name) + + remainingPods := len(pods) + if len(podsToIgnore) > 0 { + for i := range pods { + _, shouldIgnore := podsToIgnoreLookup[fmt.Sprintf("%s/%s", pods[i].Namespace, pods[i].Name)] + if shouldIgnore { + remainingPods-- + } + } + } + if remainingPods > 0 { + return true, fmt.Errorf("waiting for %d pods to be terminated on node %v", remainingPods, node.Name) } return false, nil }, @@ -370,15 +435,13 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod, groupVersio if apierrors.IsInternalError(err) { return false, err } - - // If PDB is violated, K8S returns 429 TooManyRequests with specific cause - // TODO: With KUBE-479, rethink this flow in general - if apierrors.IsTooManyRequests(err) && apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause) { - return true, err - } } // Other errors - retry. + // This includes 429 TooManyRequests (due to throttling) and 429 TooManyRequests + DisruptionBudgetCause (due to violated PDBs) + // This is done to try and do graceful eviction for as long as possible; + // it is expected that caller has a timeout that will stop this process if the PDB can never be satisfied. + // Note: pods only receive SIGTERM signals if they are evicted; if PDB prevents that, the signal will not happen here. return true, err } err := waitext.Retry(ctx, b, waitext.Forever, action, func(err error) { @@ -386,7 +449,6 @@ func (h *drainNodeHandler) evictPod(ctx context.Context, pod v1.Pod, groupVersio }) if err != nil { return fmt.Errorf("evicting pod %s in namespace %s: %w", pod.Name, pod.Namespace, err) - } return nil } @@ -446,3 +508,18 @@ func isControlledBy(p *v1.Pod, kind string) bool { return ctrl != nil && ctrl.Kind == kind } + +type podFailedActionError struct { + // Action holds context what was the code trying to do. + Action string + // Errors should hold an entry per pod, for which the action failed. + Errors []error +} + +func (p *podFailedActionError) Error() string { + return fmt.Sprintf("action %q: %v", p.Action, errors.Join(p.Errors...)) +} + +func (p *podFailedActionError) Unwrap() []error { + return p.Errors +} diff --git a/actions/drain_node_handler_test.go b/actions/drain_node_handler_test.go index 498bc53..7e36475 100644 --- a/actions/drain_node_handler_test.go +++ b/actions/drain_node_handler_test.go @@ -11,6 +11,7 @@ import ( "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,16 +23,19 @@ import ( ) func TestDrainNodeHandler(t *testing.T) { + t.Parallel() r := require.New(t) log := logrus.New() log.SetLevel(logrus.DebugLevel) t.Run("drain successfully", func(t *testing.T) { + t.Parallel() + nodeName := "node1" podName := "pod1" clientset := setupFakeClientWithNodePodEviction(nodeName, podName) - prependEvictionReaction(t, clientset, true) + prependEvictionReaction(t, clientset, true, false) action := &castai.ClusterAction{ ID: uuid.New().String(), @@ -69,10 +73,11 @@ func TestDrainNodeHandler(t *testing.T) { }) t.Run("skip drain when node not found", func(t *testing.T) { + t.Parallel() + nodeName := "node1" podName := "pod1" clientset := setupFakeClientWithNodePodEviction(nodeName, podName) - prependEvictionReaction(t, clientset, true) action := &castai.ClusterAction{ ID: uuid.New().String(), @@ -97,18 +102,20 @@ func TestDrainNodeHandler(t *testing.T) { r.NoError(err) }) - t.Run("fail to drain when internal pod eviction error occur", func(t *testing.T) { + t.Run("when eviction fails for a pod and force=false, leaves node cordoned and skip deletion", func(t *testing.T) { + t.Parallel() + nodeName := "node1" podName := "pod1" clientset := setupFakeClientWithNodePodEviction(nodeName, podName) - prependEvictionReaction(t, clientset, false) + prependEvictionReaction(t, clientset, false, false) action := &castai.ClusterAction{ ID: uuid.New().String(), ActionDrainNode: &castai.ActionDrainNode{ NodeName: "node1", DrainTimeoutSeconds: 1, - Force: true, + Force: false, }, CreatedAt: time.Now().UTC(), } @@ -120,9 +127,9 @@ func TestDrainNodeHandler(t *testing.T) { } err := h.Handle(context.Background(), action) - r.ErrorContains(err, "eviciting node pods") - r.ErrorContains(err, "default/pod1") - r.ErrorContains(err, "internal") + + r.Error(err) + r.ErrorContains(err, "failed to drain via graceful eviction") n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) r.NoError(err) @@ -132,18 +139,20 @@ func TestDrainNodeHandler(t *testing.T) { r.NoError(err) }) - t.Run("eviction timeout - force remove pods", func(t *testing.T) { - r := require.New(t) + t.Run("when eviction timeout is reached and force=false, leaves node cordoned and skip deletion", func(t *testing.T) { + t.Parallel() + nodeName := "node1" podName := "pod1" clientset := setupFakeClientWithNodePodEviction(nodeName, podName) + prependEvictionReaction(t, clientset, false, true) action := &castai.ClusterAction{ ID: uuid.New().String(), ActionDrainNode: &castai.ActionDrainNode{ NodeName: "node1", - DrainTimeoutSeconds: 1, - Force: true, + DrainTimeoutSeconds: 0, + Force: false, }, CreatedAt: time.Now().UTC(), } @@ -151,38 +160,108 @@ func TestDrainNodeHandler(t *testing.T) { h := drainNodeHandler{ log: log, clientset: clientset, - cfg: drainNodeConfig{ - podsDeleteTimeout: 700 * time.Millisecond, - podDeleteRetries: 5, - podDeleteRetryDelay: 500 * time.Millisecond, - podEvictRetryDelay: 500 * time.Millisecond, - podsTerminationWaitRetryDelay: 1000 * time.Millisecond, - }} - - clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { - deleteAction := action.(ktest.DeleteActionImpl) - if deleteAction.Name == podName { - if deleteAction.DeleteOptions.GracePeriodSeconds == nil { - return true, nil, context.DeadlineExceeded - } - return false, nil, nil - } - return false, nil, nil - }) + cfg: drainNodeConfig{}, + } err := h.Handle(context.Background(), action) - r.NoError(err) + + r.Error(err) + r.ErrorContains(err, "failed to drain via graceful eviction") + r.ErrorIs(err, context.DeadlineExceeded) n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) r.NoError(err) r.True(n.Spec.Unschedulable) _, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{}) - r.True(apierrors.IsNotFound(err)) + r.NoError(err) }) - t.Run("eviction timeout - force remove pods - failure", func(t *testing.T) { - r := require.New(t) + t.Run("eviction fails and force=true, force remove pods", func(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + drainTimeoutSeconds int + retryablePodEvictionErr bool + }{ + { + name: "timeout during eviction", + drainTimeoutSeconds: 0, + retryablePodEvictionErr: true, + }, + { + name: "failed pod during eviction", + drainTimeoutSeconds: 10, + retryablePodEvictionErr: false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + r := require.New(t) + nodeName := "node1" + podName := "pod1" + clientset := setupFakeClientWithNodePodEviction(nodeName, podName) + prependEvictionReaction(t, clientset, false, tc.retryablePodEvictionErr) + + action := &castai.ClusterAction{ + ID: uuid.New().String(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: "node1", + DrainTimeoutSeconds: tc.drainTimeoutSeconds, + Force: true, + }, + CreatedAt: time.Now().UTC(), + } + + h := drainNodeHandler{ + log: log, + clientset: clientset, + cfg: drainNodeConfig{ + podsDeleteTimeout: 700 * time.Millisecond, + podDeleteRetries: 5, + podDeleteRetryDelay: 500 * time.Millisecond, + podEvictRetryDelay: 500 * time.Millisecond, + podsTerminationWaitRetryDelay: 1000 * time.Millisecond, + }} + + actualCalls := 0 + clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { + deleteAction := action.(ktest.DeleteActionImpl) + if deleteAction.Name == podName { + actualCalls++ + // First call should be graceful; simulate it failed to validate we'll do the forced part. + // This relies on us not retrying 404s (or let's say it tests it :) ) + if deleteAction.DeleteOptions.GracePeriodSeconds == nil { + return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonNotFound}} + } + // Second call should be forced + r.Equal(int64(0), *deleteAction.DeleteOptions.GracePeriodSeconds) + return false, nil, nil + } + return false, nil, nil + }) + + err := h.Handle(context.Background(), action) + r.NoError(err) + r.Equal(2, actualCalls) + + n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + r.NoError(err) + r.True(n.Spec.Unschedulable) + + _, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{}) + r.True(apierrors.IsNotFound(err)) + }) + } + }) + + t.Run("eviction fails and force=true, at least one pod fails to delete due to internal error, should return error", func(t *testing.T) { + t.Parallel() + nodeName := "node1" podName := "pod1" clientset := setupFakeClientWithNodePodEviction(nodeName, podName) @@ -217,16 +296,21 @@ func TestDrainNodeHandler(t *testing.T) { }) err := h.Handle(context.Background(), action) - r.ErrorContains(err, "forcefully deleting pods") - r.ErrorContains(err, "default/pod1") - r.ErrorContains(err, "internal") + + var podFailedDeletionErr *podFailedActionError + + r.ErrorAs(err, &podFailedDeletionErr) + r.Len(podFailedDeletionErr.Errors, 1) + r.Contains(podFailedDeletionErr.Errors[0].Error(), "default/pod1") + r.Equal("delete", podFailedDeletionErr.Action) _, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{}) r.NoError(err) }) - t.Run("eviction timeout - force remove pods with grace 0 - failure", func(t *testing.T) { - r := require.New(t) + t.Run("eviction fails and force=true, timeout during deletion should be retried and returned", func(t *testing.T) { + t.Parallel() + nodeName := "node1" podName := "pod1" clientset := setupFakeClientWithNodePodEviction(nodeName, podName) @@ -245,28 +329,85 @@ func TestDrainNodeHandler(t *testing.T) { log: log, clientset: clientset, cfg: drainNodeConfig{ - podsDeleteTimeout: 700 * time.Millisecond, + podsDeleteTimeout: 0, // Force delete to timeout immediately podDeleteRetries: 5, - podDeleteRetryDelay: 500 * time.Millisecond, - podEvictRetryDelay: 500 * time.Millisecond, - podsTerminationWaitRetryDelay: 1000 * time.Millisecond, + podDeleteRetryDelay: 5 * time.Second, + podEvictRetryDelay: 5 * time.Second, + podsTerminationWaitRetryDelay: 10 * time.Second, }} + actualDeleteCalls := 0 clientset.PrependReactor("delete", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { deleteAction := action.(ktest.DeleteActionImpl) if deleteAction.Name == podName { - if deleteAction.DeleteOptions.GracePeriodSeconds == nil { - return true, nil, context.DeadlineExceeded - } - return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonInternalError, Message: "internal"}} + actualDeleteCalls++ + return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonTooManyRequests, Message: "stop hammering"}} } return false, nil, nil }) err := h.Handle(context.Background(), action) - r.ErrorContains(err, "forcefully deleting pods") - r.ErrorContains(err, "default/pod1") - r.ErrorContains(err, "internal") + + r.Equal(2, actualDeleteCalls) + r.ErrorIs(err, context.DeadlineExceeded) + + _, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{}) + r.NoError(err) + }) + + t.Run("force=true, failed eviction for PDBs should be retried until timeout before deleting", func(t *testing.T) { + t.Parallel() + + // tests specifically that PDB error in eviction is retried and not failed fast + nodeName := "node1" + podName := "pod1" + clientset := setupFakeClientWithNodePodEviction(nodeName, podName) + + clientset.PrependReactor("create", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { + if action.GetSubresource() != "eviction" { + return false, nil, nil + } + + // PDB error is a bit specific in k8s to reconstruct... + return true, + nil, + &apierrors.StatusError{ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonTooManyRequests, + Details: &metav1.StatusDetails{ + Causes: []metav1.StatusCause{ + { + Type: policyv1.DisruptionBudgetCause, + }, + }, + }, + }} + }) + + action := &castai.ClusterAction{ + ID: uuid.New().String(), + ActionDrainNode: &castai.ActionDrainNode{ + NodeName: "node1", + DrainTimeoutSeconds: 2, + Force: false, + }, + CreatedAt: time.Now().UTC(), + } + + h := drainNodeHandler{ + log: log, + clientset: clientset, + cfg: drainNodeConfig{}, + } + + err := h.Handle(context.Background(), action) + + r.Error(err) + r.ErrorContains(err, "failed to drain via graceful eviction") + r.ErrorIs(err, context.DeadlineExceeded) + + n, err := clientset.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + r.NoError(err) + r.True(n.Spec.Unschedulable) _, err = clientset.CoreV1().Pods("default").Get(context.Background(), podName, metav1.GetOptions{}) r.NoError(err) @@ -294,7 +435,9 @@ func TestGetDrainTimeout(t *testing.T) { } timeout := h.getDrainTimeout(action) - r.Equal(100*time.Second, timeout) + + // We give some wiggle room as the test might get here a few milliseconds late + r.InDelta((100 * time.Second).Milliseconds(), timeout.Milliseconds(), 10) }) t.Run("drain timeout for older action should be decreased by time since action creation", func(t *testing.T) { @@ -364,7 +507,7 @@ func TestLogCastPodsToEvict(t *testing.T) { } -func prependEvictionReaction(t *testing.T, c *fake.Clientset, success bool) { +func prependEvictionReaction(t *testing.T, c *fake.Clientset, success, retryableFailure bool) { c.PrependReactor("create", "pods", func(action ktest.Action) (handled bool, ret runtime.Object, err error) { if action.GetSubresource() != "eviction" { return false, nil, nil @@ -381,6 +524,11 @@ func prependEvictionReaction(t *testing.T, c *fake.Clientset, success bool) { return true, nil, nil } + // Simulate failure that should be retried by client + if retryableFailure { + return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonTooManyRequests}} + } + return true, nil, &apierrors.StatusError{ErrStatus: metav1.Status{Reason: metav1.StatusReasonInternalError, Message: "internal"}} }) } diff --git a/actions/kubernetes_helpers.go b/actions/kubernetes_helpers.go index 075f40b..e8dc2d7 100644 --- a/actions/kubernetes_helpers.go +++ b/actions/kubernetes_helpers.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" + "github.com/samber/lo" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -117,6 +119,85 @@ func getNodeForPatching(ctx context.Context, log logrus.FieldLogger, clientset k } +// executeBatchPodActions executes the action for each pod in the list. +// It does internal throttling to avoid spawning a goroutine-per-pod on large lists. +// Returns two sets of pods - the ones that successfully executed the action and the ones that failed. +// actionName might be used to distinguish what is the operation (for logs, debugging, etc.) but is optional. +func executeBatchPodActions( + ctx context.Context, + log logrus.FieldLogger, + pods []v1.Pod, + action func(context.Context, v1.Pod) error, + actionName string, +) (successfulPods []*v1.Pod, failedPods []podActionFailure) { + if actionName == "" { + actionName = "unspecified" + } + log = log.WithField("actionName", actionName) + + if len(pods) == 0 { + log.Debug("empty list of pods to execute action against") + return []*v1.Pod{}, nil + } + + var ( + parallelTasks = int(lo.Clamp(float64(len(pods)), 30, 100)) + taskChan = make(chan v1.Pod, len(pods)) + successfulPodsChan = make(chan *v1.Pod, len(pods)) + failedPodsChan = make(chan podActionFailure, len(pods)) + wg sync.WaitGroup + ) + + log.Debugf("Starting %d parallel tasks for %d pods: [%v]", parallelTasks, len(pods), lo.Map(pods, func(t v1.Pod, i int) string { + return fmt.Sprintf("%s/%s", t.Namespace, t.Name) + })) + + worker := func(taskChan <-chan v1.Pod) { + for pod := range taskChan { + if err := action(ctx, pod); err != nil { + failedPodsChan <- podActionFailure{ + actionName: actionName, + pod: &pod, + err: err, + } + } else { + successfulPodsChan <- &pod + } + } + wg.Done() + } + + for range parallelTasks { + wg.Add(1) + go worker(taskChan) + } + + for _, pod := range pods { + taskChan <- pod + } + + close(taskChan) + wg.Wait() + close(failedPodsChan) + close(successfulPodsChan) + + for pod := range successfulPodsChan { + successfulPods = append(successfulPods, pod) + } + + for failure := range failedPodsChan { + failedPods = append(failedPods, failure) + } + + return +} + func defaultBackoff() wait.Backoff { return waitext.NewConstantBackoff(500 * time.Millisecond) } + +type podActionFailure struct { + actionName string + pod *v1.Pod + err error +}