Skip to content

Commit

Permalink
feat(component/network): latency checks to global edge/DERP servers (…
Browse files Browse the repository at this point in the history
…using tailscale) (#125)

* feat(component/network): latency checks to global edge/DERP servers (using tailscale)

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* use min timeout

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* update

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* log

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* skip

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* check

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* update, move packages to latency

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* fix

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* rename aws region, remove asn

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* dallas us-east-1

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

* renamed to region code

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>

---------

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho authored Oct 19, 2024
1 parent 5e217ec commit f76bd98
Show file tree
Hide file tree
Showing 25 changed files with 1,432 additions and 856 deletions.
6 changes: 6 additions & 0 deletions cmd/gpud/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (

pollXidEvents bool
pollGPMEvents bool
netcheck bool

enableAutoUpdate bool
autoUpdateExitCode int
Expand Down Expand Up @@ -430,6 +431,11 @@ cat summary.txt
Usage: "enable polling gpm events (default: false)",
Destination: &pollGPMEvents,
},
&cli.BoolTFlag{
Name: "netcheck",
Usage: "enable network connectivity checks to global edge/derp servers (default: true)",
Destination: &netcheck,
},
},
},
}
Expand Down
1 change: 1 addition & 0 deletions cmd/gpud/command/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func cmdScan(cliContext *cli.Context) error {
diagnose.WithDebug(debug),
diagnose.WithPollXidEvents(pollXidEvents),
diagnose.WithPollGPMEvents(pollGPMEvents),
diagnose.WithNetcheck(netcheck),
)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions components/diagnose/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type Op struct {

pollXidEvents bool
pollGPMEvents bool

netcheck bool
}

type OpOption func(*Op)
Expand Down Expand Up @@ -50,3 +52,10 @@ func WithPollGPMEvents(b bool) OpOption {
op.pollGPMEvents = b
}
}

// WithNetcheck enables network connectivity checks to global edge/derp servers.
func WithNetcheck(b bool) OpOption {
return func(op *Op) {
op.netcheck = b
}
}
12 changes: 12 additions & 0 deletions components/diagnose/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
latency_edge "github.com/leptonai/gpud/pkg/latency/edge"
)

const (
Expand Down Expand Up @@ -194,6 +195,17 @@ func Scan(ctx context.Context, opts ...OpOption) error {
fmt.Printf("%s scanned dmesg file -- found %d issue(s)\n", warningSign, matched)
}

if op.netcheck {
fmt.Printf("\n%s checking network connectivity to edge/derp servers\n", inProgress)
latencies, err := latency_edge.Measure(ctx, latency_edge.WithVerbose(op.debug))
if err != nil {
log.Logger.Warnw("error measuring latencies", "error", err)
} else {
latencies.RenderTable(os.Stdout)
fmt.Printf("\n\n%s latency check complete\n\n", checkMark)
}
}

fmt.Printf("\n\n%s scan complete\n\n", checkMark)
return nil
}
4 changes: 3 additions & 1 deletion components/network/latency/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func New(ctx context.Context, cfg Config) components.Component {
rootCtx: ctx,
cancel: ccancel,
poller: getDefaultPoller(),
cfg: cfg,
}
}

Expand All @@ -33,6 +34,7 @@ type component struct {
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
cfg Config
}

func (c *component) Name() string { return Name }
Expand Down Expand Up @@ -66,7 +68,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) {
if !ok {
return nil, fmt.Errorf("invalid output type: %T", last.Output)
}
return output.States()
return output.States(c.cfg)
}

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
Expand Down
83 changes: 44 additions & 39 deletions components/network/latency/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,19 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/leptonai/gpud/components"
"github.com/leptonai/gpud/components/network/latency/derpmap"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/pkg/latency"
latency_edge "github.com/leptonai/gpud/pkg/latency/edge"
)

type Output struct {
RegionLatencies []RegionLatency `json:"region_latency"`
}

type RegionLatency struct {
// RegionID/RegionCode list is available at https://login.tailscale.com/derpmap/default
RegionID int `json:"region_id"` // RegionID is the DERP region ID
RegionCode string `json:"region_code"` // RegionCode is the three-letter code for the region
RegionName string `json:"region_name"` // RegionName is the human-readable name of the region (e.g. "Chicago")
Latency time.Duration `json:"latency"` // Latency is the round-trip time to the region
LatencyHumanized string `json:"latency_humanized"` // LatencyHumanized is the human-readable version of the latency, in milliseconds
// EgressLatencies is the list of egress latencies to global edge servers.
EgressLatencies latency.Latencies `json:"egress_latencies"`
}

func (o *Output) JSON() ([]byte, error) {
Expand Down Expand Up @@ -69,12 +63,33 @@ func ParseStatesToOutput(states ...components.State) (*Output, error) {
return nil, errors.New("no latency state found")
}

func (o *Output) States() ([]components.State, error) {
func (o *Output) States(cfg Config) ([]components.State, error) {
unhealthyReasons := []string{}
if cfg.GlobalMillisecondThreshold > 0 {
for _, latency := range o.EgressLatencies {
if latency.LatencyMilliseconds > cfg.GlobalMillisecondThreshold {
unhealthyReasons = append(unhealthyReasons, fmt.Sprintf("latency to %s edge derp server (%s) exceeded threshold of %dms", latency.RegionName, latency.Latency, cfg.GlobalMillisecondThreshold))
}
}
}

healthy := true
if cfg.GlobalMillisecondThreshold > 0 && len(unhealthyReasons) > 0 {
if len(unhealthyReasons) == len(o.EgressLatencies) {
healthy = false
}
}

reason := "no issue"
if len(unhealthyReasons) > 0 {
reason = strings.Join(unhealthyReasons, "; ")
}

b, _ := o.JSON()
state := components.State{
Name: StateNameLatency,
Healthy: true,
Reason: "n/a",
Healthy: healthy,
Reason: reason,
ExtraInfo: map[string]string{
StateKeyLatencyData: string(b),
StateKeyLatencyEncoding: StateKeyLatencyEncodingJSON,
Expand All @@ -100,35 +115,25 @@ func getDefaultPoller() query.Poller {
}

func createGetFunc(cfg Config) query.GetFunc {
timeout := time.Duration(2*cfg.GlobalMillisecondThreshold) * time.Millisecond
if timeout < 15*time.Second {
timeout = 15 * time.Second
}

return func(ctx context.Context) (any, error) {
o := &Output{}
// get code -> region mapping
// TODO: support region code, region name, and region ID in config file
codeMap := derpmap.SavedDERPMap.GetRegionCodeMapping()
for _, regionCode := range cfg.RegionCodes {
region, ok := codeMap[regionCode]
if !ok {
return nil, fmt.Errorf("region code %s not found", regionCode)
}
rl := RegionLatency{
RegionID: region.RegionID,
RegionCode: regionCode,
RegionName: region.RegionName,
}

latency, err := getRegionLatency(ctx, region.RegionID)
if err != nil {
return nil, err
}
rl.Latency = latency
rl.LatencyHumanized = fmt.Sprintf("%v", latency)
o.RegionLatencies = append(o.RegionLatencies, rl)
// "ctx" here is the root level, create one with shorter timeouts
// to not block on this checks
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var err error
o.EgressLatencies, err = latency_edge.Measure(cctx)
if err != nil {
return nil, err
}

return o, nil
}
}

func getRegionLatency(ctx context.Context, regionID int) (time.Duration, error) {
// TODO: implmenet getRegionLatency - see https://github.com/tailscale/tailscale/blob/main/net/netcheck/netcheck.go#L950
return time.Second, nil
}
78 changes: 78 additions & 0 deletions components/network/latency/component_output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package latency_test

import (
"testing"

"github.com/leptonai/gpud/components/network/latency"
pkg_latency "github.com/leptonai/gpud/pkg/latency"
)

func TestStatesHealthyEvaluation(t *testing.T) {
tests := []struct {
name string
latencies pkg_latency.Latencies
globalThreshold int64
expectedHealthyStatus bool
}{
{
name: "All latencies below threshold",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 50, RegionName: "region1"},
{LatencyMilliseconds: 60, RegionName: "region2"},
},
globalThreshold: 100,
expectedHealthyStatus: true,
},
{
name: "Some latencies above threshold",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 150, RegionName: "region1"},
{LatencyMilliseconds: 60, RegionName: "region2"},
},
globalThreshold: 100,
expectedHealthyStatus: true,
},
{
name: "All latencies above threshold",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 150, RegionName: "region1"},
{LatencyMilliseconds: 160, RegionName: "region2"},
},
globalThreshold: 100,
expectedHealthyStatus: false,
},
{
name: "No threshold set",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 150, RegionName: "region1"},
{LatencyMilliseconds: 160, RegionName: "region2"},
},
globalThreshold: 0,
expectedHealthyStatus: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
output := &latency.Output{
EgressLatencies: tt.latencies,
}
cfg := latency.Config{
GlobalMillisecondThreshold: tt.globalThreshold,
}

states, err := output.States(cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if len(states) != 1 {
t.Fatalf("expected 1 state, got %d", len(states))
}

if states[0].Healthy != tt.expectedHealthyStatus {
t.Errorf("expected healthy status to be %v, got %v", tt.expectedHealthyStatus, states[0].Healthy)
}
})
}
}
19 changes: 15 additions & 4 deletions components/network/latency/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,20 @@ import (
query_config "github.com/leptonai/gpud/components/query/config"
)

const (
// 1 second
MinGlobalMillisecondThreshold = 1000
// 7 seconds by default to reach any of the DERP servers.
DefaultGlobalMillisecondThreshold = 7000
)

type Config struct {
Query query_config.Config `json:"query"`
RegionCodes []string `json:"region_codes"`
Query query_config.Config `json:"query"`

// GlobalMillisecondThreshold is the global threshold in milliseconds for the DERP latency.
// If all DERP latencies are greater than this threshold, the component will be marked as failed.
// If at least one DERP latency is less than this threshold, the component will be marked as healthy.
GlobalMillisecondThreshold int64 `json:"global_millisecond_threshold"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
Expand All @@ -30,8 +41,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
}

func (cfg Config) Validate() error {
if len(cfg.RegionCodes) == 0 {
return fmt.Errorf("region codes are required")
if cfg.GlobalMillisecondThreshold > 0 && cfg.GlobalMillisecondThreshold < MinGlobalMillisecondThreshold {
return fmt.Errorf("global millisecond threshold must be greater than %d", MinGlobalMillisecondThreshold)
}
return nil
}
Loading

0 comments on commit f76bd98

Please sign in to comment.