Skip to content

Commit

Permalink
Merge "Fix corner case in RibOut tail dequeue scheduling logic" into …
Browse files Browse the repository at this point in the history
…R2.0
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 25, 2015
2 parents b1bde6b + 7218839 commit 049c4c5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 12 deletions.
14 changes: 10 additions & 4 deletions src/bgp/bgp_peer_membership.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,14 @@ void IPeerRib::RegisterRibOut(RibExportPolicy policy) {
ribout_->RegisterListener();
ribout_->Register(ipeer_);

int index = ribout_->GetPeerIndex(ipeer_);
RibOutUpdates *updates = ribout_->updates();
updates->QueueJoin(RibOutUpdates::QUPDATE, ribout_->GetPeerIndex(ipeer_));
updates->QueueJoin(RibOutUpdates::QBULK, ribout_->GetPeerIndex(ipeer_));
SchedulingGroup *group = ribout_->GetSchedulingGroup();
assert(group);
if (updates->QueueJoin(RibOutUpdates::QUPDATE, index))
group->RibOutActive(ribout_, RibOutUpdates::QUPDATE);
if (updates->QueueJoin(RibOutUpdates::QBULK, index))
group->RibOutActive(ribout_, RibOutUpdates::QBULK);

SetRibOutRegistered(true);
}
Expand All @@ -224,9 +229,10 @@ void IPeerRib::RegisterRibOut(RibExportPolicy policy) {
// of the RibOut itself.
//
void IPeerRib::UnregisterRibOut() {
int index = ribout_->GetPeerIndex(ipeer_);
RibOutUpdates *updates = ribout_->updates();
updates->QueueLeave(RibOutUpdates::QUPDATE, ribout_->GetPeerIndex(ipeer_));
updates->QueueLeave(RibOutUpdates::QBULK, ribout_->GetPeerIndex(ipeer_));
updates->QueueLeave(RibOutUpdates::QUPDATE, index);
updates->QueueLeave(RibOutUpdates::QBULK, index);

ribout_->Unregister(ipeer_);
ribout_ = NULL;
Expand Down
4 changes: 2 additions & 2 deletions src/bgp/bgp_ribout_updates.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,9 @@ bool RibOutUpdates::Empty() const {
return true;
}

void RibOutUpdates::QueueJoin(int queue_id, int bit) {
bool RibOutUpdates::QueueJoin(int queue_id, int bit) {
UpdateQueue *queue = queue_vec_[queue_id];
queue->Join(bit);
return queue->Join(bit);
}

void RibOutUpdates::QueueLeave(int queue_id, int bit) {
Expand Down
2 changes: 1 addition & 1 deletion src/bgp/bgp_ribout_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class RibOutUpdates {
const RibPeerSet &mready, RibPeerSet *blocked);

// Enqueue a marker at the head of the queue with this bit set.
void QueueJoin(int queue_id, int bit);
bool QueueJoin(int queue_id, int bit);
void QueueLeave(int queue_id, int bit);

bool Empty() const;
Expand Down
9 changes: 8 additions & 1 deletion src/bgp/bgp_update_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,18 @@ UpdateMarker *UpdateQueue::GetMarker(int bit) {
// Since it's a new peer, it starts out at the tail marker. Also add the
// peer's bit index and the tail marker pair to the MarkerMap.
//
void UpdateQueue::Join(int bit) {
// Return true if the tail marker is not the last entry in the queue. The
// caller should trigger a tail dequeue for the RibOut if so to take care
// of the possibility that all peers in the tail marker are blocked. Note
// that a tail dequeue may already be pending in the SchedulingGroup, but
// an extra one is harmless.
//
bool UpdateQueue::Join(int bit) {
tbb::mutex::scoped_lock lock(mutex_);
UpdateMarker *marker = &tail_marker_;
marker->members.set(bit);
markers_.insert(std::make_pair(bit, marker));
return (&tail_marker_ != &queue_.back());
}

//
Expand Down
2 changes: 1 addition & 1 deletion src/bgp/bgp_update_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class UpdateQueue {
const RibPeerSet &bitset);
UpdateMarker *GetMarker(int bit);

void Join(int bit);
bool Join(int bit);
void Leave(int bit);

bool CheckInvariants() const;
Expand Down
7 changes: 4 additions & 3 deletions src/bgp/scheduling_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ void SchedulingGroup::GetPeerList(PeerList *plist) const {
// active.
//
void SchedulingGroup::RibOutActive(RibOut *ribout, int queue_id) {
CHECK_CONCURRENCY("db::DBTable", "bgp::SendTask");
CHECK_CONCURRENCY("db::DBTable", "bgp::SendTask", "bgp::PeerMembership");

WorkRibOutEnqueue(ribout, queue_id);
}
Expand Down Expand Up @@ -831,7 +831,8 @@ auto_ptr<SchedulingGroup::WorkBase> SchedulingGroup::WorkDequeue() {
// task if required.
//
void SchedulingGroup::WorkEnqueue(WorkBase *wentry) {
CHECK_CONCURRENCY("db::DBTable", "bgp::SendTask", "bgp::SendReadyTask");
CHECK_CONCURRENCY("db::DBTable", "bgp::SendTask", "bgp::SendReadyTask",
"bgp::PeerMembership");

tbb::mutex::scoped_lock lock(mutex_);
work_queue_.push_back(wentry);
Expand All @@ -857,7 +858,7 @@ void SchedulingGroup::WorkPeerEnqueue(IPeerUpdate *peer) {
// Enqueue a WorkRibOut to the work queue.
//
void SchedulingGroup::WorkRibOutEnqueue(RibOut *ribout, int queue_id) {
CHECK_CONCURRENCY("db::DBTable", "bgp::SendTask");
CHECK_CONCURRENCY("db::DBTable", "bgp::SendTask", "bgp::PeerMembership");

WorkBase *wentry = new WorkRibOut(ribout, queue_id);
WorkEnqueue(wentry);
Expand Down

0 comments on commit 049c4c5

Please sign in to comment.