Skip to content

Commit

Permalink
Return error if cannot update peers address
Browse files Browse the repository at this point in the history
At startup (but it might happen in other moments too), some peers' Pod
IP can still be empty, which means that until the next peers update we
cannot check the connection with the other peers.

Return an error in case a peer's Pod IP is empty.

Signed-off-by: Carlo Lobrano <c.lobrano@gmail.com>
  • Loading branch information
clobrano committed Jul 3, 2024
1 parent 284181c commit 7f0cf11
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
2 changes: 1 addition & 1 deletion e2e/self_node_remediation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ var _ = Describe("Self Node Remediation E2E", func() {
Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(worker), worker)).ToNot(HaveOccurred())
uids[worker.GetName()] = worker.GetUID()

// and the lat boot time
// and the last boot time
t, err := getBootTime(worker)
Expect(err).ToNot(HaveOccurred())
bootTimes[worker.GetName()] = t
Expand Down
39 changes: 24 additions & 15 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
commonlabels "github.com/medik8s/common/pkg/labels"

pkgerrors "github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -76,30 +77,37 @@ func (p *Peers) Start(ctx context.Context) error {
p.controlPlanePeerSelector = createSelector(hostname, getControlPlaneLabel(myNode))
}

go wait.UntilWithContext(ctx, func(ctx context.Context) {
p.updateWorkerPeers(ctx)
p.updateControlPlanePeers(ctx)
}, p.peerUpdateInterval)
var updatePeersError error
cancellableCtx, cancel := context.WithCancel(ctx)

p.log.Info("peers started")
p.log.Info("peer starting", "name", p.myNodeName)
wait.UntilWithContext(cancellableCtx, func(ctx context.Context) {
updatePeersError = p.updateWorkerPeers(ctx)
if updatePeersError != nil {
cancel()
}
updatePeersError = p.updateControlPlanePeers(ctx)
if updatePeersError != nil {
cancel()
}
}, p.peerUpdateInterval)

<-ctx.Done()
return nil
return updatePeersError
}

func (p *Peers) updateWorkerPeers(ctx context.Context) {
func (p *Peers) updateWorkerPeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.workerPeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updateControlPlanePeers(ctx context.Context) {
func (p *Peers) updateControlPlanePeers(ctx context.Context) error {
setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
return p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) {
func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) error {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -114,7 +122,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
p.workerPeersAddresses = []v1.PodIP{}
}
p.log.Error(err, "failed to update peer list")
return
return pkgerrors.Wrap(err, "failed to update peer list")
}

pods := v1.PodList{}
Expand All @@ -126,6 +134,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
}
if err := p.List(readerCtx, &pods, listOptions); err != nil {
p.log.Error(err, "could not get pods")
return pkgerrors.Wrap(err, "could not get pods")
}

nodesCount := len(nodes.Items)
Expand All @@ -134,14 +143,14 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 {
p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name)
continue
return pkgerrors.New(fmt.Sprintf("empty Pod IP for Pod %s on Node %s", pod.Name, node.Name))
}
addresses[i] = pod.Status.PodIPs[0]
}
}
}
setAddresses(addresses)
return nil
}

func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {
Expand Down

0 comments on commit 7f0cf11

Please sign in to comment.