Skip to content

Commit

Permalink
Fix image delta dangling records (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Oct 10, 2023
1 parent e5b3c69 commit 91bccda
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 64 deletions.
3 changes: 1 addition & 2 deletions castai/imagemeta_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,5 @@ type Image struct {
}

type ResourcesChange struct {
ResourceIDs []string `json:"resourceIDs"`
RemovedResourceIDs []string `json:"removedResourceIDs"`
ResourceIDs []string `json:"resourceIDs"`
}
1 change: 1 addition & 0 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func run(ctx context.Context, logger logrus.FieldLogger, castaiClient castai.Cli
scanHandler := imagescan.NewHttpHandlers(log, castaiClient, imgScanCtrl)
httpMux.HandleFunc("/v1/image-scan/report", scanHandler.HandleImageMetadata)
httpMux.HandleFunc("/debug/images", scanHandler.HandleDebugGetImages)
httpMux.HandleFunc("/debug/images/details", scanHandler.HandleDebugGetImage)
blobsCache := blobscache.NewServer(log, blobscache.ServerConfig{})
blobsCache.RegisterHandlers(httpMux)
}
Expand Down
10 changes: 3 additions & 7 deletions imagescan/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/castai/kvisor/castai"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

Expand All @@ -38,13 +36,14 @@ func NewController(
podOwnerGetter podOwnerGetter,
) *Controller {
ctx, cancel := context.WithCancel(context.Background())
log = log.WithField("component", "imagescan")
return &Controller{
ctx: ctx,
cancel: cancel,
imageScanner: imageScanner,
client: client,
delta: newDeltaState(podOwnerGetter),
log: log.WithField("component", "imagescan"),
log: log,
cfg: cfg,
k8sVersionMinor: k8sVersionMinor,
timeGetter: timeGetter(),
Expand Down Expand Up @@ -76,8 +75,6 @@ type Controller struct {
func (s *Controller) RequiredInformers() []reflect.Type {
rt := []reflect.Type{
reflect.TypeOf(&corev1.Pod{}),
reflect.TypeOf(&appsv1.ReplicaSet{}),
reflect.TypeOf(&batchv1.Job{}),
reflect.TypeOf(&corev1.Node{}),
}
return rt
Expand Down Expand Up @@ -335,8 +332,7 @@ func (s *Controller) sendImagesResourcesChanges(ctx context.Context) {
ID: img.id,
Architecture: img.architecture,
ResourcesChange: castai.ResourcesChange{
ResourceIDs: img.ownerChanges.addedIDS,
RemovedResourceIDs: img.ownerChanges.removedIDs,
ResourceIDs: lo.Uniq(img.ownerChanges.addedIDS),
},
})
}
Expand Down
16 changes: 7 additions & 9 deletions imagescan/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestSubscriber(t *testing.T) {
return time.Now().UTC().Add(time.Hour)
}
delta := sub.delta
img := newImage("img1", "amd64")
img := newImage("img1amd64", "img1", "amd64")
img.name = "img"
img.nodes = map[string]*imageNode{
"node1": {},
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestSubscriber(t *testing.T) {
return time.Now().UTC().Add(time.Hour)
}
delta := sub.delta
img := newImage("img1", "amd64")
img := newImage("img1amd64", "img1", "amd64")
img.name = "img"
img.containerRuntime = imgcollectorconfig.RuntimeContainerd
img.nodes = map[string]*imageNode{
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestSubscriber(t *testing.T) {
return time.Now().UTC().Add(time.Hour)
}
delta := sub.delta
img := newImage("img1", "amd64")
img := newImage("img1amd64", "img1", "amd64")
img.name = "img"
img.containerRuntime = imgcollectorconfig.RuntimeContainerd
img.owners = map[string]*imageOwner{
Expand Down Expand Up @@ -505,14 +505,13 @@ func TestSubscriber(t *testing.T) {
return time.Now().UTC().Add(time.Hour)
}
delta := sub.delta
img := newImage("img1", "amd64")
img := newImage("img1amd64", "img1", "amd64")
img.name = "img"
img.owners = map[string]*imageOwner{
"r1": {},
}
img.ownerChanges = ownerChanges{
addedIDS: []string{"r1"},
removedIDs: []string{"r2"},
addedIDS: []string{"r1"},
}
delta.images[img.cacheKey()] = img

Expand Down Expand Up @@ -545,7 +544,6 @@ func TestSubscriber(t *testing.T) {
r.Equal("img1", change2Img1.ID)
r.Equal("amd64", change2Img1.Architecture)
r.Equal([]string{"r1"}, change2Img1.ResourcesChange.ResourceIDs)
r.Equal([]string{"r2"}, change2Img1.ResourcesChange.RemovedResourceIDs)

return true
})
Expand Down Expand Up @@ -589,14 +587,14 @@ func TestSubscriber(t *testing.T) {
return time.Now().UTC().Add(time.Hour)
}
delta := sub.delta
img1 := newImage("img1", "amd64")
img1 := newImage("img1amd64", "img1", "amd64")
img1.name = "img1"
img1.owners = map[string]*imageOwner{
"r1": {},
}
delta.images[img1.cacheKey()] = img1

img2 := newImage("img2", "amd64")
img2 := newImage("img1amd64", "img2", "amd64")
img2.name = "img2"
img2.owners = map[string]*imageOwner{
"r2": {},
Expand Down
64 changes: 36 additions & 28 deletions imagescan/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ var (
errNoCandidates = errors.New("no candidates")
)

const defaultImageArch = "amd64"

type podOwnerGetter interface {
GetPodOwnerID(pod *corev1.Pod) string
}

func newImage(imageID, architecture string) *image {
func newImage(key, imageID, architecture string) *image {
return &image{
key: key,
id: imageID,
architecture: architecture,
owners: map[string]*imageOwner{},
Expand Down Expand Up @@ -90,7 +93,12 @@ func (d *deltaState) delete(o kube.Object) {
}

func (d *deltaState) handlePodUpdate(v *corev1.Pod) {
d.upsertImages(v)
if v.Status.Phase == corev1.PodSucceeded {
d.handlePodDelete(v)
}
if v.Status.Phase == corev1.PodRunning {
d.upsertImages(v)
}
d.updateNodesUsageFromPod(v)
}

Expand All @@ -115,13 +123,7 @@ func (d *deltaState) updateNodesUsageFromPod(v *corev1.Pod) {
case corev1.PodRunning, corev1.PodPending:
n, found := d.nodes[v.Spec.NodeName]
if !found {
n = &node{
name: v.Spec.NodeName,
allocatableMem: &inf.Dec{},
allocatableCPU: &inf.Dec{},
pods: make(map[types.UID]*pod),
}
d.nodes[v.Spec.NodeName] = n
return
}

p, found := n.pods[v.GetUID()]
Expand All @@ -144,11 +146,9 @@ func (d *deltaState) updateNodesUsageFromPod(v *corev1.Pod) {
}

func (d *deltaState) upsertImages(pod *corev1.Pod) {
// Skip pods which are not running. If pod is running this means that container image should be already downloaded.
if pod.Status.Phase != corev1.PodRunning {
if _, found := d.nodes[pod.Spec.NodeName]; !found {
return
}

containers := pod.Spec.Containers
containers = append(containers, pod.Spec.InitContainers...)
containerStatuses := pod.Status.ContainerStatuses
Expand All @@ -171,17 +171,12 @@ func (d *deltaState) upsertImages(pod *corev1.Pod) {
continue
}

arch := "amd64"
nodeName := pod.Spec.NodeName
n, ok := d.nodes[nodeName]
if ok {
arch = n.architecture
}

key := cs.ImageID + arch
arch := d.getPodArch(pod)
key := d.getImageKey(cs.ImageID, arch)
img, found := d.images[key]
if !found {
img = newImage(cs.ImageID, arch)
img = newImage(key, cs.ImageID, arch)
}
img.id = cs.ImageID
img.name = cont.Image
Expand Down Expand Up @@ -218,6 +213,10 @@ func (d *deltaState) upsertImages(pod *corev1.Pod) {

func (d *deltaState) handlePodDelete(pod *corev1.Pod) {
for imgKey, img := range d.images {
if img.architecture != d.getPodArch(pod) {
continue
}

podID := string(pod.UID)
if n, found := img.nodes[pod.Spec.NodeName]; found {
delete(n.podIDs, podID)
Expand All @@ -228,10 +227,6 @@ func (d *deltaState) handlePodDelete(pod *corev1.Pod) {
delete(owner.podIDs, podID)
if len(owner.podIDs) == 0 {
delete(img.owners, ownerResourceID)
// Add changed owner.
if img.scanned {
img.ownerChanges.removedIDs = append(img.ownerChanges.removedIDs, ownerResourceID)
}
}
}

Expand Down Expand Up @@ -327,6 +322,19 @@ func (d *deltaState) setImageScanned(scannedImg castai.ScannedImage) {
}
}

func (d *deltaState) getImageKey(imageID, arch string) string {
key := imageID + arch
return key
}

func (d *deltaState) getPodArch(pod *corev1.Pod) string {
n, ok := d.nodes[pod.Spec.NodeName]
if ok && n.architecture != "" {
return n.architecture
}
return defaultImageArch
}

func getContainerRuntime(containerID string) imgcollectorconfig.Runtime {
parts := strings.Split(containerID, "://")
if len(parts) != 2 {
Expand Down Expand Up @@ -410,6 +418,8 @@ var (
)

type image struct {
key string

// id is ImageID from container status. It includes image name and digest.
//
// Note: ImageID's digest part could confuse you with actual image digest.
Expand Down Expand Up @@ -455,15 +465,13 @@ func (img *image) isUnused() bool {
}

type ownerChanges struct {
addedIDS []string
removedIDs []string
addedIDS []string
}

func (c *ownerChanges) empty() bool {
return len(c.addedIDS) == 0 && len(c.removedIDs) == 0
return len(c.addedIDS) == 0
}

func (c *ownerChanges) clear() {
c.addedIDS = []string{}
c.removedIDs = []string{}
}
6 changes: 2 additions & 4 deletions imagescan/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,19 +385,17 @@ func TestDelta(t *testing.T) {
}

delta.upsert(pod)
img, found := delta.images["testid"]
img, found := delta.images["testidamd64"]
r.True(found)
r.Len(img.owners, 1)
r.Len(img.nodes, 1)

delta.delete(pod)

img, found = delta.images["testid"]
img, found = delta.images["testidamd64"]
r.True(found)
r.Len(img.owners, 0)

delta.delete(node)

_, found = delta.nodes["node1"]
r.False(found)
})
Expand Down
Loading

0 comments on commit 91bccda

Please sign in to comment.