Skip to content

Commit

Permalink
Find last known pod owners (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Oct 6, 2023
1 parent cb5d938 commit 789faad
Show file tree
Hide file tree
Showing 12 changed files with 436 additions and 399 deletions.
45 changes: 22 additions & 23 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,14 @@ func main() {
logrus.RegisterExitHandler(e.Wait)

ctx := signals.SetupSignalHandler()
if err := run(ctx, logger, client, cfg, binVersion); err != nil {
if err := run(ctx, logger, client, cfg, binVersion); err != nil && !errors.Is(err, context.Canceled) {
logErr := &logContextErr{}
if errors.As(err, &logErr) {
log = logger.WithFields(logErr.fields)
}
log.Fatalf("castai-kvisor failed: %v", err)
}
log.Info("castai-kvisor stopped")
}

func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Client, cfg config.Config, binVersion config.SecurityAgentVersion) (reterr error) {
Expand Down Expand Up @@ -161,16 +162,19 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli

snapshotProvider := delta.NewSnapshotProvider()

kubeSubscribers := []kube.ObjectSubscriber{
delta.NewController(
log,
log.Level,
delta.Config{DeltaSyncInterval: cfg.DeltaSyncInterval},
castaiClient,
snapshotProvider,
k8sVersion.MinorInt,
),
}
informersFactory := informers.NewSharedInformerFactory(clientSet, 0)
kubeCtrl := kube.NewController(log, informersFactory, k8sVersion)

deltaCtrl := delta.NewController(
log,
log.Level,
delta.Config{DeltaSyncInterval: cfg.DeltaSyncInterval},
castaiClient,
snapshotProvider,
k8sVersion.MinorInt,
kubeCtrl,
)
kubeCtrl.AddSubscribers(deltaCtrl)

telemetryManager := telemetry.NewManager(log, castaiClient)

Expand All @@ -193,19 +197,19 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli

if cfg.Linter.Enabled {
log.Info("linter enabled")
linterSub, err := kubelinter.NewController(log, castaiClient, linter)
linterCtrl, err := kubelinter.NewController(log, castaiClient, linter)
if err != nil {
return err
}
kubeSubscribers = append(kubeSubscribers, linterSub)
kubeCtrl.AddSubscribers(linterCtrl)
}
if cfg.KubeBench.Enabled {
log.Info("kubebench enabled")
if cfg.KubeBench.Force {
scannedNodes = []string{}
}
podLogReader := agentlog.NewPodLogReader(clientSet)
kubeSubscribers = append(kubeSubscribers, kubebench.NewController(
kubeBenchCtrl := kubebench.NewController(
log,
clientSet,
cfg.PodNamespace,
Expand All @@ -214,7 +218,8 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli
castaiClient,
podLogReader,
scannedNodes,
))
)
kubeCtrl.AddSubscribers(kubeBenchCtrl)
}
var imgScanCtrl *imagescan.Controller
if cfg.ImageScan.Enabled {
Expand All @@ -225,12 +230,9 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli
imagescan.NewImageScanner(clientSet, cfg),
castaiClient,
k8sVersion.MinorInt,
kubeCtrl,
)
kubeSubscribers = append(kubeSubscribers, imgScanCtrl)
}

if len(kubeSubscribers) == 0 {
return errors.New("no subscribers enabled")
kubeCtrl.AddSubscribers(imgScanCtrl)
}

if cfg.CloudScan.Enabled {
Expand Down Expand Up @@ -258,9 +260,6 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli
})
go gc.Start(ctx)

informersFactory := informers.NewSharedInformerFactory(clientSet, 0)
kubeCtrl := kube.NewController(log, informersFactory, kubeSubscribers, k8sVersion)

resyncObserver := delta.ResyncObserver(ctx, log, snapshotProvider, castaiClient)
telemetryManager.AddObservers(resyncObserver)
featureObserver, featuresCtx := telemetry.ObserveDisabledFeatures(ctx, cfg, log)
Expand Down
12 changes: 11 additions & 1 deletion delta/subscriber.go → delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewController(
cfg Config, client castaiClient,
stateProvider SnapshotProvider,
k8sVersionMinor int,
podOwnerGetter podOwnerGetter,
) *Controller {
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -61,7 +62,8 @@ func NewController(
k8sVersionMinor: k8sVersionMinor,
log: log.WithField("component", "delta"),
client: client,
delta: newDelta(log, logLevel, stateProvider),
delta: newDelta(log, podOwnerGetter, logLevel, stateProvider),
initialDelay: 60 * time.Second,
}
}

Expand All @@ -75,6 +77,7 @@ type Controller struct {
delta *delta
mu sync.RWMutex
initialized bool
initialDelay time.Duration
}

func (s *Controller) RequiredInformers() []reflect.Type {
Expand Down Expand Up @@ -104,6 +107,13 @@ func (s *Controller) RequiredInformers() []reflect.Type {
}

func (s *Controller) Run(ctx context.Context) error {
// Wait for initial deltas sync before starting deltas send loop.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(s.initialDelay):
}

for {
select {
case <-ctx.Done():
Expand Down
55 changes: 43 additions & 12 deletions delta/subscriber_test.go → delta/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func TestSubscriber(t *testing.T) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)

castaiClient := &mockCastaiClient{}

pod1 := &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"},
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -56,12 +54,13 @@ func TestSubscriber(t *testing.T) {

assertDelta := func(t *testing.T, delta *castai.Delta, event castai.EventType, initial bool) {
r := require.New(t)
podUID := "111b56a9-ab5e-4a35-93af-f092e2f63011"
r.Equal(&castai.Delta{
FullSnapshot: initial,
Items: []castai.DeltaItem{
{
Event: event,
ObjectUID: "111b56a9-ab5e-4a35-93af-f092e2f63011",
ObjectUID: podUID,
ObjectName: "nginx-1",
ObjectNamespace: "default",
ObjectKind: "Pod",
Expand All @@ -73,41 +72,51 @@ func TestSubscriber(t *testing.T) {
ImageName: "nginx:1.23",
},
},
ObjectStatus: corev1.PodStatus{Phase: corev1.PodRunning},
ObjectStatus: corev1.PodStatus{Phase: corev1.PodRunning},
ObjectOwnerUID: podUID,
},
},
}, delta)
}

t.Run("send add event", func(t *testing.T) {
sub := NewController(log, logrus.DebugLevel, Config{DeltaSyncInterval: 1 * time.Millisecond}, castaiClient, &snapshotProviderMock{}, 21)
client := &mockCastaiClient{}
sub := newTestController(log)
sub.initialDelay = 1 * time.Millisecond
sub.client = client
sub.OnAdd(pod1)

ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
delta := castaiClient.delta
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventAdd, true)
})

t.Run("send update event", func(t *testing.T) {
sub := NewController(log, logrus.DebugLevel, Config{DeltaSyncInterval: 1 * time.Millisecond}, castaiClient, &snapshotProviderMock{}, 21)
client := &mockCastaiClient{}
sub := newTestController(log)
sub.initialDelay = 1 * time.Millisecond
sub.client = client
sub.OnAdd(pod1)
sub.OnUpdate(pod1)

ctx, cancel := context.WithTimeout(ctx, 30*time.Millisecond)
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
delta := castaiClient.delta
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventUpdate, true)
})

t.Run("send delete event", func(t *testing.T) {
sub := NewController(log, logrus.DebugLevel, Config{DeltaSyncInterval: 1 * time.Millisecond}, castaiClient, &snapshotProviderMock{}, 21)
client := &mockCastaiClient{}
sub := newTestController(log)
sub.initialDelay = 1 * time.Millisecond
sub.client = client
sub.OnAdd(pod1)
sub.OnUpdate(pod1)
sub.OnDelete(pod1)
Expand All @@ -116,13 +125,16 @@ func TestSubscriber(t *testing.T) {
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
delta := castaiClient.delta
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventDelete, true)
})

t.Run("second event does not set full snapshot flag", func(t *testing.T) {
sub := NewController(log, logrus.DebugLevel, Config{DeltaSyncInterval: 1 * time.Millisecond}, castaiClient, &snapshotProviderMock{}, 21)
client := &mockCastaiClient{}
sub := newTestController(log)
sub.initialDelay = 1 * time.Millisecond
sub.client = client
sub.OnAdd(pod1)

go func() {
Expand All @@ -134,12 +146,31 @@ func TestSubscriber(t *testing.T) {
defer cancel()
err := sub.Run(ctx)
r.True(errors.Is(err, context.DeadlineExceeded))
delta := castaiClient.delta
delta := client.delta
r.NotNil(delta)
assertDelta(t, delta, castai.EventAdd, false)
})
}

func newTestController(log logrus.FieldLogger) *Controller {
return NewController(
log,
logrus.DebugLevel,
Config{DeltaSyncInterval: 1 * time.Millisecond},
&mockCastaiClient{},
&snapshotProviderMock{},
21,
&mockPodOwnerGetter{},
)
}

type mockPodOwnerGetter struct {
}

func (m *mockPodOwnerGetter) GetPodOwnerID(pod *corev1.Pod) string {
return string(pod.UID)
}

type mockCastaiClient struct {
delta *castai.Delta
}
Expand Down
38 changes: 24 additions & 14 deletions delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ import (
"github.com/castai/kvisor/kube"
)

type podOwnerGetter interface {
GetPodOwnerID(pod *corev1.Pod) string
}

// newDelta initializes the delta struct which is used to collect cluster deltas, debounce them and map to CAST AI
// requests.
func newDelta(log logrus.FieldLogger, logLevel logrus.Level, provider SnapshotProvider) *delta {
func newDelta(log logrus.FieldLogger, podOwnerGetter podOwnerGetter, logLevel logrus.Level, provider SnapshotProvider) *delta {
return &delta{
log: log,
logLevel: logLevel,
snapshot: provider,
cache: map[string]castai.DeltaItem{},
skippers: []skipper{},
log: log,
logLevel: logLevel,
snapshot: provider,
cache: map[string]castai.DeltaItem{},
skippers: []skipper{},
podOwnerGetter: podOwnerGetter,
}
}

Expand All @@ -38,11 +43,12 @@ type skipper func(obj object) bool
// delta is used to collect cluster deltas, debounce them and map to CAST AI requests. It holds a cache of queue items
// which is referenced any time a new item is added to debounce the items.
type delta struct {
log logrus.FieldLogger
logLevel logrus.Level
snapshot SnapshotProvider
cache map[string]castai.DeltaItem
skippers []skipper
log logrus.FieldLogger
logLevel logrus.Level
snapshot SnapshotProvider
cache map[string]castai.DeltaItem
skippers []skipper
podOwnerGetter podOwnerGetter
}

// add will add an item to the delta cache. It will debounce the objects.
Expand All @@ -55,7 +61,6 @@ func (d *delta) add(event kube.Event, obj object) {

key := string(obj.GetUID())
gvr := obj.GetObjectKind().GroupVersionKind()
d.log.Debugf("add delta, event=%s, gvr=%s, ns=%s, name=%s", event, gvr.String(), obj.GetNamespace(), obj.GetName())

deltaItem := castai.DeltaItem{
Event: toCASTAIEvent(event),
Expand All @@ -65,7 +70,7 @@ func (d *delta) add(event kube.Event, obj object) {
ObjectKind: gvr.Kind,
ObjectAPIVersion: gvr.GroupVersion().String(),
ObjectCreatedAt: obj.GetCreationTimestamp().UTC(),
ObjectOwnerUID: getOwnerUID(obj),
ObjectOwnerUID: d.getOwnerUID(obj),
ObjectLabels: obj.GetLabels(),
}
if containers, status, ok := getContainersAndStatus(obj); ok {
Expand Down Expand Up @@ -146,7 +151,12 @@ func getContainersAndStatus(obj kube.Object) ([]castai.Container, interface{}, b
return res, st, true
}

func getOwnerUID(obj kube.Object) string {
func (d *delta) getOwnerUID(obj kube.Object) string {
switch v := obj.(type) {
case *corev1.Pod:
return d.podOwnerGetter.GetPodOwnerID(v)
}

if len(obj.GetOwnerReferences()) == 0 {
return ""
}
Expand Down
Loading

0 comments on commit 789faad

Please sign in to comment.