Skip to content

Commit

Permalink
fix: first pass at cleaning up runtime/worker queue paths
Browse files Browse the repository at this point in the history
Alive and Dead and Bucket
plus some debug/verbose cleanups

Signed-off-by: Nick Mitchell <nickm@us.ibm.com>
  • Loading branch information
starpit committed Oct 17, 2024
1 parent 9cd1a0b commit 117205f
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 32 deletions.
14 changes: 13 additions & 1 deletion cmd/subcommands/component/worker/prestop.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,22 @@ func PreStop() *cobra.Command {
Long: "Mark this worker as dead",
}

bucket := ""
cmd.Flags().StringVar(&bucket, "bucket", "", "Which S3 bucket to use")
cmd.MarkFlagRequired("bucket")

alive := ""
cmd.Flags().StringVar(&alive, "alive", "", "Where to place our alive file")
cmd.MarkFlagRequired("alive")

dead := ""
cmd.Flags().StringVar(&dead, "dead", "", "Where to place our dead file")
cmd.MarkFlagRequired("dead")

logOpts := options.AddLogOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
return worker.PreStop(context.Background(), worker.Options(*logOpts))
return worker.PreStop(context.Background(), worker.Options{Bucket: bucket, Dead: dead, LogOptions: *logOpts})
}

return cmd
Expand Down
10 changes: 9 additions & 1 deletion cmd/subcommands/component/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ func Run() *cobra.Command {
Args: cobra.MatchAll(cobra.OnlyValidArgs),
}

bucket := ""
cmd.Flags().StringVar(&bucket, "bucket", "", "Which S3 bucket to use")
cmd.MarkFlagRequired("bucket")

alive := ""
cmd.Flags().StringVar(&alive, "alive", "", "Where to place our alive file")
cmd.MarkFlagRequired("alive")

logOpts := options.AddLogOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("Nothing to run. Specify the worker command line after a --: %v", args)
}

return worker.Run(context.Background(), args, worker.Options(*logOpts))
return worker.Run(context.Background(), args, worker.Options{Bucket: bucket, Alive: alive, LogOptions: *logOpts})
}

return cmd
Expand Down
8 changes: 8 additions & 0 deletions pkg/fe/transformer/api/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ func WorkerOutbox(base, worker, task string) string {
return filepath.Join(base, worker, "outbox", task)
}

func WorkerAlive(queueSpec queue.Spec, runname, poolname string) string {
return WorkerInbox(WorkerInboxPathBase(queueSpec, runname), filepath.Join(poolname, "$LUNCHPAIL_POD_NAME"), ".alive")
}

func WorkerDead(queueSpec queue.Spec, runname, poolname string) string {
return WorkerInbox(WorkerInboxPathBase(queueSpec, runname), filepath.Join(poolname, "$LUNCHPAIL_POD_NAME"), ".dead")
}

func AllDone(queueSpec queue.Spec, runname string) string {
return filepath.Join(QueuePrefixPath0(queueSpec, runname), "alldone")
}
15 changes: 13 additions & 2 deletions pkg/fe/transformer/api/workerpool/lower.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,19 @@ func Lower(buildName, runname string, app hlir.Application, pool hlir.WorkerPool
}
app.Spec.Env["LUNCHPAIL_STARTUP_DELAY"] = strconv.Itoa(startupDelay)

app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop" EXIT
$LUNCHPAIL_EXE component worker run --verbose=%v --debug=%v -- %s`, opts.Log.Verbose, opts.Log.Debug, app.Spec.Command)
app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop --verbose=%v --debug=%v --bucket %s --alive %s --dead %s" EXIT
$LUNCHPAIL_EXE component worker run --verbose=%v --debug=%v --bucket %s --alive %s -- %s`,
opts.Log.Verbose,
opts.Log.Debug,
ir.Queue.Bucket,
api.WorkerAlive(ir.Queue, runname, pool.Metadata.Name),
api.WorkerDead(ir.Queue, runname, pool.Metadata.Name),
opts.Log.Verbose,
opts.Log.Debug,
ir.Queue.Bucket,
api.WorkerAlive(ir.Queue, runname, pool.Metadata.Name),
app.Spec.Command,
)

return shell.LowerAsComponent(
buildName,
Expand Down
10 changes: 1 addition & 9 deletions pkg/runtime/queue/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ type filepaths struct {

// Dispatcher is done
Done string

// Worker is alive
Alive string

// Worker is dead
Dead string
}

func pathsForRun() (filepaths, error) {
Expand All @@ -42,10 +36,8 @@ func pathsFor(queuePrefixPath string) (filepaths, error) {
processing := "processing"
outbox := outboxFolder
done := filepath.Join(poolPrefix, "done")
alive := filepath.Join(prefix, inbox, ".alive")
dead := filepath.Join(prefix, inbox, ".dead")

return filepaths{bucket, poolPrefix, prefix, inbox, processing, outbox, done, alive, dead}, nil
return filepaths{bucket, poolPrefix, prefix, inbox, processing, outbox, done}, nil
}

func (c S3Client) Outbox() string {
Expand Down
7 changes: 6 additions & 1 deletion pkg/runtime/worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@ package worker

import "lunchpail.io/pkg/build"

type Options = build.LogOptions
type Options struct {
Bucket string
Alive string
Dead string
build.LogOptions
}
8 changes: 4 additions & 4 deletions pkg/runtime/worker/prestop.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ func PreStop(ctx context.Context, opts Options) error {
return err
}

if opts.Debug {
if opts.LogOptions.Debug {
fmt.Println("Marking worker as done...")
}

client.Rm(client.Paths.Bucket, client.Paths.Alive)
client.Touch(client.Paths.Bucket, client.Paths.Dead)
client.Rm(opts.Bucket, opts.Alive)
client.Touch(opts.Bucket, opts.Dead)

if opts.Verbose {
if opts.LogOptions.Verbose {
fmt.Printf("This worker is shutting down %s\n", os.Getenv("LUNCHPAIL_POD_NAME"))
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/runtime/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@ func delay() error {
return nil
}

func printenv() {
for _, e := range os.Environ() {
fmt.Fprintf(os.Stderr, "%v\n", e)
}
}

func Run(ctx context.Context, handler []string, opts Options) error {
if opts.Verbose {
if opts.LogOptions.Verbose {
fmt.Fprintf(os.Stderr, "Lunchpail worker starting up\n")
printenv()
}

if opts.Debug {
if opts.LogOptions.Debug {
// helpful for debugging
fmt.Fprintf(os.Stderr, "env=%v\n", os.Environ())
}
Expand All @@ -44,5 +51,5 @@ func Run(ctx context.Context, handler []string, opts Options) error {
return err
}

return startWatch(ctx, handler, client, opts.Debug)
return startWatch(ctx, handler, client, opts)
}
19 changes: 8 additions & 11 deletions pkg/runtime/worker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,20 @@ func killfileExists(client queue.S3Client, bucket, prefix string) bool {
return client.Exists(bucket, prefix, "kill")
}

func startWatch(ctx context.Context, handler []string, client queue.S3Client, debug bool) error {
bucket := client.Paths.Bucket
func startWatch(ctx context.Context, handler []string, client queue.S3Client, opts Options) error {
bucket := opts.Bucket
prefix := client.Paths.Prefix
inbox := client.Paths.Inbox
processing := client.Paths.Processing
outbox := client.Paths.Outbox
alive := client.Paths.Alive
alive := opts.Alive
// dead := client.Paths.dead

if debug {
fmt.Fprintf(os.Stderr, "Mkdirp bucket=%s\n", bucket)
}
if err := client.Mkdirp(bucket); err != nil {
return err
}

if debug {
if opts.LogOptions.Debug {
fmt.Fprintf(os.Stderr, "Lunchpail worker touching alive file bucket=%s path=%s\n", bucket, alive)
}
err := client.Touch(bucket, alive)
Expand All @@ -62,7 +59,7 @@ func startWatch(ctx context.Context, handler []string, client queue.S3Client, de

select {
case err := <-errs:
if debug {
if opts.LogOptions.Verbose {
fmt.Fprintln(os.Stderr, err)
}

Expand Down Expand Up @@ -186,7 +183,7 @@ func startWatch(ctx context.Context, handler []string, client queue.S3Client, de
}

if EC == 0 {
if debug {
if opts.LogOptions.Debug {
fmt.Printf("Worker succeeded on task %s\n", localprocessing)
}
err = client.Touch(bucket, succeeded)
Expand All @@ -202,7 +199,7 @@ func startWatch(ctx context.Context, handler []string, client queue.S3Client, de
}

if _, err := os.Stat(localoutbox); err == nil {
if debug {
if opts.LogOptions.Debug {
fmt.Printf("Uploading worker-produced outbox file %s->%s\n", localoutbox, out)
}
if err := client.Upload(bucket, localoutbox, out); err != nil {
Expand All @@ -215,7 +212,7 @@ func startWatch(ctx context.Context, handler []string, client queue.S3Client, de
}
}

if debug {
if opts.LogOptions.Verbose {
fmt.Println("Worker exiting normally")
}

Expand Down

0 comments on commit 117205f

Please sign in to comment.