Skip to content

Commit

Permalink
Merge PR: rpc api rate limit (#3272)
Browse files Browse the repository at this point in the history
* limit more rpc api

* optimize code
  • Loading branch information
ilovers authored Dec 13, 2023
1 parent a232e52 commit 644a1ce
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 12 deletions.
2 changes: 1 addition & 1 deletion app/rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GetAPIs(clientCtx context.CLIContext, log log.Logger, keys ...ethsecp256k1.
rateLimiters := getRateLimiter()
disableAPI := getDisableAPI()
ethBackend = backend.New(clientCtx, log, rateLimiters, disableAPI)
ethAPI := eth.NewAPI(clientCtx, log, ethBackend, nonceLock, keys...)
ethAPI := eth.NewAPI(rateLimiters, clientCtx, log, ethBackend, nonceLock, keys...)
if evmtypes.GetEnableBloomFilter() {
ethBackend.StartBloomHandlers(evmtypes.BloomBitsBlocks, evmtypes.GetIndexer().GetDB())
}
Expand Down
57 changes: 52 additions & 5 deletions app/rpc/namespaces/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -90,10 +92,18 @@ type PublicEthereumAPI struct {
callCache *lru.Cache
cdc *codec.Codec
fastQueryThreshold uint64
rateLimiters map[string]*rate.Limiter
}

func (api *PublicEthereumAPI) GetRateLimiter(apiName string) *rate.Limiter {
if api.rateLimiters == nil {
return nil
}
return api.rateLimiters[apiName]
}

// NewAPI creates an instance of the public ETH Web3 API.
func NewAPI(
func NewAPI(rateLimiters map[string]*rate.Limiter,
clientCtx clientcontext.CLIContext, log log.Logger, backend backend.Backend, nonceLock *rpctypes.AddrLocker,
keys ...ethsecp256k1.PrivKey,
) *PublicEthereumAPI {
Expand All @@ -115,6 +125,7 @@ func NewAPI(
wrappedBackend: watcher.NewQuerier(),
watcherBackend: watcher.NewWatcher(log),
fastQueryThreshold: viper.GetUint64(FlagFastQueryThreshold),
rateLimiters: rateLimiters,
}
api.evmFactory = simulation.NewEvmFactory(clientCtx.ChainID, api.wrappedBackend)
module := evm.AppModuleBasic{}
Expand Down Expand Up @@ -506,6 +517,10 @@ func (api *PublicEthereumAPI) getStorageAt(address common.Address, key []byte, b
func (api *PublicEthereumAPI) GetStorageAt(address common.Address, key string, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getStorageAt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "key", key, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_getStorageAt")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand All @@ -522,7 +537,10 @@ func (api *PublicEthereumAPI) GetStorageAtInternal(address common.Address, key [
func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (*hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_getTransactionCount", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNrOrHash)

rateLimiter := api.GetRateLimiter("eth_getTransactionCount")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNum, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -553,6 +571,10 @@ func (api *PublicEthereumAPI) GetTransactionCount(address common.Address, blockN
func (api *PublicEthereumAPI) GetBlockTransactionCountByHash(hash common.Hash) *hexutil.Uint {
monitor := monitor.GetMonitor("eth_getBlockTransactionCountByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rateLimiter := api.GetRateLimiter("eth_getBlockTransactionCountByHash")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil
}
res, _, err := api.clientCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
return nil
Expand Down Expand Up @@ -636,6 +658,10 @@ func (api *PublicEthereumAPI) GetUncleCountByBlockNumber(_ rpctypes.BlockNumber)
func (api *PublicEthereumAPI) GetCode(address common.Address, blockNrOrHash rpctypes.BlockNumberOrHash) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_getCode", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("address", address, "block number", blockNrOrHash)
rateLimiter := api.GetRateLimiter("eth_getCode")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockNumber, err := api.backend.ConvertToBlockNumber(blockNrOrHash)
if err != nil {
return nil, err
Expand Down Expand Up @@ -683,6 +709,10 @@ func (api *PublicEthereumAPI) GetCodeByHash(hash common.Hash) (hexutil.Bytes, er
// GetTransactionLogs returns the logs given a transaction hash.
func (api *PublicEthereumAPI) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) {
api.logger.Debug("eth_getTransactionLogs", "hash", txHash)
rateLimiter := api.GetRateLimiter("eth_getTransactionLogs")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
return api.backend.GetTransactionLogs(txHash)
}

Expand Down Expand Up @@ -864,7 +894,10 @@ func (api *PublicEthereumAPI) addCallCache(key common.Hash, data []byte) {
func (api *PublicEthereumAPI) Call(args rpctypes.CallArgs, blockNrOrHash rpctypes.BlockNumberOrHash, overrides *evmtypes.StateOverrides) (hexutil.Bytes, error) {
monitor := monitor.GetMonitor("eth_call", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args, "block number", blockNrOrHash)

rateLimiter := api.GetRateLimiter("eth_call")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
if overrides != nil {
if err := overrides.Check(); err != nil {
return nil, err
Expand Down Expand Up @@ -1092,7 +1125,10 @@ func (api *PublicEthereumAPI) simDoCall(args rpctypes.CallArgs, cap uint64) (uin
func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint64, error) {
monitor := monitor.GetMonitor("eth_estimateGas", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("args", args)

rateLimiter := api.GetRateLimiter("eth_estimateGas")
if rateLimiter != nil && !rateLimiter.Allow() {
return 0, rpctypes.ErrServerBusy
}
params, err := api.getEvmParams()
if err != nil {
return 0, TransformDataError(err, "eth_estimateGas")
Expand Down Expand Up @@ -1136,6 +1172,10 @@ func (api *PublicEthereumAPI) EstimateGas(args rpctypes.CallArgs) (hexutil.Uint6
func (api *PublicEthereumAPI) GetBlockByHash(hash common.Hash, fullTx bool) (*watcher.Block, error) {
monitor := monitor.GetMonitor("eth_getBlockByHash", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash, "full", fullTx)
rateLimiter := api.GetRateLimiter("eth_getBlockByHash")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockRes, err := api.backend.GetBlockByHash(hash, fullTx)
if err != nil {
return nil, TransformDataError(err, RPCEthGetBlockByHash)
Expand Down Expand Up @@ -1195,7 +1235,10 @@ func (api *PublicEthereumAPI) getBlockByNumber(blockNum rpctypes.BlockNumber, fu
func (api *PublicEthereumAPI) GetBlockByNumber(blockNum rpctypes.BlockNumber, fullTx bool) (*watcher.Block, error) {
monitor := monitor.GetMonitor("eth_getBlockByNumber", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("number", blockNum, "full", fullTx)

rateLimiter := api.GetRateLimiter("eth_getBlockByNumber")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
blockRes, err := api.getBlockByNumber(blockNum, fullTx)
return blockRes, err
}
Expand Down Expand Up @@ -1306,6 +1349,10 @@ func (api *PublicEthereumAPI) getTransactionByBlockAndIndex(block *tmtypes.Block
func (api *PublicEthereumAPI) GetTransactionReceipt(hash common.Hash) (*watcher.TransactionReceipt, error) {
monitor := monitor.GetMonitor("eth_getTransactionReceipt", api.logger, api.Metrics).OnBegin()
defer monitor.OnEnd("hash", hash)
rateLimiter := api.GetRateLimiter("eth_getTransactionReceipt")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, rpctypes.ErrServerBusy
}
res, e := api.wrappedBackend.GetTransactionReceipt(hash)
if e == nil {
return res, nil
Expand Down
11 changes: 5 additions & 6 deletions app/rpc/namespaces/eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
)

var (
ErrServerBusy = errors.New("server is too busy")
ErrMethodNotAllowed = errors.New("the method is not allowed")
NameSpace = "filters"
)
Expand Down Expand Up @@ -138,7 +137,7 @@ func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
}
rateLimiter := api.backend.GetRateLimiter("eth_newPendingTransactionFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", ErrServerBusy.Error()))
return rpc.ID(fmt.Sprintf("error creating pending tx filter: %s", rpctypes.ErrServerBusy.Error()))
}
pendingTxSub, cancelSubs, err := api.events.SubscribePendingTxs()
if err != nil {
Expand Down Expand Up @@ -235,7 +234,7 @@ func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
}
rateLimiter := api.backend.GetRateLimiter("eth_newBlockFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(fmt.Sprintf("error creating block filter: %s", ErrServerBusy.Error()))
return rpc.ID(fmt.Sprintf("error creating block filter: %s", rpctypes.ErrServerBusy.Error()))
}
headerSub, cancelSubs, err := api.events.SubscribeNewHeads()
if err != nil {
Expand Down Expand Up @@ -402,7 +401,7 @@ func (api *PublicFilterAPI) NewFilter(criteria filters.FilterCriteria) (rpc.ID,
}
rateLimiter := api.backend.GetRateLimiter("eth_newFilter")
if rateLimiter != nil && !rateLimiter.Allow() {
return rpc.ID(""), ErrServerBusy
return rpc.ID(""), rpctypes.ErrServerBusy
}
var (
filterID = rpc.ID("")
Expand Down Expand Up @@ -468,7 +467,7 @@ func (api *PublicFilterAPI) GetLogs(ctx context.Context, criteria filters.Filter
}
rateLimiter := api.backend.GetRateLimiter("eth_getLogs")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, ErrServerBusy
return nil, rpctypes.ErrServerBusy
}
var filter *Filter
if criteria.BlockHash != nil {
Expand Down Expand Up @@ -574,7 +573,7 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
}
rateLimiter := api.backend.GetRateLimiter("eth_getFilterChanges")
if rateLimiter != nil && !rateLimiter.Allow() {
return nil, ErrServerBusy
return nil, rpctypes.ErrServerBusy
}
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions app/rpc/types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)

var (
ErrServerBusy = errors.New("server is too busy, please try again later")
// static gas limit for all blocks
defaultGasLimit = hexutil.Uint64(int64(^uint32(0)))
defaultGasUsed = hexutil.Uint64(0)
Expand Down

0 comments on commit 644a1ce

Please sign in to comment.