Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(component/network): latency checks to global edge/DERP servers (using tailscale) #125

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/gpud/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (

pollXidEvents bool
pollGPMEvents bool
netcheck bool

enableAutoUpdate bool
autoUpdateExitCode int
Expand Down Expand Up @@ -430,6 +431,11 @@ cat summary.txt
Usage: "enable polling gpm events (default: false)",
Destination: &pollGPMEvents,
},
&cli.BoolTFlag{
Name: "netcheck",
Usage: "enable network connectivity checks to global edge/derp servers (default: true)",
Destination: &netcheck,
},
},
},
}
Expand Down
1 change: 1 addition & 0 deletions cmd/gpud/command/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func cmdScan(cliContext *cli.Context) error {
diagnose.WithDebug(debug),
diagnose.WithPollXidEvents(pollXidEvents),
diagnose.WithPollGPMEvents(pollGPMEvents),
diagnose.WithNetcheck(netcheck),
)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions components/diagnose/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ type Op struct {

pollXidEvents bool
pollGPMEvents bool

netcheck bool
}

type OpOption func(*Op)
Expand Down Expand Up @@ -50,3 +52,10 @@ func WithPollGPMEvents(b bool) OpOption {
op.pollGPMEvents = b
}
}

// WithNetcheck enables network connectivity checks to global edge/derp servers.
func WithNetcheck(b bool) OpOption {
return func(op *Op) {
op.netcheck = b
}
}
12 changes: 12 additions & 0 deletions components/diagnose/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
query_log_tail "github.com/leptonai/gpud/components/query/log/tail"
"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/file"
latency_edge "github.com/leptonai/gpud/pkg/latency/edge"
)

const (
Expand Down Expand Up @@ -194,6 +195,17 @@ func Scan(ctx context.Context, opts ...OpOption) error {
fmt.Printf("%s scanned dmesg file -- found %d issue(s)\n", warningSign, matched)
}

if op.netcheck {
fmt.Printf("\n%s checking network connectivity to edge/derp servers\n", inProgress)
latencies, err := latency_edge.Measure(ctx, latency_edge.WithVerbose(op.debug))
if err != nil {
log.Logger.Warnw("error measuring latencies", "error", err)
} else {
latencies.RenderTable(os.Stdout)
fmt.Printf("\n\n%s latency check complete\n\n", checkMark)
}
}

fmt.Printf("\n\n%s scan complete\n\n", checkMark)
return nil
}
4 changes: 3 additions & 1 deletion components/network/latency/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func New(ctx context.Context, cfg Config) components.Component {
rootCtx: ctx,
cancel: ccancel,
poller: getDefaultPoller(),
cfg: cfg,
}
}

Expand All @@ -33,6 +34,7 @@ type component struct {
rootCtx context.Context
cancel context.CancelFunc
poller query.Poller
cfg Config
}

func (c *component) Name() string { return Name }
Expand Down Expand Up @@ -66,7 +68,7 @@ func (c *component) States(ctx context.Context) ([]components.State, error) {
if !ok {
return nil, fmt.Errorf("invalid output type: %T", last.Output)
}
return output.States()
return output.States(c.cfg)
}

func (c *component) Events(ctx context.Context, since time.Time) ([]components.Event, error) {
Expand Down
83 changes: 44 additions & 39 deletions components/network/latency/component_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,19 @@ import (
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/leptonai/gpud/components"
"github.com/leptonai/gpud/components/network/latency/derpmap"
"github.com/leptonai/gpud/components/query"
"github.com/leptonai/gpud/pkg/latency"
latency_edge "github.com/leptonai/gpud/pkg/latency/edge"
)

type Output struct {
RegionLatencies []RegionLatency `json:"region_latency"`
}

type RegionLatency struct {
// RegionID/RegionCode list is available at https://login.tailscale.com/derpmap/default
RegionID int `json:"region_id"` // RegionID is the DERP region ID
RegionCode string `json:"region_code"` // RegionCode is the three-letter code for the region
RegionName string `json:"region_name"` // RegionName is the human-readable name of the region (e.g. "Chicago")
Latency time.Duration `json:"latency"` // Latency is the round-trip time to the region
LatencyHumanized string `json:"latency_humanized"` // LatencyHumanized is the human-readable version of the latency, in milliseconds
// EgressLatencies is the list of egress latencies to global edge servers.
EgressLatencies latency.Latencies `json:"egress_latencies"`
}

func (o *Output) JSON() ([]byte, error) {
Expand Down Expand Up @@ -69,12 +63,33 @@ func ParseStatesToOutput(states ...components.State) (*Output, error) {
return nil, errors.New("no latency state found")
}

func (o *Output) States() ([]components.State, error) {
func (o *Output) States(cfg Config) ([]components.State, error) {
unhealthyReasons := []string{}
if cfg.GlobalMillisecondThreshold > 0 {
for _, latency := range o.EgressLatencies {
if latency.LatencyMilliseconds > cfg.GlobalMillisecondThreshold {
unhealthyReasons = append(unhealthyReasons, fmt.Sprintf("latency to %s edge derp server (%s) exceeded threshold of %dms", latency.RegionName, latency.Latency, cfg.GlobalMillisecondThreshold))
}
}
}

healthy := true
if cfg.GlobalMillisecondThreshold > 0 && len(unhealthyReasons) > 0 {
if len(unhealthyReasons) == len(o.EgressLatencies) {
healthy = false
}
}

reason := "no issue"
if len(unhealthyReasons) > 0 {
reason = strings.Join(unhealthyReasons, "; ")
}

b, _ := o.JSON()
state := components.State{
Name: StateNameLatency,
Healthy: true,
Reason: "n/a",
Healthy: healthy,
Reason: reason,
ExtraInfo: map[string]string{
StateKeyLatencyData: string(b),
StateKeyLatencyEncoding: StateKeyLatencyEncodingJSON,
Expand All @@ -100,35 +115,25 @@ func getDefaultPoller() query.Poller {
}

func createGetFunc(cfg Config) query.GetFunc {
timeout := time.Duration(2*cfg.GlobalMillisecondThreshold) * time.Millisecond
if timeout < 15*time.Second {
timeout = 15 * time.Second
}

return func(ctx context.Context) (any, error) {
o := &Output{}
// get code -> region mapping
// TODO: support region code, region name, and region ID in config file
codeMap := derpmap.SavedDERPMap.GetRegionCodeMapping()
for _, regionCode := range cfg.RegionCodes {
region, ok := codeMap[regionCode]
if !ok {
return nil, fmt.Errorf("region code %s not found", regionCode)
}
rl := RegionLatency{
RegionID: region.RegionID,
RegionCode: regionCode,
RegionName: region.RegionName,
}

latency, err := getRegionLatency(ctx, region.RegionID)
if err != nil {
return nil, err
}
rl.Latency = latency
rl.LatencyHumanized = fmt.Sprintf("%v", latency)
o.RegionLatencies = append(o.RegionLatencies, rl)
// "ctx" here is the root level, create one with shorter timeouts
// to not block on this checks
cctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var err error
o.EgressLatencies, err = latency_edge.Measure(cctx)
if err != nil {
return nil, err
}

return o, nil
}
}

func getRegionLatency(ctx context.Context, regionID int) (time.Duration, error) {
// TODO: implmenet getRegionLatency - see https://github.com/tailscale/tailscale/blob/main/net/netcheck/netcheck.go#L950
return time.Second, nil
}
78 changes: 78 additions & 0 deletions components/network/latency/component_output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package latency_test

import (
"testing"

"github.com/leptonai/gpud/components/network/latency"
pkg_latency "github.com/leptonai/gpud/pkg/latency"
)

func TestStatesHealthyEvaluation(t *testing.T) {
tests := []struct {
name string
latencies pkg_latency.Latencies
globalThreshold int64
expectedHealthyStatus bool
}{
{
name: "All latencies below threshold",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 50, RegionName: "region1"},
{LatencyMilliseconds: 60, RegionName: "region2"},
},
globalThreshold: 100,
expectedHealthyStatus: true,
},
{
name: "Some latencies above threshold",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 150, RegionName: "region1"},
{LatencyMilliseconds: 60, RegionName: "region2"},
},
globalThreshold: 100,
expectedHealthyStatus: true,
},
{
name: "All latencies above threshold",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 150, RegionName: "region1"},
{LatencyMilliseconds: 160, RegionName: "region2"},
},
globalThreshold: 100,
expectedHealthyStatus: false,
},
{
name: "No threshold set",
latencies: pkg_latency.Latencies{
{LatencyMilliseconds: 150, RegionName: "region1"},
{LatencyMilliseconds: 160, RegionName: "region2"},
},
globalThreshold: 0,
expectedHealthyStatus: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
output := &latency.Output{
EgressLatencies: tt.latencies,
}
cfg := latency.Config{
GlobalMillisecondThreshold: tt.globalThreshold,
}

states, err := output.States(cfg)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if len(states) != 1 {
t.Fatalf("expected 1 state, got %d", len(states))
}

if states[0].Healthy != tt.expectedHealthyStatus {
t.Errorf("expected healthy status to be %v, got %v", tt.expectedHealthyStatus, states[0].Healthy)
}
})
}
}
19 changes: 15 additions & 4 deletions components/network/latency/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,20 @@ import (
query_config "github.com/leptonai/gpud/components/query/config"
)

const (
// 1 second
MinGlobalMillisecondThreshold = 1000
// 7 seconds by default to reach any of the DERP servers.
DefaultGlobalMillisecondThreshold = 7000
)

type Config struct {
Query query_config.Config `json:"query"`
RegionCodes []string `json:"region_codes"`
Query query_config.Config `json:"query"`

// GlobalMillisecondThreshold is the global threshold in milliseconds for the DERP latency.
// If all DERP latencies are greater than this threshold, the component will be marked as failed.
// If at least one DERP latency is less than this threshold, the component will be marked as healthy.
GlobalMillisecondThreshold int64 `json:"global_millisecond_threshold"`
}

func ParseConfig(b any, db *sql.DB) (*Config, error) {
Expand All @@ -30,8 +41,8 @@ func ParseConfig(b any, db *sql.DB) (*Config, error) {
}

func (cfg Config) Validate() error {
if len(cfg.RegionCodes) == 0 {
return fmt.Errorf("region codes are required")
if cfg.GlobalMillisecondThreshold > 0 && cfg.GlobalMillisecondThreshold < MinGlobalMillisecondThreshold {
return fmt.Errorf("global millisecond threshold must be greater than %d", MinGlobalMillisecondThreshold)
}
return nil
}
Loading
Loading