Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Commit

Permalink
Merge pull request #5865 from ethereum/sync-deadlock
Browse files Browse the repository at this point in the history
Fix race condition which permanently pauses sync
  • Loading branch information
halfalicious committed Dec 12, 2019
2 parents 9f75df6 + 124a702 commit 04c3e53
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
- Fixed: [#5852](https://github.com/ethereum/aleth/pull/5852) Output correct original opcodes instead of synthetic `PUSHC`/`JUMPC`/`JUMPCI` in VM trace.
- Fixed: [#5829](https://github.com/ethereum/aleth/pull/5829) web3.eth.getBlock now returns block size in bytes. This requires a (automatic) database rebuild which can take a while depending on how many blocks are in the local chain.
- Fixed: [#5866](https://github.com/ethereum/aleth/pull/5866) Update output of `debug_accountRangeAt` and `eth_getTransactionCount` RPC functions to conform to Geth's output.
- Fixed: [#5865](https://github.com/ethereum/aleth/pull/5865) Fix bug which causes syncing to become permanently stuck.

## [1.7.2] - 2019-11-22

Expand Down
30 changes: 17 additions & 13 deletions libethereum/BlockChainSync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,18 @@ BlockChainSync::BlockChainSync(EthereumCapability& _host)
m_lastImportedBlock(m_startingBlock),
m_lastImportedBlockHash(_host.chain().currentHash())
{
m_bqRoomAvailable = host().bq().onRoomAvailable([this]() {
// Ensure that syncing occurs on the network thread (since the block queue onRoomAvailable
// handler can be called on the client thread)
host().capabilityHost().postWork([this]() {
RecursiveGuard l(x_sync);
m_state = SyncState::Blocks;
continueSync();
});
m_bqBlocksDrained = host().bq().onBlocksDrained([this]() {
if (isSyncPaused() && !host().bq().knownFull())
{
// Draining freed up space in the block queue. Let's resume syncing.
// Ensure that syncing occurs on the network thread (since the block queue handler is
// called on the client thread
host().capabilityHost().postWork([this]() {
RecursiveGuard l(x_sync);
m_state = SyncState::Blocks;
continueSync();
});
}
});
}

Expand Down Expand Up @@ -250,10 +254,8 @@ void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
u256 td = host().chain().details().totalDifficulty;
if (host().bq().isActive())
td += host().bq().difficulty();

u256 syncingDifficulty = std::max(m_syncingTotalDifficulty, td);

u256 peerTotalDifficulty = peer.totalDifficulty();
u256 const syncingDifficulty = std::max(m_syncingTotalDifficulty, td);
u256 const peerTotalDifficulty = peer.totalDifficulty();

if (_force || peerTotalDifficulty > syncingDifficulty)
{
Expand All @@ -269,7 +271,9 @@ void BlockChainSync::syncPeer(NodeID const& _peerID, bool _force)
LOG(m_logger) << "Syncing with peer " << peer.id();
m_state = SyncState::Blocks;
}
peer.requestBlockHeaders(peer.latestHash(), 1, 0, false);

// Request tip of peer's chain
peer.requestBlockHeaders(peer.latestHash(), 1 /* count */, 0 /* skip */, false /* reverse */);
peer.setWaitingForTransactions(true);
return;
}
Expand Down
7 changes: 6 additions & 1 deletion libethereum/BlockChainSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ class BlockChainSync final: public HasInvariants
};

EthereumCapability& m_host;
Handler<> m_bqRoomAvailable; ///< Triggered once block queue has space for more blocks

// Triggered once blocks have been drained from the block queue, potentially freeing up space
// for more blocks. Note that the block queue can still be full after a drain, depending on how
// many blocks are in the queue vs how many are being drained.
Handler<> m_bqBlocksDrained;

mutable RecursiveMutex x_sync;
/// Peers to which we've sent DAO request
std::set<NodeID> m_daoChallengedPeers;
Expand Down
5 changes: 1 addition & 4 deletions libethereum/BlockQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,9 @@ std::size_t BlockQueue::unknownCount() const

void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
{
bool wasFull = false;
DEV_WRITE_GUARDED(m_lock)
{
DEV_INVARIANT_CHECK;
wasFull = knownFull();
if (m_drainingSet.empty())
{
m_drainingDifficulty = 0;
Expand All @@ -437,8 +435,7 @@ void BlockQueue::drain(VerifiedBlocks& o_out, unsigned _max)
}
}
}
if (wasFull && !knownFull())
m_onRoomAvailable();
m_onBlocksDrained();
}

bool BlockQueue::invariants() const
Expand Down
48 changes: 24 additions & 24 deletions libethereum/BlockQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class SizedBlockQueue
}

std::deque<T> m_queue;
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
};

template<class KeyType>
Expand Down Expand Up @@ -196,7 +196,7 @@ class SizedBlockMap
}

BlockMultimap m_map;
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
std::atomic<size_t> m_size = {0}; ///< Tracks total size in bytes
};

/**
Expand Down Expand Up @@ -251,13 +251,13 @@ class BlockQueue: HasInvariants
QueueStatus blockStatus(h256 const& _h) const;

Handler<> onReady(std::function<void(void)> _t) { return m_onReady.add(_t); }
Handler<> onRoomAvailable(std::function<void(void)> _t) { return m_onRoomAvailable.add(_t); }
Handler<> onBlocksDrained(std::function<void(void)> _t) { return m_onBlocksDrained.add(_t); }

template <class T> void setOnBad(T const& _t) { m_onBad = _t; }

bool knownFull() const;
bool unknownFull() const;
u256 difficulty() const; // Total difficulty of queueud blocks
u256 difficulty() const; // Total difficulty of queueud blocks
bool isActive() const;

private:
Expand All @@ -282,31 +282,31 @@ class BlockQueue: HasInvariants
std::size_t unknownSize() const;
std::size_t unknownCount() const;

BlockChain const* m_bc; ///< The blockchain into which our imports go.
BlockChain const* m_bc; ///< The blockchain into which our imports go.

mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown.
h256Hash m_drainingSet; ///< All blocks being imported.
h256Hash m_readySet; ///< All blocks ready for chain import.
h256Hash m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain.
SizedBlockMap<h256> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
h256Hash m_knownBad; ///< Set of blocks that we know will never be valid.
SizedBlockMap<time_t> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp
mutable boost::shared_mutex m_lock; ///< General lock for the sets, m_future and m_unknown.
h256Hash m_drainingSet; ///< All blocks being imported.
h256Hash m_readySet; ///< All blocks ready for chain import.
h256Hash m_unknownSet; ///< Set of all blocks whose parents are not ready/in-chain.
SizedBlockMap<h256> m_unknown; ///< For blocks that have an unknown parent; we map their parent hash to the block stuff, and insert once the block appears.
h256Hash m_knownBad; ///< Set of blocks that we know will never be valid.
SizedBlockMap<time_t> m_future; ///< Set of blocks that are not yet valid. Ordered by timestamp
h256Hash m_futureSet; ///< Set of all blocks that are not yet valid.
Signal<> m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast.
Signal<> m_onRoomAvailable; ///< Called when space for new blocks becomes availabe after a drain. Be nice and exit fast.
Signal<> m_onReady; ///< Called when a subsequent call to import blocks will return a non-empty container. Be nice and exit fast.
Signal<> m_onBlocksDrained; ///< Called when blocks have been drained from the block queue. Be nice and exit fast.

mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified.
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.
SizedBlockQueue<VerifiedBlock> m_verified; ///< List of blocks, in correct order, verified and ready for chain-import.
SizedBlockQueue<VerifiedBlock> m_verifying; ///< List of blocks being verified; as long as the block component (bytes) is empty, it's not finished.
SizedBlockQueue<UnverifiedBlock> m_unverified; ///< List of <block hash, parent hash, block data> in correct order, ready for verification.
mutable Mutex m_verification; ///< Mutex that allows writing to m_verified, m_verifying and m_unverified.
std::condition_variable m_moreToVerify; ///< Signaled when m_unverified has a new entry.
SizedBlockQueue<VerifiedBlock> m_verified; ///< List of blocks, in correct order, verified and ready for chain-import.
SizedBlockQueue<VerifiedBlock> m_verifying; ///< List of blocks being verified; as long as the block component (bytes) is empty, it's not finished.
SizedBlockQueue<UnverifiedBlock> m_unverified; ///< List of <block hash, parent hash, block data> in correct order, ready for verification.

std::vector<std::thread> m_verifiers; ///< Threads who only verify.
std::atomic<bool> m_deleting = {false}; ///< Exit condition for verifiers.
std::vector<std::thread> m_verifiers; ///< Threads who only verify.
std::atomic<bool> m_deleting = {false}; ///< Exit condition for verifiers.

std::function<void(Exception&)> m_onBad; ///< Called if we have a block that doesn't verify.
u256 m_difficulty; ///< Total difficulty of blocks in the queue
u256 m_drainingDifficulty; ///< Total difficulty of blocks in draining
std::function<void(Exception&)> m_onBad; ///< Called if we have a block that doesn't verify.
u256 m_difficulty; ///< Total difficulty of blocks in the queue
u256 m_drainingDifficulty; ///< Total difficulty of blocks in draining

Logger m_logger{createLogger(VerbosityDebug, "bq")};
Logger m_loggerDetail{createLogger(VerbosityTrace, "bq")};
Expand Down

0 comments on commit 04c3e53

Please sign in to comment.