Skip to content

Commit

Permalink
added queue functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
kriyanshii committed Oct 6, 2024
1 parent 05188f4 commit 0d50f9d
Show file tree
Hide file tree
Showing 28 changed files with 10,763 additions and 1,038 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@
"files.associations": {
"*.gohtml": "html"
},
"cSpell.words": [
"dagu"
],
}
5 changes: 5 additions & 0 deletions cmd/dry.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,20 @@ func dryCmd() *cobra.Command {

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, agentLogger)
queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

agt := agent.New(
requestID,
workflow,
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,
cli,
dataStore,
queueStore,
statsStore,
&agent.Options{Dry: true})

ctx := cmd.Context()
Expand Down
11 changes: 11 additions & 0 deletions cmd/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,27 @@ import (
"github.com/dagu-org/dagu/internal/logger"
"github.com/dagu-org/dagu/internal/persistence"
dsclient "github.com/dagu-org/dagu/internal/persistence/client"
"github.com/dagu-org/dagu/internal/persistence/queue"
"github.com/dagu-org/dagu/internal/persistence/stats"
)

func newClient(cfg *config.Config, ds persistence.DataStores, lg logger.Logger) client.Client {
return client.New(ds, cfg.Executable, cfg.WorkDir, lg)
}

func newQueueStore(cfg *config.Config) persistence.QueueStore {
return queue.NewQueueStore(cfg.QueueDir)
}
func newStatsStore(cfg *config.Config) persistence.StatsStore {
return stats.NewStatsStore(cfg.StatsDir)
}

func newDataStores(cfg *config.Config) persistence.DataStores {
return dsclient.NewDataStores(
cfg.DAGs,
cfg.DataDir,
cfg.QueueDir,
cfg.StatsDir,
cfg.SuspendFlagsDir,
dsclient.DataStoreOptions{
LatestStatusToday: cfg.LatestStatusToday,
Expand Down
6 changes: 6 additions & 0 deletions cmd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func restartCmd() *cobra.Command {

dataStore := newDataStores(cfg)
cli := newClient(cfg, dataStore, initLogger)
queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil {
initLogger.Fatal("Workflow stop operation failed",
Expand Down Expand Up @@ -126,8 +128,12 @@ func restartCmd() *cobra.Command {
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,

newClient(cfg, dataStore, agentLogger),
dataStore,
queueStore,
statsStore,
&agent.Options{Dry: false})

listenSignals(cmd.Context(), agt)
Expand Down
5 changes: 5 additions & 0 deletions cmd/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func retryCmd() *cobra.Command {
})

cli := newClient(cfg, dataStore, agentLogger)
queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

agentLogger.Info("Workflow retry initiated",
"workflow", workflow.Name,
Expand All @@ -118,8 +120,11 @@ func retryCmd() *cobra.Command {
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,
cli,
dataStore,
queueStore,
statsStore,
&agent.Options{RetryTarget: status.Status},
)

Expand Down
16 changes: 14 additions & 2 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func startCmd() *cobra.Command {
if err != nil {
log.Fatalf("Flag retrieval failed (quiet): %v", err)
}
waiting, err := cmd.Flags().GetBool("waiting")
if err != nil {
log.Fatalf("Flag retrieval failed (waiting): %v", err)
}

initLogger := logger.NewLogger(logger.NewLoggerArgs{
Debug: cfg.Debug,
Expand Down Expand Up @@ -90,6 +94,10 @@ func startCmd() *cobra.Command {
})

dataStore := newDataStores(cfg)

queueStore := newQueueStore(cfg)
statsStore := newStatsStore(cfg)

cli := newClient(cfg, dataStore, agentLogger)

agentLogger.Info("Workflow execution initiated",
Expand All @@ -103,10 +111,12 @@ func startCmd() *cobra.Command {
agentLogger,
filepath.Dir(logFile.Name()),
logFile.Name(),
cfg.DAGQueueLength,
cli,
dataStore,
&agent.Options{})

queueStore,
statsStore,
&agent.Options{FromWaitingQueue: waiting})
ctx := cmd.Context()

listenSignals(ctx, agt)
Expand All @@ -122,6 +132,8 @@ func startCmd() *cobra.Command {

cmd.Flags().StringP("params", "p", "", "parameters")
cmd.Flags().BoolP("quiet", "q", false, "suppress output")
cmd.Flags().BoolP("waiting", "w", false, "from waiting queue")

return cmd
}

Expand Down
Loading

0 comments on commit 0d50f9d

Please sign in to comment.