Skip to content

Commit

Permalink
feat: PRT-add-grpc-compression-for-provider-consumer-communication (#…
Browse files Browse the repository at this point in the history
…1440)

* add compression test

* add client compression

* make provider listener opts more friendly to append

* fixing test

* adding compression var

* fixing test to include compression

* adding compression flag and sorting flags

* removing from cache

* adding compression flag to cmd

* adding comment

* fixing a major bug blocking providers too fast

* adding the average latency for relays

* adding test

* adding average latency metric

* fixing nil deref

* fix test

* fix test 21

* adjusting test to fit new limits

* fix test

* fix precision

* fix test
  • Loading branch information
ranlavanet authored May 16, 2024
1 parent 3d09369 commit 8826ae6
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 19 deletions.
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
func isGrpcServerUp(url string) bool {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true, false)
conn, err := lavasession.ConnectGRPCClient(context.Background(), url, true, false, false)
if err != nil {
return false
}
Expand Down
14 changes: 11 additions & 3 deletions protocol/lavasession/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
)

const (
MaxConsecutiveConnectionAttempts = 5
TimeoutForEstablishingAConnection = 1500 * time.Millisecond // 1.5 seconds
MaxSessionsAllowedPerProvider = 1000 // Max number of sessions allowed per provider
MaxAllowedBlockListedSessionPerProvider = 3
MaximumNumberOfFailuresAllowedPerConsumerSession = 3
MaxAllowedBlockListedSessionPerProvider = MaxSessionsAllowedPerProvider / 3
MaximumNumberOfFailuresAllowedPerConsumerSession = 15
RelayNumberIncrement = 1
DataReliabilitySessionId = 0 // data reliability session id is 0. we can change to more sessions later if needed.
DataReliabilityRelayNumber = 1
Expand Down Expand Up @@ -62,7 +63,7 @@ func IsSessionSyncLoss(err error) bool {
return code == codes.Code(SessionOutOfSyncError.ABCICode())
}

func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, skipTLS bool) (*grpc.ClientConn, error) {
func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool, skipTLS bool, allowCompression bool) (*grpc.ClientConn, error) {
var opts []grpc.DialOption

if skipTLS {
Expand Down Expand Up @@ -93,6 +94,13 @@ func ConnectGRPCClient(ctx context.Context, address string, allowInsecure bool,
}))
}

// allow gzip compression for grpc.
if allowCompression {
opts = append(opts, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name), // Use gzip compression for provider consumer communication
))
}

conn, err := grpc.DialContext(ctx, address, opts...)
return conn, err
}
Expand Down
101 changes: 101 additions & 0 deletions protocol/lavasession/common_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
package lavasession

import (
"context"
"crypto/tls"
"fmt"
"log"
"net"
"strings"
"testing"
"time"

"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
planstypes "github.com/lavanet/lava/x/plans/types"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
)

type printGeos []*Endpoint
Expand Down Expand Up @@ -111,3 +122,93 @@ func TestGeoOrdering(t *testing.T) {
})
}
}

type RelayerConnectionServer struct {
pairingtypes.UnimplementedRelayerServer
guid uint64
}

func (rs *RelayerConnectionServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) {
return nil, fmt.Errorf("unimplemented")
}

func (rs *RelayerConnectionServer) Probe(ctx context.Context, probeReq *pairingtypes.ProbeRequest) (*pairingtypes.ProbeReply, error) {
// peerAddress := common.GetIpFromGrpcContext(ctx)
// utils.LavaFormatInfo("received probe", utils.LogAttr("incoming-ip", peerAddress))
return &pairingtypes.ProbeReply{
Guid: rs.guid,
}, nil
}

func (rs *RelayerConnectionServer) RelaySubscribe(request *pairingtypes.RelayRequest, srv pairingtypes.Relayer_RelaySubscribeServer) error {
return fmt.Errorf("unimplemented")
}

func startServer() (*grpc.Server, net.Listener) {
listen := ":0"
lis, err := net.Listen("tcp", listen)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
tlsConfig := GetTlsConfig(NetworkAddressData{})
srv := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
pairingtypes.RegisterRelayerServer(srv, &RelayerConnectionServer{})
go func() {
if err := srv.Serve(lis); err != nil {
log.Println("test finished:", err)
}
}()
return srv, lis
}

// Note that locally testing compression will probably be out performed by non compressed.
// due to the overhead of compressing it. while global communication should benefit from reduced latency.
func BenchmarkGRPCServer(b *testing.B) {
srv, lis := startServer()
address := lis.Addr().String()
defer srv.Stop()
defer lis.Close()

csp := &ConsumerSessionsWithProvider{}
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := csp.ConnectRawClientWithTimeout(ctx, address)
if err != nil {
utils.LavaFormatDebug("waiting for grpc server to launch")
continue
}
cancel()
break
}

runBenchmark := func(b *testing.B, opts ...grpc.DialOption) {
var tlsConf tls.Config
tlsConf.InsecureSkipVerify = true
credentials := credentials.NewTLS(&tlsConf)
opts = append(opts, grpc.WithTransportCredentials(credentials))
conn, err := grpc.DialContext(context.Background(), address, opts...)
if err != nil {
b.Fatalf("failed to dial server: %v", err)
}
defer conn.Close()

client := pairingtypes.NewRelayerClient(conn)

b.ResetTimer()
for i := 0; i < b.N; i++ {
client.Probe(context.Background(), &pairingtypes.ProbeRequest{Guid: 125, SpecId: "EVMOS", ApiInterface: "jsonrpc"})
}
}

b.Run("WithoutCompression", func(b *testing.B) {
runBenchmark(b)
})

b.Run("WithCompression", func(b *testing.B) {
runBenchmark(b, grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name), // Use gzip compression for outgoing messages
))
})

time.Sleep(3 * time.Second)
}
8 changes: 4 additions & 4 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,7 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu
consumerSession.LatestRelayCu = 0 // making sure no one uses it in a wrong way
consecutiveErrors := uint64(len(consumerSession.ConsecutiveErrors))
parentConsumerSessionsWithProvider := consumerSession.Parent // must read this pointer before unlocking
csm.updateMetricsManager(consumerSession)
csm.updateMetricsManager(consumerSession, time.Duration(0), false)
// finished with consumerSession here can unlock.
consumerSession.Free(errorReceived) // we unlock before we change anything in the parent ConsumerSessionsWithProvider

Expand Down Expand Up @@ -961,13 +961,13 @@ func (csm *ConsumerSessionManager) OnSessionDone(
// calculate QoS
consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount))
go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock))
csm.updateMetricsManager(consumerSession)
csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis
return nil
}

// updates QoS metrics for a provider
// consumerSession should still be locked when accessing this method as it fetches information from the session it self
func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession) {
func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleConsumerSession, relayLatency time.Duration, sessionSuccessful bool) {
if csm.consumerMetricsManager == nil {
return
}
Expand All @@ -988,7 +988,7 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC
publicProviderAddress := consumerSession.Parent.PublicLavaAddress

go func() {
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum)
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum, relayLatency, sessionSuccessful)
// in case we blocked the session add it to our block sessions metric
if blockedSession {
csm.consumerMetricsManager.AddNumberOfBlockedSessionMetric(chainId, apiInterface, publicProviderAddress)
Expand Down
7 changes: 5 additions & 2 deletions protocol/lavasession/consumer_session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
const (
parallelGoRoutines = 40
numberOfProviders = 10
numberOfResetsToTest = 10
numberOfResetsToTest = 1
numberOfAllowedSessionsPerConsumer = 10
firstEpochHeight = 20
secondEpochHeight = 40
Expand Down Expand Up @@ -428,6 +428,8 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
ctx := context.Background()
csm := CreateConsumerSessionManager()
pairingList := createPairingList("", true)
// make list shorter otherwise we wont be able to ban all as it takes slightly more time now
pairingList = map[uint64]*ConsumerSessionsWithProvider{0: pairingList[0]}
err := csm.UpdateAllProviders(firstEpochHeight, pairingList) // update the providers.
require.NoError(t, err)

Expand All @@ -438,6 +440,7 @@ func TestPairingResetWithMultipleFailures(t *testing.T) {
break
}
css, err := csm.GetSessions(ctx, cuForFirstRequest, NewUsedProviders(nil), servicedBlockNumber, "", nil, common.NO_STATE, 0) // get a session
require.NoError(t, err)

for _, cs := range css {
err = csm.OnSessionFailure(cs.Session, nil)
Expand Down Expand Up @@ -811,7 +814,7 @@ func TestContext(t *testing.T) {

func TestGrpcClientHang(t *testing.T) {
ctx := context.Background()
conn, err := ConnectGRPCClient(ctx, grpcListener, true, false)
conn, err := ConnectGRPCClient(ctx, grpcListener, true, false, false)
require.NoError(t, err)
client := pairingtypes.NewRelayerClient(conn)
err = conn.Close()
Expand Down
12 changes: 9 additions & 3 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,15 @@ func (list EndpointInfoList) Swap(i, j int) {
list[i], list[j] = list[j], list[i]
}

const AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
const (
AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
AllowGRPCCompressionFlag = "allow-grpc-compression-for-consumer-provider-communication"
)

var AllowInsecureConnectionToProviders = false
var (
AllowInsecureConnectionToProviders = false
AllowGRPCCompressionForConsumerProviderCommunication = false
)

type UsedProvidersInf interface {
RemoveUsed(providerAddress string, err error)
Expand Down Expand Up @@ -301,7 +307,7 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er
func (cswp *ConsumerSessionsWithProvider) ConnectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) {
connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection)
defer cancel()
conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders, false)
conn, err := ConnectGRPCClient(connectCtx, addr, AllowInsecureConnectionToProviders, false, AllowGRPCCompressionForConsumerProviderCommunication)
if err != nil {
return nil, nil, err
}
Expand Down
43 changes: 41 additions & 2 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,26 @@ import (
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

type LatencyTracker struct {
AverageLatency time.Duration // in nano seconds (time.Since result)
TotalRequests int
}

func (lt *LatencyTracker) AddLatency(latency time.Duration) {
lt.TotalRequests++
weight := 1.0 / float64(lt.TotalRequests)
// Calculate the weighted average of the current average latency and the new latency
lt.AverageLatency = time.Duration(float64(lt.AverageLatency)*(1-weight) + float64(latency)*weight)
}

type ConsumerMetricsManager struct {
totalCURequestedMetric *prometheus.CounterVec
totalRelaysRequestedMetric *prometheus.CounterVec
Expand All @@ -35,6 +48,8 @@ type ConsumerMetricsManager struct {
protocolVersionMetric *prometheus.GaugeVec
providerRelays map[string]uint64
addMethodsApiGauge bool
averageLatencyPerChain map[string]*LatencyTracker // key == chain Id + api interface
averageLatencyMetric *prometheus.GaugeVec
}

type ConsumerMetricsManagerOptions struct {
Expand Down Expand Up @@ -128,9 +143,13 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM

endpointsHealthChecksOkMetric.Set(1)
protocolVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_protocol_version",
Name: "lava_consumer_protocol_version",
Help: "The current running lavap version for the process. major := version / 1000000, minor := (version / 1000) % 1000, patch := version % 1000",
}, []string{"version"})
averageLatencyMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_average_latency_in_milliseconds",
Help: "average latency per chain id per api interface",
}, []string{"spec", "apiInterface"})
// Register the metrics with the Prometheus registry.
prometheus.MustRegister(totalCURequestedMetric)
prometheus.MustRegister(totalRelaysRequestedMetric)
Expand All @@ -151,6 +170,7 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(currentNumberOfOpenSessionsMetric)
prometheus.MustRegister(currentNumberOfBlockedSessionsMetric)
prometheus.MustRegister(apiSpecificsMetric)
prometheus.MustRegister(averageLatencyMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
Expand All @@ -163,10 +183,12 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
averageLatencyPerChain: map[string]*LatencyTracker{},
virtualEpochMetric: virtualEpochMetric,
endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric,
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
averageLatencyMetric: averageLatencyMetric,
totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric,
totalRelaysReturnedFromProvidersMetric: totalRelaysReturnedFromProvidersMetric,
totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric,
Expand Down Expand Up @@ -273,7 +295,11 @@ func (pme *ConsumerMetricsManager) AddNumberOfBlockedSessionMetric(chainId strin
pme.currentNumberOfBlockedSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc()
}

func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64) {
func (pme *ConsumerMetricsManager) getKeyForAverageLatency(chainId string, apiInterface string) string {
return chainId + apiInterface
}

func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64, relayLatency time.Duration, sessionSuccessful bool) {
if pme == nil {
return
}
Expand All @@ -285,6 +311,19 @@ func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface st
// do not add Qos metrics there's another session with more statistics
return
}

// calculate average latency on successful sessions only and not hanging apis (transactions etc..)
if sessionSuccessful {
averageLatencyKey := pme.getKeyForAverageLatency(chainId, apiInterface)
existingLatency, foundExistingLatency := pme.averageLatencyPerChain[averageLatencyKey]
if !foundExistingLatency {
pme.averageLatencyPerChain[averageLatencyKey] = &LatencyTracker{}
existingLatency = pme.averageLatencyPerChain[averageLatencyKey]
}
existingLatency.AddLatency(relayLatency)
pme.averageLatencyMetric.WithLabelValues(chainId, apiInterface).Set(float64(existingLatency.AverageLatency.Milliseconds()))
}

pme.LatestProviderRelay.WithLabelValues(chainId, providerAddress, apiInterface).SetToCurrentTime()
// update existing relays
pme.providerRelays[providerRelaysKey] = relays
Expand Down
Loading

0 comments on commit 8826ae6

Please sign in to comment.