From 89a2b24d7fd3276b1408ac3d5c52f8569be89e9d Mon Sep 17 00:00:00 2001 From: Praveen K V Date: Thu, 5 May 2016 12:49:46 +0530 Subject: [PATCH] Define new class for FlowEvent queues 1. Define new class for FlowEvent queues. The new class makes it easier to add common functionality needed for all flow-event queues 2. Added functionality to log message if time taken for single run of a queue exceeds a threshold 3. Fix crash in UpdateStats. When FlowEvent is enqueued to work-queue, it can be processed and released before we hit UpdateStats. Ensure that UpdateStats is invoked before enqueuing FlowStats Conflicts: src/vnsw/agent/pkt/flow_proto.h Change-Id: I7fa217b1b200cb01202e7731d792318ea88f3f91 Closes-Bug: #1578660 Partial-Bug: #1568126 Partial-Bug: #1572270 Partial-Bug: #1572471 --- src/vnsw/agent/cmn/agent.h | 2 + src/vnsw/agent/init/agent_param.cc | 9 ++ src/vnsw/agent/init/agent_param.h | 6 ++ src/vnsw/agent/pkt/SConscript | 1 + src/vnsw/agent/pkt/flow_event.cc | 157 +++++++++++++++++++++++++++++ src/vnsw/agent/pkt/flow_event.h | 114 +++++++++++++++++++++ src/vnsw/agent/pkt/flow_proto.cc | 111 ++++++++------------ src/vnsw/agent/pkt/flow_proto.h | 23 ++--- src/vnsw/agent/pkt/flow_table.cc | 30 +++--- 9 files changed, 351 insertions(+), 102 deletions(-) create mode 100644 src/vnsw/agent/pkt/flow_event.cc diff --git a/src/vnsw/agent/cmn/agent.h b/src/vnsw/agent/cmn/agent.h index 384b131af7f..c363b7c7cd6 100644 --- a/src/vnsw/agent/cmn/agent.h +++ b/src/vnsw/agent/cmn/agent.h @@ -249,6 +249,8 @@ class Agent { static const uint32_t kDefaultFlowIndexSmLogCount = 0; // default number of threads for flow setup static const uint32_t kDefaultFlowThreadCount = 1; + // Log a message if latency in processing flow queue exceeds limit + static const uint32_t kDefaultFlowLatencyLimit = 0; // Max number of threads static const uint32_t kMaxTbbThreads = 8; static const uint32_t kDefaultTbbKeepawakeTimeout = (20); //time-millisecs diff --git a/src/vnsw/agent/init/agent_param.cc b/src/vnsw/agent/init/agent_param.cc index 962fc9237d8..1adebd7a8d3 100644 --- a/src/vnsw/agent/init/agent_param.cc +++ b/src/vnsw/agent/init/agent_param.cc @@ -519,6 +519,11 @@ void AgentParam::ParseFlows() { flow_thread_count_ = Agent::kDefaultFlowThreadCount; } + if (!GetValueFromTree(flow_latency_limit_, + "FLOWS.latency_limit")) { + flow_latency_limit_ = Agent::kDefaultFlowLatencyLimit; + } + if (!GetValueFromTree(flow_trace_enable_, "FLOWS.trace_enable")) { flow_trace_enable_ = true; } @@ -769,6 +774,8 @@ void AgentParam::ParseFlowArguments (const boost::program_options::variables_map &var_map) { GetOptValue(var_map, flow_thread_count_, "FLOWS.thread_count"); + GetOptValue(var_map, flow_latency_limit_, + "FLOWS.latency_limit"); GetOptValue(var_map, flow_trace_enable_, "FLOWS.trace_enable"); uint16_t val = 0; if (GetOptValue(var_map, val, "FLOWS.max_vm_flows")) { @@ -1186,6 +1193,7 @@ void AgentParam::LogConfig() const { LOG(DEBUG, "Linklocal Max Vm Flows : " << linklocal_vm_flows_); LOG(DEBUG, "Flow cache timeout : " << flow_cache_timeout_); LOG(DEBUG, "Flow thread count : " << flow_thread_count_); + LOG(DEBUG, "Flow latency limit : " << flow_latency_limit_); LOG(DEBUG, "Flow index-mgr sm log count : " << flow_index_sm_log_count_); if (agent_mode_ == VROUTER_AGENT) @@ -1310,6 +1318,7 @@ AgentParam::AgentParam(bool enable_flow_options, send_ratelimit_(sandesh_send_rate_limit()), flow_thread_count_(Agent::kDefaultFlowThreadCount), flow_trace_enable_(true), + flow_latency_limit_(Agent::kDefaultFlowLatencyLimit), subnet_hosts_resolvable_(true), tbb_thread_count_(Agent::kMaxTbbThreads), tbb_exec_delay_(0), diff --git a/src/vnsw/agent/init/agent_param.h b/src/vnsw/agent/init/agent_param.h index 1867abcfebd..fd53fe2db70 100644 --- a/src/vnsw/agent/init/agent_param.h +++ b/src/vnsw/agent/init/agent_param.h @@ -254,6 +254,11 @@ class AgentParam { bool flow_trace_enable() const { return flow_trace_enable_; } void set_flow_trace_enable(bool val) { flow_trace_enable_ = val; } + uint16_t flow_task_latency_limit() const { return flow_latency_limit_; } + void set_flow_task_latency_limit(uint16_t count) { + flow_latency_limit_ = count; + } + uint32_t tbb_thread_count() const { return tbb_thread_count_; } uint32_t tbb_exec_delay() const { return tbb_exec_delay_; } uint32_t tbb_schedule_delay() const { return tbb_schedule_delay_; } @@ -471,6 +476,7 @@ class AgentParam { uint32_t send_ratelimit_; uint16_t flow_thread_count_; bool flow_trace_enable_; + uint16_t flow_latency_limit_; bool subnet_hosts_resolvable_; std::string bgp_as_a_service_port_range_; std::vector bgp_as_a_service_port_range_value_; diff --git a/src/vnsw/agent/pkt/SConscript b/src/vnsw/agent/pkt/SConscript index 2cc59b10fe4..a3e342ddab0 100644 --- a/src/vnsw/agent/pkt/SConscript +++ b/src/vnsw/agent/pkt/SConscript @@ -21,6 +21,7 @@ sandesh_objs = AgentEnv.BuildExceptionCppObj(env, SandeshGenSrcs) pkt_srcs = [ 'flow_entry.cc', + 'flow_event.cc', 'flow_table.cc', 'flow_token.cc', 'flow_handler.cc', diff --git a/src/vnsw/agent/pkt/flow_event.cc b/src/vnsw/agent/pkt/flow_event.cc new file mode 100644 index 00000000000..74199796c8e --- /dev/null +++ b/src/vnsw/agent/pkt/flow_event.cc @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include "vrouter/flow_stats/flow_stats_collector.h" +#include "flow_proto.h" +#include "flow_mgmt_dbclient.h" +#include "flow_mgmt.h" +#include "flow_event.h" + +////////////////////////////////////////////////////////////////////////////// +// FlowEventQueue routines +////////////////////////////////////////////////////////////////////////////// +FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto, + const std::string &name, + uint32_t task_id, int task_instance, + FlowTokenPool *pool, + uint16_t latency_limit) : + flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0), + latency_limit_(latency_limit) { + queue_ = new Queue(task_id, task_instance, + boost::bind(&FlowEventQueueBase::Handler, this, _1)); + char buff[100]; + sprintf(buff, "%s-%d", name.c_str(), task_instance); + queue_->set_name(buff); + queue_->SetStartRunnerFunc(boost::bind(&FlowEventQueueBase::TokenCheck, + this)); + if (latency_limit_) { + queue_->SetEntryCallback(boost::bind(&FlowEventQueueBase::TaskEntry, + this)); + queue_->SetExitCallback(boost::bind(&FlowEventQueueBase::TaskExit, + this, _1)); + } +} + +FlowEventQueueBase::~FlowEventQueueBase() { + delete queue_; +} + +void FlowEventQueueBase::Shutdown() { + queue_->Shutdown(); +} + +void FlowEventQueueBase::Enqueue(FlowEvent *event) { + queue_->Enqueue(event); +} + +bool FlowEventQueueBase::TokenCheck() { + return flow_proto_->TokenCheck(token_pool_); +} + +bool FlowEventQueueBase::TaskEntry() { + count_ = 0; + task_start_ = ClockMonotonicUsec(); + getrusage(RUSAGE_THREAD, &rusage_); + return true; +} + +void FlowEventQueueBase::TaskExit(bool done) { + if (task_start_ == 0) + return; + + uint64_t t = ClockMonotonicUsec(); + if (((t - task_start_) / 1000) >= latency_limit_) { + struct rusage r; + getrusage(RUSAGE_THREAD, &r); + + uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 100; + user += (r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec); + + uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 100; + sys += (r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec); + + LOG(ERROR, queue_->Description() + << " Time exceeded " << ((t - task_start_) / 1000) + << " Count " << count_ + << " User " << user << " Sys " << sys); + } + return; +} + +bool FlowEventQueueBase::Handler(FlowEvent *event) { + count_++; + return HandleEvent(event); +} + +FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto, + FlowTable *table, FlowTokenPool *pool, + uint16_t latency_limit) : + FlowEventQueueBase(proto, "Flow Event Queue", + agent->task_scheduler()->GetTaskId(kTaskFlowEvent), + table->table_index(), pool, latency_limit), + flow_table_(table) { +} + +FlowEventQueue::~FlowEventQueue() { +} + +bool FlowEventQueue::HandleEvent(FlowEvent *event) { + return flow_proto_->FlowEventHandler(event, flow_table_); +} + +DeleteFlowEventQueue::DeleteFlowEventQueue(Agent *agent, FlowProto *proto, + FlowTable *table, + FlowTokenPool *pool, + uint16_t latency_limit) : + FlowEventQueueBase(proto, "Flow Delete Queue", + agent->task_scheduler()->GetTaskId(kTaskFlowEvent), + table->table_index(), pool, latency_limit), + flow_table_(table) { +} + +DeleteFlowEventQueue::~DeleteFlowEventQueue() { +} + +bool DeleteFlowEventQueue::HandleEvent(FlowEvent *event) { + return flow_proto_->FlowDeleteHandler(event, flow_table_); +} + +KSyncFlowEventQueue::KSyncFlowEventQueue(Agent *agent, FlowProto *proto, + FlowTable *table, + FlowTokenPool *pool, + uint16_t latency_limit) : + FlowEventQueueBase(proto, "Flow KSync Queue", + agent->task_scheduler()->GetTaskId(kTaskFlowEvent), + table->table_index(), pool, latency_limit), + flow_table_(table) { +} + +KSyncFlowEventQueue::~KSyncFlowEventQueue() { +} + +bool KSyncFlowEventQueue::HandleEvent(FlowEvent *event) { + return flow_proto_->FlowKSyncMsgHandler(event, flow_table_); +} + +UpdateFlowEventQueue::UpdateFlowEventQueue(Agent *agent, FlowProto *proto, + FlowTokenPool *pool, + uint16_t latency_limit) : + FlowEventQueueBase(proto, "Flow Update Queue", + agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0, + pool, latency_limit) { +} + +UpdateFlowEventQueue::~UpdateFlowEventQueue() { +} + +bool UpdateFlowEventQueue::HandleEvent(FlowEvent *event) { + return flow_proto_->FlowUpdateHandler(event); +} diff --git a/src/vnsw/agent/pkt/flow_event.h b/src/vnsw/agent/pkt/flow_event.h index 4302267f2c3..1bf30dad518 100644 --- a/src/vnsw/agent/pkt/flow_event.h +++ b/src/vnsw/agent/pkt/flow_event.h @@ -4,9 +4,12 @@ #ifndef __AGENT_FLOW_EVENT_H__ #define __AGENT_FLOW_EVENT_H__ +#include #include #include "flow_table.h" +class FlowTokenPool; + //////////////////////////////////////////////////////////////////////////// // Control events for flow management //////////////////////////////////////////////////////////////////////////// @@ -216,4 +219,115 @@ class FlowEventKSync : public FlowEvent { uint64_t evict_flow_oflow_; }; +//////////////////////////////////////////////////////////////////////////// +// FlowProto uses following queues, +// +// - FlowEventQueue +// This queue contains events for flow add, flow eviction etc... +// See FlowProto::FlowEventHandler for events handled in this queue +// - KSyncFlowEventQueue +// This queue contains events generated from KSync response for a flow +// - DeleteFlowEventQueue +// This queue contains events generated for flow-ageing +// - UpdateFlowEventQueue +// This queue contains events generated as result of config changes such +// as add/delete/change of interface, vn, vm, acl, nh, route etc... +// +// All queues are defined from a base class FlowEventQueueBase. +// FlowEventQueueBase implements a wrapper around the WorkQueues with following +// additional functionality, +// +// - Rate Control using Tokens +// All the queues give above can potentially add/change/delete flows in the +// vrouter. So, the queues given above acts as producer and VRouter acts as +// consumer. VRouter is a slow consumer of events. To provide fairness +// across queues, a "token" based scheme is used. See flow_token.h for more +// information +// +// The queue will stop the WorkQueue when it runs out of tokens. The queue +// is started again after a minimum number of tokens become available +// +// - Time limits +// Intermittently, it is observed that some of the queues take large amount +// of time. Latencies in queue such as KSync queue or delete-queue can result +// in flow-setup latencies. So, we want to impose an upper bound on the +// amount of time taken in single run of WorkQueue. +// +// We take timestamp at start of queue, and check latency for every 8 +// events processed in the queue. If the latency goes beyond a limit, the +// WorkQueue run is aborted. +//////////////////////////////////////////////////////////////////////////// +class FlowEventQueueBase { +public: + typedef WorkQueue Queue; + + FlowEventQueueBase(FlowProto *proto, const std::string &name, + uint32_t task_id, int task_instance, + FlowTokenPool *pool, uint16_t latency_limit); + virtual ~FlowEventQueueBase(); + virtual bool HandleEvent(FlowEvent *event) = 0; + virtual bool Handler(FlowEvent *event); + + void Shutdown(); + void Enqueue(FlowEvent *event); + bool TokenCheck(); + bool TaskEntry(); + void TaskExit(bool done); + void set_disable(bool val) { queue_->set_disable(val); } + uint32_t Length() { return queue_->Length(); } + void MayBeStartRunner() { queue_->MayBeStartRunner(); } + Queue *queue() const { return queue_; } + +protected: + Queue *queue_; + FlowProto *flow_proto_; + FlowTokenPool *token_pool_; + uint64_t task_start_; + uint32_t count_; + uint16_t latency_limit_; + struct rusage rusage_; +}; + +class FlowEventQueue : public FlowEventQueueBase { +public: + FlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, + FlowTokenPool *pool, uint16_t latency_limit); + virtual ~FlowEventQueue(); + + bool HandleEvent(FlowEvent *event); +private: + FlowTable *flow_table_; +}; + +class DeleteFlowEventQueue : public FlowEventQueueBase { +public: + DeleteFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, + FlowTokenPool *pool, uint16_t latency_limit); + virtual ~DeleteFlowEventQueue(); + + bool HandleEvent(FlowEvent *event); +private: + FlowTable *flow_table_; +}; + +class KSyncFlowEventQueue : public FlowEventQueueBase { +public: + KSyncFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, + FlowTokenPool *pool, uint16_t latency_limit); + virtual ~KSyncFlowEventQueue(); + + bool HandleEvent(FlowEvent *event); +private: + FlowTable *flow_table_; +}; + +class UpdateFlowEventQueue : public FlowEventQueueBase { +public: + UpdateFlowEventQueue(Agent *agent, FlowProto *proto, + FlowTokenPool *pool, uint16_t latency_limit); + virtual ~UpdateFlowEventQueue(); + + bool HandleEvent(FlowEvent *event); +}; + #endif // __AGENT_FLOW_EVENT_H__ diff --git a/src/vnsw/agent/pkt/flow_proto.cc b/src/vnsw/agent/pkt/flow_proto.cc index 1552b3ad360..ed010a11f0a 100644 --- a/src/vnsw/agent/pkt/flow_proto.cc +++ b/src/vnsw/agent/pkt/flow_proto.cc @@ -19,16 +19,13 @@ static void UpdateStats(FlowEvent *event, FlowStats *stats); FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) : Proto(agent, kTaskFlowEvent, PktHandler::FLOW, io), - flow_update_queue_(agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0, - boost::bind(&FlowProto::FlowUpdateHandler, this, _1)), - use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(), add_tokens_("Add Tokens", this, kFlowAddTokens), del_tokens_("Delete Tokens", this, kFlowDelTokens), update_tokens_("Update Tokens", this, kFlowUpdateTokens), + flow_update_queue_(agent, this, &update_tokens_, + agent->params()->flow_task_latency_limit()), + use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(), stats_() { - flow_update_queue_.set_name("Flow update queue"); - flow_update_queue_.SetStartRunnerFunc(boost::bind(&FlowProto::TokenCheck, - this, &update_tokens_)); agent->SetFlowProto(this); set_trace(false); uint16_t table_count = agent->flow_thread_count(); @@ -37,36 +34,19 @@ FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) : flow_table_list_.push_back(new FlowTable(agent_, i)); } - TaskScheduler *scheduler = agent_->task_scheduler(); - uint32_t task_id = scheduler->GetTaskId(kTaskFlowEvent); for (uint32_t i = 0; i < table_count; i++) { - char name[128]; - sprintf(name, "Flow Event Queue-%d", i); - FlowEventQueue *queue = - new FlowEventQueue(task_id, i, - boost::bind(&FlowProto::FlowEventHandler, this, - _1, flow_table_list_[i])); - queue->set_name(name); - queue->SetStartRunnerFunc(boost::bind(&FlowProto::TokenCheck, this, - &add_tokens_)); - flow_event_queue_.push_back(queue); - - sprintf(name, "Flow Delete Queue-%d", i); - queue = new FlowEventQueue(task_id, i, - boost::bind(&FlowProto::FlowDeleteHandler, - this, _1, flow_table_list_[i])); - queue->set_name(name); - queue->SetStartRunnerFunc(boost::bind(&FlowProto::TokenCheck, this, - &del_tokens_)); - flow_delete_queue_.push_back(queue); - - sprintf(name, "Flow KSync Queue-%d", i); - queue = new FlowEventQueue(task_id, i, - boost::bind(&FlowProto::FlowKSyncMsgHandler, - this, _1, flow_table_list_[i]), - queue->kMaxSize, queue->kMaxIterations*2); - queue->set_name(name); - flow_ksync_queue_.push_back(queue); + uint16_t latency = agent->params()->flow_task_latency_limit(); + flow_event_queue_.push_back + (new FlowEventQueue(agent, this, flow_table_list_[i], + &add_tokens_, latency)); + + flow_delete_queue_.push_back + (new DeleteFlowEventQueue(agent, this, flow_table_list_[i], + &del_tokens_, latency)); + + flow_ksync_queue_.push_back + (new KSyncFlowEventQueue(agent, this, flow_table_list_[i], + &add_tokens_, latency)); } if (::getenv("USE_VROUTER_HASH") != NULL) { string opt = ::getenv("USE_VROUTER_HASH"); @@ -297,7 +277,7 @@ bool FlowProto::UpdateFlow(FlowEntry *flow) { // Flow Control Event routines ///////////////////////////////////////////////////////////////////////////// void FlowProto::EnqueueFlowEvent(FlowEvent *event) { - bool enqueue_done = true; + FlowEventQueueBase *queue = NULL; switch (event->event()) { case FlowEvent::VROUTER_FLOW_MSG: { PktInfo *info = event->pkt_info().get(); @@ -305,14 +285,14 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { info->ip_proto, info->sport, info->dport, info->agent_hdr.cmd_param); - flow_event_queue_[index]->Enqueue(event); + queue = flow_event_queue_[index]; break; } case FlowEvent::FLOW_MESSAGE: { FlowTaskMsg *ipc = static_cast(event->pkt_info()->ipc); FlowTable *table = ipc->fe_ptr.get()->flow_table(); - flow_event_queue_[table->table_index()]->Enqueue(event); + queue = flow_event_queue_[table->table_index()]; break; } @@ -320,20 +300,20 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { case FlowEvent::FREE_FLOW_REF: { FlowEntry *flow = event->flow(); FlowTable *table = flow->flow_table(); - flow_event_queue_[table->table_index()]->Enqueue(event); + queue = flow_event_queue_[table->table_index()]; break; } case FlowEvent::AUDIT_FLOW: { FlowTable *table = GetFlowTable(event->get_flow_key(), event->flow_handle()); - flow_event_queue_[table->table_index()]->Enqueue(event); + queue = flow_event_queue_[table->table_index()]; break; } case FlowEvent::GROW_FREE_LIST: { FlowTable *table = GetTable(event->table_index()); - flow_event_queue_[table->table_index()]->Enqueue(event); + queue = flow_event_queue_[table->table_index()]; break; } @@ -342,29 +322,26 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { FlowTableKSyncObject *ksync_obj = static_cast (ksync_event->ksync_entry()->GetObject()); FlowTable *table = ksync_obj->flow_table(); - flow_ksync_queue_[table->table_index()]->Enqueue(event); + queue = flow_ksync_queue_[table->table_index()]; break; } case FlowEvent::REENTRANT: { uint32_t index = event->table_index(); - flow_event_queue_[index]->Enqueue(event); + queue = flow_event_queue_[index]; break; } case FlowEvent::DELETE_FLOW: { FlowTable *table = GetTable(event->table_index()); - flow_delete_queue_[table->table_index()]->Enqueue(event); + queue = flow_delete_queue_[table->table_index()]; break; } case FlowEvent::REVALUATE_DBENTRY: { FlowEntry *flow = event->flow(); if (flow->flow_table()->SetRevaluatePending(flow)) { - flow_update_queue_.Enqueue(event); - } else { - enqueue_done = false; - delete event; + queue = &flow_update_queue_; } break; } @@ -372,17 +349,14 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { case FlowEvent::REVALUATE_FLOW: { FlowEntry *flow = event->flow(); if (flow->flow_table()->SetRecomputePending(flow)) { - flow_update_queue_.Enqueue(event); - } else { - enqueue_done = false; - delete event; + queue = &flow_update_queue_; } break; } case FlowEvent::DELETE_DBENTRY: case FlowEvent::FREE_DBENTRY: { - flow_update_queue_.Enqueue(event); + queue = &flow_update_queue_; break; } @@ -397,9 +371,13 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { break; } - // Keep UpdateStats in-sync on add of new events - if (enqueue_done) + if (queue) { UpdateStats(event, &stats_); + queue->Enqueue(event); + } else { + delete event; + } + return; } @@ -460,17 +438,6 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) { break; } - // Flow was waiting for an index. Index is available now. Retry acquiring - // the index - case FlowEvent::KSYNC_EVENT: { - FlowEventKSync *ksync_event = static_cast(req); - FlowTableKSyncEntry *ksync_entry = - (static_cast (ksync_event->ksync_entry())); - FlowEntry *flow = ksync_entry->flow_entry().get(); - table->ProcessFlowEvent(req, flow, flow->reverse_flow_entry()); - break; - } - default: { assert(0); break; @@ -488,6 +455,8 @@ bool FlowProto::FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table) { assert(table->ConcurrencyCheck() == true); switch (req->event()) { + // Flow was waiting for an index. Index is available now. Retry acquiring + // the index case FlowEvent::KSYNC_EVENT: { FlowTableKSyncEntry *ksync_entry = (static_cast (ksync_event->ksync_entry())); @@ -774,7 +743,7 @@ void UpdateStats(FlowEvent *req, FlowStats *stats) { } } -static void SetFlowEventQueueStats(const FlowProto::FlowEventQueue *queue, +static void SetFlowEventQueueStats(const FlowEventQueueBase::Queue *queue, ProfileData::WorkQueueStats *stats) { stats->name_ = queue->Description(); stats->queue_count_ = queue->Length(); @@ -823,14 +792,14 @@ void FlowProto::SetProfileData(ProfileData *data) { data->flow_.flow_delete_queue_.resize(flow_table_list_.size()); data->flow_.flow_ksync_queue_.resize(flow_table_list_.size()); for (uint16_t i = 0; i < flow_table_list_.size(); i++) { - SetFlowEventQueueStats(flow_event_queue_[i], + SetFlowEventQueueStats(flow_event_queue_[i]->queue(), &data->flow_.flow_event_queue_[i]); - SetFlowEventQueueStats(flow_delete_queue_[i], + SetFlowEventQueueStats(flow_delete_queue_[i]->queue(), &data->flow_.flow_delete_queue_[i]); - SetFlowEventQueueStats(flow_ksync_queue_[i], + SetFlowEventQueueStats(flow_ksync_queue_[i]->queue(), &data->flow_.flow_ksync_queue_[i]); } - SetFlowEventQueueStats(&flow_update_queue_, + SetFlowEventQueueStats(flow_update_queue_.queue(), &data->flow_.flow_update_queue_); const PktHandler::PktHandlerQueue *pkt_queue = pkt->pkt_handler()->work_queue(); diff --git a/src/vnsw/agent/pkt/flow_proto.h b/src/vnsw/agent/pkt/flow_proto.h index ae0f961dbfe..0d5e4a91ec8 100644 --- a/src/vnsw/agent/pkt/flow_proto.h +++ b/src/vnsw/agent/pkt/flow_proto.h @@ -37,20 +37,12 @@ struct FlowStats { class FlowProto : public Proto { public: - typedef WorkQueue FlowEventQueue; static const int kMinTableCount = 1; static const int kMaxTableCount = 16; static const int kFlowAddTokens = 800; static const int kFlowDelTokens = 800; static const int kFlowUpdateTokens = 400; - enum OperationType { - INVALID_OP, - ADD, - UPDATE, - DELETE - }; - FlowProto(Agent *agent, boost::asio::io_service &io); virtual ~FlowProto(); @@ -116,8 +108,8 @@ class FlowProto : public Proto { uint8_t table_index); FlowTokenPtr GetToken(FlowEvent::Event event); void TokenAvailable(FlowTokenPool *pool); + bool TokenCheck(const FlowTokenPool *pool); void EnqueueUnResolvedFlowEntry(FlowEntryPtr &flow); - bool ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow); private: friend class SandeshIPv4FlowFilterRequest; @@ -128,20 +120,19 @@ class FlowProto : public Proto { FlowTraceFilter *ipv6_trace_filter() { return &ipv6_trace_filter_; } bool ProcessFlowEvent(const FlowEvent &req, FlowTable *table); - bool TokenCheck(const FlowTokenPool *pool); + FlowTokenPool add_tokens_; + FlowTokenPool del_tokens_; + FlowTokenPool update_tokens_; std::vector flow_event_queue_; - std::vector flow_delete_queue_; - std::vector flow_ksync_queue_; + std::vector flow_delete_queue_; + std::vector flow_ksync_queue_; std::vector flow_table_list_; - FlowEventQueue flow_update_queue_; + UpdateFlowEventQueue flow_update_queue_; tbb::atomic linklocal_flow_count_; bool use_vrouter_hash_; FlowTraceFilter ipv4_trace_filter_; FlowTraceFilter ipv6_trace_filter_; - FlowTokenPool add_tokens_; - FlowTokenPool del_tokens_; - FlowTokenPool update_tokens_; FlowStats stats_; }; diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index 28c98083e49..85a3b2315ab 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -220,21 +220,6 @@ void FlowTable::AddInternal(FlowEntry *flow_req, FlowEntry *flow, flow->set_deleted(false); } - if (flow) { - if (fwd_flow_update) { - flow->set_last_event(FlowEvent::FLOW_MESSAGE); - } else { - flow->set_last_event(FlowEvent::VROUTER_FLOW_MSG); - } - } - if (rflow) { - if (rev_flow_update) { - rflow->set_last_event(FlowEvent::FLOW_MESSAGE); - } else { - rflow->set_last_event(FlowEvent::VROUTER_FLOW_MSG); - } - } - if (rflow) { if (rflow_req != rflow) { Copy(rflow, rflow_req, (rev_flow_update || force_update_rflow)); @@ -257,6 +242,21 @@ void FlowTable::AddInternal(FlowEntry *flow_req, FlowEntry *flow, } } + if (flow) { + if (fwd_flow_update) { + flow->set_last_event(FlowEvent::FLOW_MESSAGE); + } else { + flow->set_last_event(FlowEvent::VROUTER_FLOW_MSG); + } + } + if (rflow) { + if (rev_flow_update) { + rflow->set_last_event(FlowEvent::FLOW_MESSAGE); + } else { + rflow->set_last_event(FlowEvent::VROUTER_FLOW_MSG); + } + } + // If the flows are already present, we want to retain the Forward and // Reverse flow characteristics for flow. // We have following conditions,