From 7f0cf113b3d781e3f904c024dff7231ea7ac25f6 Mon Sep 17 00:00:00 2001 From: Carlo Lobrano Date: Wed, 3 Jul 2024 11:07:45 +0200 Subject: [PATCH] Return error if cannot update peers address At startup (but it might happen in other moments too), some peers' Pod IP can still be empty, which means that until the next peers update we cannot check the connection with the other peers. Return an error in case a peer's Pod IP is empty. Signed-off-by: Carlo Lobrano --- e2e/self_node_remediation_test.go | 2 +- pkg/peers/peers.go | 39 +++++++++++++++++++------------ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/e2e/self_node_remediation_test.go b/e2e/self_node_remediation_test.go index 64c91307..4b565a25 100644 --- a/e2e/self_node_remediation_test.go +++ b/e2e/self_node_remediation_test.go @@ -233,7 +233,7 @@ var _ = Describe("Self Node Remediation E2E", func() { Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(worker), worker)).ToNot(HaveOccurred()) uids[worker.GetName()] = worker.GetUID() - // and the lat boot time + // and the last boot time t, err := getBootTime(worker) Expect(err).ToNot(HaveOccurred()) bootTimes[worker.GetName()] = t diff --git a/pkg/peers/peers.go b/pkg/peers/peers.go index cd741d6d..5938cb3a 100644 --- a/pkg/peers/peers.go +++ b/pkg/peers/peers.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" commonlabels "github.com/medik8s/common/pkg/labels" + pkgerrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error { p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode)) } - go wait.UntilWithContext(ctx, func(ctx context.Context) { - p.updateWorkerPeers(ctx) - p.updateControlPlanePeers(ctx) - }, p.peerUpdateInterval) + var updatePeersError error + cancellableCtx, cancel := context.WithCancel(ctx) - p.log.Info("peers started") + p.log.Info("peer starting", "name", p.myNodeName) + wait.UntilWithContext(cancellableCtx, func(ctx context.Context) { + updatePeersError = p.updateWorkerPeers(ctx) + if updatePeersError != nil { + cancel() + } + updatePeersError = p.updateControlPlanePeers(ctx) + if updatePeersError != nil { + cancel() + } + }, p.peerUpdateInterval) - <-ctx.Done() - return nil + return updatePeersError } -func (p *Peers) updateWorkerPeers(ctx context.Context) { +func (p *Peers) updateWorkerPeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.workerPeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updateControlPlanePeers(ctx context.Context) { +func (p *Peers) updateControlPlanePeers(ctx context.Context) error { setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses } selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector } - p.updatePeers(ctx, selectorGetter, setterFunc) + return p.updatePeers(ctx, selectorGetter, setterFunc) } -func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) { +func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error { p.mutex.Lock() defer p.mutex.Unlock() @@ -114,7 +122,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec p.workerPeersAddresses = []v1.PodIP{} } p.log.Error(err, "failed to update peer list") - return + return pkgerrors.Wrap(err, "failed to update peer list") } pods := v1.PodList{} @@ -126,6 +134,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec } if err := p.List(readerCtx, &pods, listOptions); err != nil { p.log.Error(err, "could not get pods") + return pkgerrors.Wrap(err, "could not get pods") } nodesCount := len(nodes.Items) @@ -134,14 +143,14 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec for _, pod := range pods.Items { if pod.Spec.NodeName == node.Name { if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 { - p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name) - continue + return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name)) } addresses[i] = pod.Status.PodIPs[0] } } } setAddresses(addresses) + return nil } func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {