diff --git a/cmd/coap/main.go b/cmd/coap/main.go index 767bd787133..57f6ccc4b48 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -19,20 +19,19 @@ import ( "github.com/mainflux/mainflux" "github.com/mainflux/mainflux/coap" "github.com/mainflux/mainflux/coap/api" - gocoap "github.com/plgd-dev/go-coap/v2" - logger "github.com/mainflux/mainflux/logger" thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" + broker "github.com/nats-io/nats.go" opentracing "github.com/opentracing/opentracing-go" + gocoap "github.com/plgd-dev/go-coap/v2" stdprometheus "github.com/prometheus/client_golang/prometheus" - jconfig "github.com/uber/jaeger-client-go/config" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ) const ( - defPort = "5688" + defPort = "5683" defNatsURL = "nats://localhost:4222" defLogLevel = "error" defClientTLS = "false" @@ -81,14 +80,13 @@ func main() { tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout) - // pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger) - // if err != nil { - // logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err)) - // os.Exit(1) - // } - // defer pubSub.Close() + nc, err := broker.Connect(cfg.natsURL) + if err != nil { + log.Fatalf(err.Error()) + } + defer nc.Close() - svc := coap.New(tc, nil) + svc := coap.New(tc, nc) svc = api.LoggingMiddleware(svc, logger) diff --git a/coap/adapter.go b/coap/adapter.go index 040102cc497..acafb209dd7 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 // Package coap contains the domain concept definitions needed to support -// Mainflux coap adapter service functionality. All constant values are taken +// Mainflux CoAP adapter service functionality. All constant values are taken // from RFC, and could be adjusted based on specific use case. package coap @@ -34,7 +34,7 @@ type Service interface { // Subscribes to channel with specified id, subtopic and adds subscription to // service map of subscriptions under given ID. - Subscribe(ctx context.Context, key, chanID, subtopic string, h Handler) error + Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error // Unsubscribe method is used to stop observing resource. Unsubscribe(ctx context.Context, key, chanID, subptopic, token string) error @@ -44,31 +44,20 @@ var _ Service = (*adapterService)(nil) // Observers is a map of maps, type adapterService struct { - auth mainflux.ThingsServiceClient - conn *broker.Conn - handlers map[string]handlers - obsLock sync.RWMutex + auth mainflux.ThingsServiceClient + conn *broker.Conn + observers map[string]observers + obsLock sync.RWMutex } // New instantiates the CoAP adapter implementation. func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service { as := &adapterService{ - auth: auth, - conn: nc, - handlers: make(map[string]handlers), - obsLock: sync.RWMutex{}, - } - - // go func() { - // for { - // time.Sleep(time.Second * 5) - // fmt.Println("testing size... ") - // as.obsLock.RLock() - // fmt.Println(as.observers) - // fmt.Println("Number of goroutines", runtime.NumGoroutine()) - // as.obsLock.RUnlock() - // } - // }() + auth: auth, + conn: nc, + observers: make(map[string]observers), + obsLock: sync.RWMutex{}, + } return as } @@ -97,7 +86,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin return svc.conn.Publish(subject, data) } -func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, h Handler) error { +func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error { ar := &mainflux.AccessByKeyReq{ Token: key, ChanID: chanID, @@ -113,23 +102,16 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic } go func() { - <-h.Done() - svc.remove(subject, h.Token()) + <-c.Done() + svc.remove(subject, c.Token()) }() - sub, err := svc.conn.Subscribe(subject, func(m *broker.Msg) { - var msg messaging.Message - if err := proto.Unmarshal(m.Data, &msg); err != nil { - return - } - if err := h.Handle(msg); err != nil { - } - }) + obs, err := NewObserver(subject, c, svc.conn) if err != nil { + c.Cancel() return err } - h.Sub(sub) - return svc.put(subject, h.Token(), h) + return svc.put(subject, c.Token(), obs) } func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { @@ -149,15 +131,15 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi return svc.remove(subject, token) } -func (svc *adapterService) put(endpoint, token string, h Handler) error { +func (svc *adapterService) put(endpoint, token string, o Observer) error { svc.obsLock.Lock() defer svc.obsLock.Unlock() - obs, ok := svc.handlers[endpoint] + obs, ok := svc.observers[endpoint] // If there are no observers, create map and assign it to the endpoint. if !ok { - obs = handlers{token: h} - svc.handlers[endpoint] = obs + obs = observers{token: o} + svc.observers[endpoint] = obs return nil } // If observer exists, cancel subscription and replace it. @@ -166,7 +148,7 @@ func (svc *adapterService) put(endpoint, token string, h Handler) error { return errors.Wrap(ErrUnsubscribe, err) } } - obs[token] = h + obs[token] = o return nil } @@ -174,7 +156,7 @@ func (svc *adapterService) remove(endpoint, token string) error { svc.obsLock.Lock() defer svc.obsLock.Unlock() - obs, ok := svc.handlers[endpoint] + obs, ok := svc.observers[endpoint] if !ok { return nil } @@ -186,7 +168,7 @@ func (svc *adapterService) remove(endpoint, token string) error { delete(obs, token) // If there are no observers left for the endpint, remove the map. if len(obs) == 0 { - delete(svc.handlers, endpoint) + delete(svc.observers, endpoint) } return nil } diff --git a/coap/api/logging.go b/coap/api/logging.go index 48cae79d207..3ba3b1c469d 100644 --- a/coap/api/logging.go +++ b/coap/api/logging.go @@ -44,13 +44,13 @@ func (lm *loggingMiddleware) Publish(ctx context.Context, key string, msg messag return lm.svc.Publish(ctx, key, msg) } -func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, o coap.Handler) (err error) { +func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, c coap.Client) (err error) { defer func(begin time.Time) { destChannel := chanID if subtopic != "" { destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic) } - message := fmt.Sprintf("Method subscribe to %s for client %s took %s to complete", destChannel, o.Token(), time.Since(begin)) + message := fmt.Sprintf("Method subscribe to %s for client %s took %s to complete", destChannel, c.Token(), time.Since(begin)) if err != nil { lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err)) return @@ -58,7 +58,7 @@ func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopi lm.logger.Info(fmt.Sprintf("%s without errors.", message)) }(time.Now()) - return lm.svc.Subscribe(ctx, key, chanID, subtopic, o) + return lm.svc.Subscribe(ctx, key, chanID, subtopic, c) } func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { diff --git a/coap/api/metrics.go b/coap/api/metrics.go index 7030747a7eb..47dae727c31 100644 --- a/coap/api/metrics.go +++ b/coap/api/metrics.go @@ -40,13 +40,13 @@ func (mm *metricsMiddleware) Publish(ctx context.Context, key string, msg messag return mm.svc.Publish(ctx, key, msg) } -func (mm *metricsMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, o coap.Handler) error { +func (mm *metricsMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, c coap.Client) error { defer func(begin time.Time) { mm.counter.With("method", "subscribe").Add(1) mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds()) }(time.Now()) - return mm.svc.Subscribe(ctx, key, chanID, subtopic, o) + return mm.svc.Subscribe(ctx, key, chanID, subtopic, c) } func (mm *metricsMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { diff --git a/coap/api/transport.go b/coap/api/transport.go index 7140f69d3fb..f7494bf82b5 100644 --- a/coap/api/transport.go +++ b/coap/api/transport.go @@ -98,8 +98,8 @@ func handler(w mux.ResponseWriter, m *mux.Message) { return } if obs == 0 { - h := coap.NewHandler(w.Client(), m.Token) - if err := service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, h); err != nil { + c := coap.NewClient(w.Client(), m.Token) + if err := service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c); err != nil { switch { case errors.Contains(err, coap.ErrUnauthorized): resp.Code = codes.Unauthorized diff --git a/coap/handler.go b/coap/client.go similarity index 50% rename from coap/handler.go rename to coap/client.go index e25013a1dbe..3123fcea816 100644 --- a/coap/handler.go +++ b/coap/client.go @@ -8,65 +8,55 @@ import ( "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" - broker "github.com/nats-io/nats.go" "github.com/plgd-dev/go-coap/v2/message" "github.com/plgd-dev/go-coap/v2/message/codes" mux "github.com/plgd-dev/go-coap/v2/mux" ) -// Handler wraps CoAP client. -type Handler interface { - Handle(m messaging.Message) error +// Client wraps CoAP client. +type Client interface { + SendMessage(m messaging.Message) error Cancel() error Done() <-chan struct{} + // In CoAP terminology similar to the Session ID. Token() string - Sub(*broker.Subscription) } -type handlers map[string]Handler +type observers map[string]Observer // ErrOption indicates an error when adding an option. var ErrOption = errors.New("unable to set option") -type handler struct { - client mux.Client - token message.Token - messages chan messaging.Message - sub *broker.Subscription +type client struct { + client mux.Client + token message.Token } -// NewHandler instantiates a new Observer. -func NewHandler(client mux.Client, token message.Token) Handler { - return &handler{ - client: client, +// NewClient instantiates a new Observer. +func NewClient(mc mux.Client, token message.Token) Client { + return &client{ + client: mc, token: token, } } -func (h *handler) Sub(s *broker.Subscription) { - h.sub = s +func (c *client) Done() <-chan struct{} { + return c.client.Context().Done() } -func (h *handler) Done() <-chan struct{} { - return h.client.Context().Done() +func (c *client) Cancel() error { + return c.client.Close() } -func (h *handler) Cancel() error { - if err := h.sub.Unsubscribe(); err != nil { - return err - } - return h.client.Close() -} - -func (h *handler) Token() string { - return h.token.String() +func (c *client) Token() string { + return c.token.String() } -func (h *handler) Handle(msg messaging.Message) error { +func (c *client) SendMessage(msg messaging.Message) error { m := message.Message{ Code: codes.Content, - Token: h.token, - Context: h.client.Context(), + Token: c.token, + Context: c.client.Context(), Body: bytes.NewReader(msg.Payload), } var opts message.Options @@ -81,5 +71,5 @@ func (h *handler) Handle(msg messaging.Message) error { return errors.Wrap(ErrOption, err) } m.Options = opts - return h.client.WriteMessage(&m) + return c.client.WriteMessage(&m) } diff --git a/coap/observer.go b/coap/observer.go new file mode 100644 index 00000000000..cd268268aeb --- /dev/null +++ b/coap/observer.go @@ -0,0 +1,47 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package coap + +import ( + "github.com/gogo/protobuf/proto" + "github.com/mainflux/mainflux/pkg/messaging" + broker "github.com/nats-io/nats.go" +) + +// Observer represents an internal observer used to handle CoAP observe messages. +type Observer interface { + Cancel() error +} + +// NewObserver returns a new Observer instance. +func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) { + sub, err := conn.Subscribe(subject, func(m *broker.Msg) { + var msg messaging.Message + if err := proto.Unmarshal(m.Data, &msg); err != nil { + return + } + if err := c.SendMessage(msg); err != nil { + } + }) + if err != nil { + return nil, err + } + ret := &observer{ + client: c, + sub: sub, + } + return ret, nil +} + +type observer struct { + client Client + sub *broker.Subscription +} + +func (o *observer) Cancel() error { + if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed { + return err + } + return o.client.Cancel() +}