-
Notifications
You must be signed in to change notification settings - Fork 390
/
flow_event.h
333 lines (298 loc) · 12.8 KB
/
flow_event.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/
#ifndef __AGENT_FLOW_EVENT_H__
#define __AGENT_FLOW_EVENT_H__
#include <sys/resource.h>
#include <ksync/ksync_entry.h>
#include "flow_table.h"
class FlowTokenPool;
////////////////////////////////////////////////////////////////////////////
// Control events for flow management
////////////////////////////////////////////////////////////////////////////
class FlowEvent {
public:
enum Event {
INVALID,
// Flow add message from VRouter
VROUTER_FLOW_MSG,
// Message to update a flow
FLOW_MESSAGE,
// Event to delete a flow entry
DELETE_FLOW,
// Event by audit module to delete a flow
AUDIT_FLOW,
// In agent, flow is evicted if index is allocated for another flow
// We delete the flow on eviction. There is a corner case where evicted
// flow is added in parallel with different index. In that case
// we ignore the operation
EVICT_FLOW,
// Revaluate flow due to deletion of a DBEntry. Other than for INET
// routes, delete of a DBEntry will result in deletion of flows using
// the DBEntry
DELETE_DBENTRY,
// Revaluate route due to change in a DBEntry. This event is used to
// revaluate a flow on add/change of interface, vm, vn etc...
// The action typically invovles only re-evaluating ACL lookup actions
REVALUATE_DBENTRY,
// Add/Delete of route can result in flow using a next higher/lower
// prefix. The event will re-valuate the complete flow due to change
// in route used for flow
REVALUATE_FLOW,
// Flow entry should be freed from kTaskFlowEvent task context.
// Event to ensure flow entry is freed from right context
FREE_FLOW_REF,
// A DBEntry should be freed from kTaskFlowEvent task context.
// Event to ensure DBEntry entry reference is freed from right context
FREE_DBENTRY,
// Grow the free-list entries for flow and ksync
GROW_FREE_LIST,
// Generate KSync event for the flow
KSYNC_EVENT,
// Pkt is re-entering processing in new partition
REENTRANT,
// Need to resolve the Flow entry whic is depending on Mirror entry
UNRESOLVED_FLOW_ENTRY,
};
FlowEvent() :
event_(INVALID), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) {
}
FlowEvent(Event event) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), del_rev_flow_(false), table_index_(0) {
}
FlowEvent(Event event, FlowEntry *flow) :
event_(event), flow_(flow), pkt_info_(), db_entry_(NULL),
gen_id_(0), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0){
}
FlowEvent(Event event, FlowEntry *flow, uint32_t flow_handle,
uint8_t gen_id) :
event_(event), flow_(flow), pkt_info_(), db_entry_(NULL),
gen_id_(gen_id), del_rev_flow_(false), flow_handle_(flow_handle),
table_index_(0) {
}
FlowEvent(Event event, FlowEntry *flow, const DBEntry *db_entry) :
event_(event), flow_(flow), pkt_info_(), db_entry_(db_entry),
gen_id_(0), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) {
}
FlowEvent(Event event, const DBEntry *db_entry, uint32_t gen_id) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(db_entry),
gen_id_(gen_id), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) {
}
FlowEvent(Event event, const FlowKey &key, bool del_rev_flow,
uint32_t table_index) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), flow_key_(key), del_rev_flow_(del_rev_flow),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(table_index) {
}
FlowEvent(Event event, const FlowKey &key) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), flow_key_(key), del_rev_flow_(false),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) {
}
FlowEvent(Event event, const FlowKey &key, uint32_t flow_handle,
uint8_t gen_id) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(gen_id), flow_key_(key), del_rev_flow_(false),
flow_handle_(flow_handle), table_index_(0) {
}
FlowEvent(Event event, PktInfoPtr pkt_info) :
event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL),
gen_id_(0), flow_key_(), del_rev_flow_(),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) {
}
FlowEvent(Event event, PktInfoPtr pkt_info, uint8_t table_index) :
event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL),
gen_id_(0), flow_key_(), del_rev_flow_(),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(table_index) {
}
FlowEvent(const FlowEvent &rhs) :
event_(rhs.event_), flow_(rhs.flow()), pkt_info_(rhs.pkt_info_),
db_entry_(rhs.db_entry_), gen_id_(rhs.gen_id_),
flow_key_(rhs.flow_key_), del_rev_flow_(rhs.del_rev_flow_),
flow_handle_(rhs.flow_handle_), table_index_(rhs.table_index_) {
}
FlowEvent(Event event, FlowEntryPtr &flow) :
event_(event), flow_(flow), pkt_info_(), db_entry_(NULL),
gen_id_(0), flow_key_(), del_rev_flow_(),
flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_() {
}
virtual ~FlowEvent() { }
Event event() const { return event_; }
FlowEntry *flow() const { return flow_.get(); }
FlowEntryPtr &flow_ref() { return flow_; }
void set_flow(FlowEntry *flow) { flow_ = flow; }
const DBEntry *db_entry() const { return db_entry_; }
void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; }
uint32_t gen_id() const { return gen_id_; }
const FlowKey &get_flow_key() const { return flow_key_; }
bool get_del_rev_flow() const { return del_rev_flow_; }
PktInfoPtr pkt_info() const { return pkt_info_; }
uint32_t flow_handle() const { return flow_handle_; }
uint32_t table_index() const { return table_index_;}
private:
Event event_;
FlowEntryPtr flow_;
PktInfoPtr pkt_info_;
const DBEntry *db_entry_;
uint32_t gen_id_;
FlowKey flow_key_;
bool del_rev_flow_;
uint32_t flow_handle_;
uint32_t table_index_;
};
////////////////////////////////////////////////////////////////////////////
// Event to process VRouter response for flow operation. VRouter response for
// flow is made of two messages,
// - vr_flow response which will contains,
// - Return code for the operation
// - flow-handle allocated for flow
// - gen-id for he hash-entry allocated
// - stats for the flow being evicted by VRouter
// - vr_response
// - contains ksync-event to be generated for the flow
//
// The event combines data from both the messages. The event-handler will
// process both the vrouter response messages
//
// The flow-handle and gen-id are got from base class (FlowEvent)
////////////////////////////////////////////////////////////////////////////
class FlowEventKSync : public FlowEvent {
public:
FlowEventKSync(const KSyncEntry::KSyncEntryPtr ksync_entry,
KSyncEntry::KSyncEvent ksync_event, uint32_t flow_handle,
uint32_t gen_id, int ksync_error, uint64_t evict_flow_bytes,
uint64_t evict_flow_packets, uint64_t evict_flow_oflow) :
FlowEvent(KSYNC_EVENT, NULL, flow_handle, gen_id),
ksync_entry_(ksync_entry), ksync_event_(ksync_event),
ksync_error_(ksync_error), evict_flow_bytes_(evict_flow_bytes),
evict_flow_packets_(evict_flow_packets),
evict_flow_oflow_(evict_flow_oflow) {
}
FlowEventKSync(const FlowEventKSync &rhs) :
FlowEvent(rhs), ksync_entry_(rhs.ksync_entry_),
ksync_event_(rhs.ksync_event_), ksync_error_(rhs.ksync_error_),
evict_flow_bytes_(rhs.evict_flow_bytes_),
evict_flow_packets_(rhs.evict_flow_packets_),
evict_flow_oflow_(rhs.evict_flow_oflow_) {
}
virtual ~FlowEventKSync() { }
KSyncEntry *ksync_entry() const { return ksync_entry_.get(); }
KSyncEntry::KSyncEvent ksync_event() const { return ksync_event_; }
int ksync_error() const { return ksync_error_; }
uint64_t evict_flow_bytes() const { return evict_flow_bytes_; }
uint64_t evict_flow_packets() const { return evict_flow_packets_; }
uint64_t evict_flow_oflow() const { return evict_flow_oflow_; }
private:
KSyncEntry::KSyncEntryPtr ksync_entry_;
KSyncEntry::KSyncEvent ksync_event_;
int ksync_error_;
uint64_t evict_flow_bytes_;
uint64_t evict_flow_packets_;
uint64_t evict_flow_oflow_;
};
////////////////////////////////////////////////////////////////////////////
// FlowProto uses following queues,
//
// - FlowEventQueue
// This queue contains events for flow add, flow eviction etc...
// See FlowProto::FlowEventHandler for events handled in this queue
// - KSyncFlowEventQueue
// This queue contains events generated from KSync response for a flow
// - DeleteFlowEventQueue
// This queue contains events generated for flow-ageing
// - UpdateFlowEventQueue
// This queue contains events generated as result of config changes such
// as add/delete/change of interface, vn, vm, acl, nh, route etc...
//
// All queues are defined from a base class FlowEventQueueBase.
// FlowEventQueueBase implements a wrapper around the WorkQueues with following
// additional functionality,
//
// - Rate Control using Tokens
// All the queues give above can potentially add/change/delete flows in the
// vrouter. So, the queues given above acts as producer and VRouter acts as
// consumer. VRouter is a slow consumer of events. To provide fairness
// across queues, a "token" based scheme is used. See flow_token.h for more
// information
//
// The queue will stop the WorkQueue when it runs out of tokens. The queue
// is started again after a minimum number of tokens become available
//
// - Time limits
// Intermittently, it is observed that some of the queues take large amount
// of time. Latencies in queue such as KSync queue or delete-queue can result
// in flow-setup latencies. So, we want to impose an upper bound on the
// amount of time taken in single run of WorkQueue.
//
// We take timestamp at start of queue, and check latency for every 8
// events processed in the queue. If the latency goes beyond a limit, the
// WorkQueue run is aborted.
////////////////////////////////////////////////////////////////////////////
class FlowEventQueueBase {
public:
typedef WorkQueue<FlowEvent *> Queue;
FlowEventQueueBase(FlowProto *proto, const std::string &name,
uint32_t task_id, int task_instance,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~FlowEventQueueBase();
virtual bool HandleEvent(FlowEvent *event) = 0;
virtual bool Handler(FlowEvent *event);
void Shutdown();
void Enqueue(FlowEvent *event);
bool TokenCheck();
bool TaskEntry();
void TaskExit(bool done);
void set_disable(bool val) { queue_->set_disable(val); }
uint32_t Length() { return queue_->Length(); }
void MayBeStartRunner() { queue_->MayBeStartRunner(); }
Queue *queue() const { return queue_; }
protected:
Queue *queue_;
FlowProto *flow_proto_;
FlowTokenPool *token_pool_;
uint64_t task_start_;
uint32_t count_;
uint16_t latency_limit_;
struct rusage rusage_;
};
class FlowEventQueue : public FlowEventQueueBase {
public:
FlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~FlowEventQueue();
bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};
class DeleteFlowEventQueue : public FlowEventQueueBase {
public:
DeleteFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~DeleteFlowEventQueue();
bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};
class KSyncFlowEventQueue : public FlowEventQueueBase {
public:
KSyncFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~KSyncFlowEventQueue();
bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};
class UpdateFlowEventQueue : public FlowEventQueueBase {
public:
UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~UpdateFlowEventQueue();
bool HandleEvent(FlowEvent *event);
};
#endif // __AGENT_FLOW_EVENT_H__