Skip to content

Commit

Permalink
README + main.swift update
Browse files Browse the repository at this point in the history
  • Loading branch information
jdmcd committed Oct 10, 2020
1 parent fce2a24 commit 5f7de71
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 26 deletions.
67 changes: 62 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ QueuesProgress is a tiny CLI built to help you better understand what jobs are s
> get job:key_name
```

Just to get information about a specific job, this CLI can help you.
then this CLI can help you!

## Installation

Expand All @@ -18,14 +18,14 @@ To install, run the following commands:
git clone https://github.com/gotranseo/queues-progress
cd queues-progress
swift build -c release
mv .build/release/Run /usr/local/bin/queues-progress
queues-progress
mv .build/release/Run /usr/local/bin/queues
queues progress
```

## Usage

```
Usage: queues-progress [--host,-h] [--password,-p] [--queue,-q] [--pending,-s]
Usage: queues-progress progress [--host,-b] [--password,-p] [--queue,-q] [--pending,-s] [--key,-k]
Checks the progress of queue jobs.
Expand All @@ -34,4 +34,61 @@ Options:
password The password of the redis server
queue The queue to check (defaults to `default`)
pending Whether or not to check the pending queue. Defaults to `false` and checks the `processing` state
```
key A specific key to filter against
```

When running the CLI you have a view different options for displaying the data:

```
queues progress -b localhost
Data Return Type
1: Full Data
2: Full Data (Expanded Payload)
3: Job Type Overview
>
```

Full data will return all of the stored data points for the payload, formatted nicely:

```
job:6C6DE227-38BE-42C8-90F4-CA741A976264
Job Name: JobName
Queued At: 2020-10-01 08:00:15 +0000
Bytes: 4189
Max Retry Count: 0
Delay Until: N/A
```

Selecting the expanded payload option will also dump out the full JSON string of the data associated with the job.

```
------------------------
job:6C6DE227-38BE-42C8-90F4-CA741A976264
Job Name: JobName
Queued At: 2020-10-01 08:00:15 +0000
Bytes: 4189
Max Retry Count: 0
Delay Until: N/A
Payload Data: **Payload Data Here**
```

You can also view a breakdown of the counts of jobs in your processing queue:

```
JobOne: 1
JobTwo: 11
JobThree: 2
```

If you want to filter by a specific key, specify it using the `-k` flag:

`queues progress -b localhost -k key-name`

If you are using a different queue name than `default` you can specify that as well:

`queues progress -b localhost -q email-queue`

The default setting of the tool is to pull data from the `processing` queue. If you want to pull it from the pending list instead pass in the `-s` flag:

`queues progress -b localhost -s true`
64 changes: 51 additions & 13 deletions Sources/App/ProgressCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import Vapor

public struct ProgressCommand: Command {
public struct Signature: CommandSignature {
@Option(name: "host", short: "h", help: "The host of the redis server")
@Option(name: "host", short: "b", help: "The host of the redis server")
var host: String?

@Option(name: "password", short: "p", help: "The password of the redis server")
Expand All @@ -24,6 +24,9 @@ public struct ProgressCommand: Command {
@Option(name: "pending", short: "s", help: "Whether or not to check the pending queue. Defaults to `false` and checks the `processing` state")
var pending: Bool?

@Option(name: "key", short: "k", help: "A specific key to filter against")
var key: String?

public init() {}
}

Expand All @@ -37,29 +40,49 @@ public struct ProgressCommand: Command {

public init() { }

enum DataReturnType: String, CaseIterable {
case fullData = "Full Data"
case fullWithExpandedPayload = "Full Data (Expanded Payload)"
case overview = "Job Type Overview"
}

public func run(using context: CommandContext, signature: Signature) throws {
guard let host = signature.host else {
context.console.error("Please specify a host", newLine: true)
return
}

guard let password = signature.password else {
context.console.error("Please specify a password", newLine: true)
return
}

let config = try RedisConfiguration(hostname: host,
port: 6379,
password: password,
password: signature.password,
pool: .init(connectionRetryTimeout: .minutes(1)))
let redis = context.application.redis
redis.configuration = config

let redis = SimpleRedisClient(configuration: config, eventLoopGroup: MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount))
let keyName = "vapor_queues[\(signature.queue ?? "default")]\((signature.pending ?? false) ? "" : "-processing")"
let keys = try redis.lrange(from: .init(keyName), fromIndex: 0).wait().compactMap { $0.string }.map { "job:\($0)" }
let keys = try redis
.lrange(from: .init(keyName), fromIndex: 0)
.wait()
.compactMap { $0.string }
.filter {
if let filteredKey = signature.key {
return $0.lowercased() == filteredKey.lowercased()
}

return true
}
.map { "job:\($0)" }

context.console.success("\(keys.count) keys fetched from the \((signature.pending ?? false) ? "" : "processing ")queue")

let dataType = context.console.choose("Data Return Type", from: ["Full Data", "Job Type Overview"])
let dataTypeString = context.console.choose("Data Return Type", from: DataReturnType.allCases.map { $0.rawValue })
guard var dataType = DataReturnType(rawValue: dataTypeString) else { throw Abort(.internalServerError) }

if dataType == .fullWithExpandedPayload {
let confirm = context.console.confirm("Continue with expanded payload data? If you haven't added a filter this may overwhelm your terminal.")
if !confirm {
dataType = .fullData
}
}

let progressBar = context.console.loadingBar(title: "Loading Data")
progressBar.start()
Expand All @@ -77,7 +100,7 @@ public struct ProgressCommand: Command {
}

progressBar.succeed()
if dataType == "Full Data" {
if dataType == .fullData || dataType == .fullWithExpandedPayload {
for payload in dataReturned {
context.console.info("------------------------", newLine: true)
context.console.success(payload.key ?? "")
Expand All @@ -95,10 +118,15 @@ public struct ProgressCommand: Command {
} else {
context.console.output("N/A", style: .success)
}

if dataType == .fullWithExpandedPayload {
context.console.output(" Payload Data: ", style: .info, newLine: false)
context.console.output("\(String(bytes: payload.payload, encoding: .utf8)?.data(using: .utf8)?.prettyPrintedJSONString ?? "")", style: .success)
}
}
}

if dataType == "Job Type Overview" {
if dataType == .overview {
dataReturned.map { $0.jobName }.histogram.forEach {
context.console.output("\($0.key): ", style: .info, newLine: false)
context.console.output("\($0.value)", style: .success)
Expand All @@ -121,3 +149,13 @@ extension Sequence where Element: Hashable {
return self.reduce(into: [:]) { counts, elem in counts[elem, default: 0] += 1 }
}
}

extension Data {
var prettyPrintedJSONString: String? {
guard let object = try? JSONSerialization.jsonObject(with: self, options: []),
let data = try? JSONSerialization.data(withJSONObject: object, options: [.prettyPrinted]),
let prettyPrintedString = String(data: data, encoding: .utf8) else { return nil }

return prettyPrintedString
}
}
65 changes: 65 additions & 0 deletions Sources/App/SimpleRedisClient.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// File.swift
//
//
// Created by Jimmy McDermott on 10/10/20.
//

import Foundation
import Redis
import NIO
import Vapor

struct SimpleRedisClient: RedisClient {
let configuration: RedisConfiguration
let eventLoopGroup: EventLoopGroup

private struct PoolKey: StorageKey, LockKey {
typealias Value = [EventLoop.Key: RedisConnectionPool]
}

// must be event loop from this app's elg
internal func pool() -> RedisConnectionPool {
let logger = Logger(label: "queues.progress.logger")
return RedisConnectionPool(
serverConnectionAddresses: configuration.serverAddresses,
loop: eventLoop,
maximumConnectionCount: configuration.pool.maximumConnectionCount,
minimumConnectionCount: configuration.pool.minimumConnectionCount,
connectionPassword: configuration.password,
connectionLogger: logger,
connectionTCPClient: nil,
poolLogger: logger,
connectionBackoffFactor: configuration.pool.connectionBackoffFactor,
initialConnectionBackoffDelay: configuration.pool.initialConnectionBackoffDelay,
connectionRetryTimeout: configuration.pool.connectionRetryTimeout
)
}

func send(command: String, with arguments: [RESPValue]) -> EventLoopFuture<RESPValue> {
self.pool().send(command: command, with: arguments)
}

public var eventLoop: EventLoop {
self.eventLoopGroup.next()
}

public func logging(to logger: Logger) -> RedisClient {
self
}

func unsubscribe(from channels: [RedisChannelName]) -> EventLoopFuture<Void> {
eventLoop.future()
}

func punsubscribe(from patterns: [String]) -> EventLoopFuture<Void> {
eventLoop.future()
}
}

private extension EventLoop {
typealias Key = ObjectIdentifier
var key: Key {
ObjectIdentifier(self)
}
}
24 changes: 16 additions & 8 deletions Sources/Run/main.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import Vapor
import App
import ConsoleKit
import Foundation
import NIO

var env = try Environment.detect()
try LoggingSystem.bootstrap(from: &env)
let console: Console = Terminal()
var input = CommandInput(arguments: CommandLine.arguments)
var context = CommandContext(console: console, input: input)

let app = Application(env)
defer { app.shutdown() }
app.commands.defaultCommand = ProgressCommand()
app.commands.use(ProgressCommand(), as: "progress")
try app.run()
var commands = Commands(enableAutocomplete: false)
commands.use(ProgressCommand(), as: "progress", isDefault: true)

do {
let group = commands.group(help: "A CLI for viewing data on your Redis queue jobs.")
try console.run(group, input: input)
} catch let error {
console.error("\(error)")
exit(1)
}

0 comments on commit 5f7de71

Please sign in to comment.