From 145db6eebcf72fa07df0ce7c04992bcd2a72acbc Mon Sep 17 00:00:00 2001 From: Praveen K V Date: Mon, 9 May 2016 23:12:57 +0530 Subject: [PATCH] Support state-compression for flow delete events Move the PendingAction state-compression logic to FlowEvent handler module. Rename REVALUATE_FLOW to RECOPMUTE_FLOW Change FLOW_MESSAGE event to have FlowEntry as member Add UT for state-compression of delete events Change-Id: I5480c02c27385e1fa8c63820da1a09b24b7898bb Closes-Bug: #1578660 Partial-Bug: #1568126 Partial-Bug: #1572270 Partial-Bug: #1572471 --- src/vnsw/agent/pkt/flow_entry.cc | 83 +++++++++- src/vnsw/agent/pkt/flow_entry.h | 64 +++++++- src/vnsw/agent/pkt/flow_event.cc | 149 +++++++++++++++++- src/vnsw/agent/pkt/flow_event.h | 76 ++++----- src/vnsw/agent/pkt/flow_handler.cc | 5 +- src/vnsw/agent/pkt/flow_mgmt_request.h | 2 +- src/vnsw/agent/pkt/flow_proto.cc | 117 +++++--------- src/vnsw/agent/pkt/flow_proto.h | 22 ++- src/vnsw/agent/pkt/flow_table.cc | 51 +----- src/vnsw/agent/pkt/flow_table.h | 5 +- src/vnsw/agent/pkt/test/test_flow_base.cc | 16 ++ src/vnsw/agent/pkt/test/test_flow_update.cc | 79 ++++++++-- src/vnsw/agent/pkt/test/test_flow_util.h | 13 +- .../flow_stats/flow_stats_collector.cc | 7 +- .../agent/vrouter/ksync/flowtable_ksync.cc | 2 +- 15 files changed, 472 insertions(+), 219 deletions(-) diff --git a/src/vnsw/agent/pkt/flow_entry.cc b/src/vnsw/agent/pkt/flow_entry.cc index de7da0b18d7..7d9a24450a4 100644 --- a/src/vnsw/agent/pkt/flow_entry.cc +++ b/src/vnsw/agent/pkt/flow_entry.cc @@ -2034,18 +2034,91 @@ FlowPendingAction::~FlowPendingAction() { } void FlowPendingAction::Reset() { + delete_ = false; recompute_ = false; + recompute_dbentry_ = false; revaluate_ = false; } -void FlowPendingAction::SetRevaluate(bool val) { - revaluate_ = val; +bool FlowPendingAction::SetDelete() { + if (delete_) + return false; + + delete_ = true; + return true; +} + +void FlowPendingAction::ResetDelete() { + delete_ = false; + recompute_ = false; + recompute_dbentry_ = false; + revaluate_ = false; +} + +bool FlowPendingAction::CanDelete() { + return delete_; } -void FlowPendingAction::SetRecompute(bool val) { - recompute_ = val; - revaluate_ = val; +bool FlowPendingAction::SetRecompute() { + if (delete_ || recompute_) + return false; + + recompute_ = true; + return true; } + +void FlowPendingAction::ResetRecompute() { + recompute_ = false; + recompute_dbentry_ = false; + revaluate_ = false; +} + +bool FlowPendingAction::CanRecompute() { + if (delete_) + return false; + + return recompute_; +} + +bool FlowPendingAction::SetRecomputeDBEntry() { + if (delete_ || recompute_ || recompute_dbentry_) + return false; + + recompute_dbentry_ = true; + return true; +} + +void FlowPendingAction::ResetRecomputeDBEntry() { + recompute_dbentry_ = false; + revaluate_ = false; +} + +bool FlowPendingAction::CanRecomputeDBEntry() { + if (delete_ || recompute_) + return false; + + return recompute_dbentry_; +} + +bool FlowPendingAction::SetRevaluate() { + if (delete_ || recompute_ || recompute_dbentry_ || revaluate_) + return false; + + revaluate_ = true; + return true; +} + +void FlowPendingAction::ResetRevaluate() { + revaluate_ = false; +} + +bool FlowPendingAction::CanRevaluate() { + if (delete_ || recompute_ || recompute_dbentry_) + return false; + + return revaluate_; +} + ///////////////////////////////////////////////////////////////////////////// // Introspect routines ///////////////////////////////////////////////////////////////////////////// diff --git a/src/vnsw/agent/pkt/flow_entry.h b/src/vnsw/agent/pkt/flow_entry.h index 70fd33b0551..564f71b29f1 100644 --- a/src/vnsw/agent/pkt/flow_entry.h +++ b/src/vnsw/agent/pkt/flow_entry.h @@ -312,8 +312,42 @@ struct FlowEventLog { uint8_t vrouter_gen_id_; }; -// Structure to track pending actions on flow. This is used to state-compress -// multiple actions on a flow +// There are 4 actions supported, +// Flow recomputation goes thru 2 stages of processing, +// +// - recompute_dbentry_ : In this stage, flow is enqueued to flow-update-queue +// as a result of db-entry add/delete/change. +// - recompute_ : In this stage, flow is enqueued to flow-event-queue +// for recomputation of flow +// - delete_ : Specifies that delete action is pending on flow. +// - recompute_ : Specifies that flow is enqueued into flow-event-queue +// for recomputation. +// +// The actions have a priority, the higher priorty action overrides lower +// priority actions. The priority in decreasing order is, +// - delete_ +// - recompute_ +// - recompute_dbentry_ +// - revaluate_ +// +// The flags are also used for state-compression of objects. The state +// compression is acheived with, +// +// - Before Event Enqueue : +// Before enqueuing an event, the FlowEvent module checks if the +// corresponding action or higher priority action is pending. If so, the +// event is ignored. +// Note, if the lower priority event is pending, the higher priority event +// is still enqueued. The lower priority event is ignored later as given below +// +// - On Event dequeue : +// After dequeuing an event, FlowEvent module checks if a higher priority +// event is pending. If so, the current event is ignored. +// +// - Post Event processing: +// Once the event is processed, the corresponding action is cleared for both +// forward and reverse flows. Clearing an action also clears lower priority +// actions class FlowPendingAction { public: FlowPendingAction(); @@ -321,16 +355,30 @@ class FlowPendingAction { void Reset(); - void SetRevaluate(bool val); - bool revaluate() const { return revaluate_; } + bool CanDelete(); + bool SetDelete(); + void ResetDelete(); - void SetRecompute(bool val); - bool recompute() const { return recompute_; } + bool CanRecompute(); + bool SetRecompute(); + void ResetRecompute(); + + bool CanRecomputeDBEntry(); + bool SetRecomputeDBEntry(); + void ResetRecomputeDBEntry(); + + bool CanRevaluate(); + bool SetRevaluate(); + void ResetRevaluate(); private: - // Flow pending revaluation due to change in interface, vn, acl and nh - bool revaluate_; + // delete pending + bool delete_; // Flow pending complete recompute bool recompute_; + // Flow pending recompute-dbentry + bool recompute_dbentry_; + // Flow pending revaluation due to change in interface, vn, acl and nh + bool revaluate_; }; class FlowEntry { diff --git a/src/vnsw/agent/pkt/flow_event.cc b/src/vnsw/agent/pkt/flow_event.cc index 74199796c8e..7736891ea32 100644 --- a/src/vnsw/agent/pkt/flow_event.cc +++ b/src/vnsw/agent/pkt/flow_event.cc @@ -24,7 +24,7 @@ FlowEventQueueBase::FlowEventQueueBase(FlowProto *proto, FlowTokenPool *pool, uint16_t latency_limit) : flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0), - latency_limit_(latency_limit) { + events_processed_(0), latency_limit_(latency_limit) { queue_ = new Queue(task_id, task_instance, boost::bind(&FlowEventQueueBase::Handler, this, _1)); char buff[100]; @@ -49,6 +49,10 @@ void FlowEventQueueBase::Shutdown() { } void FlowEventQueueBase::Enqueue(FlowEvent *event) { + if (CanEnqueue(event) == false) { + delete event; + return; + } queue_->Enqueue(event); } @@ -72,11 +76,11 @@ void FlowEventQueueBase::TaskExit(bool done) { struct rusage r; getrusage(RUSAGE_THREAD, &r); - uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 100; - user += (r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec); + uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 1000; + user += ((r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec) / 1000); - uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 100; - sys += (r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec); + uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 1000; + sys += ((r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec) / 1000); LOG(ERROR, queue_->Description() << " Time exceeded " << ((t - task_start_) / 1000) @@ -87,8 +91,141 @@ void FlowEventQueueBase::TaskExit(bool done) { } bool FlowEventQueueBase::Handler(FlowEvent *event) { + std::auto_ptr event_ptr(event); count_++; - return HandleEvent(event); + if (CanProcess(event) == false) { + ProcessDone(event, false); + return true; + } + + HandleEvent(event); + + ProcessDone(event, true); + return true; +} + +bool FlowEventQueueBase::CanEnqueue(FlowEvent *event) { + FlowEntry *flow = event->flow(); + bool ret = true; + switch (event->event()) { + + case FlowEvent::DELETE_DBENTRY: + case FlowEvent::DELETE_FLOW: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + ret = flow->GetPendingAction()->SetDelete(); + break; + } + + // lock already token for the flow + case FlowEvent::FLOW_MESSAGE: { + ret = flow->GetPendingAction()->SetRecompute(); + break; + } + + case FlowEvent::RECOMPUTE_FLOW: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + ret = flow->GetPendingAction()->SetRecomputeDBEntry(); + break; + } + + case FlowEvent::REVALUATE_DBENTRY: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + ret = flow->GetPendingAction()->SetRevaluate(); + break; + } + + default: + break; + } + + return ret; +} + +bool FlowEventQueueBase::CanProcess(FlowEvent *event) { + FlowEntry *flow = event->flow(); + bool ret = true; + switch (event->event()) { + + case FlowEvent::DELETE_DBENTRY: + case FlowEvent::DELETE_FLOW: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + events_processed_++; + ret = flow->GetPendingAction()->CanDelete(); + break; + } + + case FlowEvent::FLOW_MESSAGE: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + events_processed_++; + ret = flow->GetPendingAction()->CanRecompute(); + break; + } + + case FlowEvent::RECOMPUTE_FLOW: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + events_processed_++; + ret = flow->GetPendingAction()->CanRecomputeDBEntry(); + break; + } + + case FlowEvent::REVALUATE_DBENTRY: { + events_processed_++; + tbb::mutex::scoped_lock mutext(flow->mutex()); + ret = flow->GetPendingAction()->CanRevaluate(); + break; + } + + default: + break; + } + + return ret; +} + +void FlowEventQueueBase::ProcessDone(FlowEvent *event, bool update_rev_flow) { + FlowEntry *flow = event->flow(); + FlowEntry *rflow = NULL; + if (flow && update_rev_flow) + rflow = flow->reverse_flow_entry(); + + switch (event->event()) { + + case FlowEvent::DELETE_DBENTRY: + case FlowEvent::DELETE_FLOW: { + FLOW_LOCK(flow, rflow, event->event()); + flow->GetPendingAction()->ResetDelete(); + if (rflow) + rflow->GetPendingAction()->ResetDelete(); + break; + } + + case FlowEvent::FLOW_MESSAGE: { + FLOW_LOCK(flow, rflow, event->event()); + flow->GetPendingAction()->ResetRecompute(); + if (rflow) + rflow->GetPendingAction()->ResetRecompute(); + break; + } + + case FlowEvent::RECOMPUTE_FLOW: { + tbb::mutex::scoped_lock mutext(flow->mutex()); + flow->GetPendingAction()->ResetRecomputeDBEntry(); + break; + } + + case FlowEvent::REVALUATE_DBENTRY: { + FLOW_LOCK(flow, rflow, event->event()); + flow->GetPendingAction()->ResetRevaluate(); + if (rflow) + rflow->GetPendingAction()->ResetRevaluate(); + break; + } + + default: + break; + } + + return; } FlowEventQueue::FlowEventQueue(Agent *agent, FlowProto *proto, diff --git a/src/vnsw/agent/pkt/flow_event.h b/src/vnsw/agent/pkt/flow_event.h index 1bf30dad518..7bda999da24 100644 --- a/src/vnsw/agent/pkt/flow_event.h +++ b/src/vnsw/agent/pkt/flow_event.h @@ -39,9 +39,9 @@ class FlowEvent { // The action typically invovles only re-evaluating ACL lookup actions REVALUATE_DBENTRY, // Add/Delete of route can result in flow using a next higher/lower - // prefix. The event will re-valuate the complete flow due to change + // prefix. The event will recompute the complete flow due to change // in route used for flow - REVALUATE_FLOW, + RECOMPUTE_FLOW, // Flow entry should be freed from kTaskFlowEvent task context. // Event to ensure flow entry is freed from right context FREE_FLOW_REF, @@ -60,87 +60,69 @@ class FlowEvent { FlowEvent() : event_(INVALID), flow_(NULL), pkt_info_(), db_entry_(NULL), - gen_id_(0), del_rev_flow_(false), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) { + gen_id_(0), flow_handle_(FlowEntry::kInvalidFlowHandle), + table_index_(0) { } FlowEvent(Event event) : event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL), - gen_id_(0), del_rev_flow_(false), table_index_(0) { + gen_id_(0), table_index_(0) { } FlowEvent(Event event, FlowEntry *flow) : event_(event), flow_(flow), pkt_info_(), db_entry_(NULL), - gen_id_(0), del_rev_flow_(false), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0){ + gen_id_(0), flow_handle_(FlowEntry::kInvalidFlowHandle), + table_index_(0) { } FlowEvent(Event event, FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id) : event_(event), flow_(flow), pkt_info_(), db_entry_(NULL), - gen_id_(gen_id), del_rev_flow_(false), flow_handle_(flow_handle), - table_index_(0) { + gen_id_(gen_id), flow_handle_(flow_handle), table_index_(0) { } FlowEvent(Event event, FlowEntry *flow, const DBEntry *db_entry) : event_(event), flow_(flow), pkt_info_(), db_entry_(db_entry), - gen_id_(0), del_rev_flow_(false), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) { + gen_id_(0), flow_handle_(FlowEntry::kInvalidFlowHandle), + table_index_(0) { } FlowEvent(Event event, const DBEntry *db_entry, uint32_t gen_id) : event_(event), flow_(NULL), pkt_info_(), db_entry_(db_entry), - gen_id_(gen_id), del_rev_flow_(false), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) { - } - - FlowEvent(Event event, const FlowKey &key, bool del_rev_flow, - uint32_t table_index) : - event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL), - gen_id_(0), flow_key_(key), del_rev_flow_(del_rev_flow), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(table_index) { + gen_id_(gen_id), flow_handle_(FlowEntry::kInvalidFlowHandle), + table_index_(0) { } FlowEvent(Event event, const FlowKey &key) : event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL), - gen_id_(0), flow_key_(key), del_rev_flow_(false), + gen_id_(0), flow_key_(key), flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) { } FlowEvent(Event event, const FlowKey &key, uint32_t flow_handle, uint8_t gen_id) : event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL), - gen_id_(gen_id), flow_key_(key), del_rev_flow_(false), - flow_handle_(flow_handle), table_index_(0) { - } - - FlowEvent(Event event, PktInfoPtr pkt_info) : - event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL), - gen_id_(0), flow_key_(), del_rev_flow_(), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(0) { + gen_id_(gen_id), flow_key_(key), flow_handle_(flow_handle), + table_index_(0) { } - FlowEvent(Event event, PktInfoPtr pkt_info, uint8_t table_index) : - event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL), - gen_id_(0), flow_key_(), del_rev_flow_(), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_(table_index) { + FlowEvent(Event event, PktInfoPtr pkt_info, FlowEntry *flow, + uint32_t table_index) : + event_(event), flow_(flow), pkt_info_(pkt_info), db_entry_(NULL), + gen_id_(0), flow_key_(), flow_handle_(FlowEntry::kInvalidFlowHandle), + table_index_(table_index) { } FlowEvent(const FlowEvent &rhs) : event_(rhs.event_), flow_(rhs.flow()), pkt_info_(rhs.pkt_info_), db_entry_(rhs.db_entry_), gen_id_(rhs.gen_id_), - flow_key_(rhs.flow_key_), del_rev_flow_(rhs.del_rev_flow_), - flow_handle_(rhs.flow_handle_), table_index_(rhs.table_index_) { + flow_key_(rhs.flow_key_), flow_handle_(rhs.flow_handle_), + table_index_(rhs.table_index_) { } - FlowEvent(Event event, FlowEntryPtr &flow) : - event_(event), flow_(flow), pkt_info_(), db_entry_(NULL), - gen_id_(0), flow_key_(), del_rev_flow_(), - flow_handle_(FlowEntry::kInvalidFlowHandle), table_index_() { + virtual ~FlowEvent() { } - virtual ~FlowEvent() { } - Event event() const { return event_; } FlowEntry *flow() const { return flow_.get(); } FlowEntryPtr &flow_ref() { return flow_; } @@ -149,7 +131,6 @@ class FlowEvent { void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; } uint32_t gen_id() const { return gen_id_; } const FlowKey &get_flow_key() const { return flow_key_; } - bool get_del_rev_flow() const { return del_rev_flow_; } PktInfoPtr pkt_info() const { return pkt_info_; } uint32_t flow_handle() const { return flow_handle_; } uint32_t table_index() const { return table_index_;} @@ -160,7 +141,6 @@ class FlowEvent { const DBEntry *db_entry_; uint32_t gen_id_; FlowKey flow_key_; - bool del_rev_flow_; uint32_t flow_handle_; uint32_t table_index_; }; @@ -277,13 +257,23 @@ class FlowEventQueueBase { uint32_t Length() { return queue_->Length(); } void MayBeStartRunner() { queue_->MayBeStartRunner(); } Queue *queue() const { return queue_; } + uint64_t events_processed() const { return events_processed_; } + uint64_t events_enqueued() const { return queue_->NumEnqueues(); } protected: + bool CanEnqueue(FlowEvent *event); + bool CanProcess(FlowEvent *event); + void ProcessDone(FlowEvent *event, bool update_rev_flow); + Queue *queue_; FlowProto *flow_proto_; FlowTokenPool *token_pool_; uint64_t task_start_; + // Number of entries processed in single run of WorkQueue uint32_t count_; + // Number of events processed. Skips event that are state-compressed + // due to Flow PendingActions + uint64_t events_processed_; uint16_t latency_limit_; struct rusage rusage_; }; diff --git a/src/vnsw/agent/pkt/flow_handler.cc b/src/vnsw/agent/pkt/flow_handler.cc index c4d5ea1d24b..39360f3873b 100644 --- a/src/vnsw/agent/pkt/flow_handler.cc +++ b/src/vnsw/agent/pkt/flow_handler.cc @@ -78,11 +78,8 @@ bool FlowHandler::Run() { // forward flow only take lock only on forward flow tbb::mutex::scoped_lock lock1(fe->mutex()); assert(flow_table_index_ == fe->flow_table()->table_index()); - if (fe->deleted() || fe->is_flags_set(FlowEntry::ShortFlow)) { - return true; - } - if (fe->is_flags_set(FlowEntry::ShortFlow)) { + if (fe->deleted() || fe->is_flags_set(FlowEntry::ShortFlow)) { return true; } diff --git a/src/vnsw/agent/pkt/flow_mgmt_request.h b/src/vnsw/agent/pkt/flow_mgmt_request.h index b576f4b044c..b2061a0cc7f 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_request.h +++ b/src/vnsw/agent/pkt/flow_mgmt_request.h @@ -88,7 +88,7 @@ class FlowMgmtRequest { const InetUnicastRouteEntry *rt = dynamic_cast(db_entry_); if (rt) { - resp_event = FlowEvent::REVALUATE_FLOW; + resp_event = FlowEvent::RECOMPUTE_FLOW; } return resp_event; diff --git a/src/vnsw/agent/pkt/flow_proto.cc b/src/vnsw/agent/pkt/flow_proto.cc index ed010a11f0a..aada16f4ba9 100644 --- a/src/vnsw/agent/pkt/flow_proto.cc +++ b/src/vnsw/agent/pkt/flow_proto.cc @@ -161,7 +161,7 @@ uint16_t FlowProto::FlowTableIndex(const IpAddress &sip, const IpAddress &dip, return (hash % (flow_event_queue_.size())); } -FlowHandler *FlowProto::AllocProtoHandler(boost::shared_ptr info, +FlowHandler *FlowProto::AllocProtoHandler(PktInfoPtr info, boost::asio::io_service &io) { uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr, info->ip_proto, info->sport, info->dport, @@ -205,11 +205,11 @@ FlowTable *FlowProto::GetFlowTable(const FlowKey &key, return flow_table_list_[index]; } -bool FlowProto::Enqueue(boost::shared_ptr msg) { +bool FlowProto::Enqueue(PktInfoPtr msg) { if (Validate(msg.get()) == false) { return true; } - EnqueueFlowEvent(new FlowEvent(FlowEvent::VROUTER_FLOW_MSG, msg)); + EnqueueFlowEvent(new FlowEvent(FlowEvent::VROUTER_FLOW_MSG, msg, NULL, 0)); return true; } @@ -289,13 +289,7 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { break; } - case FlowEvent::FLOW_MESSAGE: { - FlowTaskMsg *ipc = static_cast(event->pkt_info()->ipc); - FlowTable *table = ipc->fe_ptr.get()->flow_table(); - queue = flow_event_queue_[table->table_index()]; - break; - } - + case FlowEvent::FLOW_MESSAGE: case FlowEvent::EVICT_FLOW: case FlowEvent::FREE_FLOW_REF: { FlowEntry *flow = event->flow(); @@ -333,37 +327,22 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { } case FlowEvent::DELETE_FLOW: { - FlowTable *table = GetTable(event->table_index()); - queue = flow_delete_queue_[table->table_index()]; - break; - } - - case FlowEvent::REVALUATE_DBENTRY: { - FlowEntry *flow = event->flow(); - if (flow->flow_table()->SetRevaluatePending(flow)) { - queue = &flow_update_queue_; - } - break; - } - - case FlowEvent::REVALUATE_FLOW: { FlowEntry *flow = event->flow(); - if (flow->flow_table()->SetRecomputePending(flow)) { - queue = &flow_update_queue_; - } + queue = flow_delete_queue_[flow->flow_table()->table_index()]; break; } + case FlowEvent::FREE_DBENTRY: case FlowEvent::DELETE_DBENTRY: - case FlowEvent::FREE_DBENTRY: { + case FlowEvent::RECOMPUTE_FLOW: + case FlowEvent::REVALUATE_DBENTRY: { queue = &flow_update_queue_; break; } case FlowEvent::UNRESOLVED_FLOW_ENTRY: { - FlowEntry *flow = event->flow(); - FlowTable *table = flow->flow_table(); - flow_event_queue_[table->table_index()]->Enqueue(event); + FlowTable *table = event->flow()->flow_table(); + queue = flow_event_queue_[table->table_index()]; break; } default: @@ -371,18 +350,12 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) { break; } - if (queue) { - UpdateStats(event, &stats_); - queue->Enqueue(event); - } else { - delete event; - } - + UpdateStats(event, &stats_); + queue->Enqueue(event); return; } bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) { - std::auto_ptr req_ptr(req); // concurrency check to ensure all request are in right partitions assert(table->ConcurrencyCheck() == true); @@ -403,11 +376,20 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) { break; } - case FlowEvent::REENTRANT: - case FlowEvent::FLOW_MESSAGE: { + case FlowEvent::REENTRANT: { FlowHandler *handler = new FlowHandler(agent(), req->pkt_info(), io_, this, table->table_index()); RunProtoHandler(handler); + } + + case FlowEvent::FLOW_MESSAGE: { + FlowEntry *flow = req->flow(); + FlowTaskMsg *flow_msg = new FlowTaskMsg(flow); + PktInfoPtr pkt_info(new PktInfo(PktHandler::FLOW, flow_msg)); + FreeBuffer(pkt_info.get()); + FlowHandler *handler = new FlowHandler(agent(), pkt_info, io_, + this, table->table_index()); + RunProtoHandler(handler); break; } @@ -449,7 +431,6 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) { bool FlowProto::FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table) { FlowEventKSync *ksync_event = static_cast(req); - std::auto_ptr req_ptr(req); // concurrency check to ensure all request are in right partitions assert(table->ConcurrencyCheck() == true); @@ -476,8 +457,6 @@ bool FlowProto::FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table) { } bool FlowProto::FlowUpdateHandler(FlowEvent *req) { - std::auto_ptr req_ptr(req); - switch (req->event()) { case FlowEvent::FREE_DBENTRY: { FlowMgmtManager *mgr = agent()->pkt()->flow_mgmt_manager(); @@ -493,7 +472,7 @@ bool FlowProto::FlowUpdateHandler(FlowEvent *req) { break; } - case FlowEvent::REVALUATE_FLOW: { + case FlowEvent::RECOMPUTE_FLOW: { FlowEntry *flow = req->flow(); flow->flow_table()->ProcessFlowEvent(req, flow, flow->reverse_flow_entry()); @@ -510,7 +489,6 @@ bool FlowProto::FlowUpdateHandler(FlowEvent *req) { } bool FlowProto::FlowDeleteHandler(FlowEvent *req, FlowTable *table) { - std::auto_ptr req_ptr(req); // concurrency check to ensure all request are in right partitions // flow-update-queue doenst happen table pointer. Skip concurrency check // for flow-update-queue @@ -520,14 +498,8 @@ bool FlowProto::FlowDeleteHandler(FlowEvent *req, FlowTable *table) { switch (req->event()) { case FlowEvent::DELETE_FLOW: { - FlowEntry *flow = NULL; - FlowEntry *rflow = NULL; - table->PopulateFlowEntriesUsingKey(req->get_flow_key(), - req->get_del_rev_flow(), &flow, - &rflow); - if (flow == NULL) - break; - table->ProcessFlowEvent(req, flow, rflow); + FlowEntry *flow = req->flow(); + table->ProcessFlowEvent(req, flow, flow->reverse_flow_entry()); break; } @@ -543,10 +515,8 @@ bool FlowProto::FlowDeleteHandler(FlowEvent *req, FlowTable *table) { ////////////////////////////////////////////////////////////////////////////// // Utility methods to generate events ////////////////////////////////////////////////////////////////////////////// -void FlowProto::DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow, - uint32_t table_index) { - EnqueueFlowEvent(new FlowEvent(FlowEvent::DELETE_FLOW, flow_key, - del_rev_flow, table_index)); +void FlowProto::DeleteFlowRequest(FlowEntry *flow) { + EnqueueFlowEvent(new FlowEvent(FlowEvent::DELETE_FLOW, flow)); return; } @@ -583,10 +553,8 @@ void FlowProto::KSyncEventRequest(KSyncEntry *ksync_entry, evict_flow_packets, evict_flow_oflow)); } -void FlowProto::MessageRequest(InterTaskMsg *msg) { - boost::shared_ptr pkt_info(new PktInfo(PktHandler::FLOW, msg)); - FreeBuffer(pkt_info.get()); - EnqueueFlowEvent(new FlowEvent(FlowEvent::FLOW_MESSAGE, pkt_info)); +void FlowProto::MessageRequest(FlowEntry *flow) { + EnqueueFlowEvent(new FlowEvent(FlowEvent::FLOW_MESSAGE, flow)); return; } @@ -607,13 +575,18 @@ void FlowProto::ForceEnqueueFreeFlowReference(FlowEntryPtr &flow) { EnqueueFlowEvent(event); } -bool FlowProto::EnqueueReentrant(boost::shared_ptr msg, - uint8_t table_index) { +bool FlowProto::EnqueueReentrant(PktInfoPtr msg, uint8_t table_index) { EnqueueFlowEvent(new FlowEvent(FlowEvent::REENTRANT, - msg, table_index)); + msg, NULL, table_index)); return true; } +// Enqueue event to force revaluation of KSync entry +void FlowProto::EnqueueUnResolvedFlowEntry(FlowEntry *flow) { + FlowEvent *event = new FlowEvent(FlowEvent::UNRESOLVED_FLOW_ENTRY, flow); + EnqueueFlowEvent(event); +} + // Apply trace-filter for flow. Will not allow true-false transistions. // That is, if flows are already marked for tracing, action is retained bool FlowProto::ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow) { @@ -659,7 +632,7 @@ FlowTokenPtr FlowProto::GetToken(FlowEvent::Event event) { case FlowEvent::FLOW_MESSAGE: case FlowEvent::DELETE_DBENTRY: case FlowEvent::REVALUATE_DBENTRY: - case FlowEvent::REVALUATE_FLOW: + case FlowEvent::RECOMPUTE_FLOW: return update_tokens_.GetToken(NULL); break; @@ -702,12 +675,6 @@ void FlowProto::TokenAvailable(FlowTokenPool *pool) { } } -void FlowProto::EnqueueUnResolvedFlowEntry(FlowEntryPtr& flow) { - - FlowEvent *event = new FlowEvent(FlowEvent::UNRESOLVED_FLOW_ENTRY,flow); - EnqueueFlowEvent(event); -} - ////////////////////////////////////////////////////////////////////////////// // Set profile information ////////////////////////////////////////////////////////////////////////////// @@ -725,11 +692,11 @@ void UpdateStats(FlowEvent *req, FlowStats *stats) { case FlowEvent::AUDIT_FLOW: stats->audit_count_++; break; - case FlowEvent::REVALUATE_FLOW: - stats->revaluate_count_++; + case FlowEvent::RECOMPUTE_FLOW: + stats->recompute_count_++; break; case FlowEvent::REVALUATE_DBENTRY: - stats->recompute_count_++; + stats->revaluate_count_++; break; case FlowEvent::KSYNC_EVENT: { stats->vrouter_responses_++; diff --git a/src/vnsw/agent/pkt/flow_proto.h b/src/vnsw/agent/pkt/flow_proto.h index 8c03e743126..e0d9dc89d70 100644 --- a/src/vnsw/agent/pkt/flow_proto.h +++ b/src/vnsw/agent/pkt/flow_proto.h @@ -28,10 +28,16 @@ struct FlowStats { uint64_t vrouter_responses_; uint64_t vrouter_error_; + // Number of events actually processed + uint64_t delete_process_; + uint64_t revaluate_process_; + uint64_t recompute_process_; + FlowStats() : add_count_(0), delete_count_(0), flow_messages_(0), revaluate_count_(0), recompute_count_(0), audit_count_(0), - vrouter_responses_(0), vrouter_error_(0) { + vrouter_responses_(0), vrouter_error_(0), delete_process_(0), + revaluate_process_(0), recompute_process_(0) { } }; @@ -52,9 +58,9 @@ class FlowProto : public Proto { void FlushFlows(); bool Validate(PktInfo *msg); - FlowHandler *AllocProtoHandler(boost::shared_ptr info, + FlowHandler *AllocProtoHandler(PktInfoPtr info, boost::asio::io_service &io); - bool Enqueue(boost::shared_ptr msg); + bool Enqueue(PktInfoPtr msg); FlowEntry *Find(const FlowKey &key, uint32_t table_index) const; uint16_t FlowTableIndex(const IpAddress &sip, const IpAddress &dip, @@ -72,8 +78,7 @@ class FlowProto : public Proto { void EnqueueFlowEvent(FlowEvent *event); void ForceEnqueueFreeFlowReference(FlowEntryPtr &flow); - void DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow, - uint32_t table_index); + void DeleteFlowRequest(FlowEntry *flow); void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id); void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle, @@ -89,7 +94,7 @@ class FlowProto : public Proto { uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow); - void MessageRequest(InterTaskMsg *msg); + void MessageRequest(FlowEntry *flow); void DisableFlowEventQueue(uint32_t index, bool disabled); void DisableFlowUpdateQueue(bool disabled); @@ -110,13 +115,16 @@ class FlowProto : public Proto { void TokenAvailable(FlowTokenPool *pool); bool ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow); - void EnqueueUnResolvedFlowEntry(FlowEntryPtr &flow); + void EnqueueUnResolvedFlowEntry(FlowEntry *flow); bool TokenCheck(const FlowTokenPool *pool); + private: friend class SandeshIPv4FlowFilterRequest; friend class SandeshIPv6FlowFilterRequest; friend class SandeshShowFlowFilterRequest; friend class FlowTraceFilterTest; + friend class FlowUpdateTest; + friend class FlowTest; FlowTraceFilter *ipv4_trace_filter() { return &ipv4_trace_filter_; } FlowTraceFilter *ipv6_trace_filter() { return &ipv6_trace_filter_; } diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index c24fc3d2509..eeda28e6709 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -193,11 +193,6 @@ void FlowTable::AddInternal(FlowEntry *flow_req, FlowEntry *flow, if (rflow_req) rflow_req->set_reverse_flow_entry(NULL); - if (flow) - flow->GetPendingAction()->SetRecompute(false); - if (rflow) - rflow->GetPendingAction()->SetRecompute(false); - bool force_update_rflow = false; if (fwd_flow_update) { if (flow == NULL) @@ -514,8 +509,8 @@ boost::uuids::uuid FlowTable::rand_gen() { return rand_gen_(); } -// Enqueue message to revaluate a flow -void FlowTable::RevaluateFlow(FlowEntry *flow) { +// Enqueue message to recompute a flow +void FlowTable::RecomputeFlow(FlowEntry *flow) { if (flow->is_flags_set(FlowEntry::ShortFlow)) return; @@ -524,7 +519,7 @@ void FlowTable::RevaluateFlow(FlowEntry *flow) { flow = flow->reverse_flow_entry(); } - agent_->pkt()->get_flow_proto()->MessageRequest(new FlowTaskMsg(flow)); + agent_->pkt()->get_flow_proto()->MessageRequest(flow); } // Handle deletion of a Route. Flow management module has identified that route @@ -545,15 +540,6 @@ void FlowTable::EvictFlow(FlowEntry *flow, FlowEntry *reverse_flow) { void FlowTable::HandleRevaluateDBEntry(const DBEntry *entry, FlowEntry *flow, bool active_flow, bool deleted_flow) { - // Check if re-valuate still pending on flow - if (flow->GetPendingAction()->revaluate() == false) - return; - flow->GetPendingAction()->SetRevaluate(false); - - FlowEntry *rflow = flow->reverse_flow_entry(); - if (rflow) - rflow->GetPendingAction()->SetRevaluate(false); - // Ignore revluate of deleted/short flows if (flow->IsShortFlow()) return; @@ -561,6 +547,7 @@ void FlowTable::HandleRevaluateDBEntry(const DBEntry *entry, FlowEntry *flow, if (flow->deleted()) return; + FlowEntry *rflow = flow->reverse_flow_entry(); // Update may happen for reverse-flow. We act on both forward and // reverse-flow. Get both forward and reverse flows if (flow->is_flags_set(FlowEntry::ReverseFlow)) { @@ -778,7 +765,7 @@ bool FlowTable::ProcessFlowEvent(const FlowEvent *req, FlowEntry *flow, //Now process events. switch (req->event()) { case FlowEvent::DELETE_FLOW: { - DeleteUnLocked(req->get_del_rev_flow(), flow, rflow); + DeleteUnLocked(true, flow, rflow); break; } @@ -793,9 +780,9 @@ bool FlowTable::ProcessFlowEvent(const FlowEvent *req, FlowEntry *flow, break; } - case FlowEvent::REVALUATE_FLOW: { + case FlowEvent::RECOMPUTE_FLOW: { if (active_flow) - RevaluateFlow(flow); + RecomputeFlow(flow); break; } @@ -848,30 +835,6 @@ bool FlowTable::ProcessFlowEvent(const FlowEvent *req, FlowEntry *flow, return true; } -bool FlowTable::SetRecomputePending(FlowEntry *flow) { - tbb::mutex::scoped_lock mutext(flow->mutex()); - if (flow->deleted() || flow->IsShortFlow()) - return false; - - if (flow->GetPendingAction()->recompute()) - return false; - - flow->GetPendingAction()->SetRecompute(true); - return true; -} - -bool FlowTable::SetRevaluatePending(FlowEntry *flow) { - tbb::mutex::scoped_lock mutext(flow->mutex()); - if (flow->deleted() || flow->IsShortFlow()) - return false; - - if (flow->GetPendingAction()->revaluate()) - return false; - - flow->GetPendingAction()->SetRevaluate(true); - return true; -} - ///////////////////////////////////////////////////////////////////////////// // FlowEntryFreeList implementation ///////////////////////////////////////////////////////////////////////////// diff --git a/src/vnsw/agent/pkt/flow_table.h b/src/vnsw/agent/pkt/flow_table.h index 3912d235e51..e9550350cbc 100644 --- a/src/vnsw/agent/pkt/flow_table.h +++ b/src/vnsw/agent/pkt/flow_table.h @@ -218,7 +218,7 @@ class FlowTable { void SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, int ace_id); - void RevaluateFlow(FlowEntry *flow); + void RecomputeFlow(FlowEntry *flow); void DeleteMessage(FlowEntry *flow); void DeleteVrf(VrfEntry *vrf); @@ -243,9 +243,6 @@ class FlowTable { void PopulateFlowEntriesUsingKey(const FlowKey &key, bool reverse_flow, FlowEntry** flow, FlowEntry** rflow); - bool SetRecomputePending(FlowEntry *flow); - bool SetRevaluatePending(FlowEntry *flow); - // Concurrency check to ensure all flow-table and free-list manipulations // are done from FlowEvent task context only bool ConcurrencyCheck(); diff --git a/src/vnsw/agent/pkt/test/test_flow_base.cc b/src/vnsw/agent/pkt/test/test_flow_base.cc index cf33c687144..c7f52fd60bb 100644 --- a/src/vnsw/agent/pkt/test/test_flow_base.cc +++ b/src/vnsw/agent/pkt/test/test_flow_base.cc @@ -734,6 +734,22 @@ class FlowTest : public ::testing::Test { Agent *agent() {return agent_;} FlowProto *get_flow_proto() const { return flow_proto_; } + FlowEventQueue *GetFlowEventQueue(uint32_t table_index) { + return flow_proto_->flow_event_queue_[table_index]; + } + + DeleteFlowEventQueue *GetDeleteFlowEventQueue(uint32_t table_index) { + return flow_proto_->flow_delete_queue_[table_index]; + } + + KSyncFlowEventQueue *GetKSyncFlowEventQueue(uint32_t table_index) { + return flow_proto_->flow_ksync_queue_[table_index]; + } + + UpdateFlowEventQueue *GetUpdateFlowEventQueue() { + return &flow_proto_->flow_update_queue_; + } + protected: static bool ksync_init_; BgpPeer *peer_; diff --git a/src/vnsw/agent/pkt/test/test_flow_update.cc b/src/vnsw/agent/pkt/test/test_flow_update.cc index f2c8dda7a3f..f803ce44c47 100644 --- a/src/vnsw/agent/pkt/test/test_flow_update.cc +++ b/src/vnsw/agent/pkt/test/test_flow_update.cc @@ -39,6 +39,9 @@ class FlowUpdateTest : public FlowTest { strcpy(sg1_acl_name_, "sg_acl1" "egress-access-control-list"); strcpy(sg2_acl_name_, "sg_acl2" "egress-access-control-list"); + update_queue_ = GetUpdateFlowEventQueue(); + event_queue_ = GetFlowEventQueue(0); + delete_queue_ = GetDeleteFlowEventQueue(0); } virtual void TearDown() { @@ -62,6 +65,9 @@ class FlowUpdateTest : public FlowTest { char sg1_acl_name_[1024]; char sg2_acl_name_[1024]; const struct FlowStats *flow_stats_; + FlowEventQueue *event_queue_; + UpdateFlowEventQueue *update_queue_; + DeleteFlowEventQueue *delete_queue_; }; // Validate SG rule. This is basic test to ensure SG and and ACL rules @@ -135,10 +141,10 @@ TEST_F(FlowUpdateTest, sg_change_3) { client->WaitForIdle(); EXPECT_TRUE(flow->ActionSet(TrafficAction::DENY)); - uint64_t revaluate_count = flow_stats_->revaluate_count_; + uint64_t update_count = flow_stats_->revaluate_count_; DelLink("virtual-machine-interface", "flow5", "security-group", "sg1"); client->WaitForIdle(); - EXPECT_GT(flow_stats_->revaluate_count_, revaluate_count); + EXPECT_GE(flow_stats_->revaluate_count_, update_count); } // Change ACL linked to a VN @@ -180,7 +186,52 @@ TEST_F(FlowUpdateTest, multiple_change_1) { EXPECT_FALSE(flow->ActionSet(TrafficAction::DENY)); // Enable flow-update queue and validate statistics - uint64_t revaluate_count = flow_stats_->revaluate_count_; + uint64_t update_count = update_queue_->events_processed(); + uint64_t event_count = event_queue_->events_processed(); + + // Disable flow-update queue to ensure state compression + flow_proto_->DisableFlowUpdateQueue(true); + client->WaitForIdle(); + + // Update interface config to use sg2 + AddLink("virtual-machine-interface", "flow5", "security-group", "sg2"); + client->WaitForIdle(); + + // Update VN to use new ACL + AddAcl("acl1000", 1000, "vn6" , "vn6", "deny"); + AddLink("virtual-network", "vn6", "access-control-list", "acl1000"); + client->WaitForIdle(); + + // Enable flow-update queue and validate statistics + flow_proto_->DisableFlowUpdateQueue(false); + client->WaitForIdle(); + + // There can be 2 RECOMPUTE events - one for each flow + EXPECT_LE((update_count + 2), update_queue_->events_processed()); + // There should be atmost one FLOW_MESSAGE + EXPECT_LE((event_count + 1), event_queue_->events_processed()); + + // Delete the acl + DelLink("virtual-network", "vn6", "access-control-list", "acl1000"); + DelNode("access-control-list", "acl1000"); + client->WaitForIdle(); +} + +// Test for delete state-compressing pending changes for a flow-entry +TEST_F(FlowUpdateTest, multiple_change_delete_1) { + TxIpPacket(flow5->id(), vm_a_ip, vm_b_ip, 1); + client->WaitForIdle(); + EXPECT_EQ(2U, flow_proto_->FlowCount()); + + FlowEntry *flow = FlowGet(flow5->vrf()->vrf_id(), vm_a_ip, vm_b_ip, 1, 0, + 0, flow5->flow_key_nh()->id()); + EXPECT_TRUE(flow != NULL); + EXPECT_FALSE(flow->ActionSet(TrafficAction::DENY)); + + // Enable flow-update queue and validate statistics + uint64_t update_count = update_queue_->events_processed(); + + uint64_t delete_count = delete_queue_->events_processed(); // Disable flow-update queue to ensure state compression flow_proto_->DisableFlowUpdateQueue(true); @@ -189,28 +240,38 @@ TEST_F(FlowUpdateTest, multiple_change_1) { // Update interface config to use sg2 AddLink("virtual-machine-interface", "flow5", "security-group", "sg2"); client->WaitForIdle(); - // Ensure flow is enqueued for revaluation - EXPECT_EQ((revaluate_count + 2), flow_stats_->revaluate_count_); // Update VN to use new ACL AddAcl("acl1000", 1000, "vn6" , "vn6", "deny"); AddLink("virtual-network", "vn6", "access-control-list", "acl1000"); client->WaitForIdle(); - // Ensure flow is not enqueueed for revaluation, since the old enqueue is - // not yet processed - EXPECT_EQ((revaluate_count + 2), flow_stats_->revaluate_count_); // Enable flow-update queue and validate statistics flow_proto_->DisableFlowUpdateQueue(false); client->WaitForIdle(); // Ensure that only one recompute is done - EXPECT_EQ((revaluate_count + 2), flow_stats_->revaluate_count_); + EXPECT_LE((update_count + 2), update_queue_->events_processed()); + EXPECT_EQ((delete_count), delete_queue_->events_processed()); + + update_count = update_queue_->events_processed(); + // Disable flow-update queue before deleting ACL + flow_proto_->DisableFlowUpdateQueue(true); + client->WaitForIdle(); // Delete the acl DelLink("virtual-network", "vn6", "access-control-list", "acl1000"); DelNode("access-control-list", "acl1000"); client->WaitForIdle(); + + // Enable flow-update queue and validate statistics + flow_proto_->DisableFlowUpdateQueue(false); + client->WaitForIdle(); + + // Update queue has following events, + // - 2 DELETE_DBENTRY delete for flows + EXPECT_LE((update_count + 2), update_queue_->events_processed()); + EXPECT_EQ(delete_count, delete_queue_->events_processed()); } // Test flow deletion on ACL deletion diff --git a/src/vnsw/agent/pkt/test/test_flow_util.h b/src/vnsw/agent/pkt/test/test_flow_util.h index 23c674da81c..8aeed10f57d 100644 --- a/src/vnsw/agent/pkt/test/test_flow_util.h +++ b/src/vnsw/agent/pkt/test/test_flow_util.h @@ -237,14 +237,15 @@ class TestFlowPkt { class FlowDeleteTask : public Task { public: FlowDeleteTask(const FlowKey &key) : - Task(TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent), -1), + Task(TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent), 0), key_(key) {} virtual bool Run() { - Agent::GetInstance()->pkt()->get_flow_proto()-> - DeleteFlowRequest(key_, true, - Agent::GetInstance()->pkt()-> - get_flow_proto()->GetTable(0)-> - table_index()); + FlowProto *proto = Agent::GetInstance()->pkt()->get_flow_proto(); + FlowEntry *flow = proto->Find(key_, + proto->GetTable(0)->table_index()); + if (flow) { + proto->DeleteFlowRequest(flow); + } return true; } std::string Description() const { return "FlowDeleteTask"; } diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index dd8bc59d50d..e411e034abd 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -306,12 +306,7 @@ void FlowStatsCollector::UpdateStatsAndExportFlow(FlowExportInfo *info, void FlowStatsCollector::FlowDeleteEnqueue(FlowExportInfo *info, uint64_t t) { FlowEntry *fe = info->flow(); - uint16_t idx = 0; - if (fe->flow_table()) { - idx = fe->flow_table()->table_index(); - } - agent_uve_->agent()->pkt()->get_flow_proto()->DeleteFlowRequest(fe->key(), - true, idx); + agent_uve_->agent()->pkt()->get_flow_proto()->DeleteFlowRequest(fe); info->set_delete_enqueue_time(t); FlowEntry *rflow = info->reverse_flow(); if (rflow) { diff --git a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc index 3bc9d65613a..f4326783e2c 100644 --- a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc +++ b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc @@ -579,7 +579,7 @@ bool FlowTableKSyncObject::TimerExpiry() { count++; if (!flow_entry->deleted()) { FlowProto *proto = ksync()->agent()->pkt()->get_flow_proto(); - proto->EnqueueUnResolvedFlowEntry(flow); + proto->EnqueueUnResolvedFlowEntry(flow.get()); } } if (!unresolved_flow_list_.empty())