Skip to content

Commit

Permalink
Make BgpSenderPartition::UpdatePeerQueue more efficient
Browse files Browse the repository at this point in the history
The old implementation used to go through all peers for a RibOut
and build a bitset of send ready peers to pass to PeerDequeue.
The bitset is used to decide which peers to merge with the start
marker when we encounter other markers while doing peer dequeue.

The new implementation determines which peers are send ready more
dynamically i.e. as we encounter other markers during peer dequeue.
This avoids unnecessary work in cases where we do not encounter
many markers.  This could be because some markers are behind the
start marker for the peer or because the all peers in the start
marker get blocked or because some peers are in the tail marker.
Further, the new implementation determines the send ready state
directly from the RibOut instead of the BgpUpdateSender since the
RibOut state is more authoritative.

Also treat send_ready state in PeerState as advisory and send_ready
in the IPeerUpdate as authoritative. This if justified because the
IPeerUpdate could become blocked while processing updates from some
other BgpSenderPartition.

Also fix a few update related UTs appropriately.

Change-Id: Ib40d0a0d71c48f56e1af30962cc6f2459be4140a
Partial-Bug: 1607132
  • Loading branch information
Nischal Sheth committed Oct 23, 2016
1 parent ac0f819 commit 08a04e6
Show file tree
Hide file tree
Showing 15 changed files with 128 additions and 97 deletions.
2 changes: 2 additions & 0 deletions src/bgp/bgp_peer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ void BgpPeer::CustomClose() {
// Close this peer by closing all of it's RIBs.
//
void BgpPeer::Close(bool non_graceful) {
send_ready_ = true;
if (membership_req_pending_ && !close_manager_->IsMembershipInUse()) {
BGP_LOG_PEER(Event, this, SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL,
BGP_PEER_DIR_NA, "Close procedure deferred");
Expand Down Expand Up @@ -2197,6 +2198,7 @@ void BgpPeer::FillNeighborInfo(const BgpSandeshContext *bsc,
bnr->set_negotiated_hold_time(state_machine_->hold_time());
bnr->set_primary_path_count(GetPrimaryPathCount());
bnr->set_task_instance(GetTaskInstance());
bnr->set_send_ready(send_ready_);
bnr->set_flap_count(peer_stats_->num_flaps());
bnr->set_flap_time(peer_stats_->last_flap());
bnr->set_auth_type(
Expand Down
1 change: 1 addition & 0 deletions src/bgp/bgp_peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ class BgpPeer : public IPeer {
virtual bool CanUseMembershipManager() const;
virtual bool IsRegistrationRequired() const { return true; }
virtual uint64_t GetEorSendTimerElapsedTimeUsecs() const;
virtual bool send_ready() const { return send_ready_; }

void Close(bool non_graceful);
void Clear(int subcode);
Expand Down
1 change: 1 addition & 0 deletions src/bgp/bgp_peer.sandesh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct BgpNeighborResp {
48: bool passive;
55: bool as_override;
61: string private_as_action;
62: bool send_ready;
50: u32 peer_port;
51: string transport_address;
4: string local_address; // local ip address and port
Expand Down
15 changes: 15 additions & 0 deletions src/bgp/bgp_ribout.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "bgp/bgp_table.h"
#include "bgp/bgp_update.h"
#include "bgp/bgp_update_sender.h"
#include "bgp/ipeer.h"
#include "bgp/routing-instance/routing_instance.h"
#include "db/db.h"

Expand Down Expand Up @@ -385,6 +386,20 @@ bool RibOut::IsActive(IPeerUpdate *peer) const {
return (index < 0 ? false : active_peerset_.test(index));
}

//
// Build the subset of given RibPeerSet in this RibOut that are send ready.
//
void RibOut::BuildSendReadyBitSet(const RibPeerSet &peerset,
RibPeerSet *mready) const {
for (size_t bit = peerset.find_first(); bit != RibPeerSet::npos;
bit = peerset.find_next(bit)) {
IPeerUpdate *peer = GetPeer(bit);
if (peer->send_ready()) {
mready->set(bit);
}
}
}

//
// Return the number of peers this route has been advertised to.
//
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/bgp_ribout.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ class RibOut {
bool IsRegistered(IPeerUpdate *peer);
void Deactivate(IPeerUpdate *peer);
bool IsActive(IPeerUpdate *peer) const;
void BuildSendReadyBitSet(const RibPeerSet &peerset,
RibPeerSet *mready) const;

IPeerUpdate *GetPeer(int index) const;
int GetPeerIndex(IPeerUpdate *peer) const;
Expand Down
31 changes: 18 additions & 13 deletions src/bgp/bgp_ribout_updates.cc
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,13 @@ bool RibOutUpdates::TailDequeue(int queue_id, const RibPeerSet &msync,
// blocked parameter is populated with the set of peers that are send blocked.
//
bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
const RibPeerSet &mready,
RibPeerSet *blocked) {
CHECK_CONCURRENCY("bgp::SendUpdate");

stats_[queue_id].peer_dequeue_count_++;
UpdateQueue *queue = queue_vec_[queue_id];
UpdateMarker *start_marker = queue->GetMarker(ribout_->GetPeerIndex(peer));
int peer_idx = ribout_->GetPeerIndex(peer);
UpdateMarker *start_marker = queue->GetMarker(peer_idx);

// We're done if this is the same as the tail marker. Updates will be
// built subsequently via TailDequeue.
Expand All @@ -311,16 +311,14 @@ bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
return true;
}

// Get the encapsulator for the first RouteUpdate. Even if there's no
// RouteUpdate, we should find another marker or the tail marker.
UpdateEntry *upentry;
RouteUpdatePtr update =
monitor_->GetNextEntry(queue_id, start_marker, &upentry);
assert(upentry);

// At least one peer in the start marker i.e. the peer for which we are
// called must be send ready.
assert(start_marker->members.intersects(mready));
// We're done if the lead peer is not send ready. This can happen if
// the peer got blocked when processing updates in another partition.
RibPeerSet mready;
ribout_->BuildSendReadyBitSet(start_marker->members, &mready);
if (!mready.test(peer_idx)) {
blocked->set(peer_idx);
return false;
}

// Split out any peers from the marker that are not send ready. Note that
// this updates the RibPeerSet in the marker.
Expand All @@ -331,6 +329,13 @@ bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
queue->MarkerSplit(start_marker, notready);
}

// Get the encapsulator for the first RouteUpdate. Even if there's no
// RouteUpdate, we should find another marker or the tail marker.
UpdateEntry *upentry;
RouteUpdatePtr update =
monitor_->GetNextEntry(queue_id, start_marker, &upentry);
assert(upentry);

// Update loop. Keep going till we reach the tail marker or till all the
// peers get blocked.
RibPeerSet members = start_marker->members;
Expand Down Expand Up @@ -370,7 +375,7 @@ bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
// with the marker that is being processed for dequeue. Note
// that this updates the RibPeerSet in the marker.
RibPeerSet mmove;
mmove.BuildIntersection(marker->members, mready);
ribout_->BuildSendReadyBitSet(marker->members, &mmove);
if (!mmove.empty()) {
stats_[queue_id].marker_merge_count_++;
queue->MarkerMerge(start_marker, marker, mmove);
Expand Down
2 changes: 1 addition & 1 deletion src/bgp/bgp_ribout_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class RibOutUpdates {
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync,
RibPeerSet *blocked, RibPeerSet *unsync);
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer,
const RibPeerSet &mready, RibPeerSet *blocked);
RibPeerSet *blocked);

// Enqueue a marker at the head of the queue with this bit set.
bool QueueJoin(int queue_id, int bit);
Expand Down
74 changes: 35 additions & 39 deletions src/bgp/bgp_update_sender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ struct BgpSenderPartition::PeerRibState {
// to implement regular and circular iterator classes that are used to walk
// through all the RibState entries for a peer.
//
// A PeerState maintains the in_sync and send_ready state for the IPeer. An
// IPeer/PeerState is considered to be send_ready when the underlying socket
// is writable. It is considered to be in_sync if it's send_ready and the
// marker for the IPeer has merged with the tail marker for all QueueIds in
// all RiBOuts that the IPeer is subscribed.
// A PeerState maintains the in_sync and send_ready state for the IPeerUpdate.
//
// The PeerState is considered to be send_ready when the underlying socket is
// is writable. Note that the send_ready state in the PeerState may be out of
// date with the actual socket state because the socket could have got blocked
// when writing from another partition. Hence IPeerUpdate::send_ready() is the
// more authoritative source.
//
// The PeerState is considered to be in_sync if it's send_ready and the marker
// IPeerUpdate the peer has merged with the tail marker for all QueueIds in
// all RiBOuts for which the IPeerUpdate is subscribed.
//
// The PeerState keeps count of the number of active RibOuts for each QueueId.
// A (RibOut, QueueId) pair is considered to be active if the PeerState isn't
Expand Down Expand Up @@ -645,31 +651,20 @@ void BgpSenderPartition::BuildSyncBitSet(const RibOut *ribout, RibState *rs,

for (RibState::iterator it = rs->begin(peer_state_imap_);
it != rs->end(peer_state_imap_); ++it) {
const PeerState *ps = it.operator->();
if (ps->in_sync()) {
int rix = ribout->GetPeerIndex(ps->peer());
msync->set(rix);
}
}
}
PeerState *ps = it.operator->();

//
// Build the RibPeerSet of IPeers for the RibOut that are send ready. Note
// that we need to use bit indices that are specific to the RibOut, not the
// ones from the BgpSenderPartition.
//
void BgpSenderPartition::BuildSendReadyBitSet(RibOut *ribout,
RibPeerSet *mready) {
CHECK_CONCURRENCY("bgp::SendUpdate");

RibState *rs = rib_state_imap_.Find(ribout);
assert(rs != NULL);
for (RibState::iterator it = rs->begin(peer_state_imap_);
it != rs->end(peer_state_imap_); ++it) {
const PeerState *ps = it.operator->();
if (ps->send_ready()) {
int rix = ribout->GetPeerIndex(ps->peer());
mready->set(rix);
// If the PeerState is in sync but the IPeerUpdate is not send ready
// then update the sync and send ready state in the PeerState. Note
// that the RibOut queue for the PeerState will get marked active via
// the call the SetQueueActive from UpdateRibOut.
if (ps->in_sync()) {
if (ps->peer()->send_ready()) {
int rix = ribout->GetPeerIndex(ps->peer());
msync->set(rix);
} else {
ps->clear_sync();
ps->set_send_ready(false);
}
}
}
}
Expand Down Expand Up @@ -830,19 +825,11 @@ bool BgpSenderPartition::UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps,
if (!BitIsSet(it.peer_rib_state().qactive, queue_id))
continue;

RibOut *ribout = it.operator->();

// Build the send ready bitset. This includes all send ready peers
// for the ribout so that we can potentially merge other peers as
// we move forward in processing the update queue.
RibPeerSet send_ready;
BuildSendReadyBitSet(ribout, &send_ready);

// Drain the queue till we can do no more.
RibOut *ribout = it.operator->();
RibOutUpdates *updates = ribout->updates(index_);
RibPeerSet blocked;
bool done = updates->PeerDequeue(queue_id, peer, send_ready, &blocked);
assert(send_ready.Contains(blocked));
bool done = updates->PeerDequeue(queue_id, peer, &blocked);

// Process blocked mask.
RibState *rs = it.rib_state();
Expand Down Expand Up @@ -875,11 +862,20 @@ bool BgpSenderPartition::UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps,
void BgpSenderPartition::UpdatePeer(IPeerUpdate *peer) {
CHECK_CONCURRENCY("bgp::SendUpdate");

// Bail if the PeerState is not send ready.
PeerState *ps = peer_state_imap_.Find(peer);
if (!ps->send_ready()) {
return;
}

// Update the PeerState and bail if the IPeerUpdate is not send ready.
// This happens if the IPeerUpdate gets blocked while processing some
// other partition.
if (!peer->send_ready()) {
ps->set_send_ready(false);
return;
}

// Go through all queues and drain them if there's anything on them.
for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
if (ps->QueueCount(queue_id) == 0) {
Expand Down
1 change: 0 additions & 1 deletion src/bgp/bgp_update_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class BgpSenderPartition {
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id);

void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync);
void BuildSendReadyBitSet(RibOut *ribout, RibPeerSet *mready);

void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id,
const RibPeerSet &munsync);
Expand Down
3 changes: 2 additions & 1 deletion src/bgp/bgp_xmpp_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ class BgpXmppChannel::XmppPeer : public IPeer {
parent_->MembershipRequestCallback(table);
}

bool send_ready() const { return send_ready_; }
virtual bool send_ready() const { return send_ready_; }

private:
void WriteReadyCb(const boost::system::error_code &ec) {
Expand Down Expand Up @@ -439,6 +439,7 @@ bool BgpXmppChannel::XmppPeer::SendUpdate(const uint8_t *msg, size_t msgsize,
}

void BgpXmppChannel::XmppPeer::Close(bool non_graceful) {
send_ready_ = true;
parent_->set_peer_closed(true);
if (server_ == NULL)
return;
Expand Down
1 change: 1 addition & 0 deletions src/bgp/bgp_xmpp_sandesh.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static void FillXmppNeighborInfo(BgpNeighborResp *bnr,
bnr->set_primary_path_count(bx_channel->Peer()->GetPrimaryPathCount());
bnr->set_task_instance(connection->GetTaskInstance());
bnr->set_auth_type(connection->GetXmppAuthenticationType());
bnr->set_send_ready(bx_channel->Peer()->send_ready());
bnr->set_flap_count(bx_channel->Peer()->peer_stats()->num_flaps());
bnr->set_flap_time(bx_channel->Peer()->peer_stats()->last_flap());
if (summary)
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/ipeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ class PeerCloseManager;
class IPeerUpdate {
public:
virtual ~IPeerUpdate() { }
virtual bool send_ready() const { return true; }

// Printable name
virtual const std::string &ToString() const = 0;

Expand Down
4 changes: 4 additions & 0 deletions src/bgp/test/bgp_ribout_updates_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class BgpTestPeer : public IPeerUpdate {
int update_count() const { return count_; }
void clear_update_count() { count_ = 0; }
bool send_block() const { return send_block_; }
void clear_send_block() { send_block_ = false; }
virtual bool send_ready() const { return !send_block_; }

private:
int index_;
Expand Down Expand Up @@ -355,6 +357,7 @@ class RibOutUpdatesTest : public ::testing::Test {
void SetPeerUnblockNow(int idx) {
ConcurrencyScope scope("bgp::SendReadyTask");
ASSERT_TRUE(idx < (int) peers_.size());
peers_[idx]->clear_send_block();
spartition_->PeerSendReady(peers_[idx]);
}

Expand All @@ -363,6 +366,7 @@ class RibOutUpdatesTest : public ::testing::Test {
ASSERT_TRUE(start_idx <= end_idx);
ASSERT_TRUE(end_idx < (int) peers_.size());
for (int idx = start_idx; idx <= end_idx; idx++) {
peers_[idx]->clear_send_block();
spartition_->PeerSendReady(peers_[idx]);
}
}
Expand Down

0 comments on commit 08a04e6

Please sign in to comment.