Skip to content

Commit

Permalink
Separate client from observer
Browse files Browse the repository at this point in the history
Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin committed Sep 14, 2020
1 parent 52eebcf commit d1b0aa2
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 92 deletions.
20 changes: 9 additions & 11 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,19 @@ import (
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
gocoap "github.com/plgd-dev/go-coap/v2"

logger "github.com/mainflux/mainflux/logger"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
broker "github.com/nats-io/nats.go"
opentracing "github.com/opentracing/opentracing-go"
gocoap "github.com/plgd-dev/go-coap/v2"
stdprometheus "github.com/prometheus/client_golang/prometheus"

jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

const (
defPort = "5688"
defPort = "5683"
defNatsURL = "nats://localhost:4222"
defLogLevel = "error"
defClientTLS = "false"
Expand Down Expand Up @@ -81,14 +80,13 @@ func main() {

tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

// pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
// if err != nil {
// logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
// os.Exit(1)
// }
// defer pubSub.Close()
nc, err := broker.Connect(cfg.natsURL)
if err != nil {
log.Fatalf(err.Error())
}
defer nc.Close()

svc := coap.New(tc, nil)
svc := coap.New(tc, nc)

svc = api.LoggingMiddleware(svc, logger)

Expand Down
66 changes: 24 additions & 42 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// Mainflux CoAP adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap

Expand Down Expand Up @@ -34,7 +34,7 @@ type Service interface {

// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(ctx context.Context, key, chanID, subtopic string, h Handler) error
Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error

// Unsubscribe method is used to stop observing resource.
Unsubscribe(ctx context.Context, key, chanID, subptopic, token string) error
Expand All @@ -44,31 +44,20 @@ var _ Service = (*adapterService)(nil)

// Observers is a map of maps,
type adapterService struct {
auth mainflux.ThingsServiceClient
conn *broker.Conn
handlers map[string]handlers
obsLock sync.RWMutex
auth mainflux.ThingsServiceClient
conn *broker.Conn
observers map[string]observers
obsLock sync.RWMutex
}

// New instantiates the CoAP adapter implementation.
func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service {
as := &adapterService{
auth: auth,
conn: nc,
handlers: make(map[string]handlers),
obsLock: sync.RWMutex{},
}

// go func() {
// for {
// time.Sleep(time.Second * 5)
// fmt.Println("testing size... ")
// as.obsLock.RLock()
// fmt.Println(as.observers)
// fmt.Println("Number of goroutines", runtime.NumGoroutine())
// as.obsLock.RUnlock()
// }
// }()
auth: auth,
conn: nc,
observers: make(map[string]observers),
obsLock: sync.RWMutex{},
}

return as
}
Expand Down Expand Up @@ -97,7 +86,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg messagin
return svc.conn.Publish(subject, data)
}

func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, h Handler) error {
func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
ar := &mainflux.AccessByKeyReq{
Token: key,
ChanID: chanID,
Expand All @@ -113,23 +102,16 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic
}

go func() {
<-h.Done()
svc.remove(subject, h.Token())
<-c.Done()
svc.remove(subject, c.Token())
}()

sub, err := svc.conn.Subscribe(subject, func(m *broker.Msg) {
var msg messaging.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
return
}
if err := h.Handle(msg); err != nil {
}
})
obs, err := NewObserver(subject, c, svc.conn)
if err != nil {
c.Cancel()
return err
}
h.Sub(sub)
return svc.put(subject, h.Token(), h)
return svc.put(subject, c.Token(), obs)
}

func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error {
Expand All @@ -149,15 +131,15 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi
return svc.remove(subject, token)
}

func (svc *adapterService) put(endpoint, token string, h Handler) error {
func (svc *adapterService) put(endpoint, token string, o Observer) error {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

obs, ok := svc.handlers[endpoint]
obs, ok := svc.observers[endpoint]
// If there are no observers, create map and assign it to the endpoint.
if !ok {
obs = handlers{token: h}
svc.handlers[endpoint] = obs
obs = observers{token: o}
svc.observers[endpoint] = obs
return nil
}
// If observer exists, cancel subscription and replace it.
Expand All @@ -166,15 +148,15 @@ func (svc *adapterService) put(endpoint, token string, h Handler) error {
return errors.Wrap(ErrUnsubscribe, err)
}
}
obs[token] = h
obs[token] = o
return nil
}

func (svc *adapterService) remove(endpoint, token string) error {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

obs, ok := svc.handlers[endpoint]
obs, ok := svc.observers[endpoint]
if !ok {
return nil
}
Expand All @@ -186,7 +168,7 @@ func (svc *adapterService) remove(endpoint, token string) error {
delete(obs, token)
// If there are no observers left for the endpint, remove the map.
if len(obs) == 0 {
delete(svc.handlers, endpoint)
delete(svc.observers, endpoint)
}
return nil
}
6 changes: 3 additions & 3 deletions coap/api/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,21 @@ func (lm *loggingMiddleware) Publish(ctx context.Context, key string, msg messag
return lm.svc.Publish(ctx, key, msg)
}

func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, o coap.Handler) (err error) {
func (lm *loggingMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, c coap.Client) (err error) {
defer func(begin time.Time) {
destChannel := chanID
if subtopic != "" {
destChannel = fmt.Sprintf("%s.%s", destChannel, subtopic)
}
message := fmt.Sprintf("Method subscribe to %s for client %s took %s to complete", destChannel, o.Token(), time.Since(begin))
message := fmt.Sprintf("Method subscribe to %s for client %s took %s to complete", destChannel, c.Token(), time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
}
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())

return lm.svc.Subscribe(ctx, key, chanID, subtopic, o)
return lm.svc.Subscribe(ctx, key, chanID, subtopic, c)
}

func (lm *loggingMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error {
Expand Down
4 changes: 2 additions & 2 deletions coap/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (mm *metricsMiddleware) Publish(ctx context.Context, key string, msg messag
return mm.svc.Publish(ctx, key, msg)
}

func (mm *metricsMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, o coap.Handler) error {
func (mm *metricsMiddleware) Subscribe(ctx context.Context, key, chanID, subtopic string, c coap.Client) error {
defer func(begin time.Time) {
mm.counter.With("method", "subscribe").Add(1)
mm.latency.With("method", "subscribe").Observe(time.Since(begin).Seconds())
}(time.Now())

return mm.svc.Subscribe(ctx, key, chanID, subtopic, o)
return mm.svc.Subscribe(ctx, key, chanID, subtopic, c)
}

func (mm *metricsMiddleware) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error {
Expand Down
4 changes: 2 additions & 2 deletions coap/api/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func handler(w mux.ResponseWriter, m *mux.Message) {
return
}
if obs == 0 {
h := coap.NewHandler(w.Client(), m.Token)
if err := service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, h); err != nil {
c := coap.NewClient(w.Client(), m.Token)
if err := service.Subscribe(context.Background(), key, msg.Channel, msg.Subtopic, c); err != nil {
switch {
case errors.Contains(err, coap.ErrUnauthorized):
resp.Code = codes.Unauthorized
Expand Down
54 changes: 22 additions & 32 deletions coap/handler.go → coap/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,55 @@ import (

"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
broker "github.com/nats-io/nats.go"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
mux "github.com/plgd-dev/go-coap/v2/mux"
)

// Handler wraps CoAP client.
type Handler interface {
Handle(m messaging.Message) error
// Client wraps CoAP client.
type Client interface {
SendMessage(m messaging.Message) error
Cancel() error
Done() <-chan struct{}
// In CoAP terminology similar to the Session ID.
Token() string
Sub(*broker.Subscription)
}

type handlers map[string]Handler
type observers map[string]Observer

// ErrOption indicates an error when adding an option.
var ErrOption = errors.New("unable to set option")

type handler struct {
client mux.Client
token message.Token
messages chan messaging.Message
sub *broker.Subscription
type client struct {
client mux.Client
token message.Token
}

// NewHandler instantiates a new Observer.
func NewHandler(client mux.Client, token message.Token) Handler {
return &handler{
client: client,
// NewClient instantiates a new Observer.
func NewClient(mc mux.Client, token message.Token) Client {
return &client{
client: mc,
token: token,
}
}

func (h *handler) Sub(s *broker.Subscription) {
h.sub = s
func (c *client) Done() <-chan struct{} {
return c.client.Context().Done()
}

func (h *handler) Done() <-chan struct{} {
return h.client.Context().Done()
func (c *client) Cancel() error {
return c.client.Close()
}

func (h *handler) Cancel() error {
if err := h.sub.Unsubscribe(); err != nil {
return err
}
return h.client.Close()
}

func (h *handler) Token() string {
return h.token.String()
func (c *client) Token() string {
return c.token.String()
}

func (h *handler) Handle(msg messaging.Message) error {
func (c *client) SendMessage(msg messaging.Message) error {
m := message.Message{
Code: codes.Content,
Token: h.token,
Context: h.client.Context(),
Token: c.token,
Context: c.client.Context(),
Body: bytes.NewReader(msg.Payload),
}
var opts message.Options
Expand All @@ -81,5 +71,5 @@ func (h *handler) Handle(msg messaging.Message) error {
return errors.Wrap(ErrOption, err)
}
m.Options = opts
return h.client.WriteMessage(&m)
return c.client.WriteMessage(&m)
}
47 changes: 47 additions & 0 deletions coap/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0

package coap

import (
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/messaging"
broker "github.com/nats-io/nats.go"
)

// Observer represents an internal observer used to handle CoAP observe messages.
type Observer interface {
Cancel() error
}

// NewObserver returns a new Observer instance.
func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) {
sub, err := conn.Subscribe(subject, func(m *broker.Msg) {
var msg messaging.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
return
}
if err := c.SendMessage(msg); err != nil {
}
})
if err != nil {
return nil, err
}
ret := &observer{
client: c,
sub: sub,
}
return ret, nil
}

type observer struct {
client Client
sub *broker.Subscription
}

func (o *observer) Cancel() error {
if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed {
return err
}
return o.client.Cancel()
}

0 comments on commit d1b0aa2

Please sign in to comment.