Skip to content

Commit

Permalink
Lockfree image scan deltas (#158)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Sep 14, 2023
1 parent 49f2ef2 commit 97399c4
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 159 deletions.
99 changes: 51 additions & 48 deletions imagescan/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"sort"
"strings"
"sync"
"time"

"github.com/samber/lo"
Expand Down Expand Up @@ -61,39 +60,46 @@ func newImage(imageID, architecture string) *image {

func NewDeltaState(scannedImages []castai.ScannedImage) *deltaState {
return &deltaState{
images: buildImageMap(scannedImages),
rs: make(map[string]*appsv1.ReplicaSet),
jobs: make(map[string]*batchv1.Job),
nodes: map[string]*node{},
queue: make(chan deltaQueueItem, 1000),
remoteImagesUpdate: make(chan []castai.ScannedImage, 3),
images: buildImageMap(scannedImages),
rs: make(map[string]*appsv1.ReplicaSet),
jobs: make(map[string]*batchv1.Job),
nodes: map[string]*node{},
}
}

type deltaQueueItem struct {
event controller.Event
obj controller.Object
}

type deltaState struct {
mu sync.RWMutex
// queue is informers received k8s objects but not yet applied to delta.
// This allows to have lock free access to delta state during image scan.
queue chan deltaQueueItem

// remoteImagesUpdate is signal to update delta images from telemetry.
remoteImagesUpdate chan []castai.ScannedImage

// images holds current cluster images state. image struct contains associated nodes and owners.
images map[string]*image
rs map[string]*appsv1.ReplicaSet
jobs map[string]*batchv1.Job
nodes map[string]*node

rs map[string]*appsv1.ReplicaSet
jobs map[string]*batchv1.Job
nodes map[string]*node

// If we fail to scan in hostfs mode it will be disabled for all feature image scans.
hostFSDisabled bool
}

func (d *deltaState) Observe(response *castai.TelemetryResponse) {
if response.FullResync && len(response.ScannedImages) > 0 {
d.mu.Lock()
defer d.mu.Unlock()
// sync with servers state
d.images = buildImageMap(response.ScannedImages)
d.rs = make(map[string]*appsv1.ReplicaSet)
d.jobs = make(map[string]*batchv1.Job)
d.nodes = map[string]*node{}
d.remoteImagesUpdate <- response.ScannedImages
}
}

func (d *deltaState) upsert(o controller.Object) {
d.mu.Lock()
defer d.mu.Unlock()

key := controller.ObjectKey(o)
switch v := o.(type) {
case *corev1.Pod:
Expand All @@ -108,9 +114,6 @@ func (d *deltaState) upsert(o controller.Object) {
}

func (d *deltaState) delete(o controller.Object) {
d.mu.Lock()
defer d.mu.Unlock()

key := controller.ObjectKey(o)
switch v := o.(type) {
case *corev1.Pod:
Expand All @@ -124,6 +127,10 @@ func (d *deltaState) delete(o controller.Object) {
}
}

func (d *deltaState) updateImagesFromRemote(images []castai.ScannedImage) {
d.images = buildImageMap(images)
}

func (d *deltaState) handlePodUpdate(v *corev1.Pod) {
d.upsertImages(v)
d.updateNodesUsageFromPod(v)
Expand Down Expand Up @@ -189,7 +196,7 @@ func (d *deltaState) upsertImages(pod *corev1.Pod) {
containerStatuses := pod.Status.ContainerStatuses
containerStatuses = append(containerStatuses, pod.Status.InitContainerStatuses...)
podID := string(pod.UID)
// get the resource id of Deployment, ReplicaSet, StatefulSet, Job, CronJob
// Get the resource id of Deployment, ReplicaSet, StatefulSet, Job, CronJob.
ownerResourceID := getPodOwnerID(pod, d.rs, d.jobs)

for _, cont := range containers {
Expand Down Expand Up @@ -288,34 +295,22 @@ func (d *deltaState) handleNodeDelete(node *corev1.Node) {
}

func (d *deltaState) getImages() []*image {
d.mu.RLock()
defer d.mu.RUnlock()

return lo.Values(d.images)
}

func (d *deltaState) getNode(name string) (*node, bool) {
d.mu.RLock()
defer d.mu.RUnlock()

v, found := d.nodes[name]
return v, found
}

func (d *deltaState) updateImage(i *image, change func(img *image)) {
d.mu.Lock()
defer d.mu.Unlock()

img := d.images[i.cacheKey()]
if img != nil {
change(img)
}
}

func (d *deltaState) setImageScanError(i *image, err error) {
d.mu.Lock()
defer d.mu.Unlock()

img := d.images[i.cacheKey()]
if img == nil {
return
Expand All @@ -335,9 +330,6 @@ func (d *deltaState) setImageScanError(i *image, err error) {
}

func (d *deltaState) findBestNode(nodeNames []string, requiredMemory *inf.Dec, requiredCPU *inf.Dec) (string, error) {
d.mu.RLock()
defer d.mu.RUnlock()

if len(d.nodes) == 0 {
return "", errNoCandidates
}
Expand All @@ -361,16 +353,10 @@ func (d *deltaState) findBestNode(nodeNames []string, requiredMemory *inf.Dec, r
}

func (d *deltaState) nodeCount() int {
d.mu.RLock()
defer d.mu.RUnlock()

return len(d.nodes)
}

func (d *deltaState) isHostFsDisabled() bool {
d.mu.RLock()
defer d.mu.RUnlock()

return d.hostFSDisabled
}

Expand Down Expand Up @@ -488,12 +474,29 @@ var (
)

type image struct {
id string
// id is ImageID from container status. It includes image name and digest.
//
// Note: ImageID's digest part could confuse you with actual image digest.
// Kubernetes calculates digest based on one of these cases:
// 1. Index manifest (if exists).
// 2. Manifest file.
// 3. Config file. Mostly legacy for old images without manifest.
id string

// name is image name from container spec.
//
// Note: We select image name from container spec (not from container status).
// In container status you will see fully qualified image name, eg. docker.io/grafana/grafana:latest
// while on container spec you will see user defined image name which may not be fully qualified, eg: grafana/grafana:latest
name string

architecture string
name string
containerRuntime imgcollectorconfig.Runtime
owners map[string]*imageOwner
nodes map[string]*imageNode

// owners map key points to higher level k8s resource for that image. (Image Affected resource in CAST AI console).
// Example: In most cases Pod will be managed by deployment, so owner id will point to Deployment's uuid.
owners map[string]*imageOwner
nodes map[string]*imageNode

scanned bool
lastScanErr error
Expand Down
28 changes: 21 additions & 7 deletions imagescan/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,18 @@ func (s *Subscriber) RequiredInformers() []reflect.Type {
}

func (s *Subscriber) Run(ctx context.Context) error {
scanTicker := time.NewTicker(s.cfg.ScanInterval)
defer scanTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(s.cfg.ScanInterval):
case deltaItem := <-s.delta.queue:
s.handleDelta(deltaItem.event, deltaItem.obj)
case scannedImages := <-s.delta.remoteImagesUpdate:
s.delta.updateImagesFromRemote(scannedImages)
case <-scanTicker.C:
if err := s.scheduleScans(ctx); err != nil {
s.log.Errorf("images scan failed: %v", err)
}
Expand All @@ -83,22 +90,29 @@ func (s *Subscriber) Run(ctx context.Context) error {
}

func (s *Subscriber) OnAdd(obj controller.Object) {
s.handleDelta(controller.EventAdd, obj)
s.delta.queue <- deltaQueueItem{
event: controller.EventAdd,
obj: obj,
}
}

func (s *Subscriber) OnUpdate(obj controller.Object) {
s.handleDelta(controller.EventUpdate, obj)
s.delta.queue <- deltaQueueItem{
event: controller.EventUpdate,
obj: obj,
}
}

func (s *Subscriber) OnDelete(obj controller.Object) {
s.handleDelta(controller.EventDelete, obj)
s.delta.queue <- deltaQueueItem{
event: controller.EventDelete,
obj: obj,
}
}

func (s *Subscriber) handleDelta(event controller.Event, o controller.Object) {
switch event {
case controller.EventAdd:
s.delta.upsert(o)
case controller.EventUpdate:
case controller.EventAdd, controller.EventUpdate:
s.delta.upsert(o)
case controller.EventDelete:
s.delta.delete(o)
Expand Down
Loading

0 comments on commit 97399c4

Please sign in to comment.