diff --git a/cmd/options/run.go b/cmd/options/run.go index fa7edd0a..e109a93b 100644 --- a/cmd/options/run.go +++ b/cmd/options/run.go @@ -11,3 +11,9 @@ func AddRunOptions(cmd *cobra.Command) *RunOptions { cmd.Flags().StringVarP(&options.Run, "run", "r", "", "Inspect the given run, defaulting to using the singleton run") return &options } + +func AddRequiredRunOptions(cmd *cobra.Command) *RunOptions { + opts := AddRunOptions(cmd) + cmd.MarkFlagRequired("run") + return opts +} diff --git a/cmd/subcommands/queue/drain.go b/cmd/subcommands/queue/drain.go new file mode 100644 index 00000000..55735370 --- /dev/null +++ b/cmd/subcommands/queue/drain.go @@ -0,0 +1,41 @@ +//go:build full || deploy + +package queue + +import ( + "context" + + "github.com/spf13/cobra" + + "lunchpail.io/cmd/options" + "lunchpail.io/pkg/be" + "lunchpail.io/pkg/runtime/queue" +) + +func Drain() *cobra.Command { + cmd := &cobra.Command{ + Use: "drain", + Short: "Drain the output tasks, allowing graceful termination", + Args: cobra.MatchAll(cobra.ExactArgs(0), cobra.OnlyValidArgs), + } + + opts, err := options.RestoreBuildOptions() + if err != nil { + panic(err) + } + + runOpts := options.AddRequiredRunOptions(cmd) + options.AddTargetOptionsTo(cmd, &opts) + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + backend, err := be.New(ctx, opts) + if err != nil { + return err + } + + return queue.Drain(ctx, backend, runOpts.Run) + } + + return cmd +} diff --git a/cmd/subcommands/queue_full.go b/cmd/subcommands/queue_full.go index e9218091..a23ac7dd 100644 --- a/cmd/subcommands/queue_full.go +++ b/cmd/subcommands/queue_full.go @@ -19,6 +19,7 @@ func init() { if build.IsBuilt() { cmd.AddCommand(queue.Cat()) + cmd.AddCommand(queue.Drain()) cmd.AddCommand(queue.Last()) cmd.AddCommand(queue.Ls()) cmd.AddCommand(queue.Stat()) diff --git a/pkg/runtime/builtins/redirect.go b/pkg/runtime/builtins/redirect.go index 6ba42fcf..965f10d8 100644 --- a/pkg/runtime/builtins/redirect.go +++ b/pkg/runtime/builtins/redirect.go @@ -14,7 +14,7 @@ import ( ) func RedirectTo(ctx context.Context, client queue.S3Client, folderFor func(object string) string, opts build.LogOptions) error { - objects, errs := client.Listen(client.Paths.Bucket, client.Outbox(), "") + objects, errs := client.Listen(client.Paths.Bucket, client.Outbox(), "", false) group, _ := errgroup.WithContext(ctx) done := false @@ -41,7 +41,12 @@ func RedirectTo(ctx context.Context, client queue.S3Client, folderFor func(objec if opts.Verbose { fmt.Fprintf(os.Stderr, "Downloading output to %s\n", dst) } - group.Go(func() error { return client.Download(client.Paths.Bucket, object, dst) }) + group.Go(func() error { + if err := client.Download(client.Paths.Bucket, object, dst); err != nil { + return err + } + return client.MarkConsumed(object) + }) } for !done { diff --git a/pkg/runtime/queue/drain.go b/pkg/runtime/queue/drain.go new file mode 100644 index 00000000..18aa05b9 --- /dev/null +++ b/pkg/runtime/queue/drain.go @@ -0,0 +1,29 @@ +package queue + +import ( + "context" + + "golang.org/x/sync/errgroup" + + "lunchpail.io/pkg/be" +) + +// Drain the output tasks, allowing graceful termination +func Drain(ctx context.Context, backend be.Backend, runname string) error { + c, err := NewS3ClientForRun(ctx, backend, runname) + if err != nil { + return err + } + defer c.Stop() + + group, _ := errgroup.WithContext(ctx) + for o := range c.ListObjects(c.Paths.Bucket, c.finishedMarkers(), true) { + if o.Err != nil { + return o.Err + } + + group.Go(func() error { return c.MarkConsumed(o.Key) }) + } + + return group.Wait() +} diff --git a/pkg/runtime/queue/paths.go b/pkg/runtime/queue/paths.go index db3764df..109d7d9d 100644 --- a/pkg/runtime/queue/paths.go +++ b/pkg/runtime/queue/paths.go @@ -64,3 +64,11 @@ func pathsFor(queuePrefixPath string) (filepaths, error) { func (c S3Client) Outbox() string { return filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox) } + +func (c S3Client) finishedMarkers() string { + return filepath.Join(c.Paths.PoolPrefix, "finished") +} + +func (c S3Client) ConsumedMarker(task string) string { + return filepath.Join(c.Paths.PoolPrefix, "consumed", filepath.Base(task)) +} diff --git a/pkg/runtime/queue/s3.go b/pkg/runtime/queue/s3.go index e725c912..dae98c42 100644 --- a/pkg/runtime/queue/s3.go +++ b/pkg/runtime/queue/s3.go @@ -183,6 +183,10 @@ func (s3 S3Client) Rm(bucket, filePath string) error { return s3.client.RemoveObject(s3.context, bucket, filePath, minio.RemoveObjectOptions{}) } +func (s3 S3Client) MarkConsumed(filePath string) error { + return s3.Mark(s3.Paths.Bucket, s3.ConsumedMarker(filePath), "consumed") +} + func (s3 S3Client) Mark(bucket, filePath, marker string) error { _, err := s3.client.PutObject(s3.context, bucket, filePath, strings.NewReader(marker), int64(len(marker)), minio.PutObjectOptions{}) return err diff --git a/pkg/runtime/queue/wait.go b/pkg/runtime/queue/wait.go index 37d4316e..ff0fce54 100644 --- a/pkg/runtime/queue/wait.go +++ b/pkg/runtime/queue/wait.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "time" "github.com/minio/minio-go/v7" @@ -46,14 +47,19 @@ func (s3 S3Client) WaitTillExists(bucket, object string) error { return s3.WaitForEvent(bucket, object, "s3:ObjectCreated:*") } -func (s3 S3Client) Listen(bucket, prefix, suffix string) (<-chan string, <-chan error) { +func (s3 S3Client) Listen(bucket, prefix, suffix string, includeDeletions bool) (<-chan string, <-chan error) { c := make(chan string) e := make(chan error) os := make(map[string]bool) - report := func(key string) { - if !os[key] { - os[key] = true + report := func(key string, isCreate bool) { + if isCreate { + if !os[key] { + os[key] = true + c <- key + } + } else { + delete(os, key) c <- key } } @@ -62,7 +68,7 @@ func (s3 S3Client) Listen(bucket, prefix, suffix string) (<-chan string, <-chan if o.Err != nil { e <- o.Err } else { - report(o.Key) + report(o.Key, true) } } } @@ -72,17 +78,24 @@ func (s3 S3Client) Listen(bucket, prefix, suffix string) (<-chan string, <-chan defer close(e) once() already := false - for n := range s3.client.ListenBucketNotification(s3.context, bucket, prefix, suffix, []string{"s3:ObjectCreated:*"}) { + + events := []string{"s3:ObjectCreated:*"} + if includeDeletions { + events = append(events, "s3:ObjectRemoved:*") + } + + for n := range s3.client.ListenBucketNotification(s3.context, bucket, prefix, suffix, events) { if n.Err != nil { e <- n.Err continue } if !already { + already = true once() } for _, r := range n.Records { - report(r.S3.Object.Key) + report(r.S3.Object.Key, strings.HasPrefix(r.EventName, "s3:ObjectCreated:")) } } }() diff --git a/pkg/runtime/worker/watcher.go b/pkg/runtime/worker/watcher.go index 5f0af853..45e22f5b 100644 --- a/pkg/runtime/worker/watcher.go +++ b/pkg/runtime/worker/watcher.go @@ -49,7 +49,7 @@ func startWatch(ctx context.Context, handler []string, client queue.S3Client, de return err } - tasks, errs := client.Listen(client.Paths.Bucket, prefix, "") + tasks, errs := client.Listen(client.Paths.Bucket, prefix, "", false) for { if killfileExists(client, bucket, prefix) { break diff --git a/pkg/runtime/workstealer/assess.go b/pkg/runtime/workstealer/assess.go index 1d0e6eca..e9c45a38 100644 --- a/pkg/runtime/workstealer/assess.go +++ b/pkg/runtime/workstealer/assess.go @@ -42,6 +42,13 @@ func (c client) reportChangedFile(filepath string) error { return c.s3.Upload(c.s3.Paths.Bucket, filepath, remotefile) } +// Has a Task already been marked as completed? +func (c client) isMarkedDone(task string) bool { + finishedMarker := filepath.Join(finished, task) + p := c.localPathToRemote(finishedMarker) + return c.s3.Exists(c.s3.Paths.Bucket, filepath.Dir(p), filepath.Base(p)) +} + // A Task has been fully completed by a Worker func (c client) markDone(task string) error { finishedMarker := filepath.Join(finished, task) @@ -152,11 +159,15 @@ func (c client) cleanupForDeadWorker(worker Worker) error { // A Task has completed func (c client) cleanupForCompletedTask(completedTask AssignedTask, success TaskCode) error { - if err := c.markDone(completedTask.task); err != nil { - return err + if !c.isMarkedDone(completedTask.task) { + if err := c.markDone(completedTask.task); err != nil { + return err + } + + return c.copyToFinalOutbox(completedTask.task, completedTask.worker, success) } - return c.copyToFinalOutbox(completedTask.task, completedTask.worker, success) + return nil } type Apportionment struct { @@ -294,35 +305,33 @@ func (c client) rebalance(model Model) bool { return false } -// If the dispatcher is done and there are no more outstanding tasks, -// then touch kill files in the worker inboxes. +// Touch kill files in the worker inboxes. func (c client) touchKillFiles(model Model) { - if model.DispatcherDone && model.nFinishedTasks() > 0 && model.nTasksRemaining() == 0 { - for _, worker := range model.LiveWorkers { - if !worker.killfilePresent { - if err := c.touchKillFile(worker); err != nil { - log.Fatalf(err.Error()) - } + for _, worker := range model.LiveWorkers { + if !worker.killfilePresent { + if err := c.touchKillFile(worker); err != nil { + log.Fatalf(err.Error()) } } } } -// Is everything well and done: dispatcher, workers, us? -func (c client) readyToBye(model Model) bool { - return model.DispatcherDone && model.nFinishedTasks() > 0 && model.nTasksRemaining() == 0 && len(model.LiveWorkers) == 0 -} - // Assess and potentially update queue state. Return true when we are all done. func (c client) assess(m Model) bool { c.cleanupCompletedTasks(m) - if c.readyToBye(m) { + if m.readyToBye() { fmt.Fprintln(os.Stderr, "INFO All work has been completed, all workers have terminated") return true } else if !c.rebalance(m) { c.assignNewTasks(m) - c.touchKillFiles(m) + + if m.isAllWorkDone() { + // If the dispatcher is done and there are no more outstanding tasks, + // then touch kill files in the worker inboxes. + c.touchKillFiles(m) + } + c.reassignDeadWorkerTasks(m) } diff --git a/pkg/runtime/workstealer/fetch.go b/pkg/runtime/workstealer/fetch.go index c10a11f3..33a2a38e 100644 --- a/pkg/runtime/workstealer/fetch.go +++ b/pkg/runtime/workstealer/fetch.go @@ -22,6 +22,7 @@ type WhatChanged int const ( UnassignedTask WhatChanged = iota DispatcherDone + ConsumedTask FinishedTask LiveWorker @@ -38,6 +39,7 @@ const ( var unassignedTaskPattern = regexp.MustCompile("^inbox/(.+)$") var dispatcherDonePattern = regexp.MustCompile("^done$") +var consumedTaskPattern = regexp.MustCompile("^consumed/(.+)$") var finishedTaskPattern = regexp.MustCompile("^finished/(.+)$") var liveWorkerPattern = regexp.MustCompile("^queues/(.+)/inbox/[.]alive$") var deadWorkerPattern = regexp.MustCompile("^queues/(.+)/inbox/[.]dead$") @@ -52,6 +54,8 @@ func whatChanged(line string) (WhatChanged, string, string) { return UnassignedTask, match[1], "" } else if match := dispatcherDonePattern.FindStringSubmatch(line); len(match) == 1 { return DispatcherDone, "", "" + } else if match := consumedTaskPattern.FindStringSubmatch(line); len(match) == 2 { + return ConsumedTask, match[1], "" } else if match := finishedTaskPattern.FindStringSubmatch(line); len(match) == 2 { return FinishedTask, match[1], "" } else if match := liveWorkerPattern.FindStringSubmatch(line); len(match) == 2 { @@ -77,14 +81,6 @@ func whatChanged(line string) (WhatChanged, string, string) { // We will be passed a stream of diffs func (m *Model) update(filepath string, workersLookup map[string]Worker) { - /*unassignedTasks := []string{} - dispatcherDone := false - finishedTasks := []string{} - assignedTasks := []AssignedTask{} - processingTasks := []AssignedTask{} - successfulTasks := []AssignedTask{} - failedTasks := []AssignedTask{}*/ - what, thing, thing2 := whatChanged(filepath) switch what { @@ -92,6 +88,8 @@ func (m *Model) update(filepath string, workersLookup map[string]Worker) { m.UnassignedTasks = append(m.UnassignedTasks, thing) case DispatcherDone: m.DispatcherDone = true + case ConsumedTask: + m.ConsumedTasks = append(m.ConsumedTasks, thing) case FinishedTask: m.FinishedTasks = append(m.FinishedTasks, thing) case LiveWorker: @@ -140,7 +138,6 @@ func (m *Model) update(filepath string, workersLookup map[string]Worker) { fmt.Fprintf(os.Stderr, "ERROR unable to find worker=%s\n", thing) } } - //eturn Model{dispatcherDone, unassignedTasks, finishedTasks, liveWorkers, deadWorkers, assignedTasks, processingTasks, successfulTasks, failedTasks} } func (m *Model) finishUp(workersLookup map[string]Worker) { diff --git a/pkg/runtime/workstealer/model.go b/pkg/runtime/workstealer/model.go index fe7c39b4..85f2bdef 100644 --- a/pkg/runtime/workstealer/model.go +++ b/pkg/runtime/workstealer/model.go @@ -1,5 +1,7 @@ package workstealer +import "slices" + // A Task that was assigned to a given Worker type AssignedTask struct { worker string @@ -30,6 +32,7 @@ type Model struct { DispatcherDone bool UnassignedTasks []string + ConsumedTasks []string FinishedTasks []string LiveWorkers []Worker DeadWorkers []Worker @@ -41,20 +44,51 @@ type Model struct { FailedTasks []AssignedTask } -func (model *Model) nFinishedTasks() int { +func (model Model) nFinishedTasks() int { return len(model.FinishedTasks) } -func (model *Model) nUnassignedTasks() int { +func (model Model) nConsumedTasks() int { + return len(model.ConsumedTasks) +} + +func (model Model) nUnassignedTasks() int { return len(model.UnassignedTasks) } -func (model *Model) nAssignedTasks() int { +func (model Model) nAssignedTasks() int { return len(model.AssignedTasks) } // How many outstanding tasks do we have, i.e. either Unassigned, or // Assigned. -func (model *Model) nTasksRemaining() int { +func (model Model) nTasksRemaining() int { return model.nUnassignedTasks() + model.nAssignedTasks() } + +// Have we consumed all work that is ever going to be produced? +func (model Model) isAllWorkDone() bool { + return model.DispatcherDone && model.nFinishedTasks() > 0 && model.nTasksRemaining() == 0 +} + +// No live workers, some dead workers, and all dead workers have kill +// file (meaning that we intentionally asked them to self-destruct). +func (model Model) areAllWorkersQuiesced() bool { + return len(model.LiveWorkers) == 0 && + len(model.DeadWorkers) > 0 && + slices.IndexFunc(model.DeadWorkers, func(w Worker) bool { return !w.killfilePresent }) < 0 +} + +// Has some output been produced? +func (model Model) hasSomeOutputBeenProduced() bool { + return len(model.SuccessfulTasks)+len(model.FailedTasks) > 0 +} + +func (model Model) isAllOutputConsumed() bool { + return model.hasSomeOutputBeenProduced() && model.nFinishedTasks() == model.nConsumedTasks() +} + +// Is everything well and done: dispatcher, workers, us? +func (model Model) readyToBye() bool { + return model.isAllWorkDone() && model.areAllWorkersQuiesced() && model.isAllOutputConsumed() +} diff --git a/pkg/runtime/workstealer/run.go b/pkg/runtime/workstealer/run.go index 52697b24..5fb28fb8 100644 --- a/pkg/runtime/workstealer/run.go +++ b/pkg/runtime/workstealer/run.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" q "lunchpail.io/pkg/runtime/queue" @@ -50,7 +51,7 @@ func Run(ctx context.Context) error { } defer s3.StopListening(s3.Paths.Bucket) - o, errs := s3.Listen(s3.Paths.Bucket, "", "") + o, errs := s3.Listen(s3.Paths.Bucket, "", "", true) for { select { case err := <-errs: @@ -58,10 +59,16 @@ func Run(ctx context.Context) error { // sleep for a bit time.Sleep(s) - case <-o: + case obj := <-o: + // TODO update model incrementally rather than + // re-fetching and re-parsing the entire model + // every time there is a single change + if strings.Contains(obj, "/logs/") { + continue + } } - // fetch model + // fetch and parse model m := c.fetchModel() if err := m.report(c); err != nil { diff --git a/tests/bin/helpers.sh b/tests/bin/helpers.sh index e8de8ab1..adeaf7f4 100644 --- a/tests/bin/helpers.sh +++ b/tests/bin/helpers.sh @@ -147,22 +147,37 @@ function waitForIt { function waitForEveryoneToDie { local run_name=$1 - waitForNoInstances $run_name workdispatcher - waitForNoInstances $run_name workerpool - waitForNoInstances $run_name workstealer - waitForNoInstances $run_name minio + waitForNInstances 0 $run_name workdispatcher + waitForNInstances 0 $run_name workerpool + + if [[ "$NUM_DESIRED_OUTPUTS:-1" != "0" ]] + then + # workstealer should not auto-self-destruct + waitForNInstances 1 $run_name workstealer + + # drain the output + $testapp queue drain --target ${LUNCHPAIL_TARGET:-kubernetes} --run $run_name + + # now the workstealer should self-destruct + waitForNInstances 0 $run_name workstealer + else + waitForNInstances 0 $run_name workstealer + fi + + waitForNInstances 0 $run_name minio } -function waitForNoInstances { - local run_name=$1 - local component=$2 - echo "Checking that no $component remain running for run=$run_name" +function waitForNInstances { + local N=$1 + local run_name=$2 + local component=$3 + echo "Checking that N=$N $component are running for run=$run_name" while true do nRunning=$($testapp status instances --run $run_name --target ${LUNCHPAIL_TARGET:-kubernetes} --component $component -n $ns) - if [[ $nRunning == 0 ]] - then echo "โœ… PASS test=$name no $component remain running" && break - else echo "$nRunning ${component}(s) remaining running" && sleep 2 + if [[ $nRunning == $N ]] + then echo "โœ… PASS test=$name n=$N $component remain running" && break + else echo "Waiting because $nRunning (expected $N) ${component}(s) running" && sleep 2 fi done } diff --git a/tests/bin/run.sh b/tests/bin/run.sh index 9625ea40..a732616a 100755 --- a/tests/bin/run.sh +++ b/tests/bin/run.sh @@ -77,7 +77,7 @@ then if [[ -e "$1"/target ]] then - if [[ $(cat "$1"/target) != "$LUNCHPAIL_TARGET" ]] + if [[ $(cat "$1"/target) != "${LUNCHPAIL_TARGET:-kubernetes}" ]] then echo "$(tput setaf 3)๐Ÿงช Skipping due to unsupported target $(basename $1)$(tput sgr0)" exit 0