Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.0 -> main] Limit sync-fetch-span by max-reversible-blocks if needed #576

Merged
merged 10 commits into from
Aug 19, 2024
8 changes: 8 additions & 0 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,10 @@ struct controller_impl {
return fork_db.apply<bool>([&](const auto& forkdb) { return !!forkdb.has_root(); });
}

size_t fork_db_size() const {
return fork_db.size();
}

block_id_type fork_db_root_block_id() const {
return fork_db.apply<block_id_type>([&](const auto& forkdb) { return forkdb.root()->id(); });
}
Expand Down Expand Up @@ -5305,6 +5309,10 @@ bool controller::fork_db_has_root() const {
return my->fork_db_has_root();
}

size_t controller::fork_db_size() const {
return my->fork_db_size();
}

uint32_t controller::last_irreversible_block_num() const {
return my->fork_db_root_block_num();
}
Expand Down
3 changes: 2 additions & 1 deletion libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ namespace eosio::chain {

void set_savanna_lib_id(const block_id_type& id);

bool fork_db_has_root() const;
bool fork_db_has_root() const;
size_t fork_db_size() const;

// thread-safe, applied LIB, fork db root
uint32_t last_irreversible_block_num() const;
Expand Down
67 changes: 46 additions & 21 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,9 @@ namespace eosio {
uint32_t sync_next_expected_num GUARDED_BY(sync_mtx) {0}; // the next block number we need from peer
connection_ptr sync_source GUARDED_BY(sync_mtx); // connection we are currently syncing from

const uint32_t sync_req_span {0};
const uint32_t sync_fetch_span {0};
const uint32_t sync_peer_limit {0};
const size_t max_reversible_blocks {0};

alignas(hardware_destructive_interference_sz)
std::atomic<stages> sync_state{in_sync};
Expand All @@ -249,13 +250,13 @@ namespace eosio {
connection_ptr find_next_sync_node(); // call with locked mutex
void start_sync( const connection_ptr& c, uint32_t target ); // locks mutex
bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); // locks mutex

uint32_t active_sync_fetch_span() const;
public:
enum class closing_mode {
immediately, // closing connection immediately
handshake // sending handshake message
};
explicit sync_manager( uint32_t span, uint32_t sync_peer_limit, uint32_t min_blocks_distance );
explicit sync_manager( uint32_t span, uint32_t sync_peer_limit, size_t max_reversible_blocks, uint32_t min_blocks_distance );
static void send_handshakes();
bool syncing_from_peer() const { return sync_state == lib_catchup; }
bool is_in_sync() const { return sync_state == in_sync; }
Expand Down Expand Up @@ -1994,18 +1995,35 @@ namespace eosio {
}
//-----------------------------------------------------------

sync_manager::sync_manager( uint32_t span, uint32_t sync_peer_limit, uint32_t min_blocks_distance )
sync_manager::sync_manager( uint32_t span, uint32_t sync_peer_limit, size_t max_reversible_blocks, uint32_t min_blocks_distance )
:sync_known_lib_num( 0 )
,sync_last_requested_num( 0 )
,sync_next_expected_num( 1 )
,sync_source()
,sync_req_span( span )
,sync_fetch_span( span )
,sync_peer_limit( sync_peer_limit )
,max_reversible_blocks(max_reversible_blocks)
,sync_state(in_sync)
,min_blocks_distance(min_blocks_distance)
{
}

uint32_t sync_manager::active_sync_fetch_span() const {
auto fork_db_size = my_impl->chain_plug->chain().fork_db_size();
int32_t reversible_remaining = max_reversible_blocks - fork_db_size - 1;
if (reversible_remaining <= 0) {
fc_wlog(logger, "max-reversible-blocks ${m} exceeded, remaining ${r}, fork_db_size ${fs}",
("m", max_reversible_blocks)("r", reversible_remaining)("fs", fork_db_size));
reversible_remaining = 0;
}
if (reversible_remaining < sync_fetch_span) {
fc_wlog(logger, "sync-fetch-span ${sfs} restricted to ${r} by max-reversible-blocks ${m}, fork_db_size ${fs}",
("sfs", sync_fetch_span)("r", reversible_remaining)("m", max_reversible_blocks)("fs", fork_db_size));
return reversible_remaining;
}
return sync_fetch_span;
}

constexpr auto sync_manager::stage_str(stages s) {
switch (s) {
case in_sync : return "in sync";
Expand Down Expand Up @@ -2113,8 +2131,8 @@ namespace eosio {
void sync_manager::request_next_chunk( const connection_ptr& conn ) REQUIRES(sync_mtx) {
auto chain_info = my_impl->get_chain_info();

fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync_req_span: ${s}, fhead: ${h}, lib: ${lib}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_req_span)("h", chain_info.fork_head_num)("lib", chain_info.lib_num) );
fc_dlog( logger, "sync_last_requested_num: ${r}, sync_next_expected_num: ${e}, sync_known_lib_num: ${k}, sync-fetch-span: ${s}, fhead: ${h}, lib: ${lib}",
("r", sync_last_requested_num)("e", sync_next_expected_num)("k", sync_known_lib_num)("s", sync_fetch_span)("h", chain_info.fork_head_num)("lib", chain_info.lib_num) );

if (conn) {
// p2p_high_latency_test.py test depends on this exact log statement.
Expand Down Expand Up @@ -2149,7 +2167,8 @@ namespace eosio {
bool request_sent = false;
if( sync_last_requested_num != sync_known_lib_num ) {
uint32_t start = sync_next_expected_num;
uint32_t end = start + sync_req_span - 1;
auto fetch_span = active_sync_fetch_span();
uint32_t end = start + fetch_span - 1;
if( end > sync_known_lib_num )
end = sync_known_lib_num;
if( end > 0 && end >= start ) {
Expand Down Expand Up @@ -2527,22 +2546,26 @@ namespace eosio {
("bn", blk_num)("kn", sync_known_lib_num));
send_handshakes_when_synced = true;
} else {
// use chain head instead of fork head so we do not get too far ahead of applied blocks
uint32_t head = my_impl->get_chain_head_num();
// do not allow to get too far ahead (one sync_req_span) of chain head
if (blk_num >= sync_last_requested_num && blk_num < head + sync_req_span) {
// block was not applied, possibly because we already have the block
fc_dlog(logger, "Requesting blocks ahead, head: ${h} fhead ${fh} blk_num: ${bn} sync_next_expected_num ${nen} "
"sync_last_requested_num: ${lrn}, sync_last_requested_block: ${lrb}",
("h", my_impl->get_chain_head_num())("fh", my_impl->get_fork_head_num())
("bn", blk_num)("nen", sync_next_expected_num)
("lrn", sync_last_requested_num)("lrb", c->sync_last_requested_block));
request_next_chunk();
if (blk_num >= sync_last_requested_num) {
// do not allow to get too far ahead (sync_fetch_span) of chain head
auto fetch_span = active_sync_fetch_span();
// use chain head instead of fork head so we do not get too far ahead of applied blocks
uint32_t head = my_impl->get_chain_head_num();
if (blk_num < head + fetch_span) {
// block was not applied, possibly because we already have the block
fc_dlog(logger, "Requesting ${fs} blocks ahead, head: ${h} fhead ${fh} blk_num: ${bn} sync_next_expected_num ${nen} "
"sync_last_requested_num: ${lrn}, sync_last_requested_block: ${lrb}",
("fs", fetch_span)("h", head)("fh", my_impl->get_fork_head_num())
("bn", blk_num)("nen", sync_next_expected_num)
("lrn", sync_last_requested_num)("lrb", c->sync_last_requested_block));
request_next_chunk();
}
}
}
} else { // blk_applied
if (blk_num >= sync_last_requested_num) {
// Did not request blocks ahead, likely because too far ahead of head
// Do not restrict sync_fetch_span as we want max-reversible-blocks to shut down the node for applied blocks
fc_dlog(logger, "Requesting blocks, head: ${h} fhead ${fh} blk_num: ${bn} sync_next_expected_num ${nen} "
"sync_last_requested_num: ${lrn}, sync_last_requested_block: ${lrb}",
("h", my_impl->get_chain_head_num())("fh", my_impl->get_fork_head_num())
Expand Down Expand Up @@ -4228,6 +4251,9 @@ namespace eosio {
try {
fc_ilog( logger, "Initialize net plugin" );

chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "" );

peer_log_format = options.at( "peer-log-format" ).as<string>();

txn_exp_period = def_txn_expire_wait;
Expand All @@ -4249,6 +4275,7 @@ namespace eosio {
sync_master = std::make_unique<sync_manager>(
options.at( "sync-fetch-span" ).as<uint32_t>(),
options.at( "sync-peer-limit" ).as<uint32_t>(),
chain_plug->chain_config().max_reversible_blocks,
min_blocks_distance);

connections.init( std::chrono::milliseconds( options.at("p2p-keepalive-interval-ms").as<int>() * 2 ),
Expand Down Expand Up @@ -4345,8 +4372,6 @@ namespace eosio {
}
}

chain_plug = app().find_plugin<chain_plugin>();
EOS_ASSERT( chain_plug, chain::missing_chain_plugin_exception, "" );
chain_id = chain_plug->get_chain_id();
fc::rand_pseudo_bytes( node_id.data(), node_id.data_size());
const controller& cc = chain_plug->chain();
Expand Down