Skip to content

Commit

Permalink
Merge "Make BgpSenderPartition::UpdatePeerQueue more efficient" into …
Browse files Browse the repository at this point in the history
…R3.2
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Oct 24, 2016
2 parents 940a0cf + 08a04e6 commit ac9fca2
Show file tree
Hide file tree
Showing 15 changed files with 128 additions and 97 deletions.
2 changes: 2 additions & 0 deletions src/bgp/bgp_peer.cc
Expand Up @@ -926,6 +926,7 @@ void BgpPeer::CustomClose() {
// Close this peer by closing all of it's RIBs.
//
void BgpPeer::Close(bool non_graceful) {
send_ready_ = true;
if (membership_req_pending_ && !close_manager_->IsMembershipInUse()) {
BGP_LOG_PEER(Event, this, SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL,
BGP_PEER_DIR_NA, "Close procedure deferred");
Expand Down Expand Up @@ -2185,6 +2186,7 @@ void BgpPeer::FillNeighborInfo(const BgpSandeshContext *bsc,
bnr->set_negotiated_hold_time(state_machine_->hold_time());
bnr->set_primary_path_count(GetPrimaryPathCount());
bnr->set_task_instance(GetTaskInstance());
bnr->set_send_ready(send_ready_);
bnr->set_flap_count(peer_stats_->num_flaps());
bnr->set_flap_time(peer_stats_->last_flap());
bnr->set_auth_type(
Expand Down
1 change: 1 addition & 0 deletions src/bgp/bgp_peer.h
Expand Up @@ -215,6 +215,7 @@ class BgpPeer : public IPeer {
virtual bool CanUseMembershipManager() const;
virtual bool IsRegistrationRequired() const { return true; }
virtual uint64_t GetEorSendTimerElapsedTimeUsecs() const;
virtual bool send_ready() const { return send_ready_; }

void Close(bool non_graceful);
void Clear(int subcode);
Expand Down
1 change: 1 addition & 0 deletions src/bgp/bgp_peer.sandesh
Expand Up @@ -78,6 +78,7 @@ struct BgpNeighborResp {
48: bool passive;
55: bool as_override;
61: string private_as_action;
62: bool send_ready;
50: u32 peer_port;
51: string transport_address;
4: string local_address; // local ip address and port
Expand Down
15 changes: 15 additions & 0 deletions src/bgp/bgp_ribout.cc
Expand Up @@ -20,6 +20,7 @@
#include "bgp/bgp_table.h"
#include "bgp/bgp_update.h"
#include "bgp/bgp_update_sender.h"
#include "bgp/ipeer.h"
#include "bgp/routing-instance/routing_instance.h"
#include "db/db.h"

Expand Down Expand Up @@ -385,6 +386,20 @@ bool RibOut::IsActive(IPeerUpdate *peer) const {
return (index < 0 ? false : active_peerset_.test(index));
}

//
// Build the subset of given RibPeerSet in this RibOut that are send ready.
//
void RibOut::BuildSendReadyBitSet(const RibPeerSet &peerset,
RibPeerSet *mready) const {
for (size_t bit = peerset.find_first(); bit != RibPeerSet::npos;
bit = peerset.find_next(bit)) {
IPeerUpdate *peer = GetPeer(bit);
if (peer->send_ready()) {
mready->set(bit);
}
}
}

//
// Return the number of peers this route has been advertised to.
//
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/bgp_ribout.h
Expand Up @@ -262,6 +262,8 @@ class RibOut {
bool IsRegistered(IPeerUpdate *peer);
void Deactivate(IPeerUpdate *peer);
bool IsActive(IPeerUpdate *peer) const;
void BuildSendReadyBitSet(const RibPeerSet &peerset,
RibPeerSet *mready) const;

IPeerUpdate *GetPeer(int index) const;
int GetPeerIndex(IPeerUpdate *peer) const;
Expand Down
31 changes: 18 additions & 13 deletions src/bgp/bgp_ribout_updates.cc
Expand Up @@ -296,13 +296,13 @@ bool RibOutUpdates::TailDequeue(int queue_id, const RibPeerSet &msync,
// blocked parameter is populated with the set of peers that are send blocked.
//
bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
const RibPeerSet &mready,
RibPeerSet *blocked) {
CHECK_CONCURRENCY("bgp::SendUpdate");

stats_[queue_id].peer_dequeue_count_++;
UpdateQueue *queue = queue_vec_[queue_id];
UpdateMarker *start_marker = queue->GetMarker(ribout_->GetPeerIndex(peer));
int peer_idx = ribout_->GetPeerIndex(peer);
UpdateMarker *start_marker = queue->GetMarker(peer_idx);

// We're done if this is the same as the tail marker. Updates will be
// built subsequently via TailDequeue.
Expand All @@ -311,16 +311,14 @@ bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
return true;
}

// Get the encapsulator for the first RouteUpdate. Even if there's no
// RouteUpdate, we should find another marker or the tail marker.
UpdateEntry *upentry;
RouteUpdatePtr update =
monitor_->GetNextEntry(queue_id, start_marker, &upentry);
assert(upentry);

// At least one peer in the start marker i.e. the peer for which we are
// called must be send ready.
assert(start_marker->members.intersects(mready));
// We're done if the lead peer is not send ready. This can happen if
// the peer got blocked when processing updates in another partition.
RibPeerSet mready;
ribout_->BuildSendReadyBitSet(start_marker->members, &mready);
if (!mready.test(peer_idx)) {
blocked->set(peer_idx);
return false;
}

// Split out any peers from the marker that are not send ready. Note that
// this updates the RibPeerSet in the marker.
Expand All @@ -331,6 +329,13 @@ bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
queue->MarkerSplit(start_marker, notready);
}

// Get the encapsulator for the first RouteUpdate. Even if there's no
// RouteUpdate, we should find another marker or the tail marker.
UpdateEntry *upentry;
RouteUpdatePtr update =
monitor_->GetNextEntry(queue_id, start_marker, &upentry);
assert(upentry);

// Update loop. Keep going till we reach the tail marker or till all the
// peers get blocked.
RibPeerSet members = start_marker->members;
Expand Down Expand Up @@ -370,7 +375,7 @@ bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
// with the marker that is being processed for dequeue. Note
// that this updates the RibPeerSet in the marker.
RibPeerSet mmove;
mmove.BuildIntersection(marker->members, mready);
ribout_->BuildSendReadyBitSet(marker->members, &mmove);
if (!mmove.empty()) {
stats_[queue_id].marker_merge_count_++;
queue->MarkerMerge(start_marker, marker, mmove);
Expand Down
2 changes: 1 addition & 1 deletion src/bgp/bgp_ribout_updates.h
Expand Up @@ -72,7 +72,7 @@ class RibOutUpdates {
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync,
RibPeerSet *blocked, RibPeerSet *unsync);
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer,
const RibPeerSet &mready, RibPeerSet *blocked);
RibPeerSet *blocked);

// Enqueue a marker at the head of the queue with this bit set.
bool QueueJoin(int queue_id, int bit);
Expand Down
74 changes: 35 additions & 39 deletions src/bgp/bgp_update_sender.cc
Expand Up @@ -46,11 +46,17 @@ struct BgpSenderPartition::PeerRibState {
// to implement regular and circular iterator classes that are used to walk
// through all the RibState entries for a peer.
//
// A PeerState maintains the in_sync and send_ready state for the IPeer. An
// IPeer/PeerState is considered to be send_ready when the underlying socket
// is writable. It is considered to be in_sync if it's send_ready and the
// marker for the IPeer has merged with the tail marker for all QueueIds in
// all RiBOuts that the IPeer is subscribed.
// A PeerState maintains the in_sync and send_ready state for the IPeerUpdate.
//
// The PeerState is considered to be send_ready when the underlying socket is
// is writable. Note that the send_ready state in the PeerState may be out of
// date with the actual socket state because the socket could have got blocked
// when writing from another partition. Hence IPeerUpdate::send_ready() is the
// more authoritative source.
//
// The PeerState is considered to be in_sync if it's send_ready and the marker
// IPeerUpdate the peer has merged with the tail marker for all QueueIds in
// all RiBOuts for which the IPeerUpdate is subscribed.
//
// The PeerState keeps count of the number of active RibOuts for each QueueId.
// A (RibOut, QueueId) pair is considered to be active if the PeerState isn't
Expand Down Expand Up @@ -645,31 +651,20 @@ void BgpSenderPartition::BuildSyncBitSet(const RibOut *ribout, RibState *rs,

for (RibState::iterator it = rs->begin(peer_state_imap_);
it != rs->end(peer_state_imap_); ++it) {
const PeerState *ps = it.operator->();
if (ps->in_sync()) {
int rix = ribout->GetPeerIndex(ps->peer());
msync->set(rix);
}
}
}
PeerState *ps = it.operator->();

//
// Build the RibPeerSet of IPeers for the RibOut that are send ready. Note
// that we need to use bit indices that are specific to the RibOut, not the
// ones from the BgpSenderPartition.
//
void BgpSenderPartition::BuildSendReadyBitSet(RibOut *ribout,
RibPeerSet *mready) {
CHECK_CONCURRENCY("bgp::SendUpdate");

RibState *rs = rib_state_imap_.Find(ribout);
assert(rs != NULL);
for (RibState::iterator it = rs->begin(peer_state_imap_);
it != rs->end(peer_state_imap_); ++it) {
const PeerState *ps = it.operator->();
if (ps->send_ready()) {
int rix = ribout->GetPeerIndex(ps->peer());
mready->set(rix);
// If the PeerState is in sync but the IPeerUpdate is not send ready
// then update the sync and send ready state in the PeerState. Note
// that the RibOut queue for the PeerState will get marked active via
// the call the SetQueueActive from UpdateRibOut.
if (ps->in_sync()) {
if (ps->peer()->send_ready()) {
int rix = ribout->GetPeerIndex(ps->peer());
msync->set(rix);
} else {
ps->clear_sync();
ps->set_send_ready(false);
}
}
}
}
Expand Down Expand Up @@ -830,19 +825,11 @@ bool BgpSenderPartition::UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps,
if (!BitIsSet(it.peer_rib_state().qactive, queue_id))
continue;

RibOut *ribout = it.operator->();

// Build the send ready bitset. This includes all send ready peers
// for the ribout so that we can potentially merge other peers as
// we move forward in processing the update queue.
RibPeerSet send_ready;
BuildSendReadyBitSet(ribout, &send_ready);

// Drain the queue till we can do no more.
RibOut *ribout = it.operator->();
RibOutUpdates *updates = ribout->updates(index_);
RibPeerSet blocked;
bool done = updates->PeerDequeue(queue_id, peer, send_ready, &blocked);
assert(send_ready.Contains(blocked));
bool done = updates->PeerDequeue(queue_id, peer, &blocked);

// Process blocked mask.
RibState *rs = it.rib_state();
Expand Down Expand Up @@ -875,11 +862,20 @@ bool BgpSenderPartition::UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps,
void BgpSenderPartition::UpdatePeer(IPeerUpdate *peer) {
CHECK_CONCURRENCY("bgp::SendUpdate");

// Bail if the PeerState is not send ready.
PeerState *ps = peer_state_imap_.Find(peer);
if (!ps->send_ready()) {
return;
}

// Update the PeerState and bail if the IPeerUpdate is not send ready.
// This happens if the IPeerUpdate gets blocked while processing some
// other partition.
if (!peer->send_ready()) {
ps->set_send_ready(false);
return;
}

// Go through all queues and drain them if there's anything on them.
for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
if (ps->QueueCount(queue_id) == 0) {
Expand Down
1 change: 0 additions & 1 deletion src/bgp/bgp_update_sender.h
Expand Up @@ -122,7 +122,6 @@ class BgpSenderPartition {
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id);

void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync);
void BuildSendReadyBitSet(RibOut *ribout, RibPeerSet *mready);

void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id,
const RibPeerSet &munsync);
Expand Down
3 changes: 2 additions & 1 deletion src/bgp/bgp_xmpp_channel.cc
Expand Up @@ -364,7 +364,7 @@ class BgpXmppChannel::XmppPeer : public IPeer {
parent_->MembershipRequestCallback(table);
}

bool send_ready() const { return send_ready_; }
virtual bool send_ready() const { return send_ready_; }

private:
void WriteReadyCb(const boost::system::error_code &ec) {
Expand Down Expand Up @@ -439,6 +439,7 @@ bool BgpXmppChannel::XmppPeer::SendUpdate(const uint8_t *msg, size_t msgsize,
}

void BgpXmppChannel::XmppPeer::Close(bool non_graceful) {
send_ready_ = true;
parent_->set_peer_closed(true);
if (server_ == NULL)
return;
Expand Down
1 change: 1 addition & 0 deletions src/bgp/bgp_xmpp_sandesh.cc
Expand Up @@ -49,6 +49,7 @@ static void FillXmppNeighborInfo(BgpNeighborResp *bnr,
bnr->set_primary_path_count(bx_channel->Peer()->GetPrimaryPathCount());
bnr->set_task_instance(connection->GetTaskInstance());
bnr->set_auth_type(connection->GetXmppAuthenticationType());
bnr->set_send_ready(bx_channel->Peer()->send_ready());
bnr->set_flap_count(bx_channel->Peer()->peer_stats()->num_flaps());
bnr->set_flap_time(bx_channel->Peer()->peer_stats()->last_flap());
if (summary)
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/ipeer.h
Expand Up @@ -20,6 +20,8 @@ class PeerCloseManager;
class IPeerUpdate {
public:
virtual ~IPeerUpdate() { }
virtual bool send_ready() const { return true; }

// Printable name
virtual const std::string &ToString() const = 0;

Expand Down
4 changes: 4 additions & 0 deletions src/bgp/test/bgp_ribout_updates_test.h
Expand Up @@ -75,6 +75,8 @@ class BgpTestPeer : public IPeerUpdate {
int update_count() const { return count_; }
void clear_update_count() { count_ = 0; }
bool send_block() const { return send_block_; }
void clear_send_block() { send_block_ = false; }
virtual bool send_ready() const { return !send_block_; }

private:
int index_;
Expand Down Expand Up @@ -355,6 +357,7 @@ class RibOutUpdatesTest : public ::testing::Test {
void SetPeerUnblockNow(int idx) {
ConcurrencyScope scope("bgp::SendReadyTask");
ASSERT_TRUE(idx < (int) peers_.size());
peers_[idx]->clear_send_block();
spartition_->PeerSendReady(peers_[idx]);
}

Expand All @@ -363,6 +366,7 @@ class RibOutUpdatesTest : public ::testing::Test {
ASSERT_TRUE(start_idx <= end_idx);
ASSERT_TRUE(end_idx < (int) peers_.size());
for (int idx = start_idx; idx <= end_idx; idx++) {
peers_[idx]->clear_send_block();
spartition_->PeerSendReady(peers_[idx]);
}
}
Expand Down

0 comments on commit ac9fca2

Please sign in to comment.