Skip to content

Commit

Permalink
Use core/v1 PodIP type in place than string
Browse files Browse the repository at this point in the history
Signed-off-by: Carlo Lobrano <c.lobrano@gmail.com>
  • Loading branch information
clobrano committed Jun 24, 2024
1 parent c758d78 commit a8900ec
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 20 deletions.
18 changes: 10 additions & 8 deletions pkg/apicheck/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/util/wait"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -213,21 +215,21 @@ func (c *ApiConnectivityCheck) canOtherControlPlanesBeReached() bool {
return (healthyResponses + unhealthyResponses + apiErrorsResponses) > 0
}

func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]string, count int) []string {
func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]corev1.PodIP, count int) []corev1.PodIP {
nrOfPeers := len(*peersIPs)
if nrOfPeers == 0 {
return []string{}
return []corev1.PodIP{}
}

if count > nrOfPeers {
count = nrOfPeers
}

// TODO: maybe we should pick nodes randomly rather than relying on the order returned from api-server
selectedIPs := make([]string, count)
selectedIPs := make([]corev1.PodIP, count)
for i := 0; i < count; i++ {
ip := (*peersIPs)[i]
if ip == "" {
if ip.IP == "" {
// This should not happen, but keeping it for good measure.
c.config.Log.Info("ignoring peers without IP address")
continue
Expand All @@ -240,7 +242,7 @@ func (c *ApiConnectivityCheck) popPeerIPs(peersIPs *[]string, count int) []strin
return selectedIPs
}

func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int, int, int, int) {
func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []corev1.PodIP) (int, int, int, int) {
nrAddresses := len(addresses)
responsesChan := make(chan selfNodeRemediation.HealthCheckResponseCode, nrAddresses)

Expand All @@ -252,9 +254,9 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeers(addresses []string) (int
}

// getHealthStatusFromPeer issues a GET request to the specified IP and returns the result from the peer into the given channel
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, results chan<- selfNodeRemediation.HealthCheckResponseCode) {
func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp corev1.PodIP, results chan<- selfNodeRemediation.HealthCheckResponseCode) {

logger := c.config.Log.WithValues("IP", endpointIp)
logger := c.config.Log.WithValues("IP", endpointIp.IP)
logger.Info("getting health status from peer")

if err := c.initClientCreds(); err != nil {
Expand All @@ -264,7 +266,7 @@ func (c *ApiConnectivityCheck) getHealthStatusFromPeer(endpointIp string, result
}

// TODO does this work with IPv6?
phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds)
phClient, err := peerhealth.NewClient(fmt.Sprintf("%v:%v", endpointIp.IP, c.config.PeerHealthPort), c.config.PeerDialTimeout, c.config.Log.WithName("peerhealth client"), c.clientCreds)
if err != nil {
logger.Error(err, "failed to init grpc client")
results <- selfNodeRemediation.RequestFailed
Expand Down
27 changes: 15 additions & 12 deletions pkg/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Peers struct {
myNodeName string
mutex sync.Mutex
apiServerTimeout time.Duration
workerPeersAddresses, controlPlanePeersAddresses []string
workerPeersAddresses, controlPlanePeersAddresses []v1.PodIP
}

func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Reader, log logr.Logger, apiServerTimeout time.Duration) *Peers {
Expand All @@ -47,8 +47,8 @@ func New(myNodeName string, peerUpdateInterval time.Duration, reader client.Read
myNodeName: myNodeName,
mutex: sync.Mutex{},
apiServerTimeout: apiServerTimeout,
workerPeersAddresses: []string{},
controlPlanePeersAddresses: []string{},
workerPeersAddresses: []v1.PodIP{},
controlPlanePeersAddresses: []v1.PodIP{},
}
}

Expand Down Expand Up @@ -88,18 +88,18 @@ func (p *Peers) Start(ctx context.Context) error {
}

func (p *Peers) updateWorkerPeers(ctx context.Context) {
setterFunc := func(addresses []string) { p.workerPeersAddresses = addresses }
setterFunc := func(addresses []v1.PodIP) { p.workerPeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.workerPeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updateControlPlanePeers(ctx context.Context) {
setterFunc := func(addresses []string) { p.controlPlanePeersAddresses = addresses }
setterFunc := func(addresses []v1.PodIP) { p.controlPlanePeersAddresses = addresses }
selectorGetter := func() labels.Selector { return p.controlPlanePeerSelector }
p.updatePeers(ctx, selectorGetter, setterFunc)
}

func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []string)) {
func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selector, setAddresses func(addresses []v1.PodIP)) {
p.mutex.Lock()
defer p.mutex.Unlock()

Expand All @@ -111,7 +111,7 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
if err := p.List(readerCtx, &nodes, client.MatchingLabelsSelector{Selector: getSelector()}); err != nil {
if errors.IsNotFound(err) {
// we are the only node at the moment... reset peerList
p.workerPeersAddresses = []string{}
p.workerPeersAddresses = []v1.PodIP{}
}
p.log.Error(err, "failed to update peer list")
return
Expand All @@ -129,30 +129,33 @@ func (p *Peers) updatePeers(ctx context.Context, getSelector func() labels.Selec
}

nodesCount := len(nodes.Items)
addresses := make([]string, nodesCount)
addresses := make([]v1.PodIP, nodesCount)
for i, node := range nodes.Items {
for _, pod := range pods.Items {
if pod.Spec.NodeName == node.Name {
addresses[i] = pod.Status.PodIP
if pod.Status.PodIPs == nil || len(pod.Status.PodIPs) == 0 {
p.log.Info("skipping empty Pod IPs", "node", node.Name, "Pod", pod.Name)
}
addresses[i] = pod.Status.PodIPs[0]
}
}
}
setAddresses(addresses)
}

func (p *Peers) GetPeersAddresses(role Role) []string {
func (p *Peers) GetPeersAddresses(role Role) []v1.PodIP {
p.mutex.Lock()
defer p.mutex.Unlock()

var addresses []string
var addresses []v1.PodIP
if role == Worker {
addresses = p.workerPeersAddresses
} else {
addresses = p.controlPlanePeersAddresses
}
//we don't want the caller to be able to change the addresses
//so we create a deep copy and return it
addressesCopy := make([]string, len(addresses))
addressesCopy := make([]v1.PodIP, len(addresses))
copy(addressesCopy, addresses)

return addressesCopy
Expand Down

0 comments on commit a8900ec

Please sign in to comment.