Skip to content

Commit

Permalink
all: switch to using log/slog instead of log
Browse files Browse the repository at this point in the history
This lets us print structured errors and also introduce leveled logs.
  • Loading branch information
kevinburkesegment committed Apr 11, 2024
1 parent 79edaca commit 409d99f
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 35 deletions.
47 changes: 23 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package nsq

import (
"io"
"log"
"log/slog"
"net"
"net/http"
"strconv"
Expand Down Expand Up @@ -175,32 +175,32 @@ func (c *Consumer) run() {
defer ticker.Stop()

if err := c.pulse(); err != nil {
log.Print(err)
slog.Error("pulse error", "err", err)
}

for {
select {
case <-ticker.C:
if err := c.pulse(); err != nil {
log.Print(err)
slog.Error("pulse error", "err", err)
}

case <-c.done:
log.Println("Consumer initiating shutdown sequence")
slog.Info("Consumer initiating shutdown sequence")
// Send a CLS to all Cmd Channels for all connections
c.close()
log.Println("draining and re-queueing in-flight messages and awaiting connection waitgroup")
slog.Info("draining and re-queueing in-flight messages and awaiting connection waitgroup")
// Drain and re-queue any in-flight messages until all runConn routines return
c.drainAndJoinAwait()
// At this point all runConn routines have returned, therefore we know
// we won't be receiving any new messages from nsqd servers.
// But we potentially could have some messages in c.msgs
// We can safely close the c.msgs channel and requeue the remaining messages.
log.Println("draining and requeueing remaining in-flight messages")
slog.Info("draining and requeueing remaining in-flight messages")
// drain and requeue any remaining in-flight messages
close(c.msgs)
c.drainRemaining()
log.Println("closing and cleaning up connections")
slog.Info("closing and cleaning up connections")
// Cleanup remaining connections
c.mtx.Lock()
connCloseWg := sync.WaitGroup{}
Expand All @@ -219,28 +219,27 @@ func (c *Consumer) run() {
start := time.Now()
for len(cm.CmdChan) > 0 {
if time.Since(start) > c.drainTimeout {
log.Println("failed to drain CmdChan for connection, closing now")
slog.Info("failed to drain CmdChan for connection, closing now")
break
}
log.Println("waiting for write channel to flush any requeue commands")
slog.Info("waiting for write channel to flush any requeue commands")
time.Sleep(time.Millisecond * 500)
}
closeCommand(cm.CmdChan)
err := cm.Con.Close()
if err != nil {
log.Printf("error returned from connection close %+s", err.Error())
slog.Error("error closing connection", "err", err)
}
connCloseWg.Done()
}(cm)
}
c.mtx.Unlock()
success := c.await(&connCloseWg, c.drainTimeout)
if success {
log.Println("successfully flushed all connections")
if success := c.await(&connCloseWg, c.drainTimeout); success {
slog.Info("successfully flushed all connections")
} else {
log.Println("timed out awaiting connections flush and close")
slog.Warn("timed out awaiting connections flush and close")
}
log.Println("Consumer exiting run")
slog.Info("Consumer exiting run")
// Signal to the stop() function that orderly shutdown is complete
c.shutJoin.Done()
return
Expand Down Expand Up @@ -280,7 +279,7 @@ func (c *Consumer) drainAndJoinAwait() {
return
case m, ok := <-c.msgs:
if ok {
log.Printf("requeueing %+v\n", m.ID.String())
slog.Info("requeueing message", "msg", m.ID)
sendCommand(m.cmdChan, Req{MessageID: m.ID})
}
}
Expand All @@ -291,7 +290,7 @@ func (c *Consumer) drainAndJoinAwait() {
// channel and issues a REQ command for each.
func (c *Consumer) drainRemaining() {
for m := range c.msgs {
log.Printf("requeueing %+v\n", m.ID.String())
slog.Info("requeueing message", "msg", m.ID)
sendCommand(m.cmdChan, Req{MessageID: m.ID})
}
}
Expand Down Expand Up @@ -332,7 +331,7 @@ func (c *Consumer) pulse() (err error) {
cmdChan := make(chan Command, c.maxInFlight+2)
conn, err := c.getConn(addr)
if err != nil {
log.Printf("failed to connect to %s: %s", addr, err)
slog.Error("failed to connect", "addr", addr, "err", err)
continue
}
cm := connMeta{CmdChan: cmdChan, Con: conn}
Expand All @@ -348,7 +347,7 @@ func (c *Consumer) pulse() (err error) {
}

func (c *Consumer) close() {
log.Println("sending CLS to all command channels")
slog.Info("sending CLS to all command channels")
c.mtx.Lock()
for _, cm := range c.conns {
sendCommand(cm.CmdChan, Cls{})
Expand Down Expand Up @@ -403,7 +402,7 @@ func (c *Consumer) runConn(conn *Conn, addr string, cmdChan chan Command) {

if frame, err = conn.ReadFrame(); err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Print(err)
slog.Error("could not read frame", "err", err)
}
return
}
Expand All @@ -424,16 +423,16 @@ func (c *Consumer) runConn(conn *Conn, addr string, cmdChan chan Command) {
return

default:
log.Printf("closing connection after receiving an unexpected response from %s: %s", conn.RemoteAddr(), f)
slog.Error("closing connection after receiving an unexpected response", "remote_addr", conn.RemoteAddr(), "response", f)
return
}

case Error:
log.Printf("closing connection after receiving an error from %s: %s", conn.RemoteAddr(), f)
slog.Error("closing connection after receiving an error", "remote_addr", conn.RemoteAddr(), "response", f)
return

default:
log.Printf("closing connection after receiving an unsupported frame from %s: %s", conn.RemoteAddr(), f.FrameType())
slog.Error("closing connection after receiving an unsupported frame", "remote_addr", conn.RemoteAddr(), "response", f)
return
}
}
Expand All @@ -449,7 +448,7 @@ func (c *Consumer) writeConn(conn *Conn, cmdChan chan Command) {

for cmd := range cmdChan {
if err := c.writeConnCommand(conn, cmd); err != nil {
log.Print(err)
slog.Error("could not write command to channel", "cmd", cmd, "err", err)
return
}
}
Expand Down
36 changes: 29 additions & 7 deletions nsqlookup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/json"
"errors"
"io"
"log"
"log/slog"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -507,7 +507,11 @@ func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn) {
defer func() {
if node != nil {
err := node.Unregister(ctx)
log.Printf("UNREGISTER node = %s, err = %s", node, err)
if err != nil {
slog.Error("error unregistering node", "node", node, "err", err)
} else {
slog.Info("unregistered node", "node", node)
}
}
}()
defer close(resChan)
Expand Down Expand Up @@ -550,7 +554,7 @@ func (h TCPHandler) ServeConn(ctx context.Context, conn net.Conn) {
case Error:
res = e
default:
log.Print(err)
slog.Error("unknown command error", "cmd", cmd, "err", e)
return
}
}
Expand Down Expand Up @@ -582,7 +586,11 @@ func (h TCPHandler) identify(ctx context.Context, node Node, info NodeInfo, conn
res = RawResponse(b)
id, err = h.Engine.RegisterNode(ctx, info)

log.Printf("IDENTIFY node = %s, err = %v", info, err)
if err != nil {
slog.Error("identify node error", "node", info, "err", err)
} else {
slog.Info("identify node", "node", node)
}
return
}

Expand All @@ -591,11 +599,25 @@ func (h TCPHandler) ping(ctx context.Context, node Node) (res OK, err error) {
ctx, cancel := context.WithTimeout(ctx, h.EngineTimeout)
defer cancel()
err = node.Ping(ctx)
log.Printf("PING node = %s, err = %v", node, err)
if err != nil {
slog.Error("send ping error", "node", node, "err", err)
} else {
slog.Info("send ping", "node", node)
}
}
return
}

func infoOrErr(msg string, args ...any) {
for i, arg := range args {
if arg == "err" && args[i+1] != nil {
slog.Error(msg, args...)
return
}
}
slog.Info(msg, args...)
}

func (h TCPHandler) register(ctx context.Context, node Node, topic string, channel string) (res OK, err error) {
if node == nil {
err = errClientMustIdentify
Expand All @@ -616,7 +638,7 @@ func (h TCPHandler) register(ctx context.Context, node Node, topic string, chann
err = makeErrBadTopic("missing topic name")
}

log.Printf("REGISTER node = %s, topic = %s, channel = %s, err = %v", node, topic, channel, err)
infoOrErr("register", "node", node, "topic", topic, "channel", channel, "err", err)
return
}

Expand Down Expand Up @@ -644,7 +666,7 @@ func (h TCPHandler) unregister(ctx context.Context, node Node, topic string, cha
id = node
}

log.Printf("UNREGISTER node = %s, topic = %s, channel = %s, err = %v", node, topic, channel, err)
infoOrErr("unregister node", "node", node, "topic", topic, "channel", channel, "err", err)
return
}

Expand Down
8 changes: 4 additions & 4 deletions producer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package nsq

import (
"log"
"log/slog"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -240,15 +240,15 @@ func (p *Producer) run() {
}

if err != nil {
log.Printf("closing nsqd connection to %s: %s", p.address, err)
slog.Error("closing nsqd connection", "addr", p.address, "err", err)
}
}

connect := func() (err error) {
log.Printf("opening nsqd connection to %s", p.address)
slog.Info("opening nsqd connection", "addr", p.address)

if conn, err = DialTimeout(p.address, p.dialTimeout); err != nil {
log.Printf("failed to connect to nsqd at %s: %s", p.address, err)
slog.Error("failed to connect to nsqd", "addr", p.address, "err", err)
return
}

Expand Down

0 comments on commit 409d99f

Please sign in to comment.