diff --git a/src/db/db_table_partition.cc b/src/db/db_table_partition.cc index 3c79dee8725..26ade1a00b0 100644 --- a/src/db/db_table_partition.cc +++ b/src/db/db_table_partition.cc @@ -134,7 +134,7 @@ DBEntry *DBTablePartition::FindInternal(const DBEntry *entry) { return NULL; } DBEntry *DBTablePartition::FindNoLock(const DBEntry *entry) { - CHECK_CONCURRENCY("db::DBTable", "Agent::FlowHandler", "Agent::FlowTable"); + CHECK_CONCURRENCY("db::DBTable", "Agent::FlowEvent", "Agent::FlowUpdate"); return FindInternal(entry); } @@ -144,7 +144,7 @@ DBEntry *DBTablePartition::Find(const DBEntry *entry) { } DBEntry *DBTablePartition::FindNoLock(const DBRequestKey *key) { - CHECK_CONCURRENCY("db::DBTable", "Agent::FlowHandler", "Agent::FlowTable"); + CHECK_CONCURRENCY("db::DBTable", "Agent::FlowEvent", "Agent::FlowUpdate"); DBTable *table = static_cast(parent()); std::auto_ptr entry_ptr = table->AllocEntry(key); return FindInternal(entry_ptr.get()); diff --git a/src/ksync/ksync_object.cc b/src/ksync/ksync_object.cc index d5dc7c6a264..a2416363e13 100644 --- a/src/ksync/ksync_object.cc +++ b/src/ksync/ksync_object.cc @@ -208,15 +208,11 @@ void KSyncObject::Delete(KSyncEntry *entry) { SafeNotifyEvent(entry, KSyncEntry::DEL_REQ); } -void KSyncObject::InsertToTree(KSyncEntry *entry) { - tbb::recursive_mutex::scoped_lock lock(lock_); - assert(entry->GetRefCount() > 0); - tree_.insert(*entry); -} - -void KSyncObject::RemoveFromTree(KSyncEntry *entry) { +void KSyncObject::ChangeKey(KSyncEntry *entry, uint32_t arg) { tbb::recursive_mutex::scoped_lock lock(lock_); assert(tree_.erase(*entry) > 0); + UpdateKey(entry, arg); + tree_.insert(*entry); } void KSyncObject::FreeInd(KSyncEntry *entry, uint32_t index) { diff --git a/src/ksync/ksync_object.h b/src/ksync/ksync_object.h index 2b9bbd56335..987ff4dd225 100644 --- a/src/ksync/ksync_object.h +++ b/src/ksync/ksync_object.h @@ -152,8 +152,8 @@ class KSyncObject { // Big lock on the tree // TODO: Make this more fine granular tbb::recursive_mutex lock_; - void InsertToTree(KSyncEntry* entry); - void RemoveFromTree(KSyncEntry* entry); + void ChangeKey(KSyncEntry *entry, uint32_t arg); + virtual void UpdateKey(KSyncEntry *entry, uint32_t arg) { } private: friend class KSyncEntry; diff --git a/src/vnsw/agent/cmn/agent.cc b/src/vnsw/agent/cmn/agent.cc index 979e32a87fd..49daa2bddad 100644 --- a/src/vnsw/agent/cmn/agent.cc +++ b/src/vnsw/agent/cmn/agent.cc @@ -101,8 +101,9 @@ void Agent::SetAgentTaskPolicy() { initialized = true; const char *db_exclude_list[] = { - "Agent::FlowTable", - "Agent::FlowHandler", + kTaskFlowEvent, + kTaskFlowUpdate, + kTaskFlowAudit, "Agent::Services", "Agent::StatsCollector", "sandesh::RecvQueue", @@ -117,22 +118,17 @@ void Agent::SetAgentTaskPolicy() { const char *flow_table_exclude_list[] = { AGENT_INIT_TASKNAME }; - SetTaskPolicyOne("Agent::FlowTable", flow_table_exclude_list, + SetTaskPolicyOne(kTaskFlowEvent, flow_table_exclude_list, sizeof(flow_table_exclude_list) / sizeof(char *)); const char *flow_exclude_list[] = { - "Agent::StatsCollector", - "io::ReaderTask", - "Agent::PktFlowResponder", AGENT_INIT_TASKNAME }; - SetTaskPolicyOne("Agent::FlowHandler", flow_exclude_list, + SetTaskPolicyOne(kTaskFlowUpdate, flow_exclude_list, sizeof(flow_exclude_list) / sizeof(char *)); const char *sandesh_exclude_list[] = { "db::DBTable", - "Agent::FlowTable", - "Agent::FlowHandler", "Agent::Services", "Agent::StatsCollector", "io::ReaderTask", @@ -143,8 +139,6 @@ void Agent::SetAgentTaskPolicy() { sizeof(sandesh_exclude_list) / sizeof(char *)); const char *xmpp_config_exclude_list[] = { - "Agent::FlowTable", - "Agent::FlowHandler", "Agent::Services", "Agent::StatsCollector", "sandesh::RecvQueue", @@ -176,8 +170,6 @@ void Agent::SetAgentTaskPolicy() { sizeof(walk_cancel_exclude_list) / sizeof(char *)); const char *ksync_exclude_list[] = { - "Agent::FlowTable", - "Agent::FlowHandler", "Agent::StatsCollector", "db::DBTable", "Agent::PktFlowResponder", @@ -623,8 +615,8 @@ bool Agent::isVmwareVcenterMode() const { void Agent::ConcurrencyCheck() { if (test_mode_) { CHECK_CONCURRENCY("db::DBTable", "Agent::KSync", AGENT_INIT_TASKNAME, - "Flow::Management", "Agent::FlowHandler", - "Agent::FlowTable"); + "Flow::Management", kTaskFlowUpdate, + kTaskFlowEvent); } } diff --git a/src/vnsw/agent/cmn/agent.h b/src/vnsw/agent/cmn/agent.h index 6c80b07539d..f321e6d4a1a 100644 --- a/src/vnsw/agent/cmn/agent.h +++ b/src/vnsw/agent/cmn/agent.h @@ -200,6 +200,10 @@ extern void RouterIdDepInit(Agent *agent); #define VROUTER_SERVER_PORT 20914 +#define kTaskFlowUpdate "Agent::FlowUpdate" +#define kTaskFlowEvent "Agent::FlowEvent" +#define kTaskFlowAudit "KSync::FlowAudit" + class Agent { public: static const uint32_t kDefaultMaxLinkLocalOpenFds = 2048; diff --git a/src/vnsw/agent/oper/test/test_find_scale.cc b/src/vnsw/agent/oper/test/test_find_scale.cc index 81451dbc5d2..f2ac6083a5e 100644 --- a/src/vnsw/agent/oper/test/test_find_scale.cc +++ b/src/vnsw/agent/oper/test/test_find_scale.cc @@ -219,7 +219,7 @@ class ScaleTask : public Task { public: ScaleTask(VlanTable *table, bool use_key, bool do_lock, int thread_count, uint32_t find_count) : - Task(TaskScheduler::GetInstance()->GetTaskId("Agent::FlowTable"), -1), + Task(TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent), -1), table_(table), do_lock_(do_lock), use_key_(use_key), thread_count_(thread_count), find_count_(find_count) { } @@ -244,7 +244,7 @@ class ScaleTask : public Task { // Find routine tests TEST_F(DBTest, Find) { - ConcurrencyScope scope("Agent::FlowTable"); + ConcurrencyScope scope(kTaskFlowEvent); VlanTableReqKey key(101); EXPECT_TRUE(table_->Find(&key) != NULL); EXPECT_TRUE(table_->FindNoLock(&key) != NULL); @@ -256,7 +256,7 @@ TEST_F(DBTest, Find) { // Find routine tests TEST_F(DBTest, VlanScaleNoTask) { - ConcurrencyScope scope("Agent::FlowTable"); + ConcurrencyScope scope(kTaskFlowEvent); uint32_t count = FIND_COUNT; uint64_t key_with_lock = vlan_table_->FindScale(101, count,true, true); @@ -275,7 +275,7 @@ TEST_F(DBTest, VlanScaleNoTask) { } TEST_F(DBTest, VlanScaleTask) { - ConcurrencyScope scope("Agent::FlowTable"); + ConcurrencyScope scope(kTaskFlowEvent); scale_count_ = 0; uint32_t count = FIND_COUNT/4; @@ -339,7 +339,7 @@ class ScaleInterfaceTask : public Task { public: ScaleInterfaceTask(DBTable *table, bool use_key, bool do_lock, int thread_count, uint32_t find_count) : - Task(TaskScheduler::GetInstance()->GetTaskId("Agent::FlowTable"), -1), + Task(TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent), -1), table_(table), do_lock_(do_lock), use_key_(use_key), thread_count_(thread_count), find_count_(find_count) { } @@ -362,7 +362,7 @@ class ScaleInterfaceTask : public Task { }; TEST_F(DBTest, ScaleVmInterface) { - ConcurrencyScope scope("Agent::FlowHandler"); + ConcurrencyScope scope(kTaskFlowUpdate); DBTable *table = static_cast(Agent::GetInstance()->interface_table()); uint32_t count = FIND_COUNT; @@ -383,7 +383,7 @@ TEST_F(DBTest, ScaleVmInterface) { } TEST_F(DBTest, ScaleTaskVmInterface) { - ConcurrencyScope scope("Agent::FlowHandler"); + ConcurrencyScope scope(kTaskFlowUpdate); scale_count_ = 0; uint32_t count = FIND_COUNT/4; DBTable *table = diff --git a/src/vnsw/agent/pkt/flow_entry.cc b/src/vnsw/agent/pkt/flow_entry.cc index 37296775908..977be3f4dda 100644 --- a/src/vnsw/agent/pkt/flow_entry.cc +++ b/src/vnsw/agent/pkt/flow_entry.cc @@ -44,7 +44,7 @@ #include #include #include -#include +#include #include #include #include @@ -454,9 +454,7 @@ void FlowEntry::set_flow_handle(uint32_t flow_handle) { // Skip ksync index manipulation, for deleted flow entry // as ksync entry is not available for deleted flow if (!deleted_ && flow_handle_ == kInvalidFlowHandle) { - flow_table_->RemoveFromKSyncTree(this); - ksync_entry_->set_hash_id(flow_handle); - flow_table_->AddToKSyncTree(this); + flow_table_->UpdateFlowHandle(this, flow_handle); } flow_handle_ = flow_handle; flow_table_->UpdateKSync(this, true); diff --git a/src/vnsw/agent/pkt/flow_event.h b/src/vnsw/agent/pkt/flow_event.h new file mode 100644 index 00000000000..937df490d7f --- /dev/null +++ b/src/vnsw/agent/pkt/flow_event.h @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. + */ +#ifndef __AGENT_FLOW_EVENT_H__ +#define __AGENT_FLOW_EVENT_H__ + +#include "flow_table.h" + +//////////////////////////////////////////////////////////////////////////// +// Control events for flow management +//////////////////////////////////////////////////////////////////////////// +class FlowEvent { +public: + enum Event { + INVALID, + // Flow add message from VRouter + VROUTER_FLOW_MSG, + // Event to delete a flow entry + DELETE_FLOW, + // Event by audit module to delete a flow + AUDIT_FLOW, + // Revaluate flow due to deletion of a DBEntry. Other than for INET + // routes, delete of a DBEntry will result in deletion of flows using + // the DBEntry + DELETE_DBENTRY, + // Revaluate route due to change in a DBEntry. This event is used to + // revaluate a flow on add/change of interface, vm, vn etc... + // The action typically invovles only re-evaluating ACL lookup actions + REVALUATE_DBENTRY, + // Add/Delete of route can result in flow using a next higher/lower + // prefix. The event will re-valuate the complete flow due to change + // in route used for flow + REVALUATE_FLOW, + // Flow entry should be freed from kTaskFlowEvent task context. + // Event to ensure flow entry is freed from right context + FREE_FLOW_REF, + // A DBEntry should be freed from kTaskFlowEvent task context. + // Event to ensure DBEntry entry reference is freed from right context + FREE_DBENTRY + }; + + FlowEvent() : + event_(INVALID), flow_(NULL), pkt_info_(), db_entry_(NULL), + gen_id_(0), del_rev_flow_(false) { + } + + FlowEvent(Event event, FlowEntry *flow) : + event_(event), flow_(flow), pkt_info_(), db_entry_(NULL), + gen_id_(0), del_rev_flow_(false) { + } + + FlowEvent(Event event, FlowEntry *flow, const DBEntry *db_entry) : + event_(event), flow_(flow), pkt_info_(), db_entry_(db_entry), + gen_id_(0), del_rev_flow_(false) { + } + + FlowEvent(Event event, const DBEntry *db_entry, uint32_t gen_id) : + event_(event), flow_(NULL), pkt_info_(), db_entry_(db_entry), + gen_id_(gen_id), del_rev_flow_(false) { + } + + FlowEvent(Event event, const FlowKey &key, bool del_rev_flow) : + event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL), + gen_id_(0), flow_key_(key), del_rev_flow_(del_rev_flow) { + } + + FlowEvent(Event event, PktInfoPtr pkt_info) : + event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL), + gen_id_(0), flow_key_(), del_rev_flow_() { + } + + FlowEvent(const FlowEvent &rhs) : + event_(rhs.event_), flow_(rhs.flow()), pkt_info_(rhs.pkt_info_), + db_entry_(rhs.db_entry_), gen_id_(rhs.gen_id_), + flow_key_(rhs.flow_key_), del_rev_flow_(rhs.del_rev_flow_) { + } + + virtual ~FlowEvent() { } + + Event event() const { return event_; } + FlowEntry *flow() const { return flow_.get(); } + void set_flow(FlowEntry *flow) { flow_ = flow; } + const DBEntry *db_entry() const { return db_entry_; } + void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; } + uint32_t gen_id() const { return gen_id_; } + const FlowKey &get_flow_key() const { return flow_key_; } + bool get_del_rev_flow() const { return del_rev_flow_; } + PktInfoPtr pkt_info() const { return pkt_info_; } + +private: + Event event_; + FlowEntryPtr flow_; + PktInfoPtr pkt_info_; + const DBEntry *db_entry_; + uint32_t gen_id_; + FlowKey flow_key_; + bool del_rev_flow_; +}; + +#endif // __AGENT_FLOW_EVENT_H__ diff --git a/src/vnsw/agent/pkt/flow_handler.h b/src/vnsw/agent/pkt/flow_handler.h index 62484ab623d..2dcde31a8c5 100644 --- a/src/vnsw/agent/pkt/flow_handler.h +++ b/src/vnsw/agent/pkt/flow_handler.h @@ -8,7 +8,6 @@ #include "pkt/proto_handler.h" static const std::string unknown_vn_ = "__UNKNOWN__"; -static const std::string kFlowHandlerTask = "Agent::FlowHandler"; struct PktInfo; class PktFlowInfo; diff --git a/src/vnsw/agent/pkt/flow_mgmt.cc b/src/vnsw/agent/pkt/flow_mgmt.cc index 393025efb18..77c9d2317a3 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.cc +++ b/src/vnsw/agent/pkt/flow_mgmt.cc @@ -1,7 +1,7 @@ #include +#include "pkt/flow_proto.h" #include "pkt/flow_mgmt.h" #include "pkt/flow_mgmt_request.h" -#include "pkt/flow_mgmt_response.h" #include "pkt/flow_mgmt_dbclient.h" #include "vrouter/flow_stats/flow_stats_collector.h" const string FlowMgmtManager::kFlowMgmtTask = "Flow::Management"; @@ -21,10 +21,7 @@ FlowMgmtManager::FlowMgmtManager(Agent *agent) : nh_flow_mgmt_tree_(this), flow_mgmt_dbclient_(new FlowMgmtDbClient(agent, this)), request_queue_(agent_->task_scheduler()->GetTaskId(kFlowMgmtTask), 1, - boost::bind(&FlowMgmtManager::RequestHandler, this, _1)), - response_queue_(agent_->task_scheduler()->GetTaskId(FlowTable::TaskName()), - 1, boost::bind(&FlowMgmtManager::ResponseHandler, this, - _1)) { + boost::bind(&FlowMgmtManager::RequestHandler, this, _1)) { } void FlowMgmtManager::Init() { @@ -38,7 +35,6 @@ void FlowMgmtManager::Init() { void FlowMgmtManager::Shutdown() { request_queue_.Shutdown(); - response_queue_.Shutdown(); flow_mgmt_dbclient_->Shutdown(); } @@ -118,6 +114,10 @@ void FlowMgmtManager::RetryVrfDeleteEvent(const VrfEntry *vrf) { request_queue_.Enqueue(req); } +void FlowMgmtManager::EnqueueFlowEvent(const FlowEvent &event) { + agent_->pkt()->get_flow_proto()->EnqueueFlowEvent(event); +} + ///////////////////////////////////////////////////////////////////////////// // Handlers for events from the work-queue ///////////////////////////////////////////////////////////////////////////// @@ -216,9 +216,9 @@ bool FlowMgmtManager::RequestHandler(boost::shared_ptr req) { // being modified by two threads. Avoid the concurrency issue by // enqueuing a dummy request to flow-table queue. The reference will // be removed in flow processing context - FlowMgmtResponse flow_resp(FlowMgmtResponse::FREE_FLOW_REF, + FlowEvent flow_resp(FlowEvent::FREE_FLOW_REF, req->flow().get(), NULL); - ResponseEnqueue(flow_resp); + EnqueueFlowEvent(flow_resp); break; } @@ -538,8 +538,8 @@ void FlowMgmtManager::DeleteFlowMgmtKey(FlowEntry *flow, FlowEntryInfo *info, ///////////////////////////////////////////////////////////////////////////// // Event to be enqueued to free an object -FlowMgmtResponse::Event FlowMgmtKey::FreeDBEntryEvent() const { - FlowMgmtResponse::Event event = FlowMgmtResponse::INVALID; +FlowEvent::Event FlowMgmtKey::FreeDBEntryEvent() const { + FlowEvent::Event event = FlowEvent::INVALID; switch (type_) { case INTERFACE: case ACL: @@ -549,12 +549,12 @@ FlowMgmtResponse::Event FlowMgmtKey::FreeDBEntryEvent() const { case BRIDGE: case NH: case VRF: - event = FlowMgmtResponse::FREE_DBENTRY; + event = FlowEvent::FREE_DBENTRY; break; case ACE_ID: case VM: - event = FlowMgmtResponse::INVALID; + event = FlowEvent::INVALID; break; default: @@ -653,12 +653,12 @@ bool FlowMgmtTree::Delete(FlowMgmtKey *key, FlowEntry *flow) { // Send DELETE Entry message to FlowTable module void FlowMgmtTree::FreeNotify(FlowMgmtKey *key, uint32_t gen_id) { assert(key->db_entry() != NULL); - FlowMgmtResponse::Event event = key->FreeDBEntryEvent(); - if (event == FlowMgmtResponse::INVALID) + FlowEvent::Event event = key->FreeDBEntryEvent(); + if (event == FlowEvent::INVALID) return; - FlowMgmtResponse resp(event, key->db_entry(), gen_id); - mgr_->ResponseEnqueue(resp); + FlowEvent resp(event, key->db_entry(), gen_id); + mgr_->EnqueueFlowEvent(resp); } // An object is added/updated. Enqueue REVALUATE for flows dependent on it @@ -727,16 +727,16 @@ bool FlowMgmtEntry::CanDelete() const { bool FlowMgmtEntry::OperEntryAdd(FlowMgmtManager *mgr, const FlowMgmtRequest *req, FlowMgmtKey *key) { oper_state_ = OPER_ADD_SEEN; - FlowMgmtResponse::Event event = req->GetResponseEvent(); - if (event == FlowMgmtResponse::INVALID) + FlowEvent::Event event = req->GetResponseEvent(); + if (event == FlowEvent::INVALID) return false; - FlowMgmtResponse flow_resp(event, NULL, key->db_entry()); + FlowEvent flow_resp(event, NULL, key->db_entry()); key->KeyToFlowRequest(&flow_resp); Tree::iterator it = tree_.begin(); while (it != tree_.end()) { flow_resp.set_flow(*it); - mgr->ResponseEnqueue(flow_resp); + mgr->EnqueueFlowEvent(flow_resp); it++; } @@ -755,16 +755,16 @@ bool FlowMgmtEntry::OperEntryDelete(FlowMgmtManager *mgr, FlowMgmtKey *key) { oper_state_ = OPER_DEL_SEEN; gen_id_ = req->gen_id(); - FlowMgmtResponse::Event event = req->GetResponseEvent(); - if (event == FlowMgmtResponse::INVALID) + FlowEvent::Event event = req->GetResponseEvent(); + if (event == FlowEvent::INVALID) return false; - FlowMgmtResponse flow_resp(event, NULL, key->db_entry()); + FlowEvent flow_resp(event, NULL, key->db_entry()); key->KeyToFlowRequest(&flow_resp); Tree::iterator it = tree_.begin(); while (it != tree_.end()) { flow_resp.set_flow(*it); - mgr->ResponseEnqueue(flow_resp); + mgr->EnqueueFlowEvent(flow_resp); it++; } @@ -1325,31 +1325,3 @@ void VrfFlowMgmtEntry::Data::ManagedDelete() { vrf_mgmt_entry_->vrf_tree()->mgr()->RetryVrfDeleteEvent(vrf_); } } - -///////////////////////////////////////////////////////////////////////////// -// FlowMamagentResponse message handler -///////////////////////////////////////////////////////////////////////////// -bool FlowMgmtManager::ResponseHandler(const FlowMgmtResponse &resp){ - switch (resp.event()) { - case FlowMgmtResponse::FREE_FLOW_REF: - break; - - case FlowMgmtResponse::REVALUATE_FLOW: - case FlowMgmtResponse::REVALUATE_DBENTRY: - case FlowMgmtResponse::DELETE_DBENTRY: { - resp.flow()->flow_table()->FlowResponseHandler(&resp); - break; - } - - case FlowMgmtResponse::FREE_DBENTRY: { - flow_mgmt_dbclient_->ResponseHandler(resp.db_entry(), resp.gen_id()); - break; - } - - default: { - assert(0); - break; - } - } - return true; -} diff --git a/src/vnsw/agent/pkt/flow_mgmt.h b/src/vnsw/agent/pkt/flow_mgmt.h index 8844d686d4a..dd780a1d815 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.h +++ b/src/vnsw/agent/pkt/flow_mgmt.h @@ -6,7 +6,7 @@ #include "pkt/flow_table.h" #include "pkt/flow_mgmt_request.h" -#include "pkt/flow_mgmt_response.h" +#include "pkt/flow_event.h" //////////////////////////////////////////////////////////////////////////// // Flow Management module is responsible to keep flow action in-sync with @@ -57,8 +57,8 @@ // * Delete of DBEntry // * Export of Flow // -// - Flow Management Response -// Flow Management Tree module may generate response events as response to +// - Flow Events +// Flow Management Tree module may generate events in response to // requests. The response events are enqueued here. Example events are, // * Flow revaluation in response to DBEntry change // * Flow revaluation in response to DBEntry Add/Delete @@ -261,8 +261,8 @@ class FlowMgmtKey { // Clone the key virtual FlowMgmtKey *Clone() = 0; - // Convert from FlowMgmtKey to FlowMgmtResponse - virtual void KeyToFlowRequest(FlowMgmtResponse *req) { + // Convert from FlowMgmtKey to FlowEvent + virtual void KeyToFlowRequest(FlowEvent *req) { req->set_db_entry(db_entry_); } @@ -285,7 +285,7 @@ class FlowMgmtKey { return Compare(rhs); } - FlowMgmtResponse::Event FreeDBEntryEvent() const; + FlowEvent::Event FreeDBEntryEvent() const; Type type() const { return type_; } const DBEntry *db_entry() const { return db_entry_; } void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; } @@ -944,14 +944,9 @@ class FlowMgmtManager { bool DBEntryRequestHandler(FlowMgmtRequest *req, const DBEntry *entry); bool RequestHandler(boost::shared_ptr req); - void RequestEnqueue(FlowMgmtRequest *req); - bool FlowResponseHandler(FlowEntry *flow, const DBEntry *entry); bool DbClientHandler(const DBEntry *entry); - bool ResponseHandler(const FlowMgmtResponse &resp); - void ResponseEnqueue(const FlowMgmtResponse &resp) { - response_queue_.Enqueue(resp); - } + void EnqueueFlowEvent(const FlowEvent &event); Agent *agent() const { return agent_; } void AddEvent(FlowEntry *low); @@ -968,6 +963,9 @@ class FlowMgmtManager { uint32_t *egress_flow_count); bool HasVrfFlows(uint32_t vrf); + FlowMgmtDbClient *flow_mgmt_dbclient() const { + return flow_mgmt_dbclient_.get(); + } private: // Handle Add/Change of a flow. Builds FlowMgmtKeyTree for all objects void AddFlow(FlowEntryPtr &flow); @@ -1005,7 +1003,6 @@ class FlowMgmtManager { FlowEntryTree flow_tree_; std::auto_ptr flow_mgmt_dbclient_; WorkQueue > request_queue_; - WorkQueue response_queue_; DISALLOW_COPY_AND_ASSIGN(FlowMgmtManager); }; diff --git a/src/vnsw/agent/pkt/flow_mgmt_dbclient.cc b/src/vnsw/agent/pkt/flow_mgmt_dbclient.cc index 62809a53d22..0c5bbc6bce9 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_dbclient.cc +++ b/src/vnsw/agent/pkt/flow_mgmt_dbclient.cc @@ -510,7 +510,7 @@ void FlowMgmtDbClient::RouteNotify(VrfFlowHandlerState *vrf_state, ///////////////////////////////////////////////////////////////////////////// // FlowTableRequest message handler ///////////////////////////////////////////////////////////////////////////// -bool FlowMgmtDbClient::ResponseHandler(const DBEntry *entry, uint32_t gen_id) { +bool FlowMgmtDbClient::FreeDBState(const DBEntry *entry, uint32_t gen_id) { if (dynamic_cast(entry)) { DBTable *table = agent_->interface_table(); Interface *intf = static_cast(table->Find(entry)); diff --git a/src/vnsw/agent/pkt/flow_mgmt_dbclient.h b/src/vnsw/agent/pkt/flow_mgmt_dbclient.h index 33bbf22b923..dd1028a4507 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_dbclient.h +++ b/src/vnsw/agent/pkt/flow_mgmt_dbclient.h @@ -95,7 +95,7 @@ class FlowMgmtDbClient { void Init(); void Shutdown(); - bool ResponseHandler(const DBEntry *entry, uint32_t gen_id); + bool FreeDBState(const DBEntry *entry, uint32_t gen_id); private: void AddEvent(const DBEntry *entry, FlowMgmtState *state); diff --git a/src/vnsw/agent/pkt/flow_mgmt_request.h b/src/vnsw/agent/pkt/flow_mgmt_request.h index 03d6237497a..d803fc57ce6 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_request.h +++ b/src/vnsw/agent/pkt/flow_mgmt_request.h @@ -5,7 +5,7 @@ #define __AGENT_FLOW_MGMT_REQUEST_H__ #include "pkt/flow_table.h" -#include "pkt/flow_mgmt_response.h" +#include "pkt/flow_event.h" //////////////////////////////////////////////////////////////////////////// // Request to the Flow Management module @@ -45,8 +45,8 @@ class FlowMgmtRequest { // At the end of Flow Management Request, we may enqueue a response message // back to FlowTable module. Compute the message type to be enqueued in // response. Returns INVALID if no message to be enqueued - FlowMgmtResponse::Event GetResponseEvent() const { - FlowMgmtResponse::Event resp_event = FlowMgmtResponse::INVALID; + FlowEvent::Event GetResponseEvent() const { + FlowEvent::Event resp_event = FlowEvent::INVALID; if (db_entry_ == NULL) return resp_event; @@ -55,9 +55,9 @@ class FlowMgmtRequest { } if (event_ == ADD_DBENTRY || event_ == CHANGE_DBENTRY) { - resp_event = FlowMgmtResponse::REVALUATE_DBENTRY; + resp_event = FlowEvent::REVALUATE_DBENTRY; } else if (event_ == DELETE_DBENTRY) { - resp_event = FlowMgmtResponse::DELETE_DBENTRY; + resp_event = FlowEvent::DELETE_DBENTRY; } @@ -65,7 +65,7 @@ class FlowMgmtRequest { dynamic_cast(db_entry_); if (rt) { if (event_ == ADD_DBENTRY || event_ == DELETE_DBENTRY) { - resp_event = FlowMgmtResponse::REVALUATE_FLOW; + resp_event = FlowEvent::REVALUATE_FLOW; } } diff --git a/src/vnsw/agent/pkt/flow_mgmt_response.h b/src/vnsw/agent/pkt/flow_mgmt_response.h deleted file mode 100644 index 5fd93cae8d7..00000000000 --- a/src/vnsw/agent/pkt/flow_mgmt_response.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. - */ -#ifndef __AGENT_FLOW_MGMT_RESPONSE_H__ -#define __AGENT_FLOW_MGMT_RESPONSE_H__ - -#include "pkt/flow_table.h" - -//////////////////////////////////////////////////////////////////////////// -// Response events generated from Flow Management module -//////////////////////////////////////////////////////////////////////////// -class FlowMgmtResponse { -public: - enum Event { - INVALID, - FREE_FLOW_REF, - REVALUATE_FLOW, - REVALUATE_DBENTRY, - DELETE_DBENTRY, - FREE_DBENTRY - }; - - FlowMgmtResponse() : - event_(INVALID), flow_(NULL), db_entry_(NULL), gen_id_(0) { - } - - FlowMgmtResponse(Event event, FlowEntry *flow) : - event_(event), flow_(flow), db_entry_(NULL), gen_id_(0) { - } - - FlowMgmtResponse(Event event, FlowEntry *flow, const DBEntry *db_entry) : - event_(event), flow_(flow), db_entry_(db_entry), gen_id_(0) { - } - - FlowMgmtResponse(Event event, const DBEntry *db_entry, uint32_t gen_id) : - event_(event), flow_(NULL), db_entry_(db_entry), gen_id_(gen_id) { - } - - FlowMgmtResponse(const FlowMgmtResponse &rhs) : - event_(rhs.event_), flow_(rhs.flow()), db_entry_(rhs.db_entry_), - gen_id_(rhs.gen_id_) { - } - - virtual ~FlowMgmtResponse() { } - - Event event() const { return event_; } - FlowEntry *flow() const { return flow_.get(); } - void set_flow(FlowEntry *flow) { flow_ = flow; } - const DBEntry *db_entry() const { return db_entry_; } - void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; } - uint32_t gen_id() const { return gen_id_; } - -private: - Event event_; - FlowEntryPtr flow_; - const DBEntry *db_entry_; - uint32_t gen_id_; -}; - -#endif // __AGENT_FLOW_MGMT_RESPONSE_H__ diff --git a/src/vnsw/agent/pkt/flow_proto.cc b/src/vnsw/agent/pkt/flow_proto.cc index d6cf1ade6a6..7aea85851e7 100644 --- a/src/vnsw/agent/pkt/flow_proto.cc +++ b/src/vnsw/agent/pkt/flow_proto.cc @@ -1,13 +1,17 @@ /* * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. */ -#include "pkt/flow_proto.h" #include -#include #include +#include "flow_proto.h" +#include "flow_mgmt_dbclient.h" +#include "flow_mgmt.h" +#include "flow_event.h" FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) : - Proto(agent, kFlowHandlerTask.c_str(), PktHandler::FLOW, io) { + Proto(agent, kTaskFlowEvent, PktHandler::FLOW, io), + flow_update_queue_(agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0, + boost::bind(&FlowProto::FlowEventHandler, this, _1)) { agent->SetFlowProto(this); set_trace(false); uint16_t table_count = agent->flow_thread_count(); @@ -17,16 +21,17 @@ FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) : } TaskScheduler *scheduler = agent_->task_scheduler(); - uint32_t task_id = scheduler->GetTaskId(kFlowHandlerTask.c_str()); + uint32_t task_id = scheduler->GetTaskId(kTaskFlowEvent); for (uint32_t i = 0; i < table_count; i++) { - flow_work_queue_list_.push_back - (new FlowWorkQueue(task_id, -1, boost::bind(&Proto::ProcessProto, - this, _1))); + flow_event_queue_.push_back + (new FlowEventQueue(task_id, i, + boost::bind(&FlowProto::FlowEventHandler, this, + _1))); } } FlowProto::~FlowProto() { - STLDeleteValues(&flow_work_queue_list_); + STLDeleteValues(&flow_event_queue_); STLDeleteValues(&flow_table_list_); } @@ -48,9 +53,10 @@ void FlowProto::Shutdown() { for (uint16_t i = 0; i < flow_table_list_.size(); i++) { flow_table_list_[i]->Shutdown(); } - for (uint32_t i = 0; i < flow_work_queue_list_.size(); i++) { - flow_work_queue_list_[i]->Shutdown(); + for (uint32_t i = 0; i < flow_event_queue_.size(); i++) { + flow_event_queue_[i]->Shutdown(); } + flow_update_queue_.Shutdown(); } FlowHandler *FlowProto::AllocProtoHandler(boost::shared_ptr info, @@ -72,7 +78,7 @@ bool FlowProto::Validate(PktInfo *msg) { } uint16_t FlowProto::FlowTableIndex(uint16_t sport, uint16_t dport) const { - return (sport ^ dport) % (flow_work_queue_list_.size()); + return (sport ^ dport) % (flow_event_queue_.size()); } FlowTable *FlowProto::GetFlowTable(const FlowKey &key) const { @@ -86,7 +92,8 @@ bool FlowProto::Enqueue(boost::shared_ptr msg) { return true; } FreeBuffer(msg.get()); - bool ret = flow_work_queue_list_[index]->Enqueue(msg); + FlowEvent event(FlowEvent::VROUTER_FLOW_MSG, msg); + bool ret = flow_event_queue_[index]->Enqueue(event); return ret; } @@ -124,3 +131,104 @@ void FlowProto::VnFlowCounters(const VnEntry *vn, uint32_t *in_count, FlowEntry *FlowProto::Find(const FlowKey &key) const { return GetFlowTable(key)->Find(key); } + +bool FlowProto::AddFlow(FlowEntry *flow) { + FlowTable *table = flow->flow_table(); + table->Add(flow, flow->reverse_flow_entry()); + return true; +} + +bool FlowProto::UpdateFlow(FlowEntry *flow) { + FlowTable *table = flow->flow_table(); + table->Update(flow, flow->reverse_flow_entry()); + return true; +} + +///////////////////////////////////////////////////////////////////////////// +// Flow Control Event routines +///////////////////////////////////////////////////////////////////////////// +void FlowProto::EnqueueFlowEvent(const FlowEvent &event) { + switch (event.event()) { + case FlowEvent::DELETE_FLOW: { + FlowTable *table = GetFlowTable(event.get_flow_key()); + flow_event_queue_[table->table_index()]->Enqueue(event); + break; + } + + case FlowEvent::AUDIT_FLOW: + case FlowEvent::DELETE_DBENTRY: + case FlowEvent::REVALUATE_FLOW: + case FlowEvent::FREE_FLOW_REF: { + FlowEntry *flow = event.flow(); + FlowTable *table = flow->flow_table(); + flow_event_queue_[table->table_index()]->Enqueue(event); + break; + } + + case FlowEvent::FREE_DBENTRY: + case FlowEvent::REVALUATE_DBENTRY: { + flow_update_queue_.Enqueue(event); + break; + } + + default: + assert(0); + break; + } + + return; +} + +bool FlowProto::FlowEventHandler(const FlowEvent &req) { + switch (req.event()) { + case FlowEvent::VROUTER_FLOW_MSG: { + ProcessProto(req.pkt_info()); + break; + } + + case FlowEvent::DELETE_FLOW: { + FlowTable *table = GetFlowTable(req.get_flow_key()); + table->Delete(req.get_flow_key(), req.get_del_rev_flow()); + break; + } + + case FlowEvent::AUDIT_FLOW: { + FlowEntry *flow = req.flow(); + flow->flow_table()->Add(flow, NULL); + break; + } + + case FlowEvent::FREE_FLOW_REF: + break; + + case FlowEvent::FREE_DBENTRY: { + FlowMgmtManager *mgr = agent()->pkt()->flow_mgmt_manager(); + mgr->flow_mgmt_dbclient()->FreeDBState(req.db_entry(), req.gen_id()); + break; + } + + case FlowEvent::DELETE_DBENTRY: + case FlowEvent::REVALUATE_DBENTRY: + case FlowEvent::REVALUATE_FLOW: { + FlowEntry *flow = req.flow(); + flow->flow_table()->FlowResponseHandler(&req); + break; + } + + default: { + assert(0); + break; + } + } + return true; +} + +void FlowProto::DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow) { + EnqueueFlowEvent(FlowEvent(FlowEvent::DELETE_FLOW, flow_key, del_rev_flow)); + return; +} + +void FlowProto::CreateAuditEntry(FlowEntry *flow) { + EnqueueFlowEvent(FlowEvent(FlowEvent::AUDIT_FLOW, flow)); + return; +} diff --git a/src/vnsw/agent/pkt/flow_proto.h b/src/vnsw/agent/pkt/flow_proto.h index 454edc76386..5d1c0fa35f1 100644 --- a/src/vnsw/agent/pkt/flow_proto.h +++ b/src/vnsw/agent/pkt/flow_proto.h @@ -8,15 +8,15 @@ #include #include "cmn/agent_cmn.h" #include "base/queue_task.h" -#include "pkt/proto.h" -#include "pkt/proto_handler.h" -#include "pkt/flow_table.h" -#include "pkt/flow_handler.h" +#include "proto.h" +#include "proto_handler.h" +#include "flow_table.h" +#include "flow_handler.h" +#include "flow_event.h" -typedef WorkQueue > FlowWorkQueue; class FlowProto : public Proto { public: - static const std::string kFlowTaskName; + typedef WorkQueue FlowEventQueue; static const int kMinTableCount = 1; static const int kMaxTableCount = 16; @@ -40,9 +40,19 @@ class FlowProto : public Proto { uint32_t FlowCount() const; void VnFlowCounters(const VnEntry *vn, uint32_t *in_count, uint32_t *out_count); + + bool AddFlow(FlowEntry *flow); + bool UpdateFlow(FlowEntry *flow); + + void EnqueueFlowEvent(const FlowEvent &event); + void DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow); + void CreateAuditEntry(FlowEntry *flow); + bool FlowEventHandler(const FlowEvent &req); + private: - std::vector flow_work_queue_list_; + std::vector flow_event_queue_; std::vector flow_table_list_; + FlowEventQueue flow_update_queue_; }; extern SandeshTraceBufferPtr PktFlowTraceBuf; diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index c6d8a43fbcc..aaf1222ea9e 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -44,10 +44,9 @@ #include #include #include -#include +#include SandeshTraceBufferPtr FlowTraceBuf(SandeshTraceBufferCreate("Flow", 5000)); -const string FlowTable::kTaskName = "Agent::FlowTable"; boost::uuids::random_generator FlowTable::rand_gen_; ///////////////////////////////////////////////////////////////////////////// @@ -58,9 +57,7 @@ FlowTable::FlowTable(Agent *agent, uint16_t table_index) : table_index_(table_index), ksync_object_(NULL), flow_entry_map_(), - linklocal_flow_count_(), - request_queue_(agent_->task_scheduler()->GetTaskId(kTaskName), 1, - boost::bind(&FlowTable::RequestHandler, this, _1)) { + linklocal_flow_count_() { } FlowTable::~FlowTable() { @@ -70,7 +67,6 @@ FlowTable::~FlowTable() { void FlowTable::Init() { FlowEntry::Init(); rand_gen_ = boost::uuids::random_generator(); - return; } @@ -81,77 +77,28 @@ void FlowTable::InitDone() { } void FlowTable::Shutdown() { - request_queue_.Shutdown(); -} - -// Generate flow events to FlowTable queue -void FlowTable::FlowEvent(FlowTableRequest::Event event, FlowEntry *flow, - const FlowKey &del_key, bool del_rflow) { - FlowTableRequest req; - req.flow_ = flow; - req.event_ = FlowTableRequest::INVALID; - - switch (event) { - case FlowTableRequest::ADD_FLOW: - // The method can work in SYNC mode by calling Add routine below or - // in ASYNC mode by enqueuing a request - // Add(flow, flow->reverse_flow_entry()); - req.event_ = FlowTableRequest::ADD_FLOW; - break; - - case FlowTableRequest::UPDATE_FLOW: - req.event_ = FlowTableRequest::UPDATE_FLOW; - break; - - case FlowTableRequest::DELETE_FLOW: - req.event_ = FlowTableRequest::DELETE_FLOW; - req.del_flow_key_ = del_key; - req.del_rev_flow_ = del_rflow; - break; - - default: - assert(0); - } - - if (req.event_ != FlowTableRequest::INVALID) - request_queue_.Enqueue(req); } -bool FlowTable::RequestHandler(const FlowTableRequest &req) { - switch (req.event_) { - case FlowTableRequest::ADD_FLOW: { - // The reference to reverse flow can be dropped in the call. This can - // potentially release the reverse flow. Hold reference to reverse flow - // till this method is complete - FlowEntryPtr rflow = req.flow_->reverse_flow_entry(); - Add(req.flow_.get(), rflow.get()); - break; - } - - case FlowTableRequest::UPDATE_FLOW: { - // The reference to reverse flow can be dropped in the call. This can - // potentially release the reverse flow. Hold reference to reverse flow - // till this method is complete - FlowEntryPtr rflow = req.flow_->reverse_flow_entry(); - Update(req.flow_.get(), rflow.get()); - break; - } - - case FlowTableRequest::DELETE_FLOW: { - Delete(req.del_flow_key_, req.del_rev_flow_); - break; - } - - default: - assert(0); +///////////////////////////////////////////////////////////////////////////// +// FlowTable Add/Delete routines +///////////////////////////////////////////////////////////////////////////// +// When multiple lock are taken, there is possibility of deadlocks. We do +// deadlock avoidance by ensuring "consistent ordering of locks" +void FlowTable::GetMutexSeq(tbb::mutex &mutex1, tbb::mutex &mutex2, + tbb::mutex **mutex_ptr_1, + tbb::mutex **mutex_ptr_2) { + *mutex_ptr_1 = NULL; + *mutex_ptr_2 = NULL; + if (&mutex1 < &mutex2) { + *mutex_ptr_1 = &mutex1; + *mutex_ptr_2 = &mutex2; + } else { + *mutex_ptr_1 = &mutex2; + *mutex_ptr_2 = &mutex1; } - return true; } -///////////////////////////////////////////////////////////////////////////// -// FlowTable Add/Delete routines -///////////////////////////////////////////////////////////////////////////// FlowEntry *FlowTable::Find(const FlowKey &key) { FlowEntryMap::iterator it; @@ -265,21 +212,13 @@ void FlowTable::AddInternal(FlowEntry *flow_req, FlowEntry *flow, void FlowTable::Add(FlowEntry *flow_req, FlowEntry *flow, FlowEntry *rflow_req, FlowEntry *rflow, bool update) { - if (flow) { - flow->mutex().lock(); - } - if (rflow) { - rflow->mutex().lock(); - } + tbb::mutex tmp_mutex, *mutex_ptr_1, *mutex_ptr_2; + GetMutexSeq(flow->mutex(), rflow ? rflow->mutex() : tmp_mutex, + &mutex_ptr_1, &mutex_ptr_2); + tbb::mutex::scoped_lock lock1(*mutex_ptr_1); + tbb::mutex::scoped_lock lock2(*mutex_ptr_2); AddInternal(flow_req, flow, rflow_req, rflow, update); - - if (rflow) { - rflow->mutex().unlock(); - } - if (flow) { - flow->mutex().unlock(); - } } void FlowTable::DeleteInternal(FlowEntryMap::iterator &it) { @@ -857,13 +796,16 @@ void FlowTable::DeleteMessage(FlowEntry *flow) { } // Handle events from Flow Management module for a flow -bool FlowTable::FlowResponseHandler(const FlowMgmtResponse *resp) { +bool FlowTable::FlowResponseHandler(const FlowEvent *resp) { FlowEntry *flow = resp->flow(); - const DBEntry *entry = resp->db_entry(); - tbb::mutex::scoped_lock mutex(flow->mutex()); FlowEntry *rflow = flow->reverse_flow_entry_.get(); - if (rflow) - rflow->mutex().lock(); + const DBEntry *entry = resp->db_entry(); + + tbb::mutex tmp_mutex, *mutex_ptr_1, *mutex_ptr_2; + GetMutexSeq(flow->mutex(), rflow ? rflow->mutex() : tmp_mutex, + &mutex_ptr_1, &mutex_ptr_2); + tbb::mutex::scoped_lock lock1(*mutex_ptr_1); + tbb::mutex::scoped_lock lock2(*mutex_ptr_2); bool active_flow = true; bool deleted_flow = flow->deleted(); @@ -871,13 +813,13 @@ bool FlowTable::FlowResponseHandler(const FlowMgmtResponse *resp) { active_flow = false; switch (resp->event()) { - case FlowMgmtResponse::REVALUATE_FLOW: { + case FlowEvent::REVALUATE_FLOW: { if (active_flow) RevaluateFlow(flow); break; } - case FlowMgmtResponse::REVALUATE_DBENTRY: { + case FlowEvent::REVALUATE_DBENTRY: { const Interface *intf = dynamic_cast(entry); if (intf && active_flow) { RevaluateInterface(flow); @@ -912,7 +854,7 @@ bool FlowTable::FlowResponseHandler(const FlowMgmtResponse *resp) { break; } - case FlowMgmtResponse::DELETE_DBENTRY: { + case FlowEvent::DELETE_DBENTRY: { DeleteMessage(flow); break; } @@ -921,9 +863,6 @@ bool FlowTable::FlowResponseHandler(const FlowMgmtResponse *resp) { assert(0); } - if (rflow) - rflow->mutex().unlock(); - return true; } @@ -978,12 +917,31 @@ void FlowTable::UpdateKSync(FlowEntry *flow, bool update) { } } -void FlowTable::RemoveFromKSyncTree(FlowEntry *flow) { - ksync_object_->RemoveFromTree(flow->ksync_entry()); +// Update FlowHandle for a flow +void FlowTable::KSyncSetFlowHandle(FlowEntry *flow, uint32_t flow_handle) { + FlowEntry *rflow = flow->reverse_flow_entry(); + bool update_rflow = false; + assert(flow_handle != FlowEntry::kInvalidFlowHandle); + + tbb::mutex tmp_mutex, *mutex_ptr_1, *mutex_ptr_2; + GetMutexSeq(flow->mutex(), rflow ? rflow->mutex() : tmp_mutex, + &mutex_ptr_1, &mutex_ptr_2); + tbb::mutex::scoped_lock lock1(*mutex_ptr_1); + tbb::mutex::scoped_lock lock2(*mutex_ptr_2); + + if (flow->flow_handle() != flow_handle) { + update_rflow = true; + AddIndexFlowInfo(flow, flow_handle); + NotifyFlowStatsCollector(flow); + } + + if (rflow && update_rflow) { + UpdateKSync(rflow, true); + } } -void FlowTable::AddToKSyncTree(FlowEntry *flow) { - ksync_object_->InsertToTree(flow->ksync_entry()); +void FlowTable::UpdateFlowHandle(FlowEntry *flow, uint32_t flow_handle) { + ksync_object_->UpdateFlowHandle(flow->ksync_entry(), flow_handle); } void FlowTable::NotifyFlowStatsCollector(FlowEntry *fe) { diff --git a/src/vnsw/agent/pkt/flow_table.h b/src/vnsw/agent/pkt/flow_table.h index cf8532c6895..62ebf22697e 100644 --- a/src/vnsw/agent/pkt/flow_table.h +++ b/src/vnsw/agent/pkt/flow_table.h @@ -54,7 +54,7 @@ class FlowEntry; class FlowTable; class FlowTableKSyncEntry; class FlowTableKSyncObject; -class FlowMgmtResponse; +class FlowEvent; ///////////////////////////////////////////////////////////////////////////// // Flow addition is a two step process. @@ -65,43 +65,11 @@ class FlowMgmtResponse; // This module will maintain a tree of all flows created. It is also // responsible to generate KSync events. It is run in a single task context // -// This module has WorkQueue running in "Agent::FlowTable" task context. -// FlowTableRequest are enqueued to the queue to to add/delete flows. -// -// Functionality of FLowTable: +// Functionality of FlowTable: // 1. Manage flow_entry_map_ which contains all flows // 2. Enforce the per-VM flow limits // 3. Generate events to KSync and FlowMgmt modueles ///////////////////////////////////////////////////////////////////////////// -struct FlowTableRequest { - enum Event { - INVALID, - ADD_FLOW, - DELETE_FLOW, - UPDATE_FLOW - }; - - FlowTableRequest() : event_(INVALID), flow_(NULL), del_flow_key_(), - del_rev_flow_(false) { - } - - FlowTableRequest(Event event, FlowEntry *flow) : - event_(event), flow_(flow), del_flow_key_(), del_rev_flow_(false) { - } - - FlowTableRequest(const FlowTableRequest &rhs) : - event_(rhs.event_), flow_(rhs.flow_), del_flow_key_(rhs.del_flow_key_), - del_rev_flow_(rhs.del_rev_flow_) { - } - - virtual ~FlowTableRequest() { } - - Event event_; - FlowEntryPtr flow_; - FlowKey del_flow_key_; - bool del_rev_flow_; -}; - struct FlowTaskMsg : public InterTaskMsg { FlowTaskMsg(FlowEntry * fe) : InterTaskMsg(0), fe_ptr(fe) { } virtual ~FlowTaskMsg() { } @@ -118,7 +86,6 @@ struct Inet4FlowKeyCmp { class FlowTable { public: - static const std::string kTaskName; static boost::uuids::random_generator rand_gen_; typedef std::map FlowEntryMap; @@ -156,6 +123,7 @@ class FlowTable { void Add(FlowEntry *flow, FlowEntry *rflow); void Update(FlowEntry *flow, FlowEntry *rflow); bool Delete(const FlowKey &key, bool del_reverse_flow); + bool Delete(const FlowKey &flow_key); void DeleteAll(); // Test code only used method void DeleteFlow(const AclDBEntry *acl, const FlowKey &key, @@ -169,6 +137,7 @@ class FlowTable { // Accessor routines Agent *agent() const { return agent_; } + uint16_t table_index() const { return table_index_; } size_t Size() { return flow_entry_map_.size(); } uint32_t linklocal_flow_count() const { return linklocal_flow_count_; } FlowTable::FlowEntryMap::iterator begin() { @@ -185,7 +154,7 @@ class FlowTable { const uint64_t timestamp); void DelLinkLocalFlowInfo(int fd); - static const std::string &TaskName() { return kTaskName; } + static const char *TaskName() { return kTaskFlowEvent; } // Sandesh routines void Copy(FlowEntry *lhs, const FlowEntry *rhs); void SetAclFlowSandeshData(const AclDBEntry *acl, AclFlowResp &data, @@ -202,7 +171,7 @@ class FlowTable { void RevaluateNh(FlowEntry *flow); void DeleteVrf(VrfEntry *vrf); void RevaluateRoute(FlowEntry *flow, const AgentRoute *route); - bool FlowResponseHandler(const FlowMgmtResponse *resp); + bool FlowResponseHandler(const FlowEvent *req); bool FlowRouteMatch(const InetUnicastRouteEntry *rt, uint32_t vrf, Address::Family family, const IpAddress &ip, @@ -225,14 +194,10 @@ class FlowTable { void EvictVrouterFlow(FlowEntry *fe, uint32_t flow_index); void UpdateKSync(FlowEntry *flow, bool update); - // Flow Table request queue events - void FlowEvent(FlowTableRequest::Event event, FlowEntry *flow, - const FlowKey &del_key, bool del_rflow); - // FlowStatsCollector request queue events void NotifyFlowStatsCollector(FlowEntry *fe); - void RemoveFromKSyncTree(FlowEntry *flow); - void AddToKSyncTree(FlowEntry *flow); + void UpdateFlowHandle(FlowEntry *flow, uint32_t flow_handle); + void KSyncSetFlowHandle(FlowEntry *flow, uint32_t flow_handle); friend class FlowStatsCollector; friend class PktSandeshFlow; @@ -258,7 +223,9 @@ class FlowTable { FlowEntry *new_rflow, bool update); void Add(FlowEntry *flow, FlowEntry *new_flow, FlowEntry *rflow, FlowEntry *new_rflow, bool update); - bool RequestHandler(const FlowTableRequest &req); + void GetMutexSeq(tbb::mutex &mutex1, tbb::mutex &mutex2, + tbb::mutex **mutex_ptr_1, tbb::mutex **mutex_ptr_2); + Agent *agent_; uint16_t table_index_; FlowTableKSyncObject *ksync_object_; @@ -266,7 +233,6 @@ class FlowTable { VmFlowTree vm_flow_tree_; uint32_t linklocal_flow_count_; // total linklocal flows in the agent - WorkQueue request_queue_; FlowIndexTree flow_index_tree_; // maintain the linklocal flow info against allocated fd, debug purpose only LinkLocalFlowInfoMap linklocal_flow_info_map_; diff --git a/src/vnsw/agent/pkt/pkt_flow_info.cc b/src/vnsw/agent/pkt/pkt_flow_info.cc index 85fe762434d..b602b5784e8 100644 --- a/src/vnsw/agent/pkt/pkt_flow_info.cc +++ b/src/vnsw/agent/pkt/pkt_flow_info.cc @@ -1429,10 +1429,10 @@ void PktFlowInfo::LinkLocalPortBind(const PktInfo *pkt, void PktFlowInfo::Add(const PktInfo *pkt, PktControlInfo *in, PktControlInfo *out) { - FlowTableRequest::Event event = FlowTableRequest::ADD_FLOW; + bool update = false; if (pkt->type == PktType::MESSAGE && pkt->agent_hdr.cmd != AgentHdr::TRAP_FLOW_MISS) { - event = FlowTableRequest::UPDATE_FLOW; + update = true; } // Generate traffic seen event for path preference module @@ -1498,10 +1498,12 @@ void PktFlowInfo::Add(const PktInfo *pkt, PktControlInfo *in, * both forward and reverse flows are not not linked to each other yet. * We need both forward and reverse flows to update Fip stats info */ UpdateFipStatsInfo(flow.get(), rflow.get(), pkt, in, out); - if (swap_flows) { - flow_table->FlowEvent(event, rflow.get(), FlowKey(), false); + + FlowEntry *tmp = swap_flows ? rflow.get() : flow.get(); + if (update) { + agent->pkt()->get_flow_proto()->UpdateFlow(tmp); } else { - flow_table->FlowEvent(event, flow.get(), FlowKey(), false); + agent->pkt()->get_flow_proto()->AddFlow(tmp); } } diff --git a/src/vnsw/agent/pkt/pkt_handler.h b/src/vnsw/agent/pkt/pkt_handler.h index 7a8723e25b1..06636859b35 100644 --- a/src/vnsw/agent/pkt/pkt_handler.h +++ b/src/vnsw/agent/pkt/pkt_handler.h @@ -47,6 +47,7 @@ struct PktInfo; struct agent_hdr; class PacketBuffer; class Proto; +typedef boost::shared_ptr PktInfoPtr; struct InterTaskMsg { InterTaskMsg(uint16_t command): cmd(command) {} diff --git a/src/vnsw/agent/pkt/test/test_flow_util.h b/src/vnsw/agent/pkt/test/test_flow_util.h index e9f1b3fa428..5d54c1a6d84 100644 --- a/src/vnsw/agent/pkt/test/test_flow_util.h +++ b/src/vnsw/agent/pkt/test/test_flow_util.h @@ -240,7 +240,7 @@ class TestFlowPkt { class FlowDeleteTask : public Task { public: FlowDeleteTask(const FlowKey &key) : - Task(TaskScheduler::GetInstance()->GetTaskId("Agent::FlowHandler"), -1), + Task(TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent), -1), key_(key) {} virtual bool Run() { FlowTable *table = diff --git a/src/vnsw/agent/pkt/test/test_flowtable.cc b/src/vnsw/agent/pkt/test/test_flowtable.cc index be6c04488b7..6dc878c5909 100644 --- a/src/vnsw/agent/pkt/test/test_flowtable.cc +++ b/src/vnsw/agent/pkt/test/test_flowtable.cc @@ -399,7 +399,7 @@ class FlowTableTest : public ::testing::Test { class SetupTask : public Task { public: - SetupTask(FlowTableTest *test) : Task((TaskScheduler::GetInstance()->GetTaskId("Agent::FlowHandler")), -1), test_(test) { + SetupTask(FlowTableTest *test) : Task((TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent)), -1), test_(test) { } virtual bool Run() { FlowProto *proto = test_->get_flow_proto(); diff --git a/src/vnsw/agent/pkt/test/test_pkt_flow.cc b/src/vnsw/agent/pkt/test/test_pkt_flow.cc index a4946f2ca72..f4d090c4157 100644 --- a/src/vnsw/agent/pkt/test/test_pkt_flow.cc +++ b/src/vnsw/agent/pkt/test/test_pkt_flow.cc @@ -91,7 +91,7 @@ static bool FlowStatsTimerStartStopTrigger (bool stop) { } static void FlowStatsTimerStartStop (bool stop) { - int task_id = TaskScheduler::GetInstance()->GetTaskId("Agent::FlowHandler"); + int task_id = TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent); std::auto_ptr trigger_ (new TaskTrigger(boost::bind(FlowStatsTimerStartStopTrigger, stop), task_id, 0)); trigger_->Set(); diff --git a/src/vnsw/agent/test/test_init.cc b/src/vnsw/agent/test/test_init.cc index c5abbc7867e..e8401b2b3a8 100644 --- a/src/vnsw/agent/test/test_init.cc +++ b/src/vnsw/agent/test/test_init.cc @@ -83,6 +83,7 @@ TestClient *TestInit(const char *init_file, bool ksync_init, bool pkt_init, agent->set_ksync_sync_mode(ksync_sync_mode); // Initialize agent and kick start initialization + TaskScheduler::GetInstance(); init->Start(); WaitForInitDone(agent); diff --git a/src/vnsw/agent/test/test_init.h b/src/vnsw/agent/test/test_init.h index 520ea537f9c..a8816c26d75 100644 --- a/src/vnsw/agent/test/test_init.h +++ b/src/vnsw/agent/test/test_init.h @@ -533,7 +533,8 @@ class TestClient { TaskScheduler *scheduler = TaskScheduler::GetInstance(); TaskPolicy policy; policy.push_back(TaskExclusion(scheduler->GetTaskId("Agent::StatsCollector"))); - policy.push_back(TaskExclusion(scheduler->GetTaskId("Agent::FlowHandler"))); + policy.push_back(TaskExclusion(scheduler->GetTaskId(kTaskFlowEvent))); + policy.push_back(TaskExclusion(scheduler->GetTaskId(kTaskFlowUpdate))); policy.push_back(TaskExclusion(scheduler->GetTaskId("Agent::KSync"))); scheduler->SetPolicy(scheduler->GetTaskId("FlowFlush"), policy); } @@ -548,7 +549,8 @@ class TestClient { TaskScheduler *scheduler = TaskScheduler::GetInstance(); TaskPolicy policy; policy.push_back(TaskExclusion(scheduler->GetTaskId("Agent::StatsCollector"))); - policy.push_back(TaskExclusion(scheduler->GetTaskId("Agent::FlowHandler"))); + policy.push_back(TaskExclusion(scheduler->GetTaskId(kTaskFlowEvent))); + policy.push_back(TaskExclusion(scheduler->GetTaskId(kTaskFlowUpdate))); policy.push_back(TaskExclusion(scheduler->GetTaskId("Agent::KSync"))); scheduler->SetPolicy(scheduler->GetTaskId("FlowAge"), policy); } diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index 71f6405b0ca..a650f64696c 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -286,9 +286,7 @@ void FlowStatsCollector::UpdateFlowStats(FlowExportInfo *info, } void FlowStatsCollector::FlowDeleteEnqueue(const FlowKey &key, bool rev) { - FlowTable *flow_table = - agent_uve_->agent()->pkt()->get_flow_proto()->GetFlowTable(key); - flow_table->FlowEvent(FlowTableRequest::DELETE_FLOW, NULL, key, rev); + agent_uve_->agent()->pkt()->get_flow_proto()->DeleteFlowRequest(key, rev); } // FIXME : Handle multiple tables diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h index 1494b16ad03..7aea9768569 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h @@ -25,7 +25,6 @@ class FetchFlowStatsRecord; //shared memory (between agent and Kernel) and export this stats info to //collector. Also responsible for aging of flow entries. Runs in the context //of "Agent::StatsCollector" which has exclusion with "db::DBTable", -//"Agent::FlowHandler", "sandesh::RecvQueue", "bgp::Config" & "Agent::KSync" class FlowStatsCollector : public StatsCollector { public: static const uint64_t FlowAgeTime = 1000000 * 180; diff --git a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc index da7675709f6..c659a7e83b9 100644 --- a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc +++ b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.cc @@ -520,5 +520,14 @@ FlowTableKSyncEntry *FlowTableKSyncObject::Find(FlowEntry *key) { return static_cast(obj->Find(&entry)); } +void FlowTableKSyncObject::UpdateKey(KSyncEntry *entry, uint32_t flow_handle) { + static_cast(entry)->set_hash_id(flow_handle); +} + +void FlowTableKSyncObject::UpdateFlowHandle(FlowTableKSyncEntry *entry, + uint32_t flow_handle) { + ChangeKey(entry, flow_handle); +} + void FlowTableKSyncObject::Init() { } diff --git a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h index 984af8a10d6..c3fcac05c5f 100644 --- a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h +++ b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h @@ -83,6 +83,8 @@ class FlowTableKSyncObject : public KSyncObject { KSync *ksync() const { return ksync_; } void set_flow_table(FlowTable *table) { flow_table_ = table; } FlowTable *flow_table() const { return flow_table_; } + void UpdateFlowHandle(FlowTableKSyncEntry *entry, uint32_t flow_handle); + void UpdateKey(KSyncEntry *entry, uint32_t flow_handle); private: friend class KSyncSandeshContext; friend class FlowTable; diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc index 93bceac9df1..b6f8b61708b 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc @@ -50,9 +50,8 @@ KSyncFlowMemory::KSyncFlowMemory(KSync *ksync) : audit_timer_(TimerManager::CreateTimer (*(ksync->agent()->event_manager())->io_service(), "Flow Audit Timer", - ksync->agent()->task_scheduler()->GetTaskId - ("Agent::StatsCollector"), - StatsCollector::FlowStatsCollector)), + ksync->agent()->task_scheduler()->GetTaskId(kTaskFlowAudit), + 0)), audit_timeout_(0), audit_yield_(0), audit_flow_idx_(0), @@ -236,18 +235,14 @@ bool KSyncFlowMemory::AuditProcess() { vflow_entry->fe_key.flow_proto, ntohs(vflow_entry->fe_key.flow_sport), ntohs(vflow_entry->fe_key.flow_dport)); - FlowTable *flow_table = - ksync_->agent()->pkt()->get_flow_proto()->GetFlowTable(key); - FlowEntry *flow_p = flow_table->Find(key); - if (flow_p == NULL) { - /* Create Short flow only for non-existing flows. */ - FlowEntryPtr flow(flow_p->Allocate(key, flow_table)); - flow->InitAuditFlow(flow_idx); - AGENT_ERROR(FlowLog, flow_idx, "FlowAudit : Converting HOLD " - "entry to short flow"); - flow_table->Add(flow.get(), NULL); - } + FlowProto *proto = ksync_->agent()->pkt()->get_flow_proto(); + FlowTable *flow_table = proto->GetFlowTable(key); + FlowEntry *flow = FlowEntry::Allocate(key, flow_table); + flow->InitAuditFlow(flow_idx); + proto->CreateAuditEntry(flow); + AGENT_ERROR(FlowLog, flow_idx, "FlowAudit : Converting HOLD " + "entry to short flow"); } } diff --git a/src/vnsw/agent/vrouter/ksync/sandesh_ksync.cc b/src/vnsw/agent/vrouter/ksync/sandesh_ksync.cc index 96ed4f96463..1d10a1a3322 100644 --- a/src/vnsw/agent/vrouter/ksync/sandesh_ksync.cc +++ b/src/vnsw/agent/vrouter/ksync/sandesh_ksync.cc @@ -118,18 +118,7 @@ void KSyncSandeshContext::FlowMsgHandler(vr_flow_req *r) { } FlowTable *table = entry->flow_table(); - bool update_rev_flow = false; - if ((int)entry->flow_handle() != r->get_fr_index()) { - update_rev_flow = true; - table->AddIndexFlowInfo(entry, r->get_fr_index()); - table->NotifyFlowStatsCollector(entry); - } - - FlowEntry *rev_flow = entry->reverse_flow_entry(); - if (rev_flow && update_rev_flow) { - bool update = true; - table->UpdateKSync(rev_flow, update); - } + table->KSyncSetFlowHandle(entry, r->get_fr_index()); } } else { assert(!("Invalid Flow operation")); diff --git a/src/vnsw/agent/vrouter/stats_collector/agent_stats_collector.h b/src/vnsw/agent/vrouter/stats_collector/agent_stats_collector.h index 1bbadc5c4e7..0a860da0380 100644 --- a/src/vnsw/agent/vrouter/stats_collector/agent_stats_collector.h +++ b/src/vnsw/agent/vrouter/stats_collector/agent_stats_collector.h @@ -20,7 +20,7 @@ //statistics from vrouter and updates its data-structures with this //information. Stats collection request runs in the context of //"Agent::StatsCollector" which has exclusion with "db::DBTable", -//"Agent::FlowHandler", "sandesh::RecvQueue", "bgp::Config" & "Agent::KSync" +//"sandesh::RecvQueue", "bgp::Config" & "Agent::KSync" //Stats collection response runs in the context of "Agent::Uve" which has //exclusion with "db::DBTable" class AgentStatsCollector : public StatsCollector {