Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: second pass at cleaning up runtime/worker queue paths #432

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cmd/subcommands/component/worker/prestop.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@ func PreStop() *cobra.Command {
logOpts := options.AddLogOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
return worker.PreStop(context.Background(), worker.Options{Bucket: bucket, Dead: dead, LogOptions: *logOpts})
return worker.PreStop(context.Background(), worker.Options{
LogOptions: *logOpts,
Queue: worker.Queue{
Bucket: bucket,
Dead: dead,
},
})
}

return cmd
Expand Down
25 changes: 22 additions & 3 deletions cmd/subcommands/component/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,41 @@ func Run() *cobra.Command {
Args: cobra.MatchAll(cobra.OnlyValidArgs),
}

bucket := ""
var bucket string
cmd.Flags().StringVar(&bucket, "bucket", "", "Which S3 bucket to use")
cmd.MarkFlagRequired("bucket")

alive := ""
var alive string
cmd.Flags().StringVar(&alive, "alive", "", "Where to place our alive file")
cmd.MarkFlagRequired("alive")

var listenPrefix string
cmd.Flags().StringVar(&listenPrefix, "listen-prefix", "", "Which S3 listen-prefix to use")
cmd.MarkFlagRequired("listen-prefix")

var pollingInterval int
cmd.Flags().IntVar(&pollingInterval, "polling-interval", 3, "If polling is employed, the interval between probes")

var startupDelay int
cmd.Flags().IntVar(&startupDelay, "delay", 0, "Delay (in seconds) before engaging in any work")

logOpts := options.AddLogOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("Nothing to run. Specify the worker command line after a --: %v", args)
}

return worker.Run(context.Background(), args, worker.Options{Bucket: bucket, Alive: alive, LogOptions: *logOpts})
return worker.Run(context.Background(), args, worker.Options{
StartupDelay: startupDelay,
PollingInterval: pollingInterval,
LogOptions: *logOpts,
Queue: worker.Queue{
ListenPrefix: listenPrefix,
Bucket: bucket,
Alive: alive,
},
})
}

return cmd
Expand Down
74 changes: 70 additions & 4 deletions pkg/fe/transformer/api/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,23 @@ func ExtractNamesFromSubPathForWorker(combo string) (poolName string, workerName
// Path in s3 to store the queue data for a particular worker in a
// particular pool for a particular run. Note how we need to defer the
// worker name until run time, which we do via a
// $LUNCHPAIL_WORKER_NAME env var.
func QueuePrefixPathForWorker(queueSpec queue.Spec, runname, poolName string) string {
// $LUNCHPAIL_POD_NAME env var.
func QueuePrefixPathForWorker0(queueSpec queue.Spec, runname, poolName string) string {
return filepath.Join(
QueuePrefixPath(queueSpec, runname),
QueuePrefixPath0(queueSpec, runname),
"queues",
QueueSubPathForWorker(poolName, "$LUNCHPAIL_WORKER_NAME"),
QueueSubPathForWorker(poolName, "$LUNCHPAIL_POD_NAME"),
)
}

// Path in s3 to store the queue data for a particular worker in a
// particular pool for a particular run. Note how we need to defer the
// worker name until run time, which we do via a
// $LUNCHPAIL_POD_NAME env var.
func QueuePrefixPathForWorker(queueSpec queue.Spec, runname, poolName string) string {
return filepath.Join(
queueSpec.Bucket,
QueuePrefixPathForWorker0(queueSpec, runname, poolName),
)
}

Expand Down Expand Up @@ -75,6 +86,16 @@ func WorkerInbox(base, worker, task string) string {
return filepath.Join(base, worker, "inbox", task)
}

// for Worker
func Inbox(base string) string {
return filepath.Join(base, "inbox")
}

// for Worker
func InboxTask(base, task string) string {
return filepath.Join(Inbox(base), task)
}

func WorkerProcessingPathBase(queueSpec queue.Spec, runname string) string {
return WorkerInboxPathBase(queueSpec, runname)
}
Expand All @@ -83,6 +104,16 @@ func WorkerProcessing(base, worker, task string) string {
return filepath.Join(base, worker, "processing", task)
}

// for Worker
func Processing(base string) string {
return filepath.Join(base, "processing")
}

// for Worker
func ProcessingTask(base, task string) string {
return filepath.Join(Processing(base), task)
}

func WorkerOutboxPathBase(queueSpec queue.Spec, runname string) string {
return WorkerInboxPathBase(queueSpec, runname)
}
Expand All @@ -91,6 +122,41 @@ func WorkerOutbox(base, worker, task string) string {
return filepath.Join(base, worker, "outbox", task)
}

// for Worker
func Outbox(base string) string {
return filepath.Join(base, "outbox")
}

// for Worker
func OutboxTask(base, task string) string {
return filepath.Join(Outbox(base), task)
}

// for Worker
func ExitCodeTask(base, task string) string {
return filepath.Join(Outbox(base), task+".code")
}

// for Worker
func SucceededTask(base, task string) string {
return filepath.Join(Outbox(base), task+".succeeded")
}

// for Worker
func FailedTask(base, task string) string {
return filepath.Join(Outbox(base), task+".failed")
}

// for Worker
func StdoutTask(base, task string) string {
return filepath.Join(Outbox(base), task+".stdout")
}

// for Worker
func StderrTask(base, task string) string {
return filepath.Join(Outbox(base), task+".stderr")
}

func WorkerAlive(queueSpec queue.Spec, runname, poolname string) string {
return WorkerInbox(WorkerInboxPathBase(queueSpec, runname), filepath.Join(poolname, "$LUNCHPAIL_POD_NAME"), ".alive")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/fe/transformer/api/workerpool/lower.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package workerpool

import (
"fmt"
"strconv"

"lunchpail.io/pkg/build"
"lunchpail.io/pkg/fe/transformer/api"
Expand All @@ -28,10 +27,9 @@ func Lower(buildName, runname string, app hlir.Application, pool hlir.WorkerPool
if app.Spec.Env == nil {
app.Spec.Env = make(map[string]string)
}
app.Spec.Env["LUNCHPAIL_STARTUP_DELAY"] = strconv.Itoa(startupDelay)

app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop --verbose=%v --debug=%v --bucket %s --alive %s --dead %s" EXIT
$LUNCHPAIL_EXE component worker run --verbose=%v --debug=%v --bucket %s --alive %s -- %s`,
$LUNCHPAIL_EXE component worker run --verbose=%v --debug=%v --bucket %s --alive %s --listen-prefix %s --delay %d -- %s`,
opts.Log.Verbose,
opts.Log.Debug,
ir.Queue.Bucket,
Expand All @@ -41,6 +39,8 @@ $LUNCHPAIL_EXE component worker run --verbose=%v --debug=%v --bucket %s --alive
opts.Log.Debug,
ir.Queue.Bucket,
api.WorkerAlive(ir.Queue, runname, pool.Metadata.Name),
api.QueuePrefixPathForWorker0(ir.Queue, runname, pool.Metadata.Name),
startupDelay,
app.Spec.Command,
)

Expand Down
13 changes: 10 additions & 3 deletions pkg/runtime/worker/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ package worker

import "lunchpail.io/pkg/build"

type Queue struct {
Bucket string
ListenPrefix string
Alive string
Dead string
}

type Options struct {
Bucket string
Alive string
Dead string
Queue
StartupDelay int
PollingInterval int
build.LogOptions
}
20 changes: 2 additions & 18 deletions pkg/runtime/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,6 @@ import (
"lunchpail.io/pkg/runtime/queue"
)

func delay() error {
startupDelayStr := os.Getenv("LUNCHPAIL_STARTUP_DELAY")
if startupDelayStr != "" {
delay, err := time.ParseDuration(startupDelayStr + "s")
if err != nil {
return err
}
if delay > 0 {
fmt.Fprintf(os.Stderr, "Delaying startup by %d seconds\n", delay)
time.Sleep(delay)
}
}

return nil
}

func printenv() {
for _, e := range os.Environ() {
fmt.Fprintf(os.Stderr, "%v\n", e)
Expand All @@ -42,8 +26,8 @@ func Run(ctx context.Context, handler []string, opts Options) error {
fmt.Fprintf(os.Stderr, "env=%v\n", os.Environ())
}

if err := delay(); err != nil {
return err
if opts.StartupDelay > 0 {
time.Sleep(time.Duration(opts.StartupDelay) * time.Second)
}

client, err := queue.NewS3Client(ctx)
Expand Down
Loading