Skip to content

Commit

Permalink
feat: queue drain, and workstealer self-destruct waits for full consu…
Browse files Browse the repository at this point in the history
…mption not full production

Signed-off-by: Nick Mitchell <nickm@us.ibm.com>
  • Loading branch information
starpit committed Oct 14, 2024
1 parent d8b2e0a commit db8b204
Show file tree
Hide file tree
Showing 15 changed files with 225 additions and 56 deletions.
6 changes: 6 additions & 0 deletions cmd/options/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ func AddRunOptions(cmd *cobra.Command) *RunOptions {
cmd.Flags().StringVarP(&options.Run, "run", "r", "", "Inspect the given run, defaulting to using the singleton run")
return &options
}

func AddRequiredRunOptions(cmd *cobra.Command) *RunOptions {
opts := AddRunOptions(cmd)
cmd.MarkFlagRequired("run")
return opts
}
41 changes: 41 additions & 0 deletions cmd/subcommands/queue/drain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//go:build full || deploy

package queue

import (
"context"

"github.com/spf13/cobra"

"lunchpail.io/cmd/options"
"lunchpail.io/pkg/be"
"lunchpail.io/pkg/runtime/queue"
)

func Drain() *cobra.Command {
cmd := &cobra.Command{
Use: "drain",
Short: "Drain the output tasks, allowing graceful termination",
Args: cobra.MatchAll(cobra.ExactArgs(0), cobra.OnlyValidArgs),
}

opts, err := options.RestoreBuildOptions()
if err != nil {
panic(err)
}

runOpts := options.AddRequiredRunOptions(cmd)
options.AddTargetOptionsTo(cmd, &opts)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
backend, err := be.New(ctx, opts)
if err != nil {
return err
}

return queue.Drain(ctx, backend, runOpts.Run)
}

return cmd
}
1 change: 1 addition & 0 deletions cmd/subcommands/queue_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func init() {

if build.IsBuilt() {
cmd.AddCommand(queue.Cat())
cmd.AddCommand(queue.Drain())
cmd.AddCommand(queue.Last())
cmd.AddCommand(queue.Ls())
cmd.AddCommand(queue.Stat())
Expand Down
9 changes: 7 additions & 2 deletions pkg/runtime/builtins/redirect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func RedirectTo(ctx context.Context, client queue.S3Client, folderFor func(object string) string, opts build.LogOptions) error {
objects, errs := client.Listen(client.Paths.Bucket, client.Outbox(), "")
objects, errs := client.Listen(client.Paths.Bucket, client.Outbox(), "", false)
group, _ := errgroup.WithContext(ctx)
done := false

Expand All @@ -41,7 +41,12 @@ func RedirectTo(ctx context.Context, client queue.S3Client, folderFor func(objec
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Downloading output to %s\n", dst)
}
group.Go(func() error { return client.Download(client.Paths.Bucket, object, dst) })
group.Go(func() error {
if err := client.Download(client.Paths.Bucket, object, dst); err != nil {
return err
}
return client.MarkConsumed(object)
})
}

for !done {
Expand Down
29 changes: 29 additions & 0 deletions pkg/runtime/queue/drain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package queue

import (
"context"

"golang.org/x/sync/errgroup"

"lunchpail.io/pkg/be"
)

// Drain the output tasks, allowing graceful termination
func Drain(ctx context.Context, backend be.Backend, runname string) error {
c, err := NewS3ClientForRun(ctx, backend, runname)
if err != nil {
return err
}
defer c.Stop()

group, _ := errgroup.WithContext(ctx)
for o := range c.ListObjects(c.Paths.Bucket, c.finishedMarkers(), true) {
if o.Err != nil {
return o.Err
}

group.Go(func() error { return c.MarkConsumed(o.Key) })
}

return group.Wait()
}
8 changes: 8 additions & 0 deletions pkg/runtime/queue/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,11 @@ func pathsFor(queuePrefixPath string) (filepaths, error) {
func (c S3Client) Outbox() string {
return filepath.Join(c.Paths.PoolPrefix, c.Paths.Outbox)
}

func (c S3Client) finishedMarkers() string {
return filepath.Join(c.Paths.PoolPrefix, "finished")
}

func (c S3Client) ConsumedMarker(task string) string {
return filepath.Join(c.Paths.PoolPrefix, "consumed", filepath.Base(task))
}
4 changes: 4 additions & 0 deletions pkg/runtime/queue/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (s3 S3Client) Rm(bucket, filePath string) error {
return s3.client.RemoveObject(s3.context, bucket, filePath, minio.RemoveObjectOptions{})
}

func (s3 S3Client) MarkConsumed(filePath string) error {
return s3.Mark(s3.Paths.Bucket, s3.ConsumedMarker(filePath), "consumed")
}

func (s3 S3Client) Mark(bucket, filePath, marker string) error {
_, err := s3.client.PutObject(s3.context, bucket, filePath, strings.NewReader(marker), int64(len(marker)), minio.PutObjectOptions{})
return err
Expand Down
27 changes: 20 additions & 7 deletions pkg/runtime/queue/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/minio/minio-go/v7"
Expand Down Expand Up @@ -46,14 +47,19 @@ func (s3 S3Client) WaitTillExists(bucket, object string) error {
return s3.WaitForEvent(bucket, object, "s3:ObjectCreated:*")
}

func (s3 S3Client) Listen(bucket, prefix, suffix string) (<-chan string, <-chan error) {
func (s3 S3Client) Listen(bucket, prefix, suffix string, includeDeletions bool) (<-chan string, <-chan error) {
c := make(chan string)
e := make(chan error)

os := make(map[string]bool)
report := func(key string) {
if !os[key] {
os[key] = true
report := func(key string, isCreate bool) {
if isCreate {
if !os[key] {
os[key] = true
c <- key
}
} else {
delete(os, key)
c <- key
}
}
Expand All @@ -62,7 +68,7 @@ func (s3 S3Client) Listen(bucket, prefix, suffix string) (<-chan string, <-chan
if o.Err != nil {
e <- o.Err
} else {
report(o.Key)
report(o.Key, true)
}
}
}
Expand All @@ -72,17 +78,24 @@ func (s3 S3Client) Listen(bucket, prefix, suffix string) (<-chan string, <-chan
defer close(e)
once()
already := false
for n := range s3.client.ListenBucketNotification(s3.context, bucket, prefix, suffix, []string{"s3:ObjectCreated:*"}) {

events := []string{"s3:ObjectCreated:*"}
if includeDeletions {
events = append(events, "s3:ObjectRemoved:*")
}

for n := range s3.client.ListenBucketNotification(s3.context, bucket, prefix, suffix, events) {
if n.Err != nil {
e <- n.Err
continue
}

if !already {
already = true
once()
}
for _, r := range n.Records {
report(r.S3.Object.Key)
report(r.S3.Object.Key, strings.HasPrefix(r.EventName, "s3:ObjectCreated:"))
}
}
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/worker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func startWatch(ctx context.Context, handler []string, client queue.S3Client, de
return err
}

tasks, errs := client.Listen(client.Paths.Bucket, prefix, "")
tasks, errs := client.Listen(client.Paths.Bucket, prefix, "", false)
for {
if killfileExists(client, bucket, prefix) {
break
Expand Down
45 changes: 27 additions & 18 deletions pkg/runtime/workstealer/assess.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ func (c client) reportChangedFile(filepath string) error {
return c.s3.Upload(c.s3.Paths.Bucket, filepath, remotefile)
}

// Has a Task already been marked as completed?
func (c client) isMarkedDone(task string) bool {
finishedMarker := filepath.Join(finished, task)
p := c.localPathToRemote(finishedMarker)
return c.s3.Exists(c.s3.Paths.Bucket, filepath.Dir(p), filepath.Base(p))
}

// A Task has been fully completed by a Worker
func (c client) markDone(task string) error {
finishedMarker := filepath.Join(finished, task)
Expand Down Expand Up @@ -152,11 +159,15 @@ func (c client) cleanupForDeadWorker(worker Worker) error {

// A Task has completed
func (c client) cleanupForCompletedTask(completedTask AssignedTask, success TaskCode) error {
if err := c.markDone(completedTask.task); err != nil {
return err
if !c.isMarkedDone(completedTask.task) {
if err := c.markDone(completedTask.task); err != nil {
return err
}

return c.copyToFinalOutbox(completedTask.task, completedTask.worker, success)
}

return c.copyToFinalOutbox(completedTask.task, completedTask.worker, success)
return nil
}

type Apportionment struct {
Expand Down Expand Up @@ -294,35 +305,33 @@ func (c client) rebalance(model Model) bool {
return false
}

// If the dispatcher is done and there are no more outstanding tasks,
// then touch kill files in the worker inboxes.
// Touch kill files in the worker inboxes.
func (c client) touchKillFiles(model Model) {
if model.DispatcherDone && model.nFinishedTasks() > 0 && model.nTasksRemaining() == 0 {
for _, worker := range model.LiveWorkers {
if !worker.killfilePresent {
if err := c.touchKillFile(worker); err != nil {
log.Fatalf(err.Error())
}
for _, worker := range model.LiveWorkers {
if !worker.killfilePresent {
if err := c.touchKillFile(worker); err != nil {
log.Fatalf(err.Error())
}
}
}
}

// Is everything well and done: dispatcher, workers, us?
func (c client) readyToBye(model Model) bool {
return model.DispatcherDone && model.nFinishedTasks() > 0 && model.nTasksRemaining() == 0 && len(model.LiveWorkers) == 0
}

// Assess and potentially update queue state. Return true when we are all done.
func (c client) assess(m Model) bool {
c.cleanupCompletedTasks(m)

if c.readyToBye(m) {
if m.readyToBye() {
fmt.Fprintln(os.Stderr, "INFO All work has been completed, all workers have terminated")
return true
} else if !c.rebalance(m) {
c.assignNewTasks(m)
c.touchKillFiles(m)

if m.isAllWorkDone() {
// If the dispatcher is done and there are no more outstanding tasks,
// then touch kill files in the worker inboxes.
c.touchKillFiles(m)
}

c.reassignDeadWorkerTasks(m)
}

Expand Down
15 changes: 6 additions & 9 deletions pkg/runtime/workstealer/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type WhatChanged int
const (
UnassignedTask WhatChanged = iota
DispatcherDone
ConsumedTask
FinishedTask

LiveWorker
Expand All @@ -38,6 +39,7 @@ const (

var unassignedTaskPattern = regexp.MustCompile("^inbox/(.+)$")
var dispatcherDonePattern = regexp.MustCompile("^done$")
var consumedTaskPattern = regexp.MustCompile("^consumed/(.+)$")
var finishedTaskPattern = regexp.MustCompile("^finished/(.+)$")
var liveWorkerPattern = regexp.MustCompile("^queues/(.+)/inbox/[.]alive$")
var deadWorkerPattern = regexp.MustCompile("^queues/(.+)/inbox/[.]dead$")
Expand All @@ -52,6 +54,8 @@ func whatChanged(line string) (WhatChanged, string, string) {
return UnassignedTask, match[1], ""
} else if match := dispatcherDonePattern.FindStringSubmatch(line); len(match) == 1 {
return DispatcherDone, "", ""
} else if match := consumedTaskPattern.FindStringSubmatch(line); len(match) == 2 {
return ConsumedTask, match[1], ""
} else if match := finishedTaskPattern.FindStringSubmatch(line); len(match) == 2 {
return FinishedTask, match[1], ""
} else if match := liveWorkerPattern.FindStringSubmatch(line); len(match) == 2 {
Expand All @@ -77,21 +81,15 @@ func whatChanged(line string) (WhatChanged, string, string) {

// We will be passed a stream of diffs
func (m *Model) update(filepath string, workersLookup map[string]Worker) {
/*unassignedTasks := []string{}
dispatcherDone := false
finishedTasks := []string{}
assignedTasks := []AssignedTask{}
processingTasks := []AssignedTask{}
successfulTasks := []AssignedTask{}
failedTasks := []AssignedTask{}*/

what, thing, thing2 := whatChanged(filepath)

switch what {
case UnassignedTask:
m.UnassignedTasks = append(m.UnassignedTasks, thing)
case DispatcherDone:
m.DispatcherDone = true
case ConsumedTask:
m.ConsumedTasks = append(m.ConsumedTasks, thing)
case FinishedTask:
m.FinishedTasks = append(m.FinishedTasks, thing)
case LiveWorker:
Expand Down Expand Up @@ -140,7 +138,6 @@ func (m *Model) update(filepath string, workersLookup map[string]Worker) {
fmt.Fprintf(os.Stderr, "ERROR unable to find worker=%s\n", thing)
}
}
//eturn Model{dispatcherDone, unassignedTasks, finishedTasks, liveWorkers, deadWorkers, assignedTasks, processingTasks, successfulTasks, failedTasks}
}

func (m *Model) finishUp(workersLookup map[string]Worker) {
Expand Down
Loading

0 comments on commit db8b204

Please sign in to comment.