Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement controller to assign IP manually for pod #44

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,38 @@ Either as command line arguments or environment variables.

API token for hetzner cloud access.

### `--service-label-selector` or `HCLOUD_IP_FLOATER_SERVICE_LABEL_SELECTOR`
### `--service-label-selector` or `HCLOUD_IP_FLOATER_SERVICE_LABEL_SELECTOR`

Service label selector to use when watching for kubernetes services. Any services that do not match this selector will be ignored by the controller.

**Default**: `hcloud-ip-floater.cstl.dev/ignore!=true`

### `--manual-assignment-label` or `HCLOUD_IP_FLOATER_MANUAL_ASSIGNMENT_LABEL`

This is experimental and hasn't seen a lot of production use!

Label name used to manually assign floating IPs on a Pod.

This can be useful when other means of routing the traffic to a pod than a load balancer are used. E.g. you could be using the [`ipvlan` CNI plugin](https://www.cni.dev/plugins/current/main/ipvlan/) with [Multus](https://github.com/k8snetworkplumbingwg/multus-cni/).
The label accepts a comma-seperated list of floating IP addresses to assign to the node the pod is on.

Example:
```yaml
apiVersion: v1
kind: Pod
metadata:
name: my-pod
labels:
hcloud-ip-floater.cstl.dev/floating-ip: "1.2.3.4,2.3.4.5"
```

This mechanism will be ignored if there is a service with the same IP present.

**Default**: `hcloud-ip-floater.cstl.dev/floating-ip`

### `--floating-label-selector` or `HCLOUD_IP_FLOATER_FLOATING_LABEL_SELECTOR`

Label selector for hcloud floating IPs. Floating IPs that do not match this selector will be ignored by the controller.
Label selector for hcloud floating IPs. Floating IPs that do not match this selector will be ignored by the controller.

**Default**: `hcloud-ip-floater.cstl.dev/ignore!=true`

Expand Down
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ var Global struct {
LogLevel string `id:"log-level" short:"l" desc:"verbosity level for logs" default:"warn"`
HCloudToken string `id:"hcloud-token" desc:"API token for HCloud access"`
ServiceLabelSelector string `id:"service-label-selector" desc:"label selector used to match services" default:"hcloud-ip-floater.cstl.dev/ignore!=true"`
ManualAssignmentLabel string `id:"manual-assignment-label" desc:"pod label used to assign an IP without a service" default:"hcloud-ip-floater.cstl.dev/floating-ip"`
FloatingLabelSelector string `id:"floating-label-selector" desc:"label selector used to match floating IPs" default:""`

// optional MetalLB integration
Expand Down
194 changes: 194 additions & 0 deletions internal/manualcontroller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package manualcontroller

import (
"bytes"
"context"
"crypto/sha256"
"sort"
"strings"
"time"

"github.com/costela/hcloud-ip-floater/internal/config"
"github.com/costela/hcloud-ip-floater/internal/fipcontroller"
"github.com/costela/hcloud-ip-floater/internal/servicecontroller"
"github.com/costela/hcloud-ip-floater/internal/stringset"
"github.com/costela/hcloud-ip-floater/internal/utils"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

type Controller struct {
Logger logrus.FieldLogger

K8S *kubernetes.Clientset
SVCc *servicecontroller.Controller
FIPc *fipcontroller.Controller

podInformer cache.SharedInformer
}

func (c *Controller) Run() {
ipLabel := config.Global.ManualAssignmentLabel

podInformerFactory := informers.NewSharedInformerFactoryWithOptions(
c.K8S,
time.Duration(config.Global.SyncSeconds)*time.Second,
informers.WithTweakListOptions(func(listOpts *metav1.ListOptions) {
listOpts.LabelSelector = ipLabel
}),
)
podInformer := podInformerFactory.Core().V1().Pods().Informer()
c.podInformer = podInformer

podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
c.Logger.Errorf("received unexpected object type: %T", obj)
return
}

c.Logger.WithFields(logrus.Fields{
"namespace": pod.Namespace,
"name": pod.Name,
"node": pod.Spec.NodeName,
}).Info("New pod")

value := pod.Labels[ipLabel]
ips := parseIPList(value)
if len(ips) == 0 {
c.Logger.Debug("label not present or empty")
return
}

for ip := range ips {
c.reconcileIP(ip)
}
},
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
oldPod, ok := oldObj.(*corev1.Pod)
if !ok {
c.Logger.Errorf("received unexpected object type: %T", oldObj)
return
}
newPod, ok := newObj.(*corev1.Pod)
if !ok {
c.Logger.Errorf("received unexpected object type: %T", newObj)
return
}

c.Logger.WithFields(logrus.Fields{
"namespace": newPod.Namespace,
"name": newPod.Name,
"node": newPod.Spec.NodeName,
}).Info("Pod updated")

// diff label values
oldValue := oldPod.Labels[ipLabel]
oldIPs := parseIPList(oldValue)
newValue := newPod.Labels[ipLabel]
newIPs := parseIPList(newValue)

removedIPs := oldIPs.Diff(newIPs)
for ip := range removedIPs {
c.reconcileIP(ip)
}

for ip := range newIPs {
c.reconcileIP(ip)
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
c.Logger.Errorf("received unexpected object type: %T", obj)
return
}

c.Logger.WithFields(logrus.Fields{
"namespace": pod.Namespace,
"name": pod.Name,
"node": pod.Spec.NodeName,
}).Info("Pod deleted")

value := pod.Labels[ipLabel]
ips := parseIPList(value)
if len(ips) == 0 {
c.Logger.Debug("label not present or empty")
return
}

for ip := range ips {
c.reconcileIP(ip)
}
},
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
podInformer.Run(ctx.Done())
}

func parseIPList(value string) stringset.StringSet {
set := make(stringset.StringSet)
ips := strings.Split(value, ",")
for _, ip := range ips {
set.Add(strings.TrimSpace(ip))
}
return set
}

func singleSet(value string) stringset.StringSet {
set := make(stringset.StringSet)
set.Add(value)
return set
}

// only use with a locked knownIPmu!
func (c *Controller) reconcileIP(ip string) {
log := c.Logger.WithField("ip", ip)

if c.SVCc.HasServiceIP(ip) {
log.Warn("IP is assigned to a service, cannot use manually on a pod")
return
}

pods := c.podInformer.GetStore().List()

nodes := make([]string, 0, len(pods))
for _, pod := range pods {
pod := pod.(*corev1.Pod)
if !utils.PodIsReady(pod) {
continue
}
ips := parseIPList(pod.Labels[config.Global.ManualAssignmentLabel])
for labelIP := range ips {
if labelIP == ip {
nodes = append(nodes, pod.Spec.NodeName)
break
}
}
}

ipSet := singleSet(ip)

if len(nodes) == 0 {
log.Info("None of the pods are ready")
c.FIPc.ForgetAttachments(ipSet)
return
}

sort.Slice(nodes, func(i, j int) bool {
a := sha256.Sum256([]byte(nodes[i]))
b := sha256.Sum256([]byte(nodes[j]))
return bytes.Compare(a[:], b[:]) > 0
})

electedNode := nodes[0]
c.FIPc.AttachToNode(ipSet, electedNode)
log.WithField("node", electedNode).Info("Attached IP using manual assignment")
}
30 changes: 17 additions & 13 deletions internal/servicecontroller/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/costela/hcloud-ip-floater/internal/config"
"github.com/costela/hcloud-ip-floater/internal/fipcontroller"
"github.com/costela/hcloud-ip-floater/internal/stringset"
"github.com/costela/hcloud-ip-floater/internal/utils"
)

type podInformerType struct {
Expand Down Expand Up @@ -267,7 +268,7 @@ func (sc *Controller) handleNewPod(svcKey string, newPod *corev1.Pod) error {
"pod": newPod.Name,
})

if !podIsReady(newPod) {
if !utils.PodIsReady(newPod) {
funcLogger.Debug("ignoring non-ready pod")
return nil
}
Expand All @@ -288,8 +289,8 @@ func (sc *Controller) handlePodUpdate(svcKey string, oldPod, newPod *corev1.Pod)
"pod": newPod.Name,
})

oldReady := podIsReady(oldPod)
newReady := podIsReady(newPod)
oldReady := utils.PodIsReady(oldPod)
newReady := utils.PodIsReady(newPod)

if oldReady == newReady {
funcLogger.Debug("pod readiness unchanged")
Expand Down Expand Up @@ -320,15 +321,6 @@ func (sc *Controller) getServiceFromKey(svcKey string) (*corev1.Service, error)
return svc, nil
}

func podIsReady(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}

func (sc *Controller) handleServiceIPs(svc *corev1.Service, svcIPs stringset.StringSet) error {
// TODO: use util/workqueue to avoid blocking informer if hcloud API is slow

Expand Down Expand Up @@ -393,7 +385,7 @@ func (sc *Controller) getServiceReadyNodes(svcKey string) ([]string, error) {

nodes := make([]string, 0, len(pods))
for _, pod := range pods {
if podIsReady(pod) {
if utils.PodIsReady(pod) {
nodes = append(nodes, pod.Spec.NodeName)
}
}
Expand Down Expand Up @@ -430,6 +422,18 @@ func (sc *Controller) forgetServiceIPs(svcKey string) {
delete(sc.svcIPs, svcKey)
}

func (sc *Controller) HasServiceIP(ip string) bool {
sc.svcIPsMu.Lock()
defer sc.svcIPsMu.Unlock()

for _, ips := range sc.svcIPs {
if ips.Has(ip) {
return true
}
}
return false
}

func getLoadbalancerIPs(svc *corev1.Service) stringset.StringSet {
ips := make(stringset.StringSet, len(svc.Status.LoadBalancer.Ingress))

Expand Down
14 changes: 14 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package utils

import (
corev1 "k8s.io/api/core/v1"
)

func PodIsReady(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
return true
}
}
return false
}
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/costela/hcloud-ip-floater/internal/config"
"github.com/costela/hcloud-ip-floater/internal/fipcontroller"
"github.com/costela/hcloud-ip-floater/internal/manualcontroller"
"github.com/costela/hcloud-ip-floater/internal/servicecontroller"
)

Expand Down Expand Up @@ -79,8 +80,16 @@ func main() {
FIPc: fipc,
}

mc := manualcontroller.Controller{
Logger: logger,
K8S: k8s,
SVCc: &sc,
FIPc: fipc,
}

go fipc.Run()
go sc.Run()
go mc.Run()

select {}
}