Skip to content

Commit

Permalink
Merge "Define new class for FlowEvent queues" into R3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed May 8, 2016
2 parents 9ff8add + 333d3e4 commit 39ac2fe
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 101 deletions.
2 changes: 2 additions & 0 deletions src/vnsw/agent/cmn/agent.h
Expand Up @@ -250,6 +250,8 @@ class Agent {
static const uint32_t kDefaultFlowIndexSmLogCount = 0;
// default number of threads for flow setup
static const uint32_t kDefaultFlowThreadCount = 1;
// Log a message if latency in processing flow queue exceeds limit
static const uint32_t kDefaultFlowLatencyLimit = 0;
// Max number of threads
static const uint32_t kMaxTbbThreads = 8;
static const uint32_t kDefaultTbbKeepawakeTimeout = (20); //time-millisecs
Expand Down
9 changes: 9 additions & 0 deletions src/vnsw/agent/init/agent_param.cc
Expand Up @@ -519,6 +519,11 @@ void AgentParam::ParseFlows() {
flow_thread_count_ = Agent::kDefaultFlowThreadCount;
}

if (!GetValueFromTree<uint16_t>(flow_latency_limit_,
"FLOWS.latency_limit")) {
flow_latency_limit_ = Agent::kDefaultFlowLatencyLimit;
}

if (!GetValueFromTree<bool>(flow_trace_enable_, "FLOWS.trace_enable")) {
flow_trace_enable_ = true;
}
Expand Down Expand Up @@ -769,6 +774,8 @@ void AgentParam::ParseFlowArguments
(const boost::program_options::variables_map &var_map) {
GetOptValue<uint16_t>(var_map, flow_thread_count_,
"FLOWS.thread_count");
GetOptValue<uint16_t>(var_map, flow_latency_limit_,
"FLOWS.latency_limit");
GetOptValue<bool>(var_map, flow_trace_enable_, "FLOWS.trace_enable");
uint16_t val = 0;
if (GetOptValue<uint16_t>(var_map, val, "FLOWS.max_vm_flows")) {
Expand Down Expand Up @@ -1186,6 +1193,7 @@ void AgentParam::LogConfig() const {
LOG(DEBUG, "Linklocal Max Vm Flows : " << linklocal_vm_flows_);
LOG(DEBUG, "Flow cache timeout : " << flow_cache_timeout_);
LOG(DEBUG, "Flow thread count : " << flow_thread_count_);
LOG(DEBUG, "Flow latency limit : " << flow_latency_limit_);
LOG(DEBUG, "Flow index-mgr sm log count : " << flow_index_sm_log_count_);

if (agent_mode_ == VROUTER_AGENT)
Expand Down Expand Up @@ -1310,6 +1318,7 @@ AgentParam::AgentParam(bool enable_flow_options,
send_ratelimit_(sandesh_send_rate_limit()),
flow_thread_count_(Agent::kDefaultFlowThreadCount),
flow_trace_enable_(true),
flow_latency_limit_(Agent::kDefaultFlowLatencyLimit),
subnet_hosts_resolvable_(true),
tbb_thread_count_(Agent::kMaxTbbThreads),
tbb_exec_delay_(0),
Expand Down
6 changes: 6 additions & 0 deletions src/vnsw/agent/init/agent_param.h
Expand Up @@ -254,6 +254,11 @@ class AgentParam {
bool flow_trace_enable() const { return flow_trace_enable_; }
void set_flow_trace_enable(bool val) { flow_trace_enable_ = val; }

uint16_t flow_task_latency_limit() const { return flow_latency_limit_; }
void set_flow_task_latency_limit(uint16_t count) {
flow_latency_limit_ = count;
}

uint32_t tbb_thread_count() const { return tbb_thread_count_; }
uint32_t tbb_exec_delay() const { return tbb_exec_delay_; }
uint32_t tbb_schedule_delay() const { return tbb_schedule_delay_; }
Expand Down Expand Up @@ -471,6 +476,7 @@ class AgentParam {
uint32_t send_ratelimit_;
uint16_t flow_thread_count_;
bool flow_trace_enable_;
uint16_t flow_latency_limit_;
bool subnet_hosts_resolvable_;
std::string bgp_as_a_service_port_range_;
std::vector<uint16_t> bgp_as_a_service_port_range_value_;
Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/pkt/SConscript
Expand Up @@ -21,6 +21,7 @@ sandesh_objs = AgentEnv.BuildExceptionCppObj(env, SandeshGenSrcs)

pkt_srcs = [
'flow_entry.cc',
'flow_event.cc',
'flow_table.cc',
'flow_token.cc',
'flow_handler.cc',
Expand Down
157 changes: 157 additions & 0 deletions src/vnsw/agent/pkt/flow_event.cc
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/
#include <net/address_util.h>
#include <boost/functional/hash.hpp>
#include <init/agent_param.h>
#include <cmn/agent_stats.h>
#include <oper/agent_profile.h>
#include <vrouter/ksync/flowtable_ksync.h>
#include <vrouter/ksync/ksync_init.h>
#include <vrouter/ksync/ksync_flow_index_manager.h>
#include "vrouter/flow_stats/flow_stats_collector.h"
#include "flow_proto.h"
#include "flow_mgmt_dbclient.h"
#include "flow_mgmt.h"
#include "flow_event.h"

//////////////////////////////////////////////////////////////////////////////
// FlowEventQueue routines
//////////////////////////////////////////////////////////////////////////////
FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto,
const std::string &name,
uint32_t task_id, int task_instance,
FlowTokenPool *pool,
uint16_t latency_limit) :
flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
latency_limit_(latency_limit) {
queue_ = new Queue(task_id, task_instance,
boost::bind(&FlowEventQueueBase::Handler, this, _1));
char buff[100];
sprintf(buff, "%s-%d", name.c_str(), task_instance);
queue_->set_name(buff);
queue_->SetStartRunnerFunc(boost::bind(&FlowEventQueueBase::TokenCheck,
this));
if (latency_limit_) {
queue_->SetEntryCallback(boost::bind(&FlowEventQueueBase::TaskEntry,
this));
queue_->SetExitCallback(boost::bind(&FlowEventQueueBase::TaskExit,
this, _1));
}
}

FlowEventQueueBase::~FlowEventQueueBase() {
delete queue_;
}

void FlowEventQueueBase::Shutdown() {
queue_->Shutdown();
}

void FlowEventQueueBase::Enqueue(FlowEvent *event) {
queue_->Enqueue(event);
}

bool FlowEventQueueBase::TokenCheck() {
return flow_proto_->TokenCheck(token_pool_);
}

bool FlowEventQueueBase::TaskEntry() {
count_ = 0;
task_start_ = ClockMonotonicUsec();
getrusage(RUSAGE_THREAD, &rusage_);
return true;
}

void FlowEventQueueBase::TaskExit(bool done) {
if (task_start_ == 0)
return;

uint64_t t = ClockMonotonicUsec();
if (((t - task_start_) / 1000) >= latency_limit_) {
struct rusage r;
getrusage(RUSAGE_THREAD, &r);

uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 100;
user += (r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec);

uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 100;
sys += (r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec);

LOG(ERROR, queue_->Description()
<< " Time exceeded " << ((t - task_start_) / 1000)
<< " Count " << count_
<< " User " << user << " Sys " << sys);
}
return;
}

bool FlowEventQueueBase::Handler(FlowEvent *event) {
count_++;
return HandleEvent(event);
}

FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto,
FlowTable *table, FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow Event Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
table->table_index(), pool, latency_limit),
flow_table_(table) {
}

FlowEventQueue::~FlowEventQueue() {
}

bool FlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowEventHandler(event, flow_table_);
}

DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTable *table,
FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow Delete Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
table->table_index(), pool, latency_limit),
flow_table_(table) {
}

DeleteFlowEventQueue::~DeleteFlowEventQueue() {
}

bool DeleteFlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowDeleteHandler(event, flow_table_);
}

KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTable *table,
FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow KSync Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
table->table_index(), pool, latency_limit),
flow_table_(table) {
}

KSyncFlowEventQueue::~KSyncFlowEventQueue() {
}

bool KSyncFlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowKSyncMsgHandler(event, flow_table_);
}

UpdateFlowEventQueue::UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow Update Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
pool, latency_limit) {
}

UpdateFlowEventQueue::~UpdateFlowEventQueue() {
}

bool UpdateFlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowUpdateHandler(event);
}
114 changes: 114 additions & 0 deletions src/vnsw/agent/pkt/flow_event.h
Expand Up @@ -4,9 +4,12 @@
#ifndef __AGENT_FLOW_EVENT_H__
#define __AGENT_FLOW_EVENT_H__

#include <sys/resource.h>
#include <ksync/ksync_entry.h>
#include "flow_table.h"

class FlowTokenPool;

////////////////////////////////////////////////////////////////////////////
// Control events for flow management
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -216,4 +219,115 @@ class FlowEventKSync : public FlowEvent {
uint64_t evict_flow_oflow_;
};

////////////////////////////////////////////////////////////////////////////
// FlowProto uses following queues,
//
// - FlowEventQueue
// This queue contains events for flow add, flow eviction etc...
// See FlowProto::FlowEventHandler for events handled in this queue
// - KSyncFlowEventQueue
// This queue contains events generated from KSync response for a flow
// - DeleteFlowEventQueue
// This queue contains events generated for flow-ageing
// - UpdateFlowEventQueue
// This queue contains events generated as result of config changes such
// as add/delete/change of interface, vn, vm, acl, nh, route etc...
//
// All queues are defined from a base class FlowEventQueueBase.
// FlowEventQueueBase implements a wrapper around the WorkQueues with following
// additional functionality,
//
// - Rate Control using Tokens
// All the queues give above can potentially add/change/delete flows in the
// vrouter. So, the queues given above acts as producer and VRouter acts as
// consumer. VRouter is a slow consumer of events. To provide fairness
// across queues, a "token" based scheme is used. See flow_token.h for more
// information
//
// The queue will stop the WorkQueue when it runs out of tokens. The queue
// is started again after a minimum number of tokens become available
//
// - Time limits
// Intermittently, it is observed that some of the queues take large amount
// of time. Latencies in queue such as KSync queue or delete-queue can result
// in flow-setup latencies. So, we want to impose an upper bound on the
// amount of time taken in single run of WorkQueue.
//
// We take timestamp at start of queue, and check latency for every 8
// events processed in the queue. If the latency goes beyond a limit, the
// WorkQueue run is aborted.
////////////////////////////////////////////////////////////////////////////
class FlowEventQueueBase {
public:
typedef WorkQueue<FlowEvent *> Queue;

FlowEventQueueBase(FlowProto *proto, const std::string &name,
uint32_t task_id, int task_instance,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~FlowEventQueueBase();
virtual bool HandleEvent(FlowEvent *event) = 0;
virtual bool Handler(FlowEvent *event);

void Shutdown();
void Enqueue(FlowEvent *event);
bool TokenCheck();
bool TaskEntry();
void TaskExit(bool done);
void set_disable(bool val) { queue_->set_disable(val); }
uint32_t Length() { return queue_->Length(); }
void MayBeStartRunner() { queue_->MayBeStartRunner(); }
Queue *queue() const { return queue_; }

protected:
Queue *queue_;
FlowProto *flow_proto_;
FlowTokenPool *token_pool_;
uint64_t task_start_;
uint32_t count_;
uint16_t latency_limit_;
struct rusage rusage_;
};

class FlowEventQueue : public FlowEventQueueBase {
public:
FlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~FlowEventQueue();

bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};

class DeleteFlowEventQueue : public FlowEventQueueBase {
public:
DeleteFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~DeleteFlowEventQueue();

bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};

class KSyncFlowEventQueue : public FlowEventQueueBase {
public:
KSyncFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~KSyncFlowEventQueue();

bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};

class UpdateFlowEventQueue : public FlowEventQueueBase {
public:
UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~UpdateFlowEventQueue();

bool HandleEvent(FlowEvent *event);
};

#endif // __AGENT_FLOW_EVENT_H__

0 comments on commit 39ac2fe

Please sign in to comment.