/
bgp_update_queue.cc
322 lines (293 loc) · 11 KB
/
bgp_update_queue.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/
#include "bgp/bgp_update_queue.h"
#include "base/logging.h"
//
// Initialize the UpdateQueue and add the tail marker to the FIFO.
//
UpdateQueue::UpdateQueue(int queue_id)
: queue_id_(queue_id), marker_count_(0) {
queue_.push_back(tail_marker_);
}
//
// Destroy the UpdateQueue after removing the tail marker from the FIFO.
//
UpdateQueue::~UpdateQueue() {
queue_.erase(queue_.iterator_to(tail_marker_));
assert(queue_.empty());
assert(attr_set_.empty());
}
//
// Enqueue the specified RouteUpdate to the UpdateQueue. Updates both the
// FIFO and the set container.
//
// Return true if the UpdateQueue had no RouteUpdates after the tail marker.
//
bool UpdateQueue::Enqueue(RouteUpdate *rt_update) {
tbb::mutex::scoped_lock lock(mutex_);
rt_update->set_tstamp_now();
// Insert at the end of the FIFO. Remember if the FIFO previously had
// no RouteUpdates after the tail marker.
UpdateEntry *tail_upentry = &(queue_.back());
bool need_tail_dequeue = (tail_upentry == &tail_marker_);
queue_.push_back(*rt_update);
// Go through the UpdateInfo list and insert each element into the set
// container for the UpdateQueue. Also set up the back pointer to the
// RouteUpdate.
UpdateInfoSList &uinfo_slist = rt_update->Updates();
for (UpdateInfoSList::List::iterator iter = uinfo_slist->begin();
iter != uinfo_slist->end(); ++iter) {
iter->update = rt_update;
attr_set_.insert(*iter);
}
return need_tail_dequeue;
}
//
// Dequeue the specified RouteUpdate from the UpdateQueue. All UpdateInfo
// elements for the RouteUpdate are removed from the set container.
//
void UpdateQueue::Dequeue(RouteUpdate *rt_update) {
tbb::mutex::scoped_lock lock(mutex_);
queue_.erase(queue_.iterator_to(*rt_update));
UpdateInfoSList &uinfo_slist = rt_update->Updates();
for (UpdateInfoSList::List::iterator iter = uinfo_slist->begin();
iter != uinfo_slist->end(); ++iter) {
attr_set_.erase(attr_set_.iterator_to(*iter));
}
}
//
// Return the next RouteUpdate after the given UpdateEntry on the UpdateQueue.
// Traverses the FIFO and skips over MARKERs and other element types to find
// the next RouteUpdate.
//
// Return NULL if there's no such RouteUpdate on the FIFO.
//
RouteUpdate *UpdateQueue::NextUpdate(UpdateEntry *current_upentry) {
tbb::mutex::scoped_lock lock(mutex_);
UpdatesByOrder::iterator iter = queue_.iterator_to(*current_upentry);
while (++iter != queue_.end()) {
UpdateEntry *upentry = iter.operator->();
if (upentry->IsUpdate()) {
return static_cast<RouteUpdate *>(upentry);
}
}
return NULL;
}
//
// Return the next UpdateEntry on the UpdateQueue after the one specified.
// Traverses the FIFO and returns the next UpdateEntry, irrespective of the
// type.
//
// Return NULL if there's no such UpdateEntry on the FIFO.
//
UpdateEntry *UpdateQueue::NextEntry(UpdateEntry *current_upentry) {
tbb::mutex::scoped_lock lock(mutex_);
UpdatesByOrder::iterator iter = queue_.iterator_to(*current_upentry);
if (++iter == queue_.end()) {
return NULL;
}
return iter.operator->();
}
//
// Dequeue the specified UpdateInfo from the set container.
//
void UpdateQueue::AttrDequeue(UpdateInfo *current_uinfo) {
tbb::mutex::scoped_lock lock(mutex_);
attr_set_.erase(attr_set_.iterator_to(*current_uinfo));
}
//
// Return the next UpdateInfo after the one provided. This traverses the set
// container and returns the next one if it has the same BgpAttr. Note that
// we must not consider the label in order to ensure optimal packing.
//
// Returns NULL if there are no more updates with the same BgpAttr.
//
// Also return NULL if the next UpdateInfo is for the same RouteUpdate. This
// can happen in corner cases where the label (or the set for ecmp nexthops
// in case of an XMPP ribout) for a route changes between Join operations for
// 2 different sets of IPeerUpdates. Returning such an UpdateInfo breaks the
// locking design in RibOutUpdates and results in a deadlock.
//
UpdateInfo *UpdateQueue::AttrNext(UpdateInfo *current_uinfo) {
tbb::mutex::scoped_lock lock(mutex_);
UpdatesByAttr::iterator iter = attr_set_.iterator_to(*current_uinfo);
++iter;
if (iter == attr_set_.end()) {
return NULL;
}
UpdateInfo *next_uinfo = iter.operator->();
if (next_uinfo->update == current_uinfo->update) {
return NULL;
}
if (next_uinfo->roattr.attr() == current_uinfo->roattr.attr()) {
return next_uinfo;
}
return NULL;
}
//
// Add the provided UpdateMarker after a specific RouteUpdate. Also update
// the MarkerMap so that all peers in the provided UpdateMarker now point
// to it.
//
void UpdateQueue::AddMarker(UpdateMarker *marker, RouteUpdate *rt_update) {
assert(!marker->members.empty());
tbb::mutex::scoped_lock lock(mutex_);
marker_count_++;
queue_.insert(++queue_.iterator_to(*rt_update), *marker);
for (size_t i = marker->members.find_first();
i != BitSet::npos; i = marker->members.find_next(i)) {
MarkerMap::iterator loc = markers_.find(i);
assert(loc != markers_.end());
loc->second = marker;
}
}
//
// Move the provided UpdateMarker so that it's after the RouteUpdate. Since
// the entire UpdateMarker itself is being moved, there's no need to update
// the MarkerMap.
//
// If the UpdateMarker is the tail marker, skip over all markers after the
// RouteUpdate till we hit the next RouteUpdate or the end of the queue.
// This is needed for the case where some peers get blocked after sending
// the last RouteUpdate in the queue. In that scenario, the marker for the
// blocked peers is inserted after the RouteUpdate and we then try to move
// the tail marker after the RouteUpdate.
//
void UpdateQueue::MoveMarker(UpdateMarker *marker, RouteUpdate *rt_update) {
tbb::mutex::scoped_lock lock(mutex_);
queue_.erase(queue_.iterator_to(*marker));
UpdatesByOrder::iterator iter = queue_.iterator_to(*rt_update);
if (marker == &tail_marker_) {
for (iter++; iter != queue_.end() && iter->IsMarker(); iter++) {
}
queue_.insert(iter, *marker);
} else {
queue_.insert(++iter, *marker);
}
}
//
// Split the specified UpdateMarker based on the RibPeerSet. A new marker is
// created for the peers in the RibPeerSet and is inserted immediately before
// the existing marker. The bits corresponding to the RibPeerSet being split
// are reset in the existing marker.
//
// The MarkerMap is updated so that all the peers in in the RibPeerSet point
// to the new marker.
//
void UpdateQueue::MarkerSplit(UpdateMarker *marker, const RibPeerSet &msplit) {
assert(!msplit.empty());
UpdateMarker *split_marker = new UpdateMarker();
split_marker->members = msplit;
tbb::mutex::scoped_lock lock(mutex_);
marker->members.Reset(msplit);
assert(!marker->members.empty());
UpdatesByOrder::iterator mpos = queue_.iterator_to(*marker);
queue_.insert(mpos, *split_marker);
marker_count_++;
for (size_t i = msplit.find_first();
i != BitSet::npos; i = msplit.find_next(i)) {
MarkerMap::iterator loc = markers_.find(i);
assert(loc != markers_.end());
loc->second = split_marker;
}
}
//
// Merge the peers corresponding to the RibPeerSet from the src UpdateMarker
// to the dst. The bits corresponding to the RibPeerSet being merged into the
// dst are reset in the src marker. If the src UpdateMarker has no more peers,
// as a result, it is removed from the FIFO and destroyed.
//
void UpdateQueue::MarkerMerge(UpdateMarker *dst_marker,
UpdateMarker *src_marker, const RibPeerSet &bitset) {
assert(!bitset.empty());
tbb::mutex::scoped_lock lock(mutex_);
// Set the bits in dst and update the MarkerMap. Be sure to set the dst
// before we reset the src since bitset maybe a reference to src->members.
dst_marker->members.Set(bitset);
for (size_t i = bitset.find_first();
i != BitSet::npos; i = bitset.find_next(i)) {
MarkerMap::iterator loc = markers_.find(i);
assert(loc != markers_.end());
loc->second = dst_marker;
}
// Reset the bits in the src and get rid of it in case it's now empty.
src_marker->members.Reset(bitset);
if (src_marker->members.empty()) {
queue_.erase(queue_.iterator_to(*src_marker));
assert(src_marker != &tail_marker_);
delete src_marker;
marker_count_--;
}
}
//
// Return the UpdateMarker for the peer specified by the bit position.
//
UpdateMarker *UpdateQueue::GetMarker(int bit) {
tbb::mutex::scoped_lock lock(mutex_);
MarkerMap::iterator loc = markers_.find(bit);
assert(loc != markers_.end());
return loc->second;
}
//
// Join a new peer, as represented by it's bit index, to the UpdateQueue.
// Since it's a new peer, it starts out at the tail marker. Also add the
// peer's bit index and the tail marker pair to the MarkerMap.
//
// Return true if the tail marker is not the last entry in the queue. The
// caller should trigger a tail dequeue for the RibOut if so to take care
// of the possibility that all peers in the tail marker are blocked. Note
// that a tail dequeue may already be pending in the SchedulingGroup, but
// an extra one is harmless.
//
bool UpdateQueue::Join(int bit) {
tbb::mutex::scoped_lock lock(mutex_);
UpdateMarker *marker = &tail_marker_;
marker->members.set(bit);
markers_.insert(std::make_pair(bit, marker));
return (&tail_marker_ != &queue_.back());
}
//
// Leave a peer, as represented by it's bit index, from the UpdateQueue.
// Find the current marker for the peer and remove the peer's bit from the
// MarkerMap. Reset the peer's bit in the marker and get rid of the marker
// itself if it's now empty.
//
void UpdateQueue::Leave(int bit) {
tbb::mutex::scoped_lock lock(mutex_);
MarkerMap::iterator loc = markers_.find(bit);
assert(loc != markers_.end());
UpdateMarker *marker = loc->second;
markers_.erase(loc);
marker->members.reset(bit);
if (marker != &tail_marker_ && marker->members.empty()) {
queue_.erase(queue_.iterator_to(*marker));
delete marker;
}
}
bool UpdateQueue::CheckInvariants() const {
tbb::mutex::scoped_lock lock(mutex_);
for (MarkerMap::const_iterator iter = markers_.begin();
iter != markers_.end(); ++iter) {
UpdateMarker *marker = iter->second;
CHECK_INVARIANT(marker->members.test(iter->first));
}
return true;
}
//
// Return true if the UpdateQueue is empty. It's considered empty if there
// are as no UpdateInfo elements in the set container. We don't look at the
// FIFO since that may still have the tail marker on it.
//
bool UpdateQueue::empty() const {
tbb::mutex::scoped_lock lock(mutex_);
return attr_set_.empty();
}
size_t UpdateQueue::size() const {
tbb::mutex::scoped_lock lock(mutex_);
return attr_set_.size();
}
size_t UpdateQueue::marker_count() const {
tbb::mutex::scoped_lock lock(mutex_);
return marker_count_;
}