/
bgp_xmpp_channel.h
410 lines (358 loc) · 14.8 KB
/
bgp_xmpp_channel.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/
#ifndef SRC_BGP_BGP_XMPP_CHANNEL_H_
#define SRC_BGP_BGP_XMPP_CHANNEL_H_
#include <boost/function.hpp>
#include <boost/system/error_code.hpp>
#include <boost/scoped_ptr.hpp>
#include <tbb/mutex.h>
#include <map>
#include <set>
#include <string>
#include <utility>
#include "base/queue_task.h"
#include "bgp/bgp_rib_policy.h"
#include "bgp/routing-instance/routing_instance.h"
#include "io/tcp_session.h"
#include "net/rd.h"
#include "xmpp/xmpp_channel.h"
namespace pugi {
class xml_node;
}
class BgpGlobalSystemConfig;
class BgpRouterState;
class BgpServer;
class BgpXmppRTargetManager;
struct DBRequest;
class IPeer;
class PeerCloseManager;
class XmppServer;
class BgpXmppChannelMock;
class BgpXmppChannelManager;
class BgpXmppChannelManagerMock;
class BgpXmppPeerClose;
class Timer;
class XmppConfigUpdater;
class XmppPeerInfoData;
class XmppSession;
class BgpXmppChannel {
public:
static const int kEndOfRibSendRetryTimeMsecs = 2000; // 2 Seconds
enum StatsIndex {
RX,
TX,
};
struct Stats {
Stats() { memset(this, 0, sizeof(*this)); }
uint64_t rt_updates;
uint64_t reach;
uint64_t unreach;
uint64_t end_of_rib;
};
struct ChannelStats {
ChannelStats() { memset(this, 0, sizeof(*this)); }
uint64_t instance_subscribe;
uint64_t instance_unsubscribe;
uint64_t table_subscribe;
uint64_t table_subscribe_complete;
uint64_t table_unsubscribe;
uint64_t table_unsubscribe_complete;
};
struct ErrorStats {
ErrorStats() { memset(this, 0, sizeof(*this)); }
void incr_inet6_rx_bad_xml_token_count();
void incr_inet6_rx_bad_prefix_count();
void incr_inet6_rx_bad_nexthop_count();
void incr_inet6_rx_bad_afi_safi_count();
uint64_t get_inet6_rx_bad_xml_token_count() const;
uint64_t get_inet6_rx_bad_prefix_count() const;
uint64_t get_inet6_rx_bad_nexthop_count() const;
uint64_t get_inet6_rx_bad_afi_safi_count() const;
uint64_t inet6_rx_bad_xml_token_count;
uint64_t inet6_rx_bad_prefix_count;
uint64_t inet6_rx_bad_nexthop_count;
uint64_t inet6_rx_bad_afi_safi_count;
};
explicit BgpXmppChannel(XmppChannel *channel, BgpServer *bgp_server = NULL,
BgpXmppChannelManager *manager = NULL);
virtual ~BgpXmppChannel();
void Close();
IPeer *Peer();
const IPeer *Peer() const;
virtual TcpSession::Endpoint endpoint() const;
const std::string &ToString() const;
const std::string &ToUVEKey() const;
std::string StateName() const;
TcpSession::Endpoint remote_endpoint() const;
TcpSession::Endpoint local_endpoint() const;
std::string transport_address_string() const;
void set_peer_closed(bool flag);
bool peer_deleted() const;
bool membership_unavailable() const { return membership_unavailable_; }
uint64_t peer_closed_at() const;
bool routingtable_membership_request_map_empty() const;
size_t GetMembershipRequestQueueSize() const;
const XmppSession *GetSession() const;
const Stats &rx_stats() const { return stats_[RX]; }
const Stats &tx_stats() const { return stats_[TX]; }
ErrorStats &error_stats() { return error_stats_; }
const ErrorStats &error_stats() const { return error_stats_; }
void set_deleted(bool deleted) { deleted_ = deleted; }
bool deleted() { return deleted_; }
void RoutingInstanceCallback(std::string vrf_name, int op);
void ASNUpdateCallback(as_t old_asn, as_t old_local_asn);
void IdentifierUpdateCallback(Ip4Address old_identifier);
void FillInstanceMembershipInfo(BgpNeighborResp *resp) const;
void FillTableMembershipInfo(BgpNeighborResp *resp) const;
void FillCloseInfo(BgpNeighborResp *resp) const;
void StaleCurrentSubscriptions();
void LlgrStaleCurrentSubscriptions();
void SweepCurrentSubscriptions();
void XMPPPeerInfoSend(const XmppPeerInfoData &peer_info) const;
const XmppChannel *channel() const { return channel_; }
XmppChannel *channel() { return channel_; }
void StartEndOfRibReceiveTimer();
void RestEndOfRibState();
bool EndOfRibSendTimerExpired();
bool MembershipResponseHandler(std::string table_name);
Timer *eor_send_timer() const { return eor_send_timer_; }
bool eor_sent() const { return eor_sent_; }
size_t membership_requests() const;
void ClearEndOfRibState();
PeerCloseManager *close_manager() { return close_manager_.get(); }
uint64_t get_rx_route_reach() const { return stats_[RX].reach; }
uint64_t get_rx_route_unreach() const { return stats_[RX].unreach; }
uint64_t get_rx_update() const { return stats_[RX].rt_updates; }
uint64_t get_tx_route_reach() const { return stats_[TX].reach; }
uint64_t get_tx_route_unreach() const { return stats_[TX].unreach; }
uint64_t get_tx_update() const { return stats_[TX].rt_updates; }
bool SkipUpdateSend();
bool delete_in_progress() const { return delete_in_progress_; }
void set_delete_in_progress(bool flag) { delete_in_progress_ = flag; }
BgpXmppRTargetManager *rtarget_manager() {
return rtarget_manager_.get();
}
bool IsSubscriptionGrStale(RoutingInstance *instance) const;
bool IsSubscriptionLlgrStale(RoutingInstance *instance) const;
bool IsSubscriptionEmpty() const;
const RoutingInstance::RouteTargetList &GetSubscribedRTargets(
RoutingInstance *instance) const;
void ClearSubscriptions() { routing_instances_.clear(); }
BgpServer *bgp_server() { return bgp_server_; }
const BgpXmppChannelManager *manager() const { return manager_; }
BgpXmppChannelManager *manager() { return manager_; }
XmppChannel *xmpp_channel() const { return channel_; }
void ReceiveEndOfRIB(Address::Family family);
void ProcessPendingSubscriptions();
protected:
XmppChannel *channel_;
private:
friend class BgpXmppChannelMock;
friend class BgpXmppChannelManager;
friend class BgpXmppParseTest;
friend class BgpXmppUnitTest;
class XmppPeer;
class PeerClose;
class PeerStats;
//
// State the instance id received in Membership subscription request
// Also remember we received unregister request
//
enum RequestType {
NONE,
SUBSCRIBE,
UNSUBSCRIBE,
};
struct MembershipRequestState {
MembershipRequestState(RequestType current, int id)
: current_req(current), instance_id(id), pending_req(current) {
}
RequestType current_req;
int instance_id;
RequestType pending_req;
};
// Map of routing instances to which this BgpXmppChannel is subscribed.
struct SubscriptionState {
enum State {
NONE = 0,
GR_STALE = 1 << 0,
LLGR_STALE = 1 << 1
};
SubscriptionState(const RoutingInstance::RouteTargetList &targets,
int index)
: targets(targets), index(index), state(NONE) { }
bool IsGrStale() const { return((state & GR_STALE) != 0); }
void SetGrStale() { state |= GR_STALE; }
void SetLlgrStale() { state |= LLGR_STALE; }
bool IsLlgrStale() const { return((state & LLGR_STALE) != 0); }
void ClearStale() { state &= ~(GR_STALE | LLGR_STALE); }
RoutingInstance::RouteTargetList targets;
int index;
uint32_t state;
};
typedef std::map<RoutingInstance *, SubscriptionState>
SubscribedRoutingInstanceList;
// map of routing-instance table name to XMPP subscription request state
typedef std::map<std::string, MembershipRequestState>
RoutingTableMembershipRequestMap;
// map of routing-instance name to XMPP subscription request state
// This map maintains list of requests that are rxed for subscription
// before routing instance is actually created
typedef std::map<std::string, int> VrfMembershipRequestMap;
// The code assumes that multimap preserves insertion order for duplicate
// values of same key.
typedef std::pair<const std::string, const std::string> VrfTableName;
typedef std::multimap<VrfTableName, DBRequest *> DeferQ;
virtual void ReceiveUpdate(const XmppStanza::XmppMessage *msg);
virtual bool GetMembershipInfo(BgpTable *table,
int *instance_id, uint64_t *subscribed_at, RequestType *req_type);
virtual bool GetMembershipInfo(const std::string &vrf_name,
int *instance_id);
bool VerifyMembership(const std::string &vrf_name, Address::Family family,
BgpTable **table, int *instance_id, uint64_t *subscribed_at,
bool *subscribe_pending);
bool ProcessItem(std::string vrf_name, const pugi::xml_node &node,
bool add_change);
bool ProcessInet6Item(std::string vrf_name, const pugi::xml_node &node,
bool add_change);
bool ProcessMcastItem(std::string vrf_name,
const pugi::xml_node &item, bool add_change);
bool ProcessEnetItem(std::string vrf_name,
const pugi::xml_node &item, bool add_change);
void ProcessSubscriptionRequest(std::string rt_instance,
const XmppStanza::XmppMessageIq *iq,
bool add_change);
void AddSubscriptionState(RoutingInstance *rt_instance, int index);
void RegisterTable(int line, BgpTable *table, int instance_id);
void UnregisterTable(int line, BgpTable *table);
void MembershipRequestCallback(BgpTable *table);
void DequeueRequest(const std::string &table_name, DBRequest *request);
bool XmppDecodeAddress(int af, const std::string &address,
IpAddress *addrp, bool zero_ok = false);
bool ResumeClose();
void FlushDeferQ(std::string vrf_name);
void FlushDeferQ(std::string vrf_name, std::string table_name);
void ProcessDeferredSubscribeRequest(RoutingInstance *rt_instance,
int instance_id);
void ClearStaledSubscription(RoutingInstance *rt_instance,
SubscriptionState *sub_state);
bool ProcessMembershipResponse(std::string table_name,
RoutingTableMembershipRequestMap::iterator loc);
bool EndOfRibReceiveTimerExpired();
void EndOfRibTimerErrorHandler(std::string error_name,
std::string error_message);
void SendEndOfRIB();
xmps::PeerId peer_id_;
boost::scoped_ptr<BgpXmppRTargetManager> rtarget_manager_;
BgpServer *bgp_server_;
boost::scoped_ptr<XmppPeer> peer_;
boost::scoped_ptr<BgpXmppPeerClose> peer_close_;
boost::scoped_ptr<PeerCloseManager> close_manager_;
boost::scoped_ptr<PeerStats> peer_stats_;
RibExportPolicy bgp_policy_;
// DB Requests pending membership request response.
DeferQ defer_q_;
RoutingTableMembershipRequestMap routingtable_membership_request_map_;
VrfMembershipRequestMap vrf_membership_request_map_;
BgpXmppChannelManager *manager_;
bool delete_in_progress_;
bool deleted_;
bool defer_peer_close_;
bool membership_unavailable_;
bool skip_update_send_;
bool skip_update_send_cached_;
bool eor_sent_;
Timer *eor_receive_timer_;
Timer *eor_send_timer_;
uint64_t eor_receive_timer_start_time_;
uint64_t eor_send_timer_start_time_;
WorkQueue<std::string> membership_response_worker_;
SubscribedRoutingInstanceList routing_instances_;
// statistics
Stats stats_[2];
ChannelStats channel_stats_;
ErrorStats error_stats_;
// Label block manager for multicast labels.
LabelBlockManagerPtr lb_mgr_;
DISALLOW_COPY_AND_ASSIGN(BgpXmppChannel);
};
class BgpXmppChannelManager {
public:
typedef std::map<const XmppChannel *, BgpXmppChannel *> XmppChannelMap;
typedef std::map<std::string, BgpXmppChannel *> XmppChannelNameMap;
typedef XmppChannelNameMap::const_iterator const_name_iterator;
typedef boost::function<void(BgpXmppChannel *)> VisitorFn;
BgpXmppChannelManager(XmppServer *, BgpServer *);
virtual ~BgpXmppChannelManager();
const_name_iterator name_cbegin() const {
return channel_name_map_.begin();
}
const_name_iterator name_cend() const {
return channel_name_map_.end();
}
const_name_iterator name_clower_bound(const std::string &name) const {
return channel_name_map_.lower_bound(name);
}
void VisitChannels(BgpXmppChannelManager::VisitorFn);
void VisitChannels(BgpXmppChannelManager::VisitorFn) const;
BgpXmppChannel *FindChannel(const XmppChannel *channel);
BgpXmppChannel *FindChannel(std::string client);
void RemoveChannel(XmppChannel *channel);
virtual void XmppHandleChannelEvent(XmppChannel *channel,
xmps::PeerState state);
const XmppChannelMap &channel_map() const { return channel_map_; }
bool DeleteExecutor(BgpXmppChannel *bx_channel);
void Enqueue(BgpXmppChannel *bx_channel) {
queue_.Enqueue(bx_channel);
}
bool IsReadyForDeletion();
void SetQueueDisable(bool disabled);
size_t GetQueueSize() const;
void AdminDownCallback();
void ASNUpdateCallback(as_t old_asn, as_t old_local_asn);
void IdentifierUpdateCallback(Ip4Address old_identifier);
void RoutingInstanceCallback(std::string vrf_name, int op);
uint32_t count() const {
tbb::mutex::scoped_lock lock(mutex_);
return channel_map_.size();
}
uint32_t NumUpPeer() const {
tbb::mutex::scoped_lock lock(mutex_);
return channel_map_.size();
}
int32_t deleting_count() const { return deleting_count_; }
void increment_deleting_count() { deleting_count_++; }
void decrement_deleting_count() {
assert(deleting_count_);
deleting_count_--;
}
BgpServer *bgp_server() { return bgp_server_; }
XmppServer *xmpp_server() { return xmpp_server_; }
const XmppServer *xmpp_server() const { return xmpp_server_; }
uint64_t get_subscription_gen_id() {
return subscription_gen_id_.fetch_and_increment();
}
bool CollectStats(BgpRouterState *state, bool first) const;
protected:
virtual BgpXmppChannel *CreateChannel(XmppChannel *channel);
private:
friend class BgpXmppChannelManagerMock;
friend class BgpXmppUnitTest;
void FillPeerInfo(const BgpXmppChannel *channel) const;
XmppServer *xmpp_server_;
BgpServer *bgp_server_;
WorkQueue<BgpXmppChannel *> queue_;
mutable tbb::mutex mutex_;
XmppChannelMap channel_map_;
XmppChannelNameMap channel_name_map_;
int id_;
int admin_down_listener_id_;
int asn_listener_id_;
int identifier_listener_id_;
tbb::atomic<int32_t> deleting_count_;
// Generation number for subscription tracking
tbb::atomic<uint64_t> subscription_gen_id_;
DISALLOW_COPY_AND_ASSIGN(BgpXmppChannelManager);
};
#endif // SRC_BGP_BGP_XMPP_CHANNEL_H_