Skip to content

Commit

Permalink
Add task metadata to /apps endpoint (#118)
Browse files Browse the repository at this point in the history
* Export TaskInfo from the mesos agent collector

We will reference TaskInfo in a map of container IDs to task info in a
later commit.

* Add a struct for container->task relationships

ContainerTaskRels has a single member, a map of container IDs to
TaskInfo references. It has a getter and a setter for easy access
threadsafe access to the relationship map, and an update method which
populates it from agentState.

* Return task_id and task_name in /app output

The new ContainerTaskRels argument providers access to the task metadata
for each container.

* Add tests for extracting a record with task data

We add a new mock object with appropriate metadata and supply it to
avroRecord.extract(), checking that the task IDs in the
ContainerTaskRels are the same ones that correspond to the container
ID.

* Add tests for executor ID and framework ID

This is orthogonal to the intent of this PR, however it's a small change
and seemed like a quick win.

* Refer to ContainerTaskRels in Framework collector

This reference can be provided to the extract method from the framework
collector.

* Pass ref to ContainerTaskRels into transform

* Update task mapping from agent collector

We update the task mapping after every new fetch from agent state.

* Pass reference to mapping into framework collector

Now the framework collector has access to the ContainerTaskRels object
which belongs to the mesos agent collector.

* Clarify arg name

* Defer unlock for consistency

* Temporarily skip golint in build

In scripts/test.sh we naively run go get to update golint. A new rule
has just been added to golint (golang/lint#319)
and I don't want to complicate my current PR (#118).

I will revert this commit, fix any outstanding nil-returns and consider
pinning golint in a future PR.

* Create new TaskInfo object to avoid memory issues

* Create ContainerTaskRels in dcos-metrics.go

Rather than initiating ContainerTaskRels in the framework collector, we
now initiate it in dcos-metrics.go and pass a reference into each
collector instead.

* Initiate ContainerTaskRels with a utility method

* Remove unnecessary nil check
  • Loading branch information
philipnrmn committed Sep 19, 2017
1 parent c930517 commit a9fa069
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 39 deletions.
17 changes: 10 additions & 7 deletions collectors/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
log "github.com/Sirupsen/logrus"

"github.com/dcos/dcos-metrics/collectors"
mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
"github.com/dcos/dcos-metrics/producers"
"github.com/linkedin/goavro"
)
Expand Down Expand Up @@ -54,8 +55,9 @@ type Collector struct {
InputLimitAmountKBytesFlag int
InputLimitPeriodFlag int

metricsChan chan producers.MetricsMessage
nodeInfo collectors.NodeInfo
metricsChan chan producers.MetricsMessage
nodeInfo collectors.NodeInfo
containerTaskRels *mesosAgent.ContainerTaskRels
}

// countingReader is an io.Reader that provides counts of the number of bytes
Expand All @@ -66,10 +68,11 @@ type countingReader struct {
}

// New returns a new instance of the framework collector.
func New(cfg Collector, nodeInfo collectors.NodeInfo) (Collector, chan producers.MetricsMessage) {
func New(cfg Collector, nodeInfo collectors.NodeInfo, ctr *mesosAgent.ContainerTaskRels) (Collector, chan producers.MetricsMessage) {
c := cfg
c.nodeInfo = nodeInfo
c.metricsChan = make(chan producers.MetricsMessage)
c.containerTaskRels = ctr
return c, c.metricsChan
}

Expand Down Expand Up @@ -170,7 +173,7 @@ func (c *Collector) handleConnection(conn net.Conn) {
}

ad := &AvroDatum{datum, topic, approxBytesRead}
pmm, err := ad.transform(c.nodeInfo)
pmm, err := ad.transform(c.nodeInfo, c.containerTaskRels)
if err != nil {
fwColLog.Error(err)
}
Expand All @@ -179,7 +182,7 @@ func (c *Collector) handleConnection(conn net.Conn) {
}

// transform creates a MetricsMessage from the Avro data coming in on our TCP channel.
func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo) (producers.MetricsMessage, error) {
func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo, ctr *mesosAgent.ContainerTaskRels) (producers.MetricsMessage, error) {
var (
tagData = avroRecord{}
datapointData = avroRecord{}
Expand Down Expand Up @@ -211,7 +214,7 @@ func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo) (producers.MetricsMe
return pmm, err
}

if err := tagData.extract(&pmm); err != nil {
if err := tagData.extract(&pmm, ctr); err != nil {
return pmm, err
}

Expand Down Expand Up @@ -252,7 +255,7 @@ func (a *AvroDatum) transform(nodeInfo collectors.NodeInfo) (producers.MetricsMe
return pmm, err
}

if err := datapointData.extract(&pmm); err != nil {
if err := datapointData.extract(&pmm, ctr); err != nil {
return pmm, err
}

Expand Down
15 changes: 8 additions & 7 deletions collectors/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/dcos/dcos-metrics/collectors"
mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
"github.com/dcos/dcos-metrics/producers"
"github.com/dcos/dcos-metrics/schema/metrics_schema"
"github.com/linkedin/goavro"
Expand All @@ -50,7 +51,7 @@ var (
func TestNew(t *testing.T) {
Convey("When creating a new instance of the framework collector", t, func() {
Convey("Should return a new Collector with the default config", func() {
f, fc := New(mockCollectorConfig, mockNodeInfo)
f, fc := New(mockCollectorConfig, mockNodeInfo, &mesosAgent.ContainerTaskRels{})
So(f, ShouldHaveSameTypeAs, Collector{})
So(fc, ShouldHaveSameTypeAs, make(chan producers.MetricsMessage))
So(f.InputLimitAmountKBytesFlag, ShouldEqual, mockCollectorConfig.InputLimitAmountKBytesFlag)
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestTransform(t *testing.T) {
rec.Set("datapoints", []interface{}{recDps})

a := AvroDatum{Record: rec, Topic: "some-topic"}
pmm, err := a.transform(mockNodeInfo)
pmm, err := a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
So(pmm, ShouldHaveSameTypeAs, producers.MetricsMessage{})

// If we could mock the time here, we could do a single assertion
Expand All @@ -115,7 +116,7 @@ func TestTransform(t *testing.T) {

Convey("Should return an error if AvroDatum didn't contain a goavro.Record", func() {
a := AvroDatum{Record: make(map[string]string), Topic: "some-topic"}
_, err = a.transform(mockNodeInfo)
_, err = a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
So(err, ShouldNotBeNil)
})

Expand All @@ -128,7 +129,7 @@ func TestTransform(t *testing.T) {
rec.Set("datapoints", []interface{}{recDps})

a := AvroDatum{Record: rec, Topic: "some-topic"}
_, err = a.transform(mockNodeInfo)
_, err = a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
So(err, ShouldNotBeNil)
})

Expand All @@ -141,7 +142,7 @@ func TestTransform(t *testing.T) {
rec.Set("tags", []interface{}{recTags})

a := AvroDatum{Record: rec, Topic: "some-topic"}
_, err = a.transform(mockNodeInfo)
_, err = a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
So(err, ShouldNotBeNil)
})

Expand All @@ -163,7 +164,7 @@ func TestTransform(t *testing.T) {
rec.Set("datapoints", []interface{}{recNan})

a := AvroDatum{Record: rec, Topic: "some-topic"}
pmm, err := a.transform(mockNodeInfo)
pmm, err := a.transform(mockNodeInfo, &mesosAgent.ContainerTaskRels{})
So(err, ShouldBeNil)

So(pmm.Datapoints[0].Name, ShouldEqual, "nan-name")
Expand Down Expand Up @@ -259,7 +260,7 @@ func TestHandleConnection(t *testing.T) {
defer ln.Close()
time.Sleep(1 * time.Second)

c, cc := New(mockCollectorConfig, mockNodeInfo)
c, cc := New(mockCollectorConfig, mockNodeInfo, &mesosAgent.ContainerTaskRels{})

// This goroutine runs in the background waiting for a TCP connection
// from the test below. Once the connection has been accepted,
Expand Down
12 changes: 11 additions & 1 deletion collectors/framework/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"time"

mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
"github.com/dcos/dcos-metrics/producers"
)

Expand All @@ -38,7 +39,7 @@ type avroRecord []record

// avroRecord.extract() gets tags and datapoints from avro formatted data
// and creates a MetricsMessage{}
func (ar avroRecord) extract(pmm *producers.MetricsMessage) error {
func (ar avroRecord) extract(pmm *producers.MetricsMessage, ctr *mesosAgent.ContainerTaskRels) error {
var fieldType string
if len(ar) > 0 {
fieldType = ar[0].Name
Expand All @@ -58,6 +59,15 @@ func (ar avroRecord) extract(pmm *producers.MetricsMessage) error {

if tagName == "container_id" {
pmm.Dimensions.ContainerID = tagValue

info := ctr.Get(tagValue)
if info != nil {
pmm.Dimensions.TaskID = info.ID
pmm.Dimensions.TaskName = info.Name
} else {
fwColLog.Debugf("Container ID %s had no associated task", tagValue)
}

} else if tagName == "framework_id" {
pmm.Dimensions.FrameworkID = tagValue
} else if tagName == "executor_id" {
Expand Down
67 changes: 58 additions & 9 deletions collectors/framework/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package framework
import (
"testing"

mesosAgent "github.com/dcos/dcos-metrics/collectors/mesos/agent"
"github.com/dcos/dcos-metrics/producers"
"github.com/dcos/dcos-metrics/schema/metrics_schema"
"github.com/linkedin/goavro"
Expand Down Expand Up @@ -65,21 +66,51 @@ var (
},
},
}
avroRecordWithDimensions = avroRecord{
record{
Name: "dcos.metrics.Tag",
Fields: []field{
{Name: "k", Datum: "framework_id"},
{Name: "v", Datum: "marathon"},
},
},
record{
Name: "dcos.metrics.Tag",
Fields: []field{
{Name: "k", Datum: "executor_id"},
{Name: "v", Datum: "pierrepoint"},
},
},
record{
Name: "dcos.metrics.Tag",
Fields: []field{
{Name: "k", Datum: "container_id"},
{Name: "v", Datum: "foo-container-id"},
},
},
record{
Name: "dcos.metrics.Tag",
Fields: []field{
{Name: "k", Datum: "some-label"},
{Name: "v", Datum: "some-label-value"},
},
},
}
)

func TestExtract(t *testing.T) {
Convey("When calling extract() on an avroRecord", t, func() {
Convey("Should return an error if length of ar is 0", func() {
ar := avroRecord{}
err := ar.extract(&producers.MetricsMessage{})
err := ar.extract(&producers.MetricsMessage{}, &mesosAgent.ContainerTaskRels{})
So(err, ShouldNotBeNil)
})
})

Convey("When extracting a datapoint from an Avro record", t, func() {
avroDatapoint := avroRecord{testDatapoint}
pmmTest := producers.MetricsMessage{}
err := avroDatapoint.extract(&pmmTest)
err := avroDatapoint.extract(&pmmTest, &mesosAgent.ContainerTaskRels{})

Convey("Should extract the datapoint without errors", func() {
So(err, ShouldBeNil)
Expand All @@ -97,33 +128,51 @@ func TestExtract(t *testing.T) {
})

Convey("When extracting tags from an Avro record", t, func() {
avroDatapoint := avroRecord{testTag}
avroDatapoint := avroRecordWithDimensions
pmmTest := producers.MetricsMessage{
Dimensions: producers.Dimensions{
Labels: make(map[string]string),
},
}
testRels := mesosAgent.NewContainerTaskRels()
testRels.Set("foo-container-id", &mesosAgent.TaskInfo{
ID: "foo.1234567890",
Name: "foo",
})

err := avroDatapoint.extract(&pmmTest, testRels)
Convey("Should extract the tag without errors", func() {
err := avroDatapoint.extract(&pmmTest)
value, ok := pmmTest.Dimensions.Labels["tag-name-field-test"]

So(err, ShouldBeNil)
})

Convey("Should derive specific metadata from known tags", func() {
So(pmmTest.Dimensions.FrameworkID, ShouldEqual, "marathon")
So(pmmTest.Dimensions.ExecutorID, ShouldEqual, "pierrepoint")
So(pmmTest.Dimensions.ContainerID, ShouldEqual, "foo-container-id")
})

Convey("Should derive task ID and task name with the container ID", func() {
So(pmmTest.Dimensions.TaskID, ShouldEqual, "foo.1234567890")
So(pmmTest.Dimensions.TaskName, ShouldEqual, "foo")
})

Convey("Should add other tags as labels", func() {
label, ok := pmmTest.Dimensions.Labels["some-label"]
So(ok, ShouldBeTrue)
So(value, ShouldEqual, "tag-value-field-test")
So(label, ShouldEqual, "some-label-value")
})
})

Convey("When analyzing the field types in a record", t, func() {
Convey("Should return an error if the field type was empty", func() {
ar := avroRecord{record{Name: ""}}
err := ar.extract(&producers.MetricsMessage{})
err := ar.extract(&producers.MetricsMessage{}, &mesosAgent.ContainerTaskRels{})
So(err, ShouldNotBeNil)
})

Convey("Should return an error for an unknown field type", func() {
ar := avroRecord{record{Name: "not-dcos.not-metrics.not-Type"}}
err := ar.extract(&producers.MetricsMessage{})
err := ar.extract(&producers.MetricsMessage{}, &mesosAgent.ContainerTaskRels{})
So(err, ShouldNotBeNil)
})
})
Expand Down
65 changes: 60 additions & 5 deletions collectors/mesos/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net"
"net/http"
"strconv"
"sync"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -50,17 +51,68 @@ type Collector struct {
// https://godoc.org/github.com/sirupsen/logrus#Entry
log *logrus.Entry

metricsChan chan producers.MetricsMessage
nodeInfo collectors.NodeInfo
timestamp int64
metricsChan chan producers.MetricsMessage
nodeInfo collectors.NodeInfo
timestamp int64
ContainerTaskRels *ContainerTaskRels
}

// ContainerTaskRels defines the relationship between containers and tasks.
type ContainerTaskRels struct {
sync.Mutex
rels map[string]*TaskInfo
}

// NewContainerTaskRels creates a new empty ContainerTaskRels
func NewContainerTaskRels() *ContainerTaskRels {
return &ContainerTaskRels{rels: make(map[string]*TaskInfo)}
}

// Get is a utility method which handles the mutex lock and abstracts the inner
// map in ContainerTaskRels away. If no task info is available for the supplied
// containerID, returns nil.
func (ctr *ContainerTaskRels) Get(containerID string) *TaskInfo {
ctr.Lock()
defer ctr.Unlock()
return ctr.rels[containerID]
}

// Set adds or updates an entry to ContainerTaskRels and, if necessary,
// initiates the inner map. It is only currently used in tests.
func (ctr *ContainerTaskRels) Set(containerID string, info *TaskInfo) {
ctr.Lock()
defer ctr.Unlock()
ctr.rels[containerID] = info
}

// update denormalizes the (deeply nested) /state map from the local mesos
// agent to a list of tasks mapped to container IDs.
func (ctr *ContainerTaskRels) update(as agentState) {
rels := map[string]*TaskInfo{}
for _, f := range as.Frameworks {
for _, e := range f.Executors {
for _, t := range e.Tasks {
for _, s := range t.Statuses {
rels[s.ContainerStatusInfo.ID.Value] = &TaskInfo{
ID: t.ID,
Name: t.Name,
}
}
}
}
}
ctr.Lock()
ctr.rels = rels
ctr.Unlock()
}

// New creates a new instance of the Mesos agent collector (poller).
func New(cfg Collector, nodeInfo collectors.NodeInfo) (Collector, chan producers.MetricsMessage) {
func New(cfg Collector, nodeInfo collectors.NodeInfo, rels *ContainerTaskRels) (Collector, chan producers.MetricsMessage) {
c := cfg
c.log = logrus.WithFields(logrus.Fields{"collector": "mesos-agent"})
c.nodeInfo = nodeInfo
c.metricsChan = make(chan producers.MetricsMessage)
c.ContainerTaskRels = rels
return c, c.metricsChan
}

Expand Down Expand Up @@ -88,6 +140,9 @@ func (c *Collector) pollMesosAgent() {
c.log.Errorf("Failed to get agent state from %s. Error: %s", host, err)
}

c.log.Debug("Mapping containers to tasks")
c.ContainerTaskRels.update(c.agentState)

c.log.Debugf("Fetching container metrics from host %s", host)
if err := c.getContainerMetrics(); err != nil {
c.log.Errorf("Failed to get container metrics from %s. Error: %s", host, err)
Expand Down Expand Up @@ -179,7 +234,7 @@ func getExecutorInfoByExecutorID(executorID string, executors []executorInfo) *e
}

// getTaskInfoByContainerID returns the TaskInfo struct matching the given cID.
func getTaskInfoByContainerID(containerID string, tasks []taskInfo) *taskInfo {
func getTaskInfoByContainerID(containerID string, tasks []TaskInfo) *TaskInfo {
for _, task := range tasks {
if len(task.Statuses) > 0 && task.Statuses[0].ContainerStatusInfo.ID.Value == containerID {
return &task
Expand Down
Loading

0 comments on commit a9fa069

Please sign in to comment.