cryptonote_protocol: misc fixes to the new sync algorithm

Fix sync wedge corner case:
It could happen if a connection went into standby mode, while
it was the one which had requested the next span, and that span
was still waiting for the data, and that peer is not on the
main chain. Other peers can then start asking for that data
again and again, but never get it as only that forked peer does.

And various other fixes
This commit is contained in:
moneromooo-monero 2017-08-18 20:14:23 +01:00
parent 4466b6d1b0
commit 70b8c6d77a
No known key found for this signature in database
GPG key ID: 686F07454D6CEFC3
5 changed files with 109 additions and 38 deletions

View file

@ -52,8 +52,11 @@ namespace cryptonote
void block_queue::add_blocks(uint64_t height, std::list<cryptonote::block_complete_entry> bcel, const boost::uuids::uuid &connection_id, float rate, size_t size) void block_queue::add_blocks(uint64_t height, std::list<cryptonote::block_complete_entry> bcel, const boost::uuids::uuid &connection_id, float rate, size_t size)
{ {
boost::unique_lock<boost::recursive_mutex> lock(mutex); boost::unique_lock<boost::recursive_mutex> lock(mutex);
remove_span(height); std::list<crypto::hash> hashes;
bool has_hashes = remove_span(height, &hashes);
blocks.insert(span(height, std::move(bcel), connection_id, rate, size)); blocks.insert(span(height, std::move(bcel), connection_id, rate, size));
if (has_hashes)
set_span_hashes(height, connection_id, hashes);
} }
void block_queue::add_blocks(uint64_t height, uint64_t nblocks, const boost::uuids::uuid &connection_id, boost::posix_time::ptime time) void block_queue::add_blocks(uint64_t height, uint64_t nblocks, const boost::uuids::uuid &connection_id, boost::posix_time::ptime time)
@ -92,17 +95,20 @@ void block_queue::flush_stale_spans(const std::set<boost::uuids::uuid> &live_con
} }
} }
void block_queue::remove_span(uint64_t start_block_height) bool block_queue::remove_span(uint64_t start_block_height, std::list<crypto::hash> *hashes)
{ {
boost::unique_lock<boost::recursive_mutex> lock(mutex); boost::unique_lock<boost::recursive_mutex> lock(mutex);
for (block_map::iterator i = blocks.begin(); i != blocks.end(); ++i) for (block_map::iterator i = blocks.begin(); i != blocks.end(); ++i)
{ {
if (i->start_block_height == start_block_height) if (i->start_block_height == start_block_height)
{ {
if (hashes)
*hashes = std::move(i->hashes);
blocks.erase(i); blocks.erase(i);
return; return true;
} }
} }
return false;
} }
void block_queue::remove_spans(const boost::uuids::uuid &connection_id, uint64_t start_block_height) void block_queue::remove_spans(const boost::uuids::uuid &connection_id, uint64_t start_block_height)
@ -278,6 +284,22 @@ bool block_queue::get_next_span(uint64_t &height, std::list<cryptonote::block_co
return false; return false;
} }
bool block_queue::has_next_span(const boost::uuids::uuid &connection_id, bool &filled) const
{
boost::unique_lock<boost::recursive_mutex> lock(mutex);
if (blocks.empty())
return false;
block_map::const_iterator i = blocks.begin();
if (is_blockchain_placeholder(*i))
++i;
if (i == blocks.end())
return false;
if (i->connection_id != connection_id)
return false;
filled = !i->blocks.empty();
return true;
}
size_t block_queue::get_data_size() const size_t block_queue::get_data_size() const
{ {
boost::unique_lock<boost::recursive_mutex> lock(mutex); boost::unique_lock<boost::recursive_mutex> lock(mutex);

View file

@ -71,7 +71,7 @@ namespace cryptonote
void add_blocks(uint64_t height, uint64_t nblocks, const boost::uuids::uuid &connection_id, boost::posix_time::ptime time = boost::date_time::min_date_time); void add_blocks(uint64_t height, uint64_t nblocks, const boost::uuids::uuid &connection_id, boost::posix_time::ptime time = boost::date_time::min_date_time);
void flush_spans(const boost::uuids::uuid &connection_id, bool all = false); void flush_spans(const boost::uuids::uuid &connection_id, bool all = false);
void flush_stale_spans(const std::set<boost::uuids::uuid> &live_connections); void flush_stale_spans(const std::set<boost::uuids::uuid> &live_connections);
void remove_span(uint64_t start_block_height); bool remove_span(uint64_t start_block_height, std::list<crypto::hash> *hashes = NULL);
void remove_spans(const boost::uuids::uuid &connection_id, uint64_t start_block_height); void remove_spans(const boost::uuids::uuid &connection_id, uint64_t start_block_height);
uint64_t get_max_block_height() const; uint64_t get_max_block_height() const;
void print() const; void print() const;
@ -82,6 +82,7 @@ namespace cryptonote
std::pair<uint64_t, uint64_t> get_next_span_if_scheduled(std::list<crypto::hash> &hashes, boost::uuids::uuid &connection_id, boost::posix_time::ptime &time) const; std::pair<uint64_t, uint64_t> get_next_span_if_scheduled(std::list<crypto::hash> &hashes, boost::uuids::uuid &connection_id, boost::posix_time::ptime &time) const;
void set_span_hashes(uint64_t start_height, const boost::uuids::uuid &connection_id, std::list<crypto::hash> hashes); void set_span_hashes(uint64_t start_height, const boost::uuids::uuid &connection_id, std::list<crypto::hash> hashes);
bool get_next_span(uint64_t &height, std::list<cryptonote::block_complete_entry> &bcel, boost::uuids::uuid &connection_id, bool filled = true) const; bool get_next_span(uint64_t &height, std::list<cryptonote::block_complete_entry> &bcel, boost::uuids::uuid &connection_id, bool filled = true) const;
bool has_next_span(const boost::uuids::uuid &connection_id, bool &filled) const;
size_t get_data_size() const; size_t get_data_size() const;
size_t get_num_filled_spans_prefix() const; size_t get_num_filled_spans_prefix() const;
size_t get_num_filled_spans() const; size_t get_num_filled_spans() const;

View file

@ -111,6 +111,7 @@ namespace cryptonote
std::list<connection_info> get_connections(); std::list<connection_info> get_connections();
const block_queue &get_block_queue() const { return m_block_queue; } const block_queue &get_block_queue() const { return m_block_queue; }
void stop(); void stop();
void on_connection_close(cryptonote_connection_context &context);
private: private:
//----------------- commands handlers ---------------------------------------------- //----------------- commands handlers ----------------------------------------------
int handle_notify_new_block(int command, NOTIFY_NEW_BLOCK::request& arg, cryptonote_connection_context& context); int handle_notify_new_block(int command, NOTIFY_NEW_BLOCK::request& arg, cryptonote_connection_context& context);
@ -133,6 +134,7 @@ namespace cryptonote
bool should_download_next_span(cryptonote_connection_context& context) const; bool should_download_next_span(cryptonote_connection_context& context) const;
void drop_connection(cryptonote_connection_context &context, bool add_fail, bool flush_all_spans); void drop_connection(cryptonote_connection_context &context, bool add_fail, bool flush_all_spans);
bool kick_idle_peers(); bool kick_idle_peers();
int try_add_next_blocks(cryptonote_connection_context &context);
t_core& m_core; t_core& m_core;

View file

@ -106,6 +106,11 @@ namespace cryptonote
LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() ); LOG_PRINT_CCONTEXT_L2("-->>NOTIFY_REQUEST_CHAIN: m_block_ids.size()=" << r.block_ids.size() );
post_notify<NOTIFY_REQUEST_CHAIN>(r, context); post_notify<NOTIFY_REQUEST_CHAIN>(r, context);
} }
else if(context.m_state == cryptonote_connection_context::state_standby)
{
context.m_state = cryptonote_connection_context::state_synchronizing;
try_add_next_blocks(context);
}
return true; return true;
} }
@ -819,8 +824,6 @@ namespace cryptonote
{ {
MLOG_P2P_MESSAGE("Received NOTIFY_RESPONSE_GET_OBJECTS (" << arg.blocks.size() << " blocks, " << arg.txs.size() << " txes)"); MLOG_P2P_MESSAGE("Received NOTIFY_RESPONSE_GET_OBJECTS (" << arg.blocks.size() << " blocks, " << arg.txs.size() << " txes)");
bool force_next_span = false;
// calculate size of request // calculate size of request
size_t size = 0; size_t size = 0;
for (const auto &element : arg.txs) size += element.size(); for (const auto &element : arg.txs) size += element.size();
@ -938,19 +941,34 @@ namespace cryptonote
context.m_last_known_hash = cryptonote::get_blob_hash(arg.blocks.back().block); context.m_last_known_hash = cryptonote::get_blob_hash(arg.blocks.back().block);
if (m_core.get_test_drop_download() && m_core.get_test_drop_download_height()) { // DISCARD BLOCKS for testing if (!m_core.get_test_drop_download() || !m_core.get_test_drop_download_height()) { // DISCARD BLOCKS for testing
return 1;
}
}
// We try to lock the sync lock. If we can, it means no other thread is skip:
// currently adding blocks, so we do that for as long as we can from the try_add_next_blocks(context);
// block queue. Then, we go back to download. return 1;
const boost::unique_lock<boost::mutex> sync{m_sync_lock, boost::try_to_lock}; }
if (!sync.owns_lock())
{
MINFO("Failed to lock m_sync_lock, going back to download");
goto skip;
}
MDEBUG(context << " lock m_sync_lock, adding blocks to chain...");
template<class t_core>
int t_cryptonote_protocol_handler<t_core>::try_add_next_blocks(cryptonote_connection_context& context)
{
bool force_next_span = false;
{
// We try to lock the sync lock. If we can, it means no other thread is
// currently adding blocks, so we do that for as long as we can from the
// block queue. Then, we go back to download.
const boost::unique_lock<boost::mutex> sync{m_sync_lock, boost::try_to_lock};
if (!sync.owns_lock())
{
MINFO("Failed to lock m_sync_lock, going back to download");
goto skip;
}
MDEBUG(context << " lock m_sync_lock, adding blocks to chain...");
{
m_core.pause_mine(); m_core.pause_mine();
epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler( epee::misc_utils::auto_scope_leave_caller scope_exit_handler = epee::misc_utils::create_scope_leave_handler(
boost::bind(&t_core::resume_mine, &m_core)); boost::bind(&t_core::resume_mine, &m_core));
@ -984,21 +1002,15 @@ namespace cryptonote
// - later in an alt chain // - later in an alt chain
// - orphan // - orphan
// if it was requested, then it'll be resolved later, otherwise it's an orphan // if it was requested, then it'll be resolved later, otherwise it's an orphan
bool parent_requested = false; bool parent_requested = m_block_queue.requested(new_block.prev_id);
m_p2p->for_each_connection([&](cryptonote_connection_context& context, nodetool::peerid_type peer_id, uint32_t support_flags)->bool{
if (context.m_requested_objects.find(new_block.prev_id) != context.m_requested_objects.end())
{
parent_requested = true;
return false;
}
return true;
});
if (!parent_requested) if (!parent_requested)
{ {
LOG_ERROR_CCONTEXT("Got block with unknown parent which was not requested - dropping connection"); // this can happen if a connection was sicced onto a late span, if it did not have those blocks,
// in case the peer had dropped beforehand, remove the span anyway so other threads can wake up and get it // since we don't know that at the sic time
m_block_queue.remove_spans(span_connection_id, start_height); LOG_ERROR_CCONTEXT("Got block with unknown parent which was not requested - querying block hashes");
return 1; context.m_needed_objects.clear();
context.m_last_response_height = 0;
goto skip;
} }
// parent was requested, so we wait for it to be retrieved // parent was requested, so we wait for it to be retrieved
@ -1007,6 +1019,7 @@ namespace cryptonote
} }
const boost::posix_time::ptime start = boost::posix_time::microsec_clock::universal_time(); const boost::posix_time::ptime start = boost::posix_time::microsec_clock::universal_time();
context.m_last_request_time = start;
m_core.prepare_handle_incoming_blocks(blocks); m_core.prepare_handle_incoming_blocks(blocks);
@ -1108,7 +1121,7 @@ namespace cryptonote
<< timing_message); << timing_message);
} }
} }
} // if not DISCARD BLOCK }
if (should_download_next_span(context)) if (should_download_next_span(context))
{ {
@ -1179,9 +1192,17 @@ skip:
std::list<crypto::hash> hashes; std::list<crypto::hash> hashes;
boost::uuids::uuid span_connection_id; boost::uuids::uuid span_connection_id;
boost::posix_time::ptime request_time; boost::posix_time::ptime request_time;
std::pair<uint64_t, uint64_t> span = m_block_queue.get_next_span_if_scheduled(hashes, span_connection_id, request_time); std::pair<uint64_t, uint64_t> span;
span = m_block_queue.get_start_gap_span();
if (span.second > 0)
{
MDEBUG(context << " we should download it as there is a gap");
return true;
}
// if the next span is not scheduled (or there is none) // if the next span is not scheduled (or there is none)
span = m_block_queue.get_next_span_if_scheduled(hashes, span_connection_id, request_time);
if (span.second == 0) if (span.second == 0)
{ {
// we might be in a weird case where there is a filled next span, // we might be in a weird case where there is a filled next span,
@ -1270,6 +1291,17 @@ skip:
first = false; first = false;
context.m_state = cryptonote_connection_context::state_standby; context.m_state = cryptonote_connection_context::state_standby;
} }
// this needs doing after we went to standby, so the callback knows what to do
bool filled;
if (m_block_queue.has_next_span(context.m_connection_id, filled) && !filled)
{
MDEBUG(context << " we have the next span, and it is scheduled, resuming");
++context.m_callback_request_count;
m_p2p->request_callback(context);
return 1;
}
for (size_t n = 0; n < 50; ++n) for (size_t n = 0; n < 50; ++n)
{ {
if (m_stopping) if (m_stopping)
@ -1289,9 +1321,8 @@ skip:
size_t count = 0; size_t count = 0;
const size_t count_limit = m_core.get_block_sync_size(m_core.get_current_blockchain_height()); const size_t count_limit = m_core.get_block_sync_size(m_core.get_current_blockchain_height());
std::pair<uint64_t, uint64_t> span = std::make_pair(0, 0); std::pair<uint64_t, uint64_t> span = std::make_pair(0, 0);
if (force_next_span)
{ {
MDEBUG(context << " force_next_span is true, trying next span"); MDEBUG(context << " checking for gap");
span = m_block_queue.get_start_gap_span(); span = m_block_queue.get_start_gap_span();
if (span.second > 0) if (span.second > 0)
{ {
@ -1311,6 +1342,9 @@ skip:
} }
MDEBUG(context << " we have the hashes for this gap"); MDEBUG(context << " we have the hashes for this gap");
} }
}
if (force_next_span)
{
if (span.second == 0) if (span.second == 0)
{ {
std::list<crypto::hash> hashes; std::list<crypto::hash> hashes;
@ -1360,7 +1394,12 @@ skip:
for (const auto &hash: hashes) for (const auto &hash: hashes)
{ {
req.blocks.push_back(hash); req.blocks.push_back(hash);
++count;
context.m_requested_objects.insert(hash); context.m_requested_objects.insert(hash);
// that's atrocious O(n) wise, but this is rare
auto i = std::find(context.m_needed_objects.begin(), context.m_needed_objects.end(), hash);
if (i != context.m_needed_objects.end())
context.m_needed_objects.erase(i);
} }
} }
} }
@ -1384,14 +1423,12 @@ skip:
return false; return false;
} }
std::list<crypto::hash> hashes;
auto it = context.m_needed_objects.begin(); auto it = context.m_needed_objects.begin();
for (size_t n = 0; n < span.second; ++n) for (size_t n = 0; n < span.second; ++n)
{ {
req.blocks.push_back(*it); req.blocks.push_back(*it);
++count; ++count;
context.m_requested_objects.insert(*it); context.m_requested_objects.insert(*it);
hashes.push_back(*it);
auto j = it++; auto j = it++;
context.m_needed_objects.erase(j); context.m_needed_objects.erase(j);
} }
@ -1399,7 +1436,7 @@ skip:
context.m_last_request_time = boost::posix_time::microsec_clock::universal_time(); context.m_last_request_time = boost::posix_time::microsec_clock::universal_time();
LOG_PRINT_CCONTEXT_L1("-->>NOTIFY_REQUEST_GET_OBJECTS: blocks.size()=" << req.blocks.size() << ", txs.size()=" << req.txs.size() LOG_PRINT_CCONTEXT_L1("-->>NOTIFY_REQUEST_GET_OBJECTS: blocks.size()=" << req.blocks.size() << ", txs.size()=" << req.txs.size()
<< "requested blocks count=" << count << " / " << count_limit << " from " << span.first); << "requested blocks count=" << count << " / " << count_limit << " from " << span.first << ", first hash " << req.blocks.front());
//epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size()); //epee::net_utils::network_throttle_manager::get_global_throttle_inreq().logger_handle_net("log/dr-monero/net/req-all.data", sec, get_avg_block_size());
post_notify<NOTIFY_REQUEST_GET_OBJECTS>(req, context); post_notify<NOTIFY_REQUEST_GET_OBJECTS>(req, context);
@ -1582,8 +1619,15 @@ skip:
{ {
if (add_fail) if (add_fail)
m_p2p->add_host_fail(context.m_remote_address); m_p2p->add_host_fail(context.m_remote_address);
m_p2p->drop_connection(context); m_p2p->drop_connection(context);
m_block_queue.flush_spans(context.m_connection_id, flush_all_spans);
}
//------------------------------------------------------------------------------------------------------------------------
template<class t_core>
void t_cryptonote_protocol_handler<t_core>::on_connection_close(cryptonote_connection_context &context)
{
uint64_t target = 0; uint64_t target = 0;
m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id, uint32_t support_flags) { m_p2p->for_each_connection([&](const connection_context& cntxt, nodetool::peerid_type peer_id, uint32_t support_flags) {
if (cntxt.m_state >= cryptonote_connection_context::state_synchronizing && cntxt.m_connection_id != context.m_connection_id) if (cntxt.m_state >= cryptonote_connection_context::state_synchronizing && cntxt.m_connection_id != context.m_connection_id)
@ -1597,7 +1641,7 @@ skip:
m_core.set_target_blockchain_height(target); m_core.set_target_blockchain_height(target);
} }
m_block_queue.flush_spans(context.m_connection_id, flush_all_spans); m_block_queue.flush_spans(context.m_connection_id, false);
} }
//------------------------------------------------------------------------------------------------------------------------ //------------------------------------------------------------------------------------------------------------------------

View file

@ -1795,6 +1795,8 @@ namespace nodetool
m_peerlist.remove_from_peer_anchor(na); m_peerlist.remove_from_peer_anchor(na);
} }
m_payload_handler.on_connection_close(context);
MINFO("["<< epee::net_utils::print_connection_context(context) << "] CLOSE CONNECTION"); MINFO("["<< epee::net_utils::print_connection_context(context) << "] CLOSE CONNECTION");
} }