Skip to content

Commit

Permalink
Send deltas in batches (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjmao authored Jun 17, 2024
1 parent 4a479b3 commit 756a9dd
Show file tree
Hide file tree
Showing 12 changed files with 576 additions and 384 deletions.
4 changes: 2 additions & 2 deletions api/v1/kube/kube_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions api/v1/runtime/common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

759 changes: 418 additions & 341 deletions api/v1/runtime/runtime_agent_api.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions api/v1/runtime/runtime_agent_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ service RuntimeSecurityAgentAPI {
rpc GetSyncState(GetSyncStateRequest) returns (GetSyncStateResponse) {}
rpc UpdateSyncState(UpdateSyncStateRequest) returns (UpdateSyncStateResponse) {}

rpc KubernetesDeltaBatchIngest(stream KubernetesDeltaBatch) returns (stream KubernetesDeltaIngestResponse) {}
// Deprecated. Should use KubernetesDeltaBatchIngest.
rpc KubernetesDeltaIngest(stream KubernetesDeltaItem) returns (stream KubernetesDeltaIngestResponse) {}
rpc ImageMetadataIngest(ImageMetadata) returns (ImageMetadataIngestResponse) {}
rpc KubeBenchReportIngest(KubeBenchReport) returns (KubeBenchReportIngestResponse) {}
Expand Down Expand Up @@ -257,6 +259,10 @@ message KubernetesDeltaItem {
bytes object_spec = 13;
}

message KubernetesDeltaBatch {
repeated KubernetesDeltaItem items = 1;
}

message Container {
string name = 1;
string image_name = 2;
Expand Down
70 changes: 69 additions & 1 deletion api/v1/runtime/runtime_agent_api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
initialKubernetesDeltaReportDelay = pflag.Duration("kubernetes-delta-init-delay", 60*time.Second, "Initial delay to wait before starting reporting first kubernetes object deltas (first send report is full snapshot, this might take some time for large clusters. default: `1m`)")
kubernetesDeltaSendTimeout = pflag.Duration("kubernetes-delta-send-timeout", 3*time.Minute, "Kubernetes deltas send timeout")
kubernetesDeltaCompressionEnabled = pflag.Bool("kubernetes-delta-compression-enabled", true, "Kubernetes deltas compression during ingest")
kubernetesDeltaBatchSize = pflag.Int("kubernetes-delta-batch-size", 200, "Kubernetes deltas batch size during ingest")

imageScanEnabled = pflag.Bool("image-scan-enabled", false, "Enable image scanning")
imageScanInterval = pflag.Duration("image-scan-interval", 30*time.Second, "Image scan scheduling interval")
Expand Down Expand Up @@ -225,6 +226,7 @@ func main() {
InitialDeltay: *initialKubernetesDeltaReportDelay,
SendTimeout: *kubernetesDeltaSendTimeout,
UseCompression: *kubernetesDeltaCompressionEnabled,
BatchSize: *kubernetesDeltaBatchSize,
},
JobsCleanup: state.JobsCleanupConfig{
CleanupInterval: *jobsCleanupInterval,
Expand Down
4 changes: 4 additions & 0 deletions cmd/controller/state/castai_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type testGrpcClient struct {
getConfigurationResponse func() (*castaipb.GetConfigurationResponse, error)
}

func (t testGrpcClient) KubernetesDeltaBatchIngest(ctx context.Context, opts ...grpc.CallOption) (castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient, error) {
return nil, nil
}

func (t testGrpcClient) NetflowWriteStream(ctx context.Context, opts ...grpc.CallOption) (castaipb.RuntimeSecurityAgentAPI_NetflowWriteStreamClient, error) {
return nil, nil
}
Expand Down
24 changes: 13 additions & 11 deletions cmd/controller/state/delta/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

type castaiClient interface {
KubernetesDeltaIngest(ctx context.Context, opts ...grpc.CallOption) (castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient, error)
KubernetesDeltaBatchIngest(ctx context.Context, opts ...grpc.CallOption) (castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient, error)
}

type kubeClient interface {
Expand All @@ -43,6 +43,7 @@ type Config struct {
InitialDeltay time.Duration
SendTimeout time.Duration `validate:"required"`
UseCompression bool
BatchSize int `validate:"required"`
}

func NewController(
Expand Down Expand Up @@ -182,7 +183,7 @@ func (c *Controller) sendDeltas(ctx context.Context, pendingDeltas []deltaItem)
if c.cfg.UseCompression {
opts = append(opts, grpc.UseCompressor(gzip.Name))
}
deltaStream, err := c.castaiClient.KubernetesDeltaIngest(ctx, opts...)
deltaStream, err := c.castaiClient.KubernetesDeltaBatchIngest(ctx, opts...)
if err != nil && !errors.Is(err, context.Canceled) {
return err
}
Expand All @@ -191,27 +192,26 @@ func (c *Controller) sendDeltas(ctx context.Context, pendingDeltas []deltaItem)
}()

var sentDeltasCount int
for _, item := range pendingDeltas {
item := item
pbItem := c.toCastaiDelta(item)
if err := c.sendDeltaItem(ctx, deltaStream, pbItem); err != nil {
for _, batch := range lo.Chunk(pendingDeltas, c.cfg.BatchSize) {
pbItems := lo.Map(batch, c.toCastaiDelta)
if err := c.sendDeltaItems(ctx, deltaStream, pbItems); err != nil {
// Return any remaining items back to pending list.
c.upsertPendingItems(pendingDeltas[sentDeltasCount:])
return err
}
sentDeltasCount++
sentDeltasCount += len(batch)
}
c.log.Infof("sent deltas, id=%v, count=%d/%d, duration=%v", deltaID, len(pendingDeltas), sentDeltasCount, time.Since(start))
return nil
}

func (c *Controller) sendDeltaItem(ctx context.Context, stream castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient, item *castaipb.KubernetesDeltaItem) error {
func (c *Controller) sendDeltaItems(ctx context.Context, stream castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient, items []*castaipb.KubernetesDeltaItem) error {
return withExponentialRetry(ctx, c.log, func() error {
if err := stream.Send(item); err != nil {
if err := stream.Send(&castaipb.KubernetesDeltaBatch{Items: items}); err != nil {
if !isRetryableErr(err) {
return backoff.Permanent(err)
}
return fmt.Errorf("sending delta item: %w", err)
return fmt.Errorf("sending delta items batch: %w", err)
}
if _, err := stream.Recv(); err != nil {
if !isRetryableErr(err) {
Expand Down Expand Up @@ -283,7 +283,7 @@ func (c *Controller) upsertPendingItems(items []deltaItem) {
}
}

func (c *Controller) toCastaiDelta(item deltaItem) *castaipb.KubernetesDeltaItem {
func (c *Controller) toCastaiDelta(item deltaItem, _ int) *castaipb.KubernetesDeltaItem {
obj := item.object
objectUID := string(obj.GetUID())

Expand Down Expand Up @@ -377,6 +377,8 @@ func getObjectSpec(obj kube.Object) ([]byte, error) {
return json.Marshal(v.Spec)
case *appsv1.Deployment:
return json.Marshal(v.Spec)
case *appsv1.ReplicaSet:
return json.Marshal(v.Spec)
case *appsv1.StatefulSet:
return json.Marshal(v.Spec)
case *appsv1.DaemonSet:
Expand Down
31 changes: 19 additions & 12 deletions cmd/controller/state/delta/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ func TestController(t *testing.T) {
r := require.New(t)
client := newMockClient()
var receivedDeltasCount int
client.streamFunc = func() castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient {
client.streamFunc = func() castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient {
return &mockStream{
onSend: func(item *castaipb.KubernetesDeltaItem) error {
onSend: func(item *castaipb.KubernetesDeltaBatch) error {
receivedDeltasCount++
if receivedDeltasCount > 1 {
return errors.New("ups")
Expand All @@ -200,9 +200,9 @@ func TestController(t *testing.T) {
t.Run("fail on initial delta send error", func(t *testing.T) {
r := require.New(t)
client := newMockClient()
client.streamFunc = func() castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient {
client.streamFunc = func() castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient {
return &mockStream{
onSend: func(item *castaipb.KubernetesDeltaItem) error {
onSend: func(item *castaipb.KubernetesDeltaBatch) error {
return errors.New("ups")
},
}
Expand Down Expand Up @@ -257,12 +257,12 @@ func TestController(t *testing.T) {

func newMockClient() *mockCastaiClient {
client := &mockCastaiClient{}
client.streamFunc = func() castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient {
client.streamFunc = func() castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient {
return &mockStream{
onSend: func(item *castaipb.KubernetesDeltaItem) error {
onSend: func(item *castaipb.KubernetesDeltaBatch) error {
client.mu.Lock()
defer client.mu.Unlock()
client.deltas = append(client.deltas, item)
client.deltas = append(client.deltas, item.Items...)
return nil
},
}
Expand All @@ -273,7 +273,14 @@ func newMockClient() *mockCastaiClient {
func newTestController(log *logging.Logger, client *mockCastaiClient) *Controller {
ctrl := NewController(
log,
Config{Interval: 1 * time.Millisecond, InitialDeltay: 1 * time.Millisecond, SendTimeout: 10 * time.Millisecond},
Config{
Enabled: false,
Interval: 1 * time.Millisecond,
InitialDeltay: 1 * time.Millisecond,
SendTimeout: 10 * time.Millisecond,
UseCompression: false,
BatchSize: 10,
},
client,
&mockPodOwnerGetter{},
)
Expand All @@ -292,22 +299,22 @@ func (m *mockPodOwnerGetter) GetOwnerUID(obj kube.Object) string {
type mockCastaiClient struct {
deltas []*castaipb.KubernetesDeltaItem
mu sync.Mutex
streamFunc func() v1.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient
streamFunc func() v1.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient
}

func (m *mockCastaiClient) KubernetesDeltaIngest(ctx context.Context, opts ...grpc.CallOption) (v1.RuntimeSecurityAgentAPI_KubernetesDeltaIngestClient, error) {
func (m *mockCastaiClient) KubernetesDeltaBatchIngest(ctx context.Context, opts ...grpc.CallOption) (v1.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestClient, error) {
return m.streamFunc(), nil
}

type mockStream struct {
onSend func(item *castaipb.KubernetesDeltaItem) error
onSend func(item *castaipb.KubernetesDeltaBatch) error
}

func (m *mockStream) Recv() (*castaipb.KubernetesDeltaIngestResponse, error) {
return &castaipb.KubernetesDeltaIngestResponse{}, nil
}

func (m *mockStream) Send(item *castaipb.KubernetesDeltaItem) error {
func (m *mockStream) Send(item *castaipb.KubernetesDeltaBatch) error {
return m.onSend(item)
}

Expand Down
16 changes: 16 additions & 0 deletions cmd/mock-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,22 @@ type MockServer struct {
log *logging.Logger
}

func (m *MockServer) KubernetesDeltaBatchIngest(server castaipb.RuntimeSecurityAgentAPI_KubernetesDeltaBatchIngestServer) error {
for {
event, err := server.Recv()
if err != nil {
m.log.Warnf("delta recv: %v", err)
break
}
m.log.Debugf("delta_items: %v", event)
if err := server.Send(&castaipb.KubernetesDeltaIngestResponse{}); err != nil {
m.log.Warnf("delta ack send: %v", err)
break
}
}
return nil
}

func (m *MockServer) NetflowWriteStream(server castaipb.RuntimeSecurityAgentAPI_NetflowWriteStreamServer) error {
for {
event, err := server.Recv()
Expand Down
Loading

0 comments on commit 756a9dd

Please sign in to comment.