Skip to content

Commit

Permalink
Define new class for FlowEvent queues
Browse files Browse the repository at this point in the history
1. Define new class for FlowEvent queues. The new class makes it easier to
   add common functionality needed for all flow-event queues
2. Added functionality to log message if time taken for single run of a
   queue exceeds a threshold
3. Fix crash in UpdateStats. When FlowEvent is enqueued to work-queue,
    it can be processed and released before we hit UpdateStats. Ensure
    that UpdateStats is invoked before enqueuing FlowStats

Change-Id: I7fa217b1b200cb01202e7731d792318ea88f3f91
Closes-Bug: #1578660
Partial-Bug: #1568126
Partial-Bug: #1572270
Partial-Bug: #1572471
  • Loading branch information
praveenkv committed May 6, 2016
1 parent c9fabc4 commit 333d3e4
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 101 deletions.
2 changes: 2 additions & 0 deletions src/vnsw/agent/cmn/agent.h
Expand Up @@ -250,6 +250,8 @@ class Agent {
static const uint32_t kDefaultFlowIndexSmLogCount = 0;
// default number of threads for flow setup
static const uint32_t kDefaultFlowThreadCount = 1;
// Log a message if latency in processing flow queue exceeds limit
static const uint32_t kDefaultFlowLatencyLimit = 0;
// Max number of threads
static const uint32_t kMaxTbbThreads = 8;
static const uint32_t kDefaultTbbKeepawakeTimeout = (20); //time-millisecs
Expand Down
9 changes: 9 additions & 0 deletions src/vnsw/agent/init/agent_param.cc
Expand Up @@ -519,6 +519,11 @@ void AgentParam::ParseFlows() {
flow_thread_count_ = Agent::kDefaultFlowThreadCount;
}

if (!GetValueFromTree<uint16_t>(flow_latency_limit_,
"FLOWS.latency_limit")) {
flow_latency_limit_ = Agent::kDefaultFlowLatencyLimit;
}

if (!GetValueFromTree<bool>(flow_trace_enable_, "FLOWS.trace_enable")) {
flow_trace_enable_ = true;
}
Expand Down Expand Up @@ -769,6 +774,8 @@ void AgentParam::ParseFlowArguments
(const boost::program_options::variables_map &var_map) {
GetOptValue<uint16_t>(var_map, flow_thread_count_,
"FLOWS.thread_count");
GetOptValue<uint16_t>(var_map, flow_latency_limit_,
"FLOWS.latency_limit");
GetOptValue<bool>(var_map, flow_trace_enable_, "FLOWS.trace_enable");
uint16_t val = 0;
if (GetOptValue<uint16_t>(var_map, val, "FLOWS.max_vm_flows")) {
Expand Down Expand Up @@ -1186,6 +1193,7 @@ void AgentParam::LogConfig() const {
LOG(DEBUG, "Linklocal Max Vm Flows : " << linklocal_vm_flows_);
LOG(DEBUG, "Flow cache timeout : " << flow_cache_timeout_);
LOG(DEBUG, "Flow thread count : " << flow_thread_count_);
LOG(DEBUG, "Flow latency limit : " << flow_latency_limit_);
LOG(DEBUG, "Flow index-mgr sm log count : " << flow_index_sm_log_count_);

if (agent_mode_ == VROUTER_AGENT)
Expand Down Expand Up @@ -1310,6 +1318,7 @@ AgentParam::AgentParam(bool enable_flow_options,
send_ratelimit_(sandesh_send_rate_limit()),
flow_thread_count_(Agent::kDefaultFlowThreadCount),
flow_trace_enable_(true),
flow_latency_limit_(Agent::kDefaultFlowLatencyLimit),
subnet_hosts_resolvable_(true),
tbb_thread_count_(Agent::kMaxTbbThreads),
tbb_exec_delay_(0),
Expand Down
6 changes: 6 additions & 0 deletions src/vnsw/agent/init/agent_param.h
Expand Up @@ -254,6 +254,11 @@ class AgentParam {
bool flow_trace_enable() const { return flow_trace_enable_; }
void set_flow_trace_enable(bool val) { flow_trace_enable_ = val; }

uint16_t flow_task_latency_limit() const { return flow_latency_limit_; }
void set_flow_task_latency_limit(uint16_t count) {
flow_latency_limit_ = count;
}

uint32_t tbb_thread_count() const { return tbb_thread_count_; }
uint32_t tbb_exec_delay() const { return tbb_exec_delay_; }
uint32_t tbb_schedule_delay() const { return tbb_schedule_delay_; }
Expand Down Expand Up @@ -471,6 +476,7 @@ class AgentParam {
uint32_t send_ratelimit_;
uint16_t flow_thread_count_;
bool flow_trace_enable_;
uint16_t flow_latency_limit_;
bool subnet_hosts_resolvable_;
std::string bgp_as_a_service_port_range_;
std::vector<uint16_t> bgp_as_a_service_port_range_value_;
Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/pkt/SConscript
Expand Up @@ -21,6 +21,7 @@ sandesh_objs = AgentEnv.BuildExceptionCppObj(env, SandeshGenSrcs)

pkt_srcs = [
'flow_entry.cc',
'flow_event.cc',
'flow_table.cc',
'flow_token.cc',
'flow_handler.cc',
Expand Down
157 changes: 157 additions & 0 deletions src/vnsw/agent/pkt/flow_event.cc
@@ -0,0 +1,157 @@
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/
#include <net/address_util.h>
#include <boost/functional/hash.hpp>
#include <init/agent_param.h>
#include <cmn/agent_stats.h>
#include <oper/agent_profile.h>
#include <vrouter/ksync/flowtable_ksync.h>
#include <vrouter/ksync/ksync_init.h>
#include <vrouter/ksync/ksync_flow_index_manager.h>
#include "vrouter/flow_stats/flow_stats_collector.h"
#include "flow_proto.h"
#include "flow_mgmt_dbclient.h"
#include "flow_mgmt.h"
#include "flow_event.h"

//////////////////////////////////////////////////////////////////////////////
// FlowEventQueue routines
//////////////////////////////////////////////////////////////////////////////
FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto,
const std::string &name,
uint32_t task_id, int task_instance,
FlowTokenPool *pool,
uint16_t latency_limit) :
flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
latency_limit_(latency_limit) {
queue_ = new Queue(task_id, task_instance,
boost::bind(&FlowEventQueueBase::Handler, this, _1));
char buff[100];
sprintf(buff, "%s-%d", name.c_str(), task_instance);
queue_->set_name(buff);
queue_->SetStartRunnerFunc(boost::bind(&FlowEventQueueBase::TokenCheck,
this));
if (latency_limit_) {
queue_->SetEntryCallback(boost::bind(&FlowEventQueueBase::TaskEntry,
this));
queue_->SetExitCallback(boost::bind(&FlowEventQueueBase::TaskExit,
this, _1));
}
}

FlowEventQueueBase::~FlowEventQueueBase() {
delete queue_;
}

void FlowEventQueueBase::Shutdown() {
queue_->Shutdown();
}

void FlowEventQueueBase::Enqueue(FlowEvent *event) {
queue_->Enqueue(event);
}

bool FlowEventQueueBase::TokenCheck() {
return flow_proto_->TokenCheck(token_pool_);
}

bool FlowEventQueueBase::TaskEntry() {
count_ = 0;
task_start_ = ClockMonotonicUsec();
getrusage(RUSAGE_THREAD, &rusage_);
return true;
}

void FlowEventQueueBase::TaskExit(bool done) {
if (task_start_ == 0)
return;

uint64_t t = ClockMonotonicUsec();
if (((t - task_start_) / 1000) >= latency_limit_) {
struct rusage r;
getrusage(RUSAGE_THREAD, &r);

uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 100;
user += (r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec);

uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 100;
sys += (r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec);

LOG(ERROR, queue_->Description()
<< " Time exceeded " << ((t - task_start_) / 1000)
<< " Count " << count_
<< " User " << user << " Sys " << sys);
}
return;
}

bool FlowEventQueueBase::Handler(FlowEvent *event) {
count_++;
return HandleEvent(event);
}

FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto,
FlowTable *table, FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow Event Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
table->table_index(), pool, latency_limit),
flow_table_(table) {
}

FlowEventQueue::~FlowEventQueue() {
}

bool FlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowEventHandler(event, flow_table_);
}

DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTable *table,
FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow Delete Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
table->table_index(), pool, latency_limit),
flow_table_(table) {
}

DeleteFlowEventQueue::~DeleteFlowEventQueue() {
}

bool DeleteFlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowDeleteHandler(event, flow_table_);
}

KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTable *table,
FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow KSync Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
table->table_index(), pool, latency_limit),
flow_table_(table) {
}

KSyncFlowEventQueue::~KSyncFlowEventQueue() {
}

bool KSyncFlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowKSyncMsgHandler(event, flow_table_);
}

UpdateFlowEventQueue::UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTokenPool *pool,
uint16_t latency_limit) :
FlowEventQueueBase(proto, "Flow Update Queue",
agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
pool, latency_limit) {
}

UpdateFlowEventQueue::~UpdateFlowEventQueue() {
}

bool UpdateFlowEventQueue::HandleEvent(FlowEvent *event) {
return flow_proto_->FlowUpdateHandler(event);
}
114 changes: 114 additions & 0 deletions src/vnsw/agent/pkt/flow_event.h
Expand Up @@ -4,9 +4,12 @@
#ifndef __AGENT_FLOW_EVENT_H__
#define __AGENT_FLOW_EVENT_H__

#include <sys/resource.h>
#include <ksync/ksync_entry.h>
#include "flow_table.h"

class FlowTokenPool;

////////////////////////////////////////////////////////////////////////////
// Control events for flow management
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -216,4 +219,115 @@ class FlowEventKSync : public FlowEvent {
uint64_t evict_flow_oflow_;
};

////////////////////////////////////////////////////////////////////////////
// FlowProto uses following queues,
//
// - FlowEventQueue
// This queue contains events for flow add, flow eviction etc...
// See FlowProto::FlowEventHandler for events handled in this queue
// - KSyncFlowEventQueue
// This queue contains events generated from KSync response for a flow
// - DeleteFlowEventQueue
// This queue contains events generated for flow-ageing
// - UpdateFlowEventQueue
// This queue contains events generated as result of config changes such
// as add/delete/change of interface, vn, vm, acl, nh, route etc...
//
// All queues are defined from a base class FlowEventQueueBase.
// FlowEventQueueBase implements a wrapper around the WorkQueues with following
// additional functionality,
//
// - Rate Control using Tokens
// All the queues give above can potentially add/change/delete flows in the
// vrouter. So, the queues given above acts as producer and VRouter acts as
// consumer. VRouter is a slow consumer of events. To provide fairness
// across queues, a "token" based scheme is used. See flow_token.h for more
// information
//
// The queue will stop the WorkQueue when it runs out of tokens. The queue
// is started again after a minimum number of tokens become available
//
// - Time limits
// Intermittently, it is observed that some of the queues take large amount
// of time. Latencies in queue such as KSync queue or delete-queue can result
// in flow-setup latencies. So, we want to impose an upper bound on the
// amount of time taken in single run of WorkQueue.
//
// We take timestamp at start of queue, and check latency for every 8
// events processed in the queue. If the latency goes beyond a limit, the
// WorkQueue run is aborted.
////////////////////////////////////////////////////////////////////////////
class FlowEventQueueBase {
public:
typedef WorkQueue<FlowEvent *> Queue;

FlowEventQueueBase(FlowProto *proto, const std::string &name,
uint32_t task_id, int task_instance,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~FlowEventQueueBase();
virtual bool HandleEvent(FlowEvent *event) = 0;
virtual bool Handler(FlowEvent *event);

void Shutdown();
void Enqueue(FlowEvent *event);
bool TokenCheck();
bool TaskEntry();
void TaskExit(bool done);
void set_disable(bool val) { queue_->set_disable(val); }
uint32_t Length() { return queue_->Length(); }
void MayBeStartRunner() { queue_->MayBeStartRunner(); }
Queue *queue() const { return queue_; }

protected:
Queue *queue_;
FlowProto *flow_proto_;
FlowTokenPool *token_pool_;
uint64_t task_start_;
uint32_t count_;
uint16_t latency_limit_;
struct rusage rusage_;
};

class FlowEventQueue : public FlowEventQueueBase {
public:
FlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~FlowEventQueue();

bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};

class DeleteFlowEventQueue : public FlowEventQueueBase {
public:
DeleteFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~DeleteFlowEventQueue();

bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};

class KSyncFlowEventQueue : public FlowEventQueueBase {
public:
KSyncFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~KSyncFlowEventQueue();

bool HandleEvent(FlowEvent *event);
private:
FlowTable *flow_table_;
};

class UpdateFlowEventQueue : public FlowEventQueueBase {
public:
UpdateFlowEventQueue(Agent *agent, FlowProto *proto,
FlowTokenPool *pool, uint16_t latency_limit);
virtual ~UpdateFlowEventQueue();

bool HandleEvent(FlowEvent *event);
};

#endif // __AGENT_FLOW_EVENT_H__

0 comments on commit 333d3e4

Please sign in to comment.