diff --git a/src/vnsw/agent/pkt/flow_entry.cc b/src/vnsw/agent/pkt/flow_entry.cc index 16a3afc0b0c..e42e9b158f8 100644 --- a/src/vnsw/agent/pkt/flow_entry.cc +++ b/src/vnsw/agent/pkt/flow_entry.cc @@ -470,9 +470,8 @@ void FlowEntry::Copy(FlowEntry *rhs, bool update) { gen_id_ = rhs->gen_id_; flow_handle_ = rhs->flow_handle_; /* Flow Entry is being re-used. Generate a new UUID for it. */ - // results is delete miss for previous uuid to stats collector - // with eviction disabled following is not required - // uuid_ = flow_table_->rand_gen(); + uuid_ = flow_table_->rand_gen(); + egress_uuid_ = flow_table_->rand_gen(); } } diff --git a/src/vnsw/agent/pkt/flow_mgmt.cc b/src/vnsw/agent/pkt/flow_mgmt.cc index 49cf94bbe0a..7f37ae7f2e5 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.cc +++ b/src/vnsw/agent/pkt/flow_mgmt.cc @@ -140,7 +140,8 @@ void FlowMgmtManager::DeleteEvent(FlowEntry *flow, void FlowMgmtManager::FlowStatsUpdateEvent(FlowEntry *flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { if (bytes == 0 && packets == 0 && oflow_bytes == 0) { return; } @@ -151,7 +152,7 @@ void FlowMgmtManager::FlowStatsUpdateEvent(FlowEntry *flow, uint32_t bytes, } FlowMgmtRequestPtr req(new FlowMgmtRequest (FlowMgmtRequest::UPDATE_FLOW_STATS, flow, - bytes, packets, oflow_bytes)); + bytes, packets, oflow_bytes, u)); request_queue_.Enqueue(req); } @@ -430,6 +431,13 @@ bool FlowMgmtManager::RequestHandler(FlowMgmtRequestPtr req) { (FlowMgmtRequest::ADD_FLOW, req->flow().get())); log_queue_.Enqueue(log_req); + + //Enqueue Add request to flow-stats-collector + agent_->flow_stats_manager()->AddEvent(req->flow()); + + //Enqueue Add request to UVE module for ACE stats + EnqueueUveAddEvent(flow); + AddFlow(req->flow()); } else { @@ -437,6 +445,13 @@ bool FlowMgmtManager::RequestHandler(FlowMgmtRequestPtr req) { (FlowMgmtRequest::DELETE_FLOW, req->flow().get(), req->params())); log_queue_.Enqueue(log_req); + + //Enqueue Delete request to flow-stats-collector + agent_->flow_stats_manager()->DeleteEvent(flow, req->params()); + + //Enqueue Delete request to UVE module for ACE stats + EnqueueUveDeleteEvent(flow); + DeleteFlow(req->flow(), req->params()); } break; @@ -445,7 +460,7 @@ bool FlowMgmtManager::RequestHandler(FlowMgmtRequestPtr req) { case FlowMgmtRequest::UPDATE_FLOW_STATS: { //Handle Flow stats update for flow-mgmt UpdateFlowStats(req->flow(), req->bytes(), req->packets(), - req->oflow_bytes()); + req->oflow_bytes(), req->flow_uuid()); break; } @@ -493,25 +508,11 @@ bool FlowMgmtManager::LogHandler(FlowMgmtRequestPtr req) { switch (req->event()) { case FlowMgmtRequest::ADD_FLOW: { LogFlowUnlocked(flow, "ADD"); - - //Enqueue Add request to flow-stats-collector - agent_->flow_stats_manager()->AddEvent(req->flow()); - - //Enqueue Add request to UVE module for ACE stats - EnqueueUveAddEvent(flow); - break; } case FlowMgmtRequest::DELETE_FLOW: { LogFlowUnlocked(flow, "DEL"); - - //Enqueue Delete request to flow-stats-collector - agent_->flow_stats_manager()->DeleteEvent(flow, req->params()); - - //Enqueue Delete request to UVE module for ACE stats - EnqueueUveDeleteEvent(flow); - break; } @@ -674,10 +675,11 @@ void FlowMgmtManager::DeleteFlow(FlowEntryPtr &flow, } void FlowMgmtManager::UpdateFlowStats(FlowEntryPtr &flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes) { + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u) { //Enqueue Flow Index Update Event request to flow-stats-collector agent_->flow_stats_manager()->UpdateStatsEvent(flow, bytes, packets, - oflow_bytes); + oflow_bytes, u); } bool FlowMgmtManager::HasVrfFlows(uint32_t vrf_id) { diff --git a/src/vnsw/agent/pkt/flow_mgmt.h b/src/vnsw/agent/pkt/flow_mgmt.h index 1efedb8e649..33320cab552 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.h +++ b/src/vnsw/agent/pkt/flow_mgmt.h @@ -1108,7 +1108,8 @@ class FlowMgmtManager { void AddEvent(FlowEntry *low); void DeleteEvent(FlowEntry *flow, const RevFlowDepParams ¶ms); void FlowStatsUpdateEvent(FlowEntry *flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes); + uint32_t oflow_bytes, + const boost::uuids::uuid &u); void AddDBEntryEvent(const DBEntry *entry, uint32_t gen_id); void ChangeDBEntryEvent(const DBEntry *entry, uint32_t gen_id); void DeleteDBEntryEvent(const DBEntry *entry, uint32_t gen_id); @@ -1145,7 +1146,7 @@ class FlowMgmtManager { // Handle Delete of a flow. Updates FlowMgmtKeyTree for all objects void DeleteFlow(FlowEntryPtr &flow, const RevFlowDepParams &p); void UpdateFlowStats(FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes); + uint32_t oflow_bytes, const boost::uuids::uuid &u); // Add a FlowMgmtKey into the FlowMgmtKeyTree for an object // The FlowMgmtKeyTree for object is passed as argument diff --git a/src/vnsw/agent/pkt/flow_mgmt_request.h b/src/vnsw/agent/pkt/flow_mgmt_request.h index e6e03f96c0a..f3f683467ae 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_request.h +++ b/src/vnsw/agent/pkt/flow_mgmt_request.h @@ -41,9 +41,11 @@ class FlowMgmtRequest { } FlowMgmtRequest(Event event, FlowEntry *flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes) : + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u) : event_(event), flow_(flow), db_entry_(NULL), vrf_id_(0), gen_id_(), - bytes_(bytes), packets_(packets), oflow_bytes_(oflow_bytes), params_() { + bytes_(bytes), packets_(packets), oflow_bytes_(oflow_bytes), params_(), + flow_uuid_(u) { if (event == RETRY_DELETE_VRF) assert(vrf_id_); } @@ -109,6 +111,7 @@ class FlowMgmtRequest { void set_params(const RevFlowDepParams ¶ms) { params_ = params; } + boost::uuids::uuid flow_uuid() const { return flow_uuid_; } private: Event event_; @@ -123,6 +126,7 @@ class FlowMgmtRequest { uint32_t packets_; uint32_t oflow_bytes_; RevFlowDepParams params_; + boost::uuids::uuid flow_uuid_; DISALLOW_COPY_AND_ASSIGN(FlowMgmtRequest); }; diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index ddd35bd3783..aa0aa20fb2b 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -754,7 +754,8 @@ void FlowTable::ProcessKSyncFlowEvent(const FlowEventKSync *req, mgr->FlowStatsUpdateEvent(evicted_flow.get(), req->evict_flow_bytes(), req->evict_flow_packets(), - req->evict_flow_oflow()); + req->evict_flow_oflow(), + evicted_flow->uuid()); } } diff --git a/src/vnsw/agent/pkt/pkt_flow_info.cc b/src/vnsw/agent/pkt/pkt_flow_info.cc index dfeaee95a07..666f09ef912 100644 --- a/src/vnsw/agent/pkt/pkt_flow_info.cc +++ b/src/vnsw/agent/pkt/pkt_flow_info.cc @@ -1674,10 +1674,11 @@ void PktFlowInfo::UpdateEvictedFlowStats(const PktInfo *pkt) { FlowMgmtManager *mgr = agent->pkt()->flow_mgmt_manager( flow_table->table_index()); + /* Enqueue stats update request with UUID of the flow */ if (flow.get() && flow->deleted() == false) { mgr->FlowStatsUpdateEvent(flow.get(), pkt->agent_hdr.cmd_param_2, pkt->agent_hdr.cmd_param_3, - pkt->agent_hdr.cmd_param_4); + pkt->agent_hdr.cmd_param_4, flow->uuid()); } } diff --git a/src/vnsw/agent/pkt/test/test_flow_table.cc b/src/vnsw/agent/pkt/test/test_flow_table.cc index 6faaed1ce99..b985de0fe2f 100644 --- a/src/vnsw/agent/pkt/test/test_flow_table.cc +++ b/src/vnsw/agent/pkt/test/test_flow_table.cc @@ -288,6 +288,65 @@ TEST_F(TestFlowTable, EvictPktTrapBeforeReverseFlowResp) { client->WaitForIdle(); } +TEST_F(TestFlowTable, ResetFlowStats) { + TxTcpMplsPacket(eth->id(), remote_compute, router_id_, vif0->label(), + remote_vm1_ip, vm1_ip, 1000, 200, 1, 1); + client->WaitForIdle(); + + FlowEntry *flow = FlowGet(remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, vif0->flow_key_nh()->id(), 1); + EXPECT_TRUE(flow != NULL); + FlowEntry *rflow = flow->reverse_flow_entry(); + EXPECT_TRUE(rflow != NULL); + + //Invoke FlowStatsCollector to check whether flow gets evicted + util_.EnqueueFlowStatsCollectorTask(); + client->WaitForIdle(); + + //Verify the stats of flow + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 1, 30, vif0->flow_key_nh()->id(), 1)); + + //Change the stats + KSyncSockTypeMap::IncrFlowStats(1, 1, 30); + + //Invoke FlowStatsCollector to enqueue delete for evicted flow. + util_.EnqueueFlowStatsCollectorTask(); + client->WaitForIdle(); + + //Verify the stats of flow + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 2, 60, vif0->flow_key_nh()->id(), 1)); + + TxTcpMplsPacket(eth->id(), remote_compute, router_id_, vif0->label(), + remote_vm1_ip, vm1_ip, 1000, 200, 1, 10); + client->WaitForIdle(); + + flow = FlowGet(remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, vif0->flow_key_nh()->id(), 10); + EXPECT_TRUE(flow != NULL); + + //Verify that flow-handle is updated in flow-stats module + FlowStatsCollector *fsc = flow->fsc(); + EXPECT_TRUE(fsc != NULL); + FlowExportInfo *info = fsc->FindFlowExportInfo(flow); + EXPECT_TRUE(info != NULL); + EXPECT_TRUE(info->flow_handle() == 10); + + //Verify the stats of flow are reset after change of flow-handle + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 0, 0, vif0->flow_key_nh()->id(), 10)); + + //Invoke FlowStatsCollector to enqueue delete for reverse flow which is + //marked as short flow + util_.EnqueueFlowStatsCollectorTask(); + client->WaitForIdle(); + + //Verify the stats of flow + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 1, 30, vif0->flow_key_nh()->id(), 10)); +} + int main(int argc, char *argv[]) { int ret = 0; GETUSERARGS(); diff --git a/src/vnsw/agent/test-xml/test_xml.cc b/src/vnsw/agent/test-xml/test_xml.cc index 8a134d0938d..703b3416706 100644 --- a/src/vnsw/agent/test-xml/test_xml.cc +++ b/src/vnsw/agent/test-xml/test_xml.cc @@ -35,7 +35,7 @@ class FlowExportTask : public Task { return true; } - fec->ExportFlow(info, bytes_, pkts_, NULL); + fec->ExportFlow(info, bytes_, pkts_, NULL, true); return true; } std::string Description() const { return "FlowExportTask"; } diff --git a/src/vnsw/agent/test/test_cmn_util.h b/src/vnsw/agent/test/test_cmn_util.h index 2f54fc41fc0..c69b018a90d 100644 --- a/src/vnsw/agent/test/test_cmn_util.h +++ b/src/vnsw/agent/test/test_cmn_util.h @@ -369,7 +369,8 @@ FlowEntry* FlowGet(int vrf_id, std::string sip, std::string dip, uint8_t proto, uint16_t sport, uint16_t dport, int nh_id); bool FlowStatsMatch(const string &vrf_name, const char *sip, const char *dip, uint8_t proto, uint16_t sport, uint16_t dport, - uint64_t pkts, uint64_t bytes, int nh_id); + uint64_t pkts, uint64_t bytes, int nh_id, + uint32_t flow_handle = FlowEntry::kInvalidFlowHandle); bool FindFlow(const string &vrf_name, const char *sip, const char *dip, uint8_t proto, uint16_t sport, uint16_t dport, bool nat, const string &nat_vrf_name, const char *nat_sip, diff --git a/src/vnsw/agent/test/test_util.cc b/src/vnsw/agent/test/test_util.cc index af20358126a..ce75bf1142a 100644 --- a/src/vnsw/agent/test/test_util.cc +++ b/src/vnsw/agent/test/test_util.cc @@ -3452,7 +3452,8 @@ bool FlowGet(const string &vrf_name, const char *sip, const char *dip, bool FlowStatsMatch(const string &vrf_name, const char *sip, const char *dip, uint8_t proto, uint16_t sport, - uint16_t dport, uint64_t pkts, uint64_t bytes, int nh_id) { + uint16_t dport, uint64_t pkts, uint64_t bytes, int nh_id, + uint32_t flow_handle) { Agent *agent = Agent::GetInstance(); VrfEntry *vrf = agent->vrf_table()->FindVrfFromName(vrf_name); EXPECT_TRUE(vrf != NULL); @@ -3468,7 +3469,17 @@ bool FlowStatsMatch(const string &vrf_name, const char *sip, key.protocol = proto; key.family = key.src_addr.is_v4() ? Address::INET : Address::INET6; - FlowEntry *fe = agent->pkt()->get_flow_proto()->Find(key, 0); + FlowEntry *fe = NULL; + if (flow_handle != FlowEntry::kInvalidFlowHandle) { + FlowTable *table = agent->pkt()->get_flow_proto()->GetFlowTable(key, + flow_handle); + if (table == NULL) { + return NULL; + } + fe = table->Find(key); + } else { + fe = agent->pkt()->get_flow_proto()->Find(key, 0); + } EXPECT_TRUE(fe != NULL); if (fe == NULL) { return false; diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc index 2243c4bf0a1..54112a09ece 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc @@ -5,14 +5,18 @@ FlowExportInfo::FlowExportInfo() : flow_(), setup_time_(0), teardown_time_(0), last_modified_time_(0), bytes_(0), packets_(0), underlay_source_port_(0), changed_(false), tcp_flags_(0), delete_enqueue_time_(0), evict_enqueue_time_(0), - exported_atleast_once_(false) { + exported_atleast_once_(false), gen_id_(0), + flow_handle_(FlowEntry::kInvalidFlowHandle), + rev_flow_egress_uuid_(nil_uuid()) { } FlowExportInfo::FlowExportInfo(const FlowEntryPtr &fe) : flow_(fe), setup_time_(0), teardown_time_(0), last_modified_time_(0), bytes_(0), packets_(0), underlay_source_port_(0), changed_(true), tcp_flags_(0), delete_enqueue_time_(0), evict_enqueue_time_(0), - exported_atleast_once_(false) { + exported_atleast_once_(false), gen_id_(0), + flow_handle_(FlowEntry::kInvalidFlowHandle), + rev_flow_egress_uuid_(nil_uuid()) { } FlowExportInfo::FlowExportInfo(const FlowEntryPtr &fe, uint64_t setup_time) : @@ -20,7 +24,9 @@ FlowExportInfo::FlowExportInfo(const FlowEntryPtr &fe, uint64_t setup_time) : teardown_time_(0), last_modified_time_(setup_time), bytes_(0), packets_(0), underlay_source_port_(0), changed_(true), tcp_flags_(0), delete_enqueue_time_(0), evict_enqueue_time_(0), - exported_atleast_once_(false) { + exported_atleast_once_(false), gen_id_(0), + flow_handle_(FlowEntry::kInvalidFlowHandle), + rev_flow_egress_uuid_(nil_uuid()) { } FlowEntry* FlowExportInfo::reverse_flow() const { @@ -39,6 +45,22 @@ bool FlowExportInfo::IsActionLog() const { return false; } +void FlowExportInfo::CopyFlowInfo(FlowEntry *fe) { + gen_id_ = fe->gen_id(); + flow_handle_ = fe->flow_handle(); + uuid_ = fe->uuid(); + flags_ = fe->flags(); + FlowEntry *rflow = reverse_flow(); + if (rflow) { + rev_flow_egress_uuid_ = rflow->egress_uuid(); + } +} + +void FlowExportInfo::ResetStats() { + bytes_ = packets_ = 0; + tcp_flags_ = 0; + underlay_source_port_ = 0; +} /////////////////////////////////////////////////////////////////// // APIs used only by UT ////////////////////////////////////////////////////////////////// diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h index e2e321501dd..a88ceec9bc5 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h @@ -50,6 +50,20 @@ class FlowExportInfo { uint64_t delete_enqueue_time() const { return delete_enqueue_time_; } void set_evict_enqueue_time(uint64_t value) { evict_enqueue_time_ = value; } uint64_t evict_enqueue_time() const { return evict_enqueue_time_; } + uint8_t gen_id() const { return gen_id_; } + void set_gen_id(uint8_t value) { gen_id_ = value; } + uint32_t flow_handle() const { return flow_handle_; } + void set_flow_handle(uint32_t value) { flow_handle_ = value; } + const boost::uuids::uuid &uuid() const { return uuid_; } + const boost::uuids::uuid &rev_flow_egress_uuid() const { + return rev_flow_egress_uuid_; + } + uint32_t flags() const { return flags_; } + bool is_flags_set(const FlowEntry::FlowEntryFlags &flags) const { + return (flags_ & flags); + } + void CopyFlowInfo(FlowEntry *fe); + void ResetStats(); private: FlowEntryPtr flow_; uint64_t setup_time_; @@ -67,6 +81,11 @@ class FlowExportInfo { uint64_t delete_enqueue_time_; uint64_t evict_enqueue_time_; bool exported_atleast_once_; + uint8_t gen_id_; + uint32_t flow_handle_; + boost::uuids::uuid uuid_; + boost::uuids::uuid rev_flow_egress_uuid_; + uint32_t flags_; }; #endif // __AGENT_FLOW_EXPORT_INFO_H__ diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h b/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h index 9074218d1e2..2b1d6d4b677 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h @@ -30,9 +30,10 @@ class FlowExportReq { } FlowExportReq(Event event, const FlowExportInfo &info, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes) : + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u) : event_(event), info_(info), bytes_(bytes), packets_(packets), - oflow_bytes_(oflow_bytes) { + oflow_bytes_(oflow_bytes), uuid_(u) { } ~FlowExportReq() { } @@ -44,6 +45,7 @@ class FlowExportReq { uint32_t packets() const { return packets_;} uint32_t oflow_bytes() const { return oflow_bytes_;} const RevFlowDepParams& params() const { return params_; } + boost::uuids::uuid uuid() const { return uuid_; } private: Event event_; @@ -53,6 +55,7 @@ class FlowExportReq { uint32_t packets_; uint32_t oflow_bytes_; RevFlowDepParams params_; + boost::uuids::uuid uuid_; DISALLOW_COPY_AND_ASSIGN(FlowExportReq); }; #endif // __AGENT_FLOW_EXPORT_REQUEST_H__ diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index 2311d3d91dd..e3bac022ae7 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -142,14 +142,15 @@ void FlowStatsCollector::UpdateEntriesToVisit() { bool FlowStatsCollector::ShouldBeAged(FlowExportInfo *info, const vr_flow_entry *k_flow, + const vr_flow_stats &k_stats, uint64_t curr_time) { FlowEntry *flow = info->flow(); //If both forward and reverse flow are marked //as TCP closed then immediately remote the flow if (k_flow != NULL) { uint64_t k_flow_bytes, bytes; - k_flow_bytes = GetFlowStats(k_flow->fe_stats.flow_bytes_oflow, - k_flow->fe_stats.flow_bytes); + k_flow_bytes = GetFlowStats(k_stats.flow_bytes_oflow, + k_stats.flow_bytes); bytes = 0x0000ffffffffffffULL & info->bytes(); /* Don't account for agent overflow bits while comparing change in * stats */ @@ -308,20 +309,31 @@ void FlowStatsCollector::UpdateStatsAndExportFlow(FlowExportInfo *info, return; } FlowEntry *fe = info->flow(); - const vr_flow_entry *k_flow = ksync_obj->GetValidKFlowEntry(fe->key(), - fe->flow_handle(), - fe->gen_id()); + bool read_flow = true; + if (info->uuid() != fe->uuid()) { + /* If UUID for a flow has changed, don't read fields of FlowEntry while + * sending FlowExport message to collector */ + read_flow = false; + } + /* Fetch vrouter Flow entry using gen_id and flow_handle from FlowExportInfo + * to account for the case where FlowEntry's flow_handle/gen_id has changed + * during Delete processing by FlowStatsCollector */ + vr_flow_stats k_stats; + const vr_flow_entry *k_flow = ksync_obj->GetKFlowStats(fe->key(), + info->flow_handle(), + info->gen_id(), + &k_stats); if (k_flow) { - UpdateAndExportInternal(info, k_flow->fe_stats.flow_bytes, - k_flow->fe_stats.flow_bytes_oflow, - k_flow->fe_stats.flow_packets, - k_flow->fe_stats.flow_packets_oflow, - teardown_time, true, p); + UpdateAndExportInternal(info, k_stats.flow_bytes, + k_stats.flow_bytes_oflow, + k_stats.flow_packets, + k_stats.flow_packets_oflow, + teardown_time, true, p, read_flow); return; } /* If reading of stats fails, send a message with just teardown time */ info->set_teardown_time(teardown_time); - ExportFlow(info, 0, 0, p); + ExportFlow(info, 0, 0, p, read_flow); } void FlowStatsCollector::FlowDeleteEnqueue(FlowExportInfo *info, uint64_t t) { @@ -389,7 +401,7 @@ void FlowStatsCollector::UpdateAndExportInternalLocked(FlowExportInfo *info, FlowEntry *rflow = info->reverse_flow(); FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE); UpdateAndExportInternal(info, bytes, oflow_bytes, pkts, oflow_pkts, time, - teardown_time, p); + teardown_time, p, true); } void FlowStatsCollector::UpdateAndExportInternal(FlowExportInfo *info, @@ -399,11 +411,12 @@ void FlowStatsCollector::UpdateAndExportInternal(FlowExportInfo *info, uint16_t oflow_pkts, uint64_t time, bool teardown_time, - const RevFlowDepParams *p) { + const RevFlowDepParams *p, + bool read_flow) { uint64_t diff_bytes, diff_pkts; UpdateFlowStatsInternal(info, bytes, oflow_bytes, pkts, oflow_pkts, time, teardown_time, &diff_bytes, &diff_pkts); - ExportFlow(info, diff_bytes, diff_pkts, p); + ExportFlow(info, diff_bytes, diff_pkts, p, read_flow); } // Scan for max_count entries in flow-table @@ -421,19 +434,37 @@ uint32_t FlowStatsCollector::RunAgeing(uint32_t max_count) { info = &it->second; FlowEntry *fe = info->flow(); FlowEntry *rfe = info->reverse_flow(); - uint32_t flow_handle; - uint16_t gen_id; - { - FlowEntry *rflow = NULL; - FLOW_LOCK(fe, rflow, FlowEvent::FLOW_MESSAGE); - // since flow processing and stats collector can run in parallel - // flow handle and gen id not being the key for flow entry can - // change while processing, so flow handle and gen id should be - // fetched by holding an lock and should not be re-fetched again - // during the entry processing - flow_handle = fe->flow_handle(); - gen_id = fe->gen_id(); + + /* Use flow-handle and gen-id from FlowExportInfo instead of FlowEntry. + * The stats that FlowExportInfo holds corresponds to a given + * (FlowKey, gen-id and FlowHandle). Since gen-id/flow-handle for a flow + * can change dynamically, we need to pick gen-id and flow-handle from + * FlowExportInfo. Otherwise stats will go wrong. Whenever gen-id/ + * flow-handle changes, the stats will be reset as part of AddFlow API + */ + uint32_t flow_handle = info->flow_handle(); + uint16_t gen_id = info->gen_id(); + + /* If Flow handle is still not populated in FlowStatsCollector, pick the + * value from FlowEntry + */ + if (flow_handle == FlowEntry::kInvalidFlowHandle) { + { + FlowEntry *rflow = NULL; + FLOW_LOCK(fe, rflow, FlowEvent::FLOW_MESSAGE); + // since flow processing and stats collector can run in parallel + // flow handle and gen id not being the key for flow entry can + // change while processing, so flow handle and gen id should be + // fetched by holding an lock. + flow_handle = fe->flow_handle(); + gen_id = fe->gen_id(); + info->CopyFlowInfo(fe); + } } + const vr_flow_entry *k_flow = NULL; + vr_flow_stats k_stats; + KFlowData kinfo; + it++; // if we come across deleted entry, retry flow deletion after some time @@ -447,21 +478,29 @@ uint32_t FlowStatsCollector::RunAgeing(uint32_t max_count) { continue; } + // if we come across evicted entry, retry flow eviction after some time + // duplicate eviction will be suppressed in flow_table + uint64_t evict_time = info->evict_enqueue_time(); + if (evict_time) { + if ((curr_time - evict_time) > kFlowDeleteRetryTime) { + FlowEvictEnqueue(info, curr_time, flow_handle, gen_id); + } + continue; + } + count++; - const vr_flow_entry *k_flow = ksync_obj->GetValidKFlowEntry - (fe->key(), flow_handle, gen_id); - - if ((fe->key().protocol == IPPROTO_TCP) && - ksync_obj->IsEvictionMarked(k_flow)) { - uint64_t evict_time = info->evict_enqueue_time(); - if (evict_time) { - if ((curr_time - evict_time) > kFlowDeleteRetryTime) { - FlowEvictEnqueue(info, curr_time, flow_handle, gen_id); - } + /* Teardown time is set when Evicted flow stats update message is + * received. For flows whose teardown time is set, we need not read + * stats from vrouter + */ + if (!info->teardown_time()) { + k_flow = ksync_obj->GetKFlowStatsAndInfo(fe->key(), flow_handle, + gen_id, &k_stats, &kinfo); + if ((fe->key().protocol == IPPROTO_TCP) && + ksync_obj->IsEvictionMarked(k_flow, kinfo.flags)) { + FlowEvictEnqueue(info, curr_time, flow_handle, gen_id); continue; } - FlowEvictEnqueue(info, curr_time, flow_handle, gen_id); - continue; } FlowExportInfo *rev_info = NULL; @@ -478,15 +517,19 @@ uint32_t FlowStatsCollector::RunAgeing(uint32_t max_count) { bool deleted = false; // Can the flow be aged? - if (ShouldBeAged(info, k_flow, curr_time)) { + if (ShouldBeAged(info, k_flow, k_stats, curr_time)) { rev_info = FindFlowExportInfo(rfe); - // ShouldBeAged looks at one flow only. So, check for both forward and - // reverse flows + // ShouldBeAged looks at one flow only. So, check for both forward + // and reverse flows if (rev_info) { - const vr_flow_entry *k_flow_rev; - k_flow_rev = ksync_obj->GetValidKFlowEntry - (rfe->key(), rfe->flow_handle(), rfe->gen_id()); - if (ShouldBeAged(rev_info, k_flow_rev, curr_time)) { + const vr_flow_entry *k_flow_rev = NULL; + vr_flow_stats k_rflow_stats; + k_flow_rev = ksync_obj->GetKFlowStats(rfe->key(), + rev_info->flow_handle(), + rev_info->gen_id(), + &k_rflow_stats); + if (ShouldBeAged(rev_info, k_flow_rev, k_rflow_stats, + curr_time)) { deleted = true; } } else { @@ -507,29 +550,25 @@ uint32_t FlowStatsCollector::RunAgeing(uint32_t max_count) { // Stats for deleted flow are updated when we get DELETE message if (deleted == false && k_flow) { uint64_t k_bytes, bytes; - /* Copy full stats in one shot and use local copy instead of reading - * individual stats from shared memory directly to minimize the - * inconsistency */ - struct vr_flow_stats fe_stats = k_flow->fe_stats; - k_bytes = GetFlowStats(fe_stats.flow_bytes_oflow, - fe_stats.flow_bytes); + k_bytes = GetFlowStats(k_stats.flow_bytes_oflow, + k_stats.flow_bytes); bytes = 0x0000ffffffffffffULL & info->bytes(); /* Always copy udp source port even though vrouter does not change * it. Vrouter many change this behavior and recompute source port * whenever flow action changes. To keep agent independent of this, * always copy UDP source port */ - info->set_underlay_source_port(k_flow->fe_udp_src_port); - info->set_tcp_flags(k_flow->fe_tcp_flags); + info->set_underlay_source_port(kinfo.underlay_src_port); + info->set_tcp_flags(kinfo.tcp_flags); /* Don't account for agent overflow bits while comparing change in * stats */ if (bytes != k_bytes) { UpdateAndExportInternalLocked(info, - fe_stats.flow_bytes, - fe_stats.flow_bytes_oflow, - fe_stats.flow_packets, - fe_stats.flow_packets_oflow, - curr_time, false, NULL); + k_stats.flow_bytes, + k_stats.flow_bytes_oflow, + k_stats.flow_packets, + k_stats.flow_packets_oflow, + curr_time, false, NULL); } else if (info->changed()) { /* export flow (reverse) for which traffic is not seen yet. */ ExportFlowLocked(info, 0, 0, NULL); @@ -611,11 +650,12 @@ void FlowStatsCollector::DeleteEvent(const FlowEntryPtr &flow, void FlowStatsCollector::UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { FlowExportInfo info(flow); boost::shared_ptr req(new FlowExportReq(FlowExportReq::UPDATE_FLOW_STATS, info, bytes, - packets, oflow_bytes)); + packets, oflow_bytes, u)); request_queue_.Enqueue(req); } @@ -736,7 +776,7 @@ void FlowStatsCollector::ExportFlowLocked(FlowExportInfo *info, FlowEntry *flow = info->flow(); FlowEntry *rflow = info->reverse_flow(); FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE); - ExportFlow(info, diff_bytes, diff_pkts, params); + ExportFlow(info, diff_bytes, diff_pkts, params, true); } /* Flow Export Algorithm @@ -755,11 +795,23 @@ void FlowStatsCollector::ExportFlowLocked(FlowExportInfo *info, void FlowStatsCollector::ExportFlow(FlowExportInfo *info, uint64_t diff_bytes, uint64_t diff_pkts, - const RevFlowDepParams *params) { + const RevFlowDepParams *params, + bool read_flow) { assert((agent_uve_->agent()->tsn_enabled() == false)); FlowEntry *flow = info->flow(); FlowEntry *rflow = info->reverse_flow(); + /* Drop Deleted Flow export messages if add for it was never exported. This + * includes only those delete messages where we cannot read FlowEntry + * because deleted msg is for a Flow UUID which is different from the UUID + * in FlowEntry pointer + */ + if (!info->exported_atleast_once() && !read_flow) { + assert(info->teardown_time()); + flow_stats_manager_->deleted_flow_export_drops_++; + return; + } + int32_t cfg_rate = agent_uve_->agent()->oper_db()->global_vrouter()-> flow_export_rate(); /* We should always try to export flows with Action as LOG regardless of @@ -825,49 +877,51 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, s_flow.set_packets(info->packets()); s_flow.set_diff_bytes(diff_bytes); s_flow.set_diff_packets(diff_pkts); - s_flow.set_tcp_flags(info->tcp_flags()); - - s_flow.set_sourceip(flow->key().src_addr); - s_flow.set_destip(flow->key().dst_addr); - s_flow.set_protocol(flow->key().protocol); - s_flow.set_sport(flow->key().src_port); - s_flow.set_dport(flow->key().dst_port); - s_flow.set_sourcevn(flow->data().source_vn_match); - s_flow.set_destvn(flow->data().dest_vn_match); - s_flow.set_vm(flow->data().vm_cfg_name); - if (flow->is_flags_set(FlowEntry::ReverseFlow)) { - s_flow.set_forward_flow(false); - } else { - s_flow.set_forward_flow(true); + s_flow.set_setup_time(info->setup_time()); + if (info->teardown_time()) { + s_flow.set_teardown_time(info->teardown_time()); } + info->set_changed(false); - string drop_reason = FlowEntry::DropReasonStr(flow->data().drop_reason); - s_flow.set_drop_reason(drop_reason); + if (read_flow) { + s_flow.set_tcp_flags(info->tcp_flags()); + s_flow.set_sourceip(flow->key().src_addr); + s_flow.set_destip(flow->key().dst_addr); + s_flow.set_protocol(flow->key().protocol); + s_flow.set_sport(flow->key().src_port); + s_flow.set_dport(flow->key().dst_port); + s_flow.set_sourcevn(flow->data().source_vn_match); + s_flow.set_destvn(flow->data().dest_vn_match); + s_flow.set_vm(flow->data().vm_cfg_name); + if (info->is_flags_set(FlowEntry::ReverseFlow)) { + s_flow.set_forward_flow(false); + } else { + s_flow.set_forward_flow(true); + } - s_flow.set_sg_rule_uuid(flow->sg_rule_uuid()); - s_flow.set_nw_ace_uuid(flow->nw_ace_uuid()); - if (flow->intf_entry()) { - s_flow.set_vmi_uuid(UuidToString(flow->intf_entry()->GetUuid())); - } + string drop_reason = FlowEntry::DropReasonStr(flow->data().drop_reason); + s_flow.set_drop_reason(drop_reason); - if (rflow) { - s_flow.set_reverse_uuid(to_string(rflow->uuid())); - } else if (params) { - s_flow.set_reverse_uuid(to_string(params->rev_uuid_)); - } + s_flow.set_sg_rule_uuid(flow->sg_rule_uuid()); + s_flow.set_nw_ace_uuid(flow->nw_ace_uuid()); + if (flow->intf_entry()) { + s_flow.set_vmi_uuid(UuidToString(flow->intf_entry()->GetUuid())); + } - // Set flow action - std::string action_str; - GetFlowSandeshActionParams(flow->data().match_p.action_info, action_str); - s_flow.set_action(action_str); - s_flow.set_setup_time(info->setup_time()); - if (info->teardown_time()) { - s_flow.set_teardown_time(info->teardown_time()); + if (rflow) { + s_flow.set_reverse_uuid(to_string(rflow->uuid())); + } else if (params) { + s_flow.set_reverse_uuid(to_string(params->rev_uuid_)); + } + + // Set flow action + std::string action_str; + GetFlowSandeshActionParams(flow->data().match_p.action_info, action_str); + s_flow.set_action(action_str); + SetUnderlayInfo(info, s_flow); } - SetUnderlayInfo(info, s_flow); - info->set_changed(false); - if (flow->is_flags_set(FlowEntry::LocalFlow)) { + if (info->is_flags_set(FlowEntry::LocalFlow)) { /* For local flows we need to send two flow log messages. * 1. With direction as ingress * 2. With direction as egress @@ -876,8 +930,10 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, * direction as egress. */ s_flow.set_direction_ing(1); - s_flow.set_reverse_uuid(to_string(flow->egress_uuid())); - SourceIpOverride(info, s_flow, params); + if (read_flow) { + s_flow.set_reverse_uuid(to_string(flow->egress_uuid())); + SourceIpOverride(info, s_flow, params); + } EnqueueFlowMsg(); FlowLogData &s_flow2 = msg_list_[GetFlowMsgIdx()]; @@ -893,7 +949,11 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, //while exporting rev flow, this done so that //key, stats and other stuff can be copied over //from current flow - SetImplicitFlowDetails(info, s_flow2, params); + if (read_flow) { + SetImplicitFlowDetails(info, s_flow2, params); + } else { + s_flow2.set_flowuuid(to_string(info->rev_flow_egress_uuid())); + } //Export local flow of egress direction with a different UUID even when //the flow is same. Required for analytics module to query flows //irrespective of direction. @@ -901,9 +961,11 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, flow_stats_manager_->UpdateFlowExportStats(2, subject_flows_to_algorithm); } else { - if (flow->is_flags_set(FlowEntry::IngressDir)) { + if (info->is_flags_set(FlowEntry::IngressDir)) { s_flow.set_direction_ing(1); - SourceIpOverride(info, s_flow, params); + if (read_flow) { + SourceIpOverride(info, s_flow, params); + } } else { s_flow.set_direction_ing(0); } @@ -1034,7 +1096,7 @@ bool FlowStatsCollector::RequestHandler(boost::shared_ptr req) { case FlowExportReq::UPDATE_FLOW_STATS: { EvictedFlowStatsUpdate(flow, req->bytes(), req->packets(), - req->oflow_bytes()); + req->oflow_bytes(), req->uuid()); /* ExportFlow will enqueue FlowLog message for send. If we have not hit * max messages to be sent, it will not dispatch. Invoke * DispatchPendingFlowMsg to send any enqueued messages in the queue @@ -1110,17 +1172,41 @@ void FlowStatsCollector::NewFlow(const FlowExportInfo &info) { } void FlowStatsCollector::AddFlow(FlowExportInfo info) { - FlowEntryTree::iterator it = flow_tree_.find(info.flow()); + /* Before inserting update the gen_id and flow_handle in FlowExportInfo. + * Locks for accessing fields of flow are taken in calling function. + */ + FlowEntry* fe = info.flow(); + info.CopyFlowInfo(fe); + FlowEntryTree::iterator it = flow_tree_.find(fe); if (it != flow_tree_.end()) { - it->second.set_changed(true); - it->second.set_delete_enqueue_time(0); - it->second.set_evict_enqueue_time(0); + FlowExportInfo &prev = it->second; + if (prev.uuid() != fe->uuid()) { + /* Received ADD request for already added entry with a different + * UUID. Because of state-compression of messages to + * FlowStatsCollector in FlowMgmt, we have not received DELETE for + * previous UUID. Send FlowExport to indicate delete for the flow. + * This export need not be sent if teardown time is already set. + * Teardown time would be set if EvictedFlowStats update request + * comes before this duplicate add. + */ + if (!prev.teardown_time()) { + UpdateStatsAndExportFlow(&prev, info.setup_time(), NULL); + } + /* After sending Delete to collector (if required), reset the stats + */ + prev.ResetStats(); + } + prev.CopyFlowInfo(fe); + prev.set_changed(true); + prev.set_delete_enqueue_time(0); + prev.set_evict_enqueue_time(0); + prev.set_teardown_time(0); return; } /* Invoke NewFlow only if the entry is not present in our tree */ NewFlow(info); - flow_tree_.insert(make_pair(info.flow(), info)); + flow_tree_.insert(make_pair(fe, info)); } void FlowStatsCollector::DeleteFlow(const FlowEntryPtr &flow) { @@ -1134,16 +1220,23 @@ void FlowStatsCollector::DeleteFlow(const FlowEntryPtr &flow) { void FlowStatsCollector::EvictedFlowStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { - FlowEntry *fe = flow.get(); - FlowExportInfo *info = FindFlowExportInfo(fe); + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { + FlowExportInfo *info = FindFlowExportInfo(flow.get()); if (info) { + /* Ignore stats update request for Evicted flow, if we don't have + * FlowEntry corresponding to the Evicted Flow. The match is done using + * UUID + */ + if (info->uuid() != u) { + return; + } /* We are updating stats of evicted flow. Set teardown_time here. * When delete event is being handled we don't export flow if * teardown time is set */ UpdateAndExportInternal(info, bytes, oflow_bytes & 0xFFFF, packets, oflow_bytes & 0xFFFF0000, - GetCurrentTime(), true, NULL); + GetCurrentTime(), true, NULL, false); } } diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h index ec68e588af3..57e19c82d5d 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h @@ -24,6 +24,13 @@ class FlowStatsRecordsReq; class FetchFlowStatsRecord; class FlowStatsManager; +struct KFlowData { +public: + uint16_t underlay_src_port; + uint16_t tcp_flags; + uint16_t flags; +}; + //Defines the functionality to periodically read flow stats from //shared memory (between agent and Kernel) and export this stats info to //collector. Also responsible for aging of flow entries. Runs in the context @@ -128,11 +135,13 @@ class FlowStatsCollector : public StatsCollector { FlowExportInfo *FindFlowExportInfo(const FlowEntry *fe); const FlowExportInfo *FindFlowExportInfo(const FlowEntry *fe) const; void ExportFlow(FlowExportInfo *info, uint64_t diff_bytes, - uint64_t diff_pkts, const RevFlowDepParams *params); + uint64_t diff_pkts, const RevFlowDepParams *params, + bool read_flow); void UpdateFloatingIpStats(const FlowExportInfo *flow, uint64_t bytes, uint64_t pkts); void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes); + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u); size_t Size() const { return flow_tree_.size(); } void NewFlow(const FlowExportInfo &info); void set_deleted(bool val) { @@ -164,10 +173,9 @@ class FlowStatsCollector : public StatsCollector { void UpdateEntriesToVisit(); void UpdateStatsAndExportFlow(FlowExportInfo *info, uint64_t teardown_time, const RevFlowDepParams *params); - void EvictedFlowStatsUpdate(const FlowEntryPtr &flow, - uint32_t bytes, - uint32_t packets, - uint32_t oflow_bytes); + void EvictedFlowStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u); void UpdateAndExportInternal(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, @@ -175,7 +183,8 @@ class FlowStatsCollector : public StatsCollector { uint16_t oflow_pkts, uint64_t time, bool teardown_time, - const RevFlowDepParams *params); + const RevFlowDepParams *params, + bool read_flow); void UpdateAndExportInternalLocked(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, @@ -207,7 +216,7 @@ class FlowStatsCollector : public StatsCollector { uint64_t bytes, uint64_t pkts); uint64_t GetFlowStats(const uint16_t &oflow_data, const uint32_t &data); bool ShouldBeAged(FlowExportInfo *info, const vr_flow_entry *k_flow, - uint64_t curr_time); + const vr_flow_stats &k_stats, uint64_t curr_time); uint64_t GetUpdatedFlowPackets(const FlowExportInfo *stats, uint64_t k_flow_pkts); uint64_t GetUpdatedFlowBytes(const FlowExportInfo *stats, diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc index eef55c68af8..2027ebfb3b8 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc @@ -82,7 +82,8 @@ FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent), flow_export_count_(), prev_flow_export_rate_compute_time_(0), flow_export_rate_(0), threshold_(kDefaultFlowSamplingThreshold), flow_export_disable_drops_(), flow_export_sampling_drops_(), - flow_export_drops_(), prev_cfg_flow_export_rate_(0), + flow_export_drops_(), deleted_flow_export_drops_(), + prev_cfg_flow_export_rate_(0), timer_(TimerManager::CreateTimer(*(agent_->event_manager())->io_service(), "FlowThresholdTimer", TaskScheduler::GetInstance()->GetTaskId("Agent::FlowStatsManager"), 0)), @@ -91,6 +92,7 @@ FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent), flow_export_disable_drops_ = 0; flow_export_sampling_drops_ = 0; flow_export_drops_ = 0; + deleted_flow_export_drops_ = 0; flows_sampled_atleast_once_ = false; request_queue_.set_measure_busy_time(agent->MeasureQueueDelay()); for (uint16_t i = 0; i < sizeof(protocol_list_)/sizeof(protocol_list_[0]); @@ -303,7 +305,8 @@ void FlowStatsManager::DeleteEvent(const FlowEntryPtr &flow, void FlowStatsManager::UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { if (flow == NULL) { return; } @@ -315,7 +318,7 @@ void FlowStatsManager::UpdateStatsEvent(const FlowEntryPtr &flow, return; } - fsc->UpdateStatsEvent(flow, bytes, packets, oflow_bytes); + fsc->UpdateStatsEvent(flow, bytes, packets, oflow_bytes, u); } void FlowStatsManager::FreeIndex(uint32_t idx) { diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h index c635c0da00b..a29a9c5ed46 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h @@ -97,7 +97,8 @@ class FlowStatsManager { void AddEvent(FlowEntryPtr &flow); void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams ¶ms); void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes); + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u); void Init(uint64_t flow_stats_interval, uint64_t flow_cache_timeout); void InitDone(); @@ -149,6 +150,10 @@ class FlowStatsManager { return flow_export_drops_; } + uint32_t deleted_flow_export_drops() const { + return deleted_flow_export_drops_; + } + uint64_t threshold() const { return threshold_;} bool delete_short_flow() const { return delete_short_flow_; @@ -186,6 +191,7 @@ class FlowStatsManager { tbb::atomic flow_export_sampling_drops_; tbb::atomic flow_export_without_sampling_; tbb::atomic flow_export_drops_; + tbb::atomic deleted_flow_export_drops_; tbb::atomic flows_sampled_atleast_once_; uint32_t prev_cfg_flow_export_rate_; Timer* timer_; diff --git a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h index e1b53b984b2..fe2d377e496 100644 --- a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h +++ b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h @@ -28,7 +28,7 @@ class KSyncFlowIndexManager; struct FlowKSyncResponseInfo { int ksync_error_; uint32_t flow_handle_; - uint32_t gen_id_; + uint8_t gen_id_; uint64_t evict_flow_bytes_; uint64_t evict_flow_packets_; int32_t evict_flow_oflow_; @@ -87,7 +87,7 @@ class FlowTableKSyncEntry : public KSyncNetlinkEntry { ksync_response_info_.Reset(); } void SetKSyncResponseInfo(int ksync_error, uint32_t flow_handle, - uint32_t gen_id, uint64_t evict_flow_bytes, + uint8_t gen_id, uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow) { ksync_response_info_.ksync_error_ = ksync_error; diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc index 4260c5119b5..044f4c21cf5 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -253,16 +254,66 @@ bool KSyncFlowMemory::GetFlowKey(uint32_t index, FlowKey *key) { return true; } -bool KSyncFlowMemory::IsEvictionMarked(const vr_flow_entry *entry) const { +bool KSyncFlowMemory::IsEvictionMarked(const vr_flow_entry *entry, + uint16_t flags) const { if (!entry) { return false; } - if (entry->fe_flags & VR_FLOW_FLAG_EVICTED) { + if (flags & VR_FLOW_FLAG_EVICTED) { return true; } return false; } +const vr_flow_entry *KSyncFlowMemory::GetKFlowStats(const FlowKey &key, + uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stat) const { + const vr_flow_entry *kflow = GetValidKFlowEntry(key, idx, gen_id); + if (!kflow) { + return NULL; + } + *stat = kflow->fe_stats; + kflow = GetValidKFlowEntry(key, idx, gen_id); + return kflow; +} + +void KSyncFlowMemory::ReadFlowInfo(const vr_flow_entry *kflow, + vr_flow_stats *stat, KFlowData *info) const { + *stat = kflow->fe_stats; + info->underlay_src_port = kflow->fe_udp_src_port; + info->tcp_flags = kflow->fe_tcp_flags; + info->flags = kflow->fe_flags; +} + +const vr_flow_entry *KSyncFlowMemory::GetKFlowStatsAndInfo(const FlowKey &key, + uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stats, + KFlowData *info) + const { + const vr_flow_entry *kflow = GetKernelFlowEntry(idx, false); + if (!kflow) { + return NULL; + } + if (key.protocol == IPPROTO_TCP) { + FlowKey rhs; + KFlow2FlowKey(kflow, &rhs); + if (!key.IsEqual(rhs)) { + return NULL; + } + + ReadFlowInfo(kflow, stats, info); + + if (kflow->fe_gen_id != gen_id) { + return NULL; + } + } else { + ReadFlowInfo(kflow, stats, info); + } + return kflow; +} + bool KSyncFlowMemory::AuditProcess() { uint32_t flow_idx; uint8_t gen_id; diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h index fe172d90b67..6fc82eacd33 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h @@ -14,7 +14,8 @@ class Timer; class KSync; class FlowKey; struct vr_flow_entry; -struct vr_flow_entry; +struct vr_flow_stats; +struct KFlowData; class KSyncFlowMemory { public: @@ -39,6 +40,14 @@ class KSyncFlowMemory { bool ignore_active_status) const; const vr_flow_entry *GetValidKFlowEntry(const FlowKey &key, uint32_t idx, uint8_t gen_id) const; + const vr_flow_entry *GetKFlowStats(const FlowKey &key, uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stats) const; + const vr_flow_entry *GetKFlowStatsAndInfo(const FlowKey &key, + uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stats, + KFlowData *info) const; bool GetFlowKey(uint32_t index, FlowKey *key); uint32_t flow_table_entries_count() { return flow_table_entries_count_; } @@ -48,7 +57,7 @@ class KSyncFlowMemory { void UnmapFlowMemTest(); void MapSharedMemory(); void GetFlowTableSize(); - bool IsEvictionMarked(const vr_flow_entry *entry) const; + bool IsEvictionMarked(const vr_flow_entry *entry, uint16_t flags) const; KSync *ksync() const { return ksync_; } void set_major_devid(int id) { major_devid_ = id; } @@ -68,6 +77,8 @@ class KSyncFlowMemory { uint64_t timeout; }; void KFlow2FlowKey(const vr_flow_entry *entry, FlowKey *key) const; + void ReadFlowInfo(const vr_flow_entry *k_flow, vr_flow_stats *stats, + KFlowData *info) const; KSync *ksync_;