Skip to content

Commit

Permalink
fix: force delete canaries linked to deleted CRD
Browse files Browse the repository at this point in the history
* feat: add a new job to reconcile forced deleted canaries linked to CRD

* chore: update cron schedule

---------

Co-authored-by: Moshe Immerman <moshe@flanksource.com>
  • Loading branch information
adityathebe and moshloop authored Oct 16, 2024
1 parent 95a4923 commit 574b2c8
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 19 deletions.
24 changes: 13 additions & 11 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ import (
ctrlMetrics "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var webhookPort int
var k8sLogLevel int
var enableLeaderElection bool
var operatorExecutor bool
var Operator = &cobra.Command{
Use: "operator",
Short: "Start the kubernetes operator",
Run: run,
}
var (
webhookPort int
k8sLogLevel int
enableLeaderElection bool
Operator = &cobra.Command{
Use: "operator",
Short: "Start the kubernetes operator",
Run: run,
}
)

func init() {
ServerFlags(Operator.Flags())
Operator.Flags().StringVarP(&runner.WatchNamespace, "namespace", "n", "", "Watch only specified namespace, otherwise watch all")
Operator.Flags().BoolVar(&operatorExecutor, "executor", true, "If false, only serve the UI and sync the configs")
Operator.Flags().BoolVar(&runner.OperatorExecutor, "executor", true, "If false, only serve the UI and sync the configs")
Operator.Flags().IntVar(&webhookPort, "webhookPort", 8082, "Port for webhooks ")
Operator.Flags().IntVar(&k8sLogLevel, "k8s-log-level", -1, "Kubernetes controller log level")
Operator.Flags().BoolVar(&enableLeaderElection, "enable-leader-election", false, "Enabling this will ensure there is only one active controller manager")
Expand Down Expand Up @@ -77,14 +78,15 @@ func run(cmd *cobra.Command, args []string) {

cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext)

if operatorExecutor {
if runner.OperatorExecutor {
logger.Infof("Starting executors")

// Some synchronous jobs can take time
// so we use a goroutine to unblock server start
// to prevent health check from failing
go jobs.Start()
}

go serve()

ctrl.SetLogger(logr.FromSlogHandler(logger.Handler()))
Expand Down
8 changes: 5 additions & 3 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import (
"github.com/spf13/cobra"
)

var schedule, configFile string
var executor bool
var propertiesFile = "canary-checker.properties"
var (
schedule, configFile string
executor bool
propertiesFile = "canary-checker.properties"
)

var Serve = &cobra.Command{
Use: "serve config.yaml",
Expand Down
56 changes: 55 additions & 1 deletion pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package canary

import (
"errors"
"fmt"
"sync"
"time"
Expand All @@ -18,10 +19,13 @@ import (
dutyEcho "github.com/flanksource/duty/echo"
dutyjob "github.com/flanksource/duty/job"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel/attribute"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -194,6 +198,56 @@ func logIfError(err error, description string) {
}
}

var CleanupCRDDeleteCanaries = &dutyjob.Job{
Name: "CleanupCRDDeletedCanaries",
Schedule: "@every 1d",
RunNow: true,
Singleton: true,
JobHistory: true,
Retention: dutyjob.RetentionBalanced,
Fn: func(ctx dutyjob.JobRuntime) error {
var crdCanaries []models.Canary
if err := ctx.DB().Select("id", "name", "namespace").
Where("deleted_at IS NULL").
Where("agent_id = ?", uuid.Nil.String()).
Where("source LIKE 'kubernetes/%'").Find(&crdCanaries).Error; err != nil {
return fmt.Errorf("failed to list all canaries with source=CRD: %w", err)
}

if len(crdCanaries) == 0 {
return nil
}

canaryClient, err := ctx.KubernetesDynamicClient().GetClientByGroupVersionKind(v1.GroupVersion.Group, v1.GroupVersion.Version, "Canary")
if err != nil {
return fmt.Errorf("failed to get kubernetes client for canaries: %w", err)
}

for _, canary := range crdCanaries {
if _, err := canaryClient.Namespace(canary.Namespace).Get(ctx, canary.Name, metav1.GetOptions{}); err != nil {
var statusErr *apierrors.StatusError
if errors.As(err, &statusErr) {
if statusErr.ErrStatus.Reason == metav1.StatusReasonNotFound {
if err := db.DeleteCanary(ctx.Context, canary.ID.String()); err != nil {
ctx.History.AddErrorf("error deleting canary[%s]: %v", canary.ID, err)
} else {
ctx.History.IncrSuccess()
}

Unschedule(canary.ID.String())

continue
}
}

return fmt.Errorf("failed to delete canary %s/%s from kubernetes: %w", canary.Namespace, canary.Name, err)
}
}

return nil
},
}

var CleanupDeletedCanaryChecks = &dutyjob.Job{
Name: "CleanupDeletedCanaryChecks",
Schedule: "@every 1h",
Expand All @@ -217,7 +271,7 @@ var CleanupDeletedCanaryChecks = &dutyjob.Job{

for _, r := range rows {
if err := db.DeleteCanary(ctx.Context, r.ID); err != nil {
ctx.History.AddError(fmt.Sprintf("Error deleting components for topology[%s]: %v", r.ID, err))
ctx.History.AddErrorf("error deleting canary[%s]: %v", r.ID, err)
} else {
ctx.History.IncrSuccess()
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
topologyJobs "github.com/flanksource/canary-checker/pkg/jobs/topology"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/pkg/topology"
"github.com/flanksource/commons/logger"
dutyEcho "github.com/flanksource/duty/echo"
Expand All @@ -19,9 +20,10 @@ var FuncScheduler = cron.New()
func Start() {
logger.Infof("Starting jobs ...")
dutyEcho.RegisterCron(FuncScheduler)

if canaryJobs.UpstreamConf.Valid() {
for _, j := range canaryJobs.UpstreamJobs {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
Expand All @@ -30,23 +32,31 @@ func Start() {
}

for _, j := range db.CheckStatusJobs {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
}
}

for _, j := range topology.Jobs {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
}
}

for _, j := range []*job.Job{topologyJobs.CleanupDeletedTopologyComponents, topologyJobs.SyncTopology, canaryJobs.SyncCanaryJobs, canaryJobs.CleanupDeletedCanaryChecks, dutyQuery.SyncComponentCacheJob} {
var job = j
job := j
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
}
}

if runner.OperatorExecutor {
job := canaryJobs.CleanupCRDDeleteCanaries
job.Context = context.DefaultContext
if err := job.AddToScheduler(FuncScheduler); err != nil {
logger.Errorf(err.Error())
Expand Down
3 changes: 3 additions & 0 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// OperatorExecutor when true means the application is serving as the k8s operator
var OperatorExecutor bool

var RunnerName string

var Version string
Expand Down

0 comments on commit 574b2c8

Please sign in to comment.