Skip to content
This repository has been archived by the owner on Oct 28, 2021. It is now read-only.

Refactor libp2p::Peer #5265

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 46 additions & 49 deletions libp2p/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,24 +270,18 @@ void Host::startPeerSession(Public const& _id, RLP const& _hello,
shared_ptr<Peer> peer;
DEV_RECURSIVE_GUARDED(x_sessions)
{
auto itPeer = m_peers.find(_id);
if (itPeer != m_peers.end())
peer = itPeer->second;
else
{
// peer doesn't exist, try to get port info from node table
if (Node n = nodeFromNodeTable(_id))
peer = make_shared<Peer>(n);

if (!peer)
peer = make_shared<Peer>(Node(_id, UnspecifiedNodeIPEndpoint));
auto const remoteAddress = _s->remoteEndpoint().address();
auto const remoteTcpPort = _s->remoteEndpoint().port();

peer = findPeer(_id, remoteAddress, remoteTcpPort);
if (!peer)
{
peer = make_shared<Peer>(Node{_id, NodeIPEndpoint{remoteAddress, 0, remoteTcpPort}});
m_peers[_id] = peer;
}
}
if (peer->isOffline())
peer->m_lastConnected = chrono::system_clock::now();
peer->endpoint.setAddress(_s->remoteEndpoint().address());
peer->m_lastConnected = std::chrono::system_clock::now();

auto const protocolVersion = _hello[0].toInt<unsigned>();
auto const clientVersion = _hello[1].toString();
Expand Down Expand Up @@ -386,38 +380,35 @@ void Host::startPeerSession(Public const& _id, RLP const& _hello,
<< _s->remoteEndpoint();
}

void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
void Host::onNodeTableEvent(NodeID const& _nodeID, NodeTableEventType const& _e)
{
if (_e == NodeEntryAdded)
{
LOG(m_logger) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
if (Node n = nodeFromNodeTable(_n))
LOG(m_logger) << "p2p.host.nodeTable.events.nodeEntryAdded " << _nodeID;
if (Node node = nodeFromNodeTable(_nodeID))
{
shared_ptr<Peer> p;
shared_ptr<Peer> peer;
DEV_RECURSIVE_GUARDED(x_sessions)
{
if (m_peers.count(_n))
{
p = m_peers[_n];
p->endpoint = n.endpoint;
}
else
peer = findPeer(_nodeID, node.endpoint.address(), node.endpoint.tcpPort());
if (!peer)
{
p = make_shared<Peer>(n);
m_peers[_n] = p;
LOG(m_logger) << "p2p.host.peers.events.peerAdded " << _n << " " << p->endpoint;
peer = make_shared<Peer>(node);
m_peers[_nodeID] = peer;
LOG(m_logger) << "p2p.host.peers.events.peerAdded " << _nodeID << " "
<< peer->endpoint;
}
}
if (peerSlotsAvailable(Egress))
connect(p);
connect(peer);
}
}
else if (_e == NodeEntryDropped)
{
LOG(m_logger) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
LOG(m_logger) << "p2p.host.nodeTable.events.NodeEntryDropped " << _nodeID;
RecursiveGuard l(x_sessions);
if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
m_peers.erase(_n);
if (m_peers.count(_nodeID) && m_peers[_nodeID]->peerType == PeerType::Optional)
m_peers.erase(_nodeID);
}
}

Expand Down Expand Up @@ -576,58 +567,54 @@ void Host::addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint)
addNodeToNodeTable(Node(_node, _endpoint));
}

void Host::requirePeer(NodeID const& _n, NodeIPEndpoint const& _endpoint)
void Host::requirePeer(NodeID const& _nodeID, NodeIPEndpoint const& _endpoint)
{
if (!m_run)
{
cwarn << "Network not running so node (" << _n << ") with endpoint (" << _endpoint
cwarn << "Network not running so node (" << _nodeID << ") with endpoint (" << _endpoint
<< ") cannot be added as a required peer";
return;
}
if (!haveCapabilities())
{
cwarn << "No capabilities registered so node (" << _n << ") with endpoint (" << _endpoint
cwarn << "No capabilities registered so node (" << _nodeID << ") with endpoint (" << _endpoint
<< ") cannot be added as a required peer";
return;
}

if (_n == id())
if (_nodeID == id())
{
cnetdetails << "Ignoring the request to connect to self " << _n;
cnetdetails << "Ignoring the request to connect to self " << _nodeID;
return;
}

if (!_n)
if (!_nodeID)
{
cnetdetails << "Ignoring the request to connect to null node id.";
return;
}

{
Guard l(x_requiredPeers);
m_requiredPeers.insert(_n);
m_requiredPeers.insert(_nodeID);
}

Node const node(_n, _endpoint, PeerType::Required);
Node node(_nodeID, _endpoint, PeerType::Required);
// create or update m_peers entry
shared_ptr<Peer> p;
shared_ptr<Peer> peer;
DEV_RECURSIVE_GUARDED(x_sessions)
{
auto it = m_peers.find(_n);
if (it != m_peers.end())
{
p = it->second;
p->endpoint = node.endpoint;
p->peerType = PeerType::Required;
}
peer = findPeer(_nodeID, node.endpoint.address(), node.endpoint.tcpPort());
if (peer)
peer->peerType = PeerType::Required;
else
{
p = make_shared<Peer>(node);
m_peers[_n] = p;
peer = make_shared<Peer>(node);
m_peers[_nodeID] = peer;
}
}
// required for discovery
addNodeToNodeTable(*p);
addNodeToNodeTable(*peer);
}

bool Host::isRequiredPeer(NodeID const& _id) const
Expand Down Expand Up @@ -1130,3 +1117,13 @@ void Host::forEachPeer(
return;
}

std::shared_ptr<Peer> Host::findPeer(
NodeID const& _nodeID, bi::address const& _address, unsigned short _tcpPort) const
{
auto const itPeer = m_peers.find(_nodeID);
if (itPeer != m_peers.end() && itPeer->second->endpoint.address() == _address &&
itPeer->second->endpoint.tcpPort() == _tcpPort)
return itPeer->second;

return {};
}
3 changes: 3 additions & 0 deletions libp2p/Host.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ class Host: public Worker
/// Stop registered capabilities, typically done when the network is being shut down.
void stopCapabilities();

std::shared_ptr<Peer> findPeer(
NodeID const& _nodeID, bi::address const& _address, unsigned short _tcpPort) const;

bytes m_restoreNetwork; ///< Set by constructor and used to set Host key and restore network peers & nodes.

std::atomic<bool> m_run{false}; ///< Whether network is running.
Expand Down
2 changes: 0 additions & 2 deletions libp2p/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ class Peer: public Node
friend class Session; /// Allows Session to update score and rating.
friend class Host; /// For Host: saveNetwork(), restoreNetwork()

friend class RLPXHandshake;

public:
/// Construct Peer from Node.
Peer(Node const& _node): Node(_node) {}
Expand Down