Skip to content

Commit

Permalink
MF-928 - Change CoAP lib (#1233)
Browse files Browse the repository at this point in the history
* Switch CoAP lib

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Revert removed adapter code

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* WIP CoAP refactor

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add auth key

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix observers map

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix reading message body

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix subtopic parsing

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix error handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix multi-protocol communication

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Separate client from observer

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Remove unused config

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Remove TCP option

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Inline error check

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add logging client errors

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Replace RWMutex since we're not using RLock

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Inline error handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Inline error handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
  • Loading branch information
dborovcanin authored Sep 22, 2020
1 parent f18f2c1 commit f10e49e
Show file tree
Hide file tree
Showing 535 changed files with 44,568 additions and 8,886 deletions.
35 changes: 10 additions & 25 deletions cmd/coap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
"syscall"
"time"

gocoap "github.com/dustin/go-coap"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/coap"
"github.com/mainflux/mainflux/coap/api"
logger "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging/nats"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
broker "github.com/nats-io/nats.go"
opentracing "github.com/opentracing/opentracing-go"
gocoap "github.com/plgd-dev/go-coap/v2"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
Expand All @@ -36,7 +36,6 @@ const (
defLogLevel = "error"
defClientTLS = "false"
defCACerts = ""
defPingPeriod = "12"
defJaegerURL = ""
defThingsAuthURL = "localhost:8181"
defThingsAuthTimeout = "1s"
Expand All @@ -46,7 +45,6 @@ const (
envLogLevel = "MF_COAP_ADAPTER_LOG_LEVEL"
envClientTLS = "MF_COAP_ADAPTER_CLIENT_TLS"
envCACerts = "MF_COAP_ADAPTER_CA_CERTS"
envPingPeriod = "MF_COAP_ADAPTER_PING_PERIOD"
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
Expand All @@ -58,7 +56,6 @@ type config struct {
logLevel string
clientTLS bool
caCerts string
pingPeriod time.Duration
jaegerURL string
thingsAuthURL string
thingsAuthTimeout time.Duration
Expand All @@ -78,17 +75,15 @@ func main() {
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()

cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
respChan := make(chan string, 10000)
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)

pubSub, err := nats.NewPubSub(cfg.natsURL, "", logger)
nc, err := broker.Connect(cfg.natsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s", err))
os.Exit(1)
log.Fatalf(err.Error())
}
defer pubSub.Close()
defer nc.Close()

svc := coap.New(pubSub, logger, cc, respChan)
svc := coap.New(tc, nc)

svc = api.LoggingMiddleware(svc, logger)

Expand All @@ -111,7 +106,7 @@ func main() {
errs := make(chan error, 2)

go startHTTPServer(cfg.port, logger, errs)
go startCOAPServer(cfg, svc, cc, respChan, logger, errs)
go startCOAPServer(cfg, svc, nil, logger, errs)

go func() {
c := make(chan os.Signal)
Expand All @@ -129,15 +124,6 @@ func loadConfig() config {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}

pp, err := strconv.ParseInt(mainflux.Env(envPingPeriod, defPingPeriod), 10, 64)
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envPingPeriod)
}

if pp < 1 || pp > 24 {
log.Fatalf("Value of %s must be between 1 and 24", envPingPeriod)
}

authTimeout, err := time.ParseDuration(mainflux.Env(envThingsAuthTimeout, defThingsAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
Expand All @@ -149,7 +135,6 @@ func loadConfig() config {
logLevel: mainflux.Env(envLogLevel, defLogLevel),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
pingPeriod: time.Duration(pp),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
thingsAuthURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL),
thingsAuthTimeout: authTimeout,
Expand Down Expand Up @@ -210,8 +195,8 @@ func startHTTPServer(port string, logger logger.Logger, errs chan error) {
errs <- http.ListenAndServe(p, api.MakeHTTPHandler())
}

func startCOAPServer(cfg config, svc coap.Service, auth mainflux.ThingsServiceClient, respChan chan<- string, l logger.Logger, errs chan error) {
func startCOAPServer(cfg config, svc coap.Service, auth mainflux.ThingsServiceClient, l logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", cfg.port)
l.Info(fmt.Sprintf("CoAP adapter service started, exposed port %s", cfg.port))
errs <- gocoap.ListenAndServe("udp", p, api.MakeCOAPHandler(svc, auth, l, respChan, cfg.pingPeriod))
errs <- gocoap.ListenAndServe("udp", p, api.MakeCoAPHandler(svc, l))
}
191 changes: 110 additions & 81 deletions coap/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,142 +2,171 @@
// SPDX-License-Identifier: Apache-2.0

// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// Mainflux CoAP adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap

import (
"context"
"fmt"
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/errors"
broker "github.com/nats-io/nats.go"

"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/messaging"
)

const (
chanID = "id"
keyHeader = "key"
const chansPrefix = "channels"

// AckRandomFactor is default ACK coefficient.
AckRandomFactor = 1.5
// AckTimeout is the amount of time to wait for a response.
AckTimeout = 2000 * time.Millisecond
// MaxRetransmit is the maximum number of times a message will be retransmitted.
MaxRetransmit = 4
// Exported errors
var (
ErrUnauthorized = errors.New("unauthorized access")
ErrUnsubscribe = errors.New("unable to unsubscribe")
)

// Service specifies coap service API.
// Service specifies CoAP service API.
type Service interface {
// Publish Messssage
Publish(msg messaging.Message) error
Publish(ctx context.Context, key string, msg messaging.Message) error

// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(chanID, subtopic, obsID string, obs *Observer) error
Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error

// Unsubscribe method is used to stop observing resource.
Unsubscribe(obsID string)
Unsubscribe(ctx context.Context, key, chanID, subptopic, token string) error
}

var _ Service = (*adapterService)(nil)

// Observers is a map of maps,
type adapterService struct {
auth mainflux.ThingsServiceClient
ps messaging.PubSub
log logger.Logger
obs map[string]*Observer
obsLock sync.Mutex
auth mainflux.ThingsServiceClient
conn *broker.Conn
observers map[string]observers
obsLock sync.Mutex
}

// New instantiates the CoAP adapter implementation.
func New(ps messaging.PubSub, log logger.Logger, auth mainflux.ThingsServiceClient, responses <-chan string) Service {
func New(auth mainflux.ThingsServiceClient, nc *broker.Conn) Service {
as := &adapterService{
auth: auth,
ps: ps,
log: log,
obs: make(map[string]*Observer),
obsLock: sync.Mutex{},
auth: auth,
conn: nc,
observers: make(map[string]observers),
obsLock: sync.Mutex{},
}

go as.listenResponses(responses)
return as
}

func (svc *adapterService) get(obsID string) (*Observer, bool) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

val, ok := svc.obs[obsID]
return val, ok
}
func (svc *adapterService) Publish(ctx context.Context, key string, msg messaging.Message) error {
ar := &mainflux.AccessByKeyReq{
Token: key,
ChanID: msg.Channel,
}
thid, err := svc.auth.CanAccessByKey(ctx, ar)
if err != nil {
return errors.Wrap(ErrUnauthorized, err)
}
msg.Publisher = thid.GetValue()

func (svc *adapterService) put(obsID string, o *Observer) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
data, err := proto.Marshal(&msg)
if err != nil {
return err
}

val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
subject := fmt.Sprintf("%s.%s", chansPrefix, msg.Channel)
if msg.Subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic)
}

svc.obs[obsID] = o
return svc.conn.Publish(subject, data)
}

func (svc *adapterService) remove(obsID string) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error {
ar := &mainflux.AccessByKeyReq{
Token: key,
ChanID: chanID,
}
if _, err := svc.auth.CanAccessByKey(ctx, ar); err != nil {
return errors.Wrap(ErrUnauthorized, err)
}

val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
delete(svc.obs, obsID)
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}
}

// ListenResponses method handles ACK messages received from client.
func (svc *adapterService) listenResponses(responses <-chan string) {
for {
id := <-responses
go func() {
<-c.Done()
svc.remove(subject, c.Token())
}()

val, ok := svc.get(id)
if ok {
val.StoreExpired(false)
}
obs, err := NewObserver(subject, c, svc.conn)
if err != nil {
c.Cancel()
return err
}
return svc.put(subject, c.Token(), obs)
}

func (svc *adapterService) Publish(msg messaging.Message) error {
return svc.ps.Publish(msg.Channel, msg)
}

func (svc *adapterService) Subscribe(chanID, subtopic, obsID string, o *Observer) error {
subject := chanID
func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error {
ar := &mainflux.AccessByKeyReq{
Token: key,
ChanID: chanID,
}
if _, err := svc.auth.CanAccessByKey(ctx, ar); err != nil {
return errors.Wrap(ErrUnauthorized, err)
}
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", chanID, subtopic)
subject = fmt.Sprintf("%s.%s", subject, subtopic)
}

err := svc.ps.Subscribe(subject, func(msg messaging.Message) error {
o.Messages <- msg
return svc.remove(subject, token)
}

func (svc *adapterService) put(endpoint, token string, o Observer) error {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

obs, ok := svc.observers[endpoint]
// If there are no observers, create map and assign it to the endpoint.
if !ok {
obs = observers{token: o}
svc.observers[endpoint] = obs
return nil
})
if err != nil {
return err
}

go func() {
<-o.Cancel
if err := svc.ps.Unsubscribe(subject); err != nil {
svc.log.Error(fmt.Sprintf("Failed to unsubscribe from %s.%s due to %s", chanID, subtopic, err))
// If observer exists, cancel subscription and replace it.
if sub, ok := obs[token]; ok {
if err := sub.Cancel(); err != nil {
return errors.Wrap(ErrUnsubscribe, err)
}
}()

// Put method removes Observer if already exists.
svc.put(obsID, o)
}
obs[token] = o
return nil
}

func (svc *adapterService) Unsubscribe(obsID string) {
svc.remove(obsID)
func (svc *adapterService) remove(endpoint, token string) error {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()

obs, ok := svc.observers[endpoint]
if !ok {
return nil
}
if current, ok := obs[token]; ok {
if err := current.Cancel(); err != nil {
return errors.Wrap(ErrUnsubscribe, err)
}
}
delete(obs, token)
// If there are no observers left for the endpint, remove the map.
if len(obs) == 0 {
delete(svc.observers, endpoint)
}
return nil
}
Loading

0 comments on commit f10e49e

Please sign in to comment.