Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gNMI Extension field parsing support #509

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/api/target/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,25 @@ func (t *Target) DecodeProtoBytes(resp *gnmi.SubscribeResponse) error {
return nil
}

func (t *Target) DecodeExtension(resp *gnmi.SubscribeResponse) error {
if t.ExtensionProtoMap == nil {
return nil
}
for _, extension := range resp.Extension {
m := dynamic.NewMessage(t.ExtensionProtoMap[int(extension.GetRegisteredExt().GetId().Number())])
err := m.Unmarshal(extension.GetRegisteredExt().GetMsg())
if err != nil {
return err
}
jsondata, err := m.MarshalJSON()
if err != nil {
return err
}
extension.GetRegisteredExt().Msg = jsondata
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's a good idea to overwrite the extension Msg with the jsondata here. It makes more sense to return the resulting json and pass it on to the outputs as metadata.

}
return nil
}

func (t *Target) DeleteSubscription(name string) {
t.m.Lock()
defer t.m.Unlock()
Expand Down
7 changes: 4 additions & 3 deletions pkg/api/target/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type Target struct {
subscribeResponses chan *SubscribeResponse
errors chan *TargetError
stopped bool
StopChan chan struct{} `json:"-"`
Cfn context.CancelFunc `json:"-"`
RootDesc desc.Descriptor `json:"-"`
StopChan chan struct{} `json:"-"`
Cfn context.CancelFunc `json:"-"`
RootDesc desc.Descriptor `json:"-"`
ExtensionProtoMap map[int]*desc.MessageDescriptor `json:"-"`
}

// NewTarget //
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/types/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ type TargetConfig struct {
GRPCKeepalive *clientKeepalive `mapstructure:"grpc-keepalive,omitempty" yaml:"grpc-keepalive,omitempty" json:"grpc-keepalive,omitempty"`

tlsConfig *tls.Config

RegisteredExtensions []*RegisteredExtension `mapstructure:"registered-extensions,omitempty" yaml:"registered-extensions,omitempty" json:"registered-extensions,omitempty"`
}

type RegisteredExtension struct {
Id int `mapstructure:"id,omitempty" yaml:"id,omitempty" json:"id,omitempty"`
MessageName string `mapstructure:"message-name,omitempty" yaml:"message-name,omitempty" json:"message-name,omitempty"`
ProtoFile string `mapstructure:"proto-file,omitempty" yaml:"proto-file,omitempty" json:"proto-file,omitempty"`
}

type clientKeepalive struct {
Expand Down
12 changes: 10 additions & 2 deletions pkg/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,22 @@ func (a *App) StartCollector(ctx context.Context) {
select {
case rsp := <-rspChan:
subscribeResponseReceivedCounter.WithLabelValues(t.Config.Name, rsp.SubscriptionConfig.Name).Add(1)
if a.Config.Debug {
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
// decode gNMI extensions
if extensions := rsp.Response.Extension; len(extensions) > 0 {
err := t.DecodeExtension(rsp.Response)
if err != nil {
a.Logger.Printf("target %q: failed to decode extension field: %v", t.Config.Name, err)
continue
}
}
err := t.DecodeProtoBytes(rsp.Response)
if err != nil {
a.Logger.Printf("target %q: failed to decode proto bytes: %v", t.Config.Name, err)
continue
}
Comment on lines +70 to 82
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't drop the subscribe response if we fail to decode the extension.

if a.Config.Debug {
a.Logger.Printf("target %q: gNMI Subscribe Response: %+v", t.Config.Name, rsp)
}
Comment on lines +83 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this back under the case statement to that the debug logging always happens regardless of decoding succeeding or not ?

m := outputs.Meta{
"source": t.Config.Name,
"format": a.Config.Format,
Expand Down
35 changes: 35 additions & 0 deletions pkg/app/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

"github.com/fullstorydev/grpcurl"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/desc/protoparse"

"github.com/openconfig/gnmic/pkg/api/target"
"github.com/openconfig/gnmic/pkg/api/types"
)
Expand All @@ -39,6 +42,10 @@ func (a *App) initTarget(tc *types.TargetConfig) (*target.Target, error) {
if err != nil {
return nil, err
}
err = a.parseExtensionProtos(t)
if err != nil {
return nil, err
}
a.Targets[t.Config.Name] = t
return t, nil
}
Expand Down Expand Up @@ -155,6 +162,34 @@ func (a *App) parseProtoFiles(t *target.Target) error {
return nil
}

// Dynamically parse (and load) protobuf files defined in config for specific extension IDs
func (a *App) parseExtensionProtos(t *target.Target) error {
parser := protoparse.Parser{}
extensionProtoMap := make(map[int]*desc.MessageDescriptor)
a.Logger.Printf("Target %q loading protofiles for gNMI extensions", t.Config.Name)
if len(t.Config.RegisteredExtensions) == 0 {
return nil
}
Comment on lines +170 to +172
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this to the beginning of the function

for _, extension := range t.Config.RegisteredExtensions {
descSources, err := parser.ParseFiles(extension.ProtoFile)
if err != nil {
a.Logger.Printf("target %q could not load protofile: %s: %v", t.Config.Name, extension.ProtoFile, err)
return err
}
// Only a single file is ever provided to ParseFiles, so we can just grab offset 0 from the returned slice
// Verify if the provided message exists in the proto and assign
if desc := descSources[0].FindMessage(extension.MessageName); desc != nil {
extensionProtoMap[extension.Id] = desc
} else {
a.Logger.Printf("target %q could not find message %q", t.Config.Name, extension.MessageName)
return fmt.Errorf("target %q could not find message %q", t.Config.Name, extension.MessageName)
}
}
t.ExtensionProtoMap = extensionProtoMap
a.Logger.Printf("target %q loaded proto files for gNMI extensions", t.Config.Name)
return nil
}

func (a *App) targetConfigExists(name string) bool {
a.configLock.RLock()
_, ok := a.Config.Targets[name]
Expand Down
35 changes: 35 additions & 0 deletions pkg/formatters/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"encoding/json"
"fmt"
"math"
"strconv"
"strings"

flattener "github.com/karimra/go-map-flattener"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/gnmi/proto/gnmi_ext"
)

// EventMsg represents a gNMI update message,
Expand All @@ -40,9 +42,28 @@ func ResponseToEventMsgs(name string, rsp *gnmi.SubscribeResponse, meta map[stri
return nil, nil
}
evs := make([]*EventMsg, 0, len(rsp.GetUpdate().GetUpdate())+len(rsp.GetUpdate().GetDelete()))
response := rsp
switch rsp := rsp.Response.(type) {
case *gnmi.SubscribeResponse_Update:
namePrefix, prefixTags := tagsFromGNMIPath(rsp.Update.GetPrefix())
// Extension message to tags
if prefixTags == nil {
prefixTags = make(map[string]string)
}
for _, ext := range response.Extension {
extensionValues, err := extensionToMap(ext)
if err != nil {
return nil, err
}
for k, v := range extensionValues {
switch v := v.(type) {
case string:
prefixTags[k] = v
case float64:
prefixTags[k] = strconv.FormatFloat(v, 'G', -1, 64)
}
}
}
Comment on lines +53 to +66
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part assumes that any extension data can and will be mapped to a map[string]any, and that the goal is to get the extension data as tags, it will not always be the case for all extensions.

// notification updates
uevs, err := updatesToEvent(name, namePrefix, rsp.Update.GetTimestamp(), rsp.Update.GetUpdate(), prefixTags, meta)
if err != nil {
Expand Down Expand Up @@ -200,6 +221,20 @@ func tagsFromGNMIPath(p *gnmi.Path) (string, map[string]string) {
return sb.String(), tags
}

func extensionToMap(ext *gnmi_ext.Extension) (map[string]interface{}, error) {
jsondata := ext.GetRegisteredExt().GetMsg()

var anyJson map[string]interface{}
if len(jsondata) != 0 {
err := json.Unmarshal(jsondata, &anyJson)
if err != nil {
return nil, err
}
return anyJson, nil
}
return nil, fmt.Errorf("0 length JSON decoded")
}

func getValueFlat(prefix string, updValue *gnmi.TypedValue) (map[string]interface{}, error) {
if updValue == nil {
return nil, nil
Expand Down