Skip to content

Commit

Permalink
fix: PRT - fixing retry spam on executed transactions (#1387)
Browse files Browse the repository at this point in the history
* add third provider

* fix not getting actually 10 providers if some of them are ignored

* adding should retry mechanism

* limiting 10 relay retries maximum + fixing retry mechanism

* should retry now taking number of retries launched into consideration

* should retry is more generic

* comment adjustment

* lint fix

* increase protocol version

* fix typo
  • Loading branch information
ranlavanet authored Apr 22, 2024
1 parent 176507c commit d8ddd65
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 19 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Or check out the latest [release](https://github.com/lavanet/lava/releases).

### Add `lavad`/`lavap` autocomplete

You can add a useful autocomplete feature to `lavad` & `lavap` with a simple bash [script](https://github.com/lavanet/lava/blob/main/scripts/lavad_auto_completion_install.sh).
You can add a useful autocomplete feature to `lavad` & `lavap` with a simple bash [script](https://github.com/lavanet/lava/blob/main/scripts/lava_auto_completion_install.sh).

### Quick Start

Expand Down
21 changes: 12 additions & 9 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,18 +513,18 @@ func (csm *ConsumerSessionManager) getTopTenProvidersForStatefulCalls(validAddre
}
// Sort the slice using the custom sorting rule
sort.Slice(validAddresses, customSort)
validAddressesMaxIndex := len(validAddresses) - 1
addresses := []string{}
for i := 0; i < 10; i++ {
// do not overflow
if i > validAddressesMaxIndex {
break
}
wantedLength := 10
for _, sortedAddress := range validAddresses {
// skip ignored providers
if _, foundInIgnoredProviderList := ignoredProvidersList[validAddresses[i]]; foundInIgnoredProviderList {
if _, foundInIgnoredProviderList := ignoredProvidersList[sortedAddress]; foundInIgnoredProviderList {
continue
}
addresses = append(addresses, validAddresses[i])
// fill the slice until we have 10 providers who are not ignored
addresses = append(addresses, sortedAddress)
if len(addresses) >= wantedLength {
break
}
}
return addresses
}
Expand Down Expand Up @@ -589,9 +589,12 @@ func (csm *ConsumerSessionManager) tryGetConsumerSessionWithProviderFromBlockedP

// if len(csm.currentlyBlockedProviderAddresses) == 0 we probably reset the state so we can fetch it normally OR ||
// on a very rare case epoch change can happen. in this case we should just fetch a provider from the new pairing list.
// we also enter this case if all validAddresses are inside ignoredProviders
if len(csm.currentlyBlockedProviderAddresses) == 0 || ignoredProviders.currentEpoch < currentEpoch {
// epoch changed just now (between the getValidConsumerSessionsWithProvider to tryGetConsumerSessionWithProviderFromBlockedProviderList)
utils.LavaFormatDebug("Epoch changed between getValidConsumerSessionsWithProvider to tryGetConsumerSessionWithProviderFromBlockedProviderList getting pairing from new epoch list")
if ignoredProviders.currentEpoch < currentEpoch {
utils.LavaFormatDebug("Epoch changed between getValidConsumerSessionsWithProvider to tryGetConsumerSessionWithProviderFromBlockedProviderList getting pairing from new epoch list")
}
csm.lock.RUnlock() // unlock because getValidConsumerSessionsWithProvider is locking.
return csm.getValidConsumerSessionsWithProvider(ignoredProviders, cuNeededForSession, requestedBlock, addon, extensions, stateful, virtualEpoch)
}
Expand Down
7 changes: 7 additions & 0 deletions protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ type RelayProcessor struct {
consumerIp string
}

func (rp *RelayProcessor) ShouldRetry(numberOfRetriesLaunched int) bool {
if numberOfRetriesLaunched >= MaximumNumberOfTickerRelayRetries {
return false
}
return rp.selection != BestResult
}

func (rp *RelayProcessor) String() string {
if rp == nil {
return ""
Expand Down
18 changes: 15 additions & 3 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
)

const (
// maximum number of retries to send due to the ticker, if we didn't get a response after 10 different attempts then just wait.
MaximumNumberOfTickerRelayRetries = 10
MaxRelayRetries = 6
SendRelayAttempts = 3
numberOfTimesToCheckCurrentlyUsedIsEmpty = 3
Expand Down Expand Up @@ -388,22 +390,32 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH
// every relay timeout we send a new batch
startNewBatchTicker := time.NewTicker(relayTimeout)
defer startNewBatchTicker.Stop()
numberOfRetriesLaunched := 0
for {
select {
case success := <-gotResults:
if success {
if success { // check wether we can return the valid results or we need to send another relay
return relayProcessor, nil
}
// if we don't need to retry return what we currently have
if !relayProcessor.ShouldRetry(numberOfRetriesLaunched) {
return relayProcessor, nil
}
// otherwise continue sending another relay
err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
go validateReturnCondition(err)
go readResultsFromProcessor()
numberOfRetriesLaunched++
case <-startNewBatchTicker.C:
// only trigger another batch for non BestResult relays
if relayProcessor.selection != BestResult {
// only trigger another batch for non BestResult relays or if we didn't pass the retry limit.
if relayProcessor.ShouldRetry(numberOfRetriesLaunched) {
// limit the number of retries called from the new batch ticker flow.
// if we pass the limit we just wait for the relays we sent to return.
err := rpccs.sendRelayToProvider(processingCtx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor)
go validateReturnCondition(err)
// add ticker launch metrics
go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.getChainIdAndApiInterface())
numberOfRetriesLaunched++
}
case returnErr := <-returnCondition:
// we use this channel because there could be a race condition between us releasing the provider and about to send the return
Expand Down
10 changes: 5 additions & 5 deletions scripts/pre_setups/init_lava_only_with_node_three_providers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25

# screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider \
# $PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' \
# $PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
# $PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' \
# $EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25
screen -d -m -S provider2 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER2_LISTENER LAV1 rest '$LAVA_REST' \
$PROVIDER2_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC' \
$PROVIDER2_LISTENER LAV1 grpc '$LAVA_GRPC' \
$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer2 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER2.log" && sleep 0.25

screen -d -m -S provider3 bash -c "source ~/.bashrc; lavap rpcprovider \
$PROVIDER3_LISTENER LAV1 rest '$LAVA_REST' \
Expand Down
2 changes: 1 addition & 1 deletion x/protocol/types/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var _ paramtypes.ParamSet = (*Params)(nil)

const (
TARGET_VERSION = "1.2.3"
TARGET_VERSION = "1.2.4"
MIN_VERSION = "1.0.2"
)

Expand Down

0 comments on commit d8ddd65

Please sign in to comment.