diff --git a/src/vnsw/agent/pkt/flow_entry.cc b/src/vnsw/agent/pkt/flow_entry.cc index ee91a10517a..1fd6d1aee73 100644 --- a/src/vnsw/agent/pkt/flow_entry.cc +++ b/src/vnsw/agent/pkt/flow_entry.cc @@ -463,9 +463,8 @@ void FlowEntry::Copy(FlowEntry *rhs, bool update) { gen_id_ = rhs->gen_id_; flow_handle_ = rhs->flow_handle_; /* Flow Entry is being re-used. Generate a new UUID for it. */ - // results is delete miss for previous uuid to stats collector - // with eviction disabled following is not required - // uuid_ = flow_table_->rand_gen(); + uuid_ = flow_table_->rand_gen(); + egress_uuid_ = flow_table_->rand_gen(); } } diff --git a/src/vnsw/agent/pkt/flow_mgmt.cc b/src/vnsw/agent/pkt/flow_mgmt.cc index c904f88679f..85e7a6d0b7e 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.cc +++ b/src/vnsw/agent/pkt/flow_mgmt.cc @@ -153,7 +153,8 @@ void FlowMgmtManager::DeleteEvent(FlowEntry *flow, void FlowMgmtManager::FlowStatsUpdateEvent(FlowEntry *flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { if (bytes == 0 && packets == 0 && oflow_bytes == 0) { return; } @@ -164,7 +165,7 @@ void FlowMgmtManager::FlowStatsUpdateEvent(FlowEntry *flow, uint32_t bytes, } FlowMgmtRequestPtr req(new FlowMgmtRequest (FlowMgmtRequest::UPDATE_FLOW_STATS, flow, - bytes, packets, oflow_bytes)); + bytes, packets, oflow_bytes, u)); request_queue_.Enqueue(req); } @@ -472,7 +473,7 @@ bool FlowMgmtManager::RequestHandler(FlowMgmtRequestPtr req) { case FlowMgmtRequest::UPDATE_FLOW_STATS: { //Handle Flow stats update for flow-mgmt UpdateFlowStats(req->flow(), req->bytes(), req->packets(), - req->oflow_bytes()); + req->oflow_bytes(), req->flow_uuid()); break; } @@ -687,10 +688,11 @@ void FlowMgmtManager::DeleteFlow(FlowEntryPtr &flow, } void FlowMgmtManager::UpdateFlowStats(FlowEntryPtr &flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes) { + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u) { //Enqueue Flow Index Update Event request to flow-stats-collector agent_->flow_stats_manager()->UpdateStatsEvent(flow, bytes, packets, - oflow_bytes); + oflow_bytes, u); } bool FlowMgmtManager::HasVrfFlows(uint32_t vrf_id) { diff --git a/src/vnsw/agent/pkt/flow_mgmt.h b/src/vnsw/agent/pkt/flow_mgmt.h index 4497774ae8f..b8fec992ca7 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.h +++ b/src/vnsw/agent/pkt/flow_mgmt.h @@ -1107,7 +1107,8 @@ class FlowMgmtManager { void AddEvent(FlowEntry *low); void DeleteEvent(FlowEntry *flow, const RevFlowDepParams ¶ms); void FlowStatsUpdateEvent(FlowEntry *flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes); + uint32_t oflow_bytes, + const boost::uuids::uuid &u); void AddDBEntryEvent(const DBEntry *entry, uint32_t gen_id); void ChangeDBEntryEvent(const DBEntry *entry, uint32_t gen_id); void DeleteDBEntryEvent(const DBEntry *entry, uint32_t gen_id); @@ -1144,7 +1145,7 @@ class FlowMgmtManager { // Handle Delete of a flow. Updates FlowMgmtKeyTree for all objects void DeleteFlow(FlowEntryPtr &flow, const RevFlowDepParams &p); void UpdateFlowStats(FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes); + uint32_t oflow_bytes, const boost::uuids::uuid &u); // Add a FlowMgmtKey into the FlowMgmtKeyTree for an object // The FlowMgmtKeyTree for object is passed as argument diff --git a/src/vnsw/agent/pkt/flow_mgmt_request.h b/src/vnsw/agent/pkt/flow_mgmt_request.h index e6e03f96c0a..f3f683467ae 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_request.h +++ b/src/vnsw/agent/pkt/flow_mgmt_request.h @@ -41,9 +41,11 @@ class FlowMgmtRequest { } FlowMgmtRequest(Event event, FlowEntry *flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes) : + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u) : event_(event), flow_(flow), db_entry_(NULL), vrf_id_(0), gen_id_(), - bytes_(bytes), packets_(packets), oflow_bytes_(oflow_bytes), params_() { + bytes_(bytes), packets_(packets), oflow_bytes_(oflow_bytes), params_(), + flow_uuid_(u) { if (event == RETRY_DELETE_VRF) assert(vrf_id_); } @@ -109,6 +111,7 @@ class FlowMgmtRequest { void set_params(const RevFlowDepParams ¶ms) { params_ = params; } + boost::uuids::uuid flow_uuid() const { return flow_uuid_; } private: Event event_; @@ -123,6 +126,7 @@ class FlowMgmtRequest { uint32_t packets_; uint32_t oflow_bytes_; RevFlowDepParams params_; + boost::uuids::uuid flow_uuid_; DISALLOW_COPY_AND_ASSIGN(FlowMgmtRequest); }; diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index d9b730b9720..20e1c3f711d 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -754,7 +754,8 @@ void FlowTable::ProcessKSyncFlowEvent(const FlowEventKSync *req, mgr->FlowStatsUpdateEvent(evicted_flow.get(), req->evict_flow_bytes(), req->evict_flow_packets(), - req->evict_flow_oflow()); + req->evict_flow_oflow(), + evicted_flow->uuid()); } } diff --git a/src/vnsw/agent/pkt/pkt_flow_info.cc b/src/vnsw/agent/pkt/pkt_flow_info.cc index da240f2a376..cd32a89ec30 100644 --- a/src/vnsw/agent/pkt/pkt_flow_info.cc +++ b/src/vnsw/agent/pkt/pkt_flow_info.cc @@ -1665,10 +1665,11 @@ void PktFlowInfo::UpdateEvictedFlowStats(const PktInfo *pkt) { FlowMgmtManager *mgr = agent->pkt()->flow_mgmt_manager( flow_table->table_index()); + /* Enqueue stats update request with UUID of the flow */ if (flow.get() && flow->deleted() == false) { mgr->FlowStatsUpdateEvent(flow.get(), pkt->agent_hdr.cmd_param_2, pkt->agent_hdr.cmd_param_3, - pkt->agent_hdr.cmd_param_4); + pkt->agent_hdr.cmd_param_4, flow->uuid()); } } diff --git a/src/vnsw/agent/pkt/test/test_flow_table.cc b/src/vnsw/agent/pkt/test/test_flow_table.cc index 6faaed1ce99..b985de0fe2f 100644 --- a/src/vnsw/agent/pkt/test/test_flow_table.cc +++ b/src/vnsw/agent/pkt/test/test_flow_table.cc @@ -288,6 +288,65 @@ TEST_F(TestFlowTable, EvictPktTrapBeforeReverseFlowResp) { client->WaitForIdle(); } +TEST_F(TestFlowTable, ResetFlowStats) { + TxTcpMplsPacket(eth->id(), remote_compute, router_id_, vif0->label(), + remote_vm1_ip, vm1_ip, 1000, 200, 1, 1); + client->WaitForIdle(); + + FlowEntry *flow = FlowGet(remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, vif0->flow_key_nh()->id(), 1); + EXPECT_TRUE(flow != NULL); + FlowEntry *rflow = flow->reverse_flow_entry(); + EXPECT_TRUE(rflow != NULL); + + //Invoke FlowStatsCollector to check whether flow gets evicted + util_.EnqueueFlowStatsCollectorTask(); + client->WaitForIdle(); + + //Verify the stats of flow + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 1, 30, vif0->flow_key_nh()->id(), 1)); + + //Change the stats + KSyncSockTypeMap::IncrFlowStats(1, 1, 30); + + //Invoke FlowStatsCollector to enqueue delete for evicted flow. + util_.EnqueueFlowStatsCollectorTask(); + client->WaitForIdle(); + + //Verify the stats of flow + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 2, 60, vif0->flow_key_nh()->id(), 1)); + + TxTcpMplsPacket(eth->id(), remote_compute, router_id_, vif0->label(), + remote_vm1_ip, vm1_ip, 1000, 200, 1, 10); + client->WaitForIdle(); + + flow = FlowGet(remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, vif0->flow_key_nh()->id(), 10); + EXPECT_TRUE(flow != NULL); + + //Verify that flow-handle is updated in flow-stats module + FlowStatsCollector *fsc = flow->fsc(); + EXPECT_TRUE(fsc != NULL); + FlowExportInfo *info = fsc->FindFlowExportInfo(flow); + EXPECT_TRUE(info != NULL); + EXPECT_TRUE(info->flow_handle() == 10); + + //Verify the stats of flow are reset after change of flow-handle + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 0, 0, vif0->flow_key_nh()->id(), 10)); + + //Invoke FlowStatsCollector to enqueue delete for reverse flow which is + //marked as short flow + util_.EnqueueFlowStatsCollectorTask(); + client->WaitForIdle(); + + //Verify the stats of flow + EXPECT_TRUE(FlowStatsMatch("vrf1", remote_vm1_ip, vm1_ip, IPPROTO_TCP, 1000, + 200, 1, 30, vif0->flow_key_nh()->id(), 10)); +} + int main(int argc, char *argv[]) { int ret = 0; GETUSERARGS(); diff --git a/src/vnsw/agent/test-xml/test_xml.cc b/src/vnsw/agent/test-xml/test_xml.cc index 0e94e4d6612..93d74e931f9 100644 --- a/src/vnsw/agent/test-xml/test_xml.cc +++ b/src/vnsw/agent/test-xml/test_xml.cc @@ -37,7 +37,7 @@ class FlowExportTask : public Task { return true; } - fec->ExportFlow(info, bytes_, pkts_, NULL); + fec->ExportFlow(info, bytes_, pkts_, NULL, true); return true; } std::string Description() const { return "FlowExportTask"; } diff --git a/src/vnsw/agent/test/test_cmn_util.h b/src/vnsw/agent/test/test_cmn_util.h index 334b52c2d2c..3e713c99875 100644 --- a/src/vnsw/agent/test/test_cmn_util.h +++ b/src/vnsw/agent/test/test_cmn_util.h @@ -373,7 +373,8 @@ FlowEntry* FlowGet(int vrf_id, std::string sip, std::string dip, uint8_t proto, uint16_t sport, uint16_t dport, int nh_id); bool FlowStatsMatch(const string &vrf_name, const char *sip, const char *dip, uint8_t proto, uint16_t sport, uint16_t dport, - uint64_t pkts, uint64_t bytes, int nh_id); + uint64_t pkts, uint64_t bytes, int nh_id, + uint32_t flow_handle = FlowEntry::kInvalidFlowHandle); bool FindFlow(const string &vrf_name, const char *sip, const char *dip, uint8_t proto, uint16_t sport, uint16_t dport, bool nat, const string &nat_vrf_name, const char *nat_sip, diff --git a/src/vnsw/agent/test/test_util.cc b/src/vnsw/agent/test/test_util.cc index ee30cdc13f6..50ef17ffd41 100644 --- a/src/vnsw/agent/test/test_util.cc +++ b/src/vnsw/agent/test/test_util.cc @@ -3450,7 +3450,8 @@ bool FlowGet(const string &vrf_name, const char *sip, const char *dip, bool FlowStatsMatch(const string &vrf_name, const char *sip, const char *dip, uint8_t proto, uint16_t sport, - uint16_t dport, uint64_t pkts, uint64_t bytes, int nh_id) { + uint16_t dport, uint64_t pkts, uint64_t bytes, int nh_id, + uint32_t flow_handle) { Agent *agent = Agent::GetInstance(); VrfEntry *vrf = agent->vrf_table()->FindVrfFromName(vrf_name); EXPECT_TRUE(vrf != NULL); @@ -3466,7 +3467,17 @@ bool FlowStatsMatch(const string &vrf_name, const char *sip, key.protocol = proto; key.family = key.src_addr.is_v4() ? Address::INET : Address::INET6; - FlowEntry *fe = agent->pkt()->get_flow_proto()->Find(key, 0); + FlowEntry *fe = NULL; + if (flow_handle != FlowEntry::kInvalidFlowHandle) { + FlowTable *table = agent->pkt()->get_flow_proto()->GetFlowTable(key, + flow_handle); + if (table == NULL) { + return NULL; + } + fe = table->Find(key); + } else { + fe = agent->pkt()->get_flow_proto()->Find(key, 0); + } EXPECT_TRUE(fe != NULL); if (fe == NULL) { return false; diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc index b0c001cccc1..e8fea75ca74 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.cc @@ -5,14 +5,18 @@ FlowExportInfo::FlowExportInfo() : flow_(), setup_time_(0), teardown_time_(0), last_modified_time_(0), bytes_(0), packets_(0), underlay_source_port_(0), changed_(false), tcp_flags_(0), delete_enqueue_time_(0), evict_enqueue_time_(0), - visit_time_(0), exported_atleast_once_(false) { + visit_time_(0), exported_atleast_once_(false), gen_id_(0), + flow_handle_(FlowEntry::kInvalidFlowHandle), + rev_flow_egress_uuid_(nil_uuid()) { } FlowExportInfo::FlowExportInfo(const FlowEntryPtr &fe) : flow_(fe), setup_time_(0), teardown_time_(0), last_modified_time_(0), bytes_(0), packets_(0), underlay_source_port_(0), changed_(true), tcp_flags_(0), delete_enqueue_time_(0), evict_enqueue_time_(0), - visit_time_(0), exported_atleast_once_(false) { + visit_time_(0), exported_atleast_once_(false), gen_id_(0), + flow_handle_(FlowEntry::kInvalidFlowHandle), + rev_flow_egress_uuid_(nil_uuid()) { } FlowExportInfo::FlowExportInfo(const FlowEntryPtr &fe, uint64_t setup_time) : @@ -20,7 +24,9 @@ FlowExportInfo::FlowExportInfo(const FlowEntryPtr &fe, uint64_t setup_time) : teardown_time_(0), last_modified_time_(setup_time), bytes_(0), packets_(0), underlay_source_port_(0), changed_(true), tcp_flags_(0), delete_enqueue_time_(0), evict_enqueue_time_(0), - visit_time_(0), exported_atleast_once_(false) { + visit_time_(0), exported_atleast_once_(false), gen_id_(0), + flow_handle_(FlowEntry::kInvalidFlowHandle), + rev_flow_egress_uuid_(nil_uuid()) { } FlowEntry* FlowExportInfo::reverse_flow() const { @@ -39,6 +45,22 @@ bool FlowExportInfo::IsActionLog() const { return false; } +void FlowExportInfo::CopyFlowInfo(FlowEntry *fe) { + gen_id_ = fe->gen_id(); + flow_handle_ = fe->flow_handle(); + uuid_ = fe->uuid(); + flags_ = fe->flags(); + FlowEntry *rflow = reverse_flow(); + if (rflow) { + rev_flow_egress_uuid_ = rflow->egress_uuid(); + } +} + +void FlowExportInfo::ResetStats() { + bytes_ = packets_ = 0; + tcp_flags_ = 0; + underlay_source_port_ = 0; +} /////////////////////////////////////////////////////////////////// // APIs used only by UT ////////////////////////////////////////////////////////////////// diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h index c7fd2ff2d9e..1a3b2034570 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_export_info.h @@ -53,6 +53,20 @@ class FlowExportInfo : public boost::intrusive::list_base_hook<> { uint64_t visit_time() const { return visit_time_; } void set_visit_time(uint64_t t) { visit_time_ = t; } + uint8_t gen_id() const { return gen_id_; } + void set_gen_id(uint8_t value) { gen_id_ = value; } + uint32_t flow_handle() const { return flow_handle_; } + void set_flow_handle(uint32_t value) { flow_handle_ = value; } + const boost::uuids::uuid &uuid() const { return uuid_; } + const boost::uuids::uuid &rev_flow_egress_uuid() const { + return rev_flow_egress_uuid_; + } + uint32_t flags() const { return flags_; } + bool is_flags_set(const FlowEntry::FlowEntryFlags &flags) const { + return (flags_ & flags); + } + void CopyFlowInfo(FlowEntry *fe); + void ResetStats(); private: FlowEntryPtr flow_; uint64_t setup_time_; @@ -71,6 +85,11 @@ class FlowExportInfo : public boost::intrusive::list_base_hook<> { uint64_t evict_enqueue_time_; uint64_t visit_time_; bool exported_atleast_once_; + uint8_t gen_id_; + uint32_t flow_handle_; + boost::uuids::uuid uuid_; + boost::uuids::uuid rev_flow_egress_uuid_; + uint32_t flags_; }; typedef boost::intrusive::list FlowExportInfoList; diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h b/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h index 9074218d1e2..2b1d6d4b677 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_export_request.h @@ -30,9 +30,10 @@ class FlowExportReq { } FlowExportReq(Event event, const FlowExportInfo &info, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes) : + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u) : event_(event), info_(info), bytes_(bytes), packets_(packets), - oflow_bytes_(oflow_bytes) { + oflow_bytes_(oflow_bytes), uuid_(u) { } ~FlowExportReq() { } @@ -44,6 +45,7 @@ class FlowExportReq { uint32_t packets() const { return packets_;} uint32_t oflow_bytes() const { return oflow_bytes_;} const RevFlowDepParams& params() const { return params_; } + boost::uuids::uuid uuid() const { return uuid_; } private: Event event_; @@ -53,6 +55,7 @@ class FlowExportReq { uint32_t packets_; uint32_t oflow_bytes_; RevFlowDepParams params_; + boost::uuids::uuid uuid_; DISALLOW_COPY_AND_ASSIGN(FlowExportReq); }; #endif // __AGENT_FLOW_EXPORT_REQUEST_H__ diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index 0b0188354c7..605ed4f91f6 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -151,14 +151,15 @@ void FlowStatsCollector::UpdateEntriesToVisit() { bool FlowStatsCollector::ShouldBeAged(FlowExportInfo *info, const vr_flow_entry *k_flow, + const vr_flow_stats &k_stats, uint64_t curr_time) { FlowEntry *flow = info->flow(); //If both forward and reverse flow are marked //as TCP closed then immediately remote the flow if (k_flow != NULL) { uint64_t k_flow_bytes, bytes; - k_flow_bytes = GetFlowStats(k_flow->fe_stats.flow_bytes_oflow, - k_flow->fe_stats.flow_bytes); + k_flow_bytes = GetFlowStats(k_stats.flow_bytes_oflow, + k_stats.flow_bytes); bytes = 0x0000ffffffffffffULL & info->bytes(); /* Don't account for agent overflow bits while comparing change in * stats */ @@ -314,23 +315,34 @@ void FlowStatsCollector::UpdateStatsAndExportFlow(FlowExportInfo *info, if (!info) { return; } + FlowEntry *fe = info->flow(); + bool read_flow = true; + if (info->uuid() != fe->uuid()) { + /* If UUID for a flow has changed, don't read fields of FlowEntry while + * sending FlowExport message to collector */ + read_flow = false; + } KSyncFlowMemory *ksync_obj = agent_uve_->agent()->ksync()-> ksync_flow_memory(); - FlowEntry *fe = info->flow(); - const vr_flow_entry *k_flow = ksync_obj->GetValidKFlowEntry(fe->key(), - fe->flow_handle(), - fe->gen_id()); + /* Fetch vrouter Flow entry using gen_id and flow_handle from FlowExportInfo + * to account for the case where FlowEntry's flow_handle/gen_id has changed + * during Delete processing by FlowStatsCollector */ + vr_flow_stats k_stats; + const vr_flow_entry *k_flow = ksync_obj->GetKFlowStats(fe->key(), + info->flow_handle(), + info->gen_id(), + &k_stats); if (k_flow) { - UpdateAndExportInternal(info, k_flow->fe_stats.flow_bytes, - k_flow->fe_stats.flow_bytes_oflow, - k_flow->fe_stats.flow_packets, - k_flow->fe_stats.flow_packets_oflow, - teardown_time, true, p); + UpdateAndExportInternal(info, k_stats.flow_bytes, + k_stats.flow_bytes_oflow, + k_stats.flow_packets, + k_stats.flow_packets_oflow, + teardown_time, true, p, read_flow); return; } /* If reading of stats fails, send a message with just teardown time */ info->set_teardown_time(teardown_time); - ExportFlow(info, 0, 0, p); + ExportFlow(info, 0, 0, p, read_flow); } void FlowStatsCollector::FlowDeleteEnqueue(FlowExportInfo *info, uint64_t t) { @@ -400,7 +412,7 @@ void FlowStatsCollector::UpdateAndExportInternalLocked(FlowExportInfo *info, FlowEntry *rflow = info->reverse_flow(); FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE); UpdateAndExportInternal(info, bytes, oflow_bytes, pkts, oflow_pkts, time, - teardown_time, p); + teardown_time, p, true); } void FlowStatsCollector::UpdateAndExportInternal(FlowExportInfo *info, @@ -410,16 +422,18 @@ void FlowStatsCollector::UpdateAndExportInternal(FlowExportInfo *info, uint16_t oflow_pkts, uint64_t time, bool teardown_time, - const RevFlowDepParams *p) { + const RevFlowDepParams *p, + bool read_flow) { uint64_t diff_bytes, diff_pkts; UpdateFlowStatsInternal(info, bytes, oflow_bytes, pkts, oflow_pkts, time, teardown_time, &diff_bytes, &diff_pkts); - ExportFlow(info, diff_bytes, diff_pkts, p); + ExportFlow(info, diff_bytes, diff_pkts, p, read_flow); } // Check if flow needs to be evicted bool FlowStatsCollector::EvictFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, + uint16_t k_flow_flags, uint32_t flow_handle, uint16_t gen_id, FlowExportInfo *info, uint64_t curr_time) { FlowEntry *fe = info->flow(); @@ -427,7 +441,7 @@ bool FlowStatsCollector::EvictFlow(KSyncFlowMemory *ksync_obj, if ((fe->key().protocol != IPPROTO_TCP)) return false; - if (ksync_obj->IsEvictionMarked(k_flow) == false) + if (ksync_obj->IsEvictionMarked(k_flow, k_flow_flags) == false) return false; // Flow evict already enqueued? Re-Enqueue request after retry-time @@ -445,6 +459,8 @@ bool FlowStatsCollector::EvictFlow(KSyncFlowMemory *ksync_obj, bool FlowStatsCollector::AgeFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, + const vr_flow_stats &k_stats, + const KFlowData& kinfo, FlowExportInfo *info, uint64_t curr_time) { FlowEntry *fe = info->flow(); FlowEntry *rfe = info->reverse_flow(); @@ -469,15 +485,18 @@ bool FlowStatsCollector::AgeFlow(KSyncFlowMemory *ksync_obj, bool deleted = false; FlowExportInfo *rev_info = NULL; // Can the flow be aged? - if (ShouldBeAged(info, k_flow, curr_time)) { + if (ShouldBeAged(info, k_flow, k_stats, curr_time)) { rev_info = FindFlowExportInfo(rfe); // ShouldBeAged looks at one flow only. So, check for both forward and // reverse flows if (rev_info) { - const vr_flow_entry *k_flow_rev; - k_flow_rev = ksync_obj->GetValidKFlowEntry - (rfe->key(), rfe->flow_handle(), rfe->gen_id()); - if (ShouldBeAged(rev_info, k_flow_rev, curr_time)) { + const vr_flow_entry *k_flow_rev = NULL; + vr_flow_stats k_rflow_stats; + k_flow_rev = ksync_obj->GetKFlowStats(rfe->key(), + rev_info->flow_handle(), + rev_info->gen_id(), + &k_rflow_stats); + if (ShouldBeAged(rev_info, k_flow_rev, k_rflow_stats, curr_time)) { deleted = true; } } else { @@ -493,28 +512,24 @@ bool FlowStatsCollector::AgeFlow(KSyncFlowMemory *ksync_obj, // Stats for deleted flow are updated when we get DELETE message if (deleted == false && k_flow) { uint64_t k_bytes, bytes; - /* Copy full stats in one shot and use local copy instead of reading - * individual stats from shared memory directly to minimize the - * inconsistency */ - struct vr_flow_stats fe_stats = k_flow->fe_stats; - k_bytes = GetFlowStats(fe_stats.flow_bytes_oflow, - fe_stats.flow_bytes); + k_bytes = GetFlowStats(k_stats.flow_bytes_oflow, + k_stats.flow_bytes); bytes = 0x0000ffffffffffffULL & info->bytes(); /* Always copy udp source port even though vrouter does not change * it. Vrouter many change this behavior and recompute source port * whenever flow action changes. To keep agent independent of this, * always copy UDP source port */ - info->set_underlay_source_port(k_flow->fe_udp_src_port); - info->set_tcp_flags(k_flow->fe_tcp_flags); + info->set_underlay_source_port(kinfo.underlay_src_port); + info->set_tcp_flags(kinfo.tcp_flags); /* Don't account for agent overflow bits while comparing change in * stats */ if (bytes != k_bytes) { UpdateAndExportInternalLocked(info, - fe_stats.flow_bytes, - fe_stats.flow_bytes_oflow, - fe_stats.flow_packets, - fe_stats.flow_packets_oflow, + k_stats.flow_bytes, + k_stats.flow_bytes_oflow, + k_stats.flow_packets, + k_stats.flow_packets_oflow, curr_time, false, NULL); } else if (info->changed()) { /* export flow (reverse) for which traffic is not seen yet. */ @@ -531,40 +546,64 @@ uint32_t FlowStatsCollector::ProcessFlow(FlowExportInfoList::iterator &it, uint64_t curr_time) { uint32_t count = 1; FlowEntry *fe = info->flow(); - uint32_t flow_handle; - uint16_t gen_id; - { - FlowEntry *rflow = NULL; - FLOW_LOCK(fe, rflow, FlowEvent::FLOW_MESSAGE); - // since flow processing and stats collector can run in parallel - // flow handle and gen id not being the key for flow entry can - // change while processing, so flow handle and gen id should be - // fetched by holding an lock and should not be re-fetched again - // during the entry processing - flow_handle = fe->flow_handle(); - gen_id = fe->gen_id(); - } - const vr_flow_entry *k_flow = ksync_obj->GetValidKFlowEntry - (fe->key(), flow_handle, gen_id); - - // Flow evicted? - if (EvictFlow(ksync_obj, k_flow, flow_handle, gen_id, info, - curr_time) == true) { - // If retry_delete_ enabled, dont change flow_export_info_list_ - if (retry_delete_ == true) - return count; + /* Use flow-handle and gen-id from FlowExportInfo instead of FlowEntry. + * The stats that FlowExportInfo holds corresponds to a given + * (FlowKey, gen-id and FlowHandle). Since gen-id/flow-handle for a flow + * can change dynamically, we need to pick gen-id and flow-handle from + * FlowExportInfo. Otherwise stats will go wrong. Whenever gen-id/ + * flow-handle changes, the stats will be reset as part of AddFlow API + */ + uint32_t flow_handle = info->flow_handle(); + uint16_t gen_id = info->gen_id(); + + /* If Flow handle is still not populated in FlowStatsCollector, pick the + * value from FlowEntry + */ + if (flow_handle == FlowEntry::kInvalidFlowHandle) { + { + FlowEntry *rflow = NULL; + FLOW_LOCK(fe, rflow, FlowEvent::FLOW_MESSAGE); + // since flow processing and stats collector can run in parallel + // flow handle and gen id not being the key for flow entry can + // change while processing, so flow handle and gen id should be + // fetched by holding an lock. + flow_handle = fe->flow_handle(); + gen_id = fe->gen_id(); + info->CopyFlowInfo(fe); + } + } + const vr_flow_entry *k_flow = NULL; + vr_flow_stats k_stats; + KFlowData kinfo; - // We dont want to retry delete-events, remove flow from ageing list - assert(info->is_linked()); - FlowExportInfoList::iterator flow_it = - flow_export_info_list_.iterator_to(*info); - flow_export_info_list_.erase(flow_it); + /* Teardown time is set when Evicted flow stats update message is received. + * For flows whose teardown time is set, we need not read stats from + * vrouter + */ + if (!info->teardown_time()) { + k_flow = ksync_obj->GetKFlowStatsAndInfo(fe->key(), flow_handle, + gen_id, &k_stats, &kinfo); + + // Flow evicted? + if (EvictFlow(ksync_obj, k_flow, kinfo.flags, flow_handle, gen_id, + info, curr_time) == true) { + // If retry_delete_ enabled, dont change flow_export_info_list_ + if (retry_delete_ == true) + return count; + + // We dont want to retry delete-events, remove flow from ageing list + assert(info->is_linked()); + FlowExportInfoList::iterator flow_it = + flow_export_info_list_.iterator_to(*info); + flow_export_info_list_.erase(flow_it); - return count; + return count; + } } + // Flow aged? - if (AgeFlow(ksync_obj, k_flow, info, curr_time) == false) + if (AgeFlow(ksync_obj, k_flow, k_stats, kinfo, info, curr_time) == false) return count; // If retry_delete_ enabled, dont change flow_export_info_list_ @@ -711,11 +750,12 @@ void FlowStatsCollector::DeleteEvent(const FlowEntryPtr &flow, void FlowStatsCollector::UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { FlowExportInfo info(flow); boost::shared_ptr req(new FlowExportReq(FlowExportReq::UPDATE_FLOW_STATS, info, bytes, - packets, oflow_bytes)); + packets, oflow_bytes, u)); request_queue_.Enqueue(req); } @@ -836,7 +876,7 @@ void FlowStatsCollector::ExportFlowLocked(FlowExportInfo *info, FlowEntry *flow = info->flow(); FlowEntry *rflow = info->reverse_flow(); FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE); - ExportFlow(info, diff_bytes, diff_pkts, params); + ExportFlow(info, diff_bytes, diff_pkts, params, true); } /* Flow Export Algorithm @@ -855,11 +895,23 @@ void FlowStatsCollector::ExportFlowLocked(FlowExportInfo *info, void FlowStatsCollector::ExportFlow(FlowExportInfo *info, uint64_t diff_bytes, uint64_t diff_pkts, - const RevFlowDepParams *params) { + const RevFlowDepParams *params, + bool read_flow) { assert((agent_uve_->agent()->tsn_enabled() == false)); FlowEntry *flow = info->flow(); FlowEntry *rflow = info->reverse_flow(); + /* Drop Deleted Flow export messages if add for it was never exported. This + * includes only those delete messages where we cannot read FlowEntry + * because deleted msg is for a Flow UUID which is different from the UUID + * in FlowEntry pointer + */ + if (!info->exported_atleast_once() && !read_flow) { + assert(info->teardown_time()); + flow_stats_manager_->deleted_flow_export_drops_++; + return; + } + int32_t cfg_rate = agent_uve_->agent()->oper_db()->global_vrouter()-> flow_export_rate(); /* We should always try to export flows with Action as LOG regardless of @@ -925,49 +977,51 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, s_flow.set_packets(info->packets()); s_flow.set_diff_bytes(diff_bytes); s_flow.set_diff_packets(diff_pkts); - s_flow.set_tcp_flags(info->tcp_flags()); - - s_flow.set_sourceip(flow->key().src_addr); - s_flow.set_destip(flow->key().dst_addr); - s_flow.set_protocol(flow->key().protocol); - s_flow.set_sport(flow->key().src_port); - s_flow.set_dport(flow->key().dst_port); - s_flow.set_sourcevn(flow->data().source_vn_match); - s_flow.set_destvn(flow->data().dest_vn_match); - s_flow.set_vm(flow->data().vm_cfg_name); - if (flow->is_flags_set(FlowEntry::ReverseFlow)) { - s_flow.set_forward_flow(false); - } else { - s_flow.set_forward_flow(true); + s_flow.set_setup_time(info->setup_time()); + if (info->teardown_time()) { + s_flow.set_teardown_time(info->teardown_time()); } + info->set_changed(false); - string drop_reason = FlowEntry::DropReasonStr(flow->data().drop_reason); - s_flow.set_drop_reason(drop_reason); + if (read_flow) { + s_flow.set_tcp_flags(info->tcp_flags()); + s_flow.set_sourceip(flow->key().src_addr); + s_flow.set_destip(flow->key().dst_addr); + s_flow.set_protocol(flow->key().protocol); + s_flow.set_sport(flow->key().src_port); + s_flow.set_dport(flow->key().dst_port); + s_flow.set_sourcevn(flow->data().source_vn_match); + s_flow.set_destvn(flow->data().dest_vn_match); + s_flow.set_vm(flow->data().vm_cfg_name); + if (info->is_flags_set(FlowEntry::ReverseFlow)) { + s_flow.set_forward_flow(false); + } else { + s_flow.set_forward_flow(true); + } - s_flow.set_sg_rule_uuid(flow->sg_rule_uuid()); - s_flow.set_nw_ace_uuid(flow->nw_ace_uuid()); - if (flow->intf_entry()) { - s_flow.set_vmi_uuid(UuidToString(flow->intf_entry()->GetUuid())); - } + string drop_reason = FlowEntry::DropReasonStr(flow->data().drop_reason); + s_flow.set_drop_reason(drop_reason); - if (rflow) { - s_flow.set_reverse_uuid(to_string(rflow->uuid())); - } else if (params) { - s_flow.set_reverse_uuid(to_string(params->rev_uuid_)); - } + s_flow.set_sg_rule_uuid(flow->sg_rule_uuid()); + s_flow.set_nw_ace_uuid(flow->nw_ace_uuid()); + if (flow->intf_entry()) { + s_flow.set_vmi_uuid(UuidToString(flow->intf_entry()->GetUuid())); + } - // Set flow action - std::string action_str; - GetFlowSandeshActionParams(flow->data().match_p.action_info, action_str); - s_flow.set_action(action_str); - s_flow.set_setup_time(info->setup_time()); - if (info->teardown_time()) { - s_flow.set_teardown_time(info->teardown_time()); + if (rflow) { + s_flow.set_reverse_uuid(to_string(rflow->uuid())); + } else if (params) { + s_flow.set_reverse_uuid(to_string(params->rev_uuid_)); + } + + // Set flow action + std::string action_str; + GetFlowSandeshActionParams(flow->data().match_p.action_info, action_str); + s_flow.set_action(action_str); + SetUnderlayInfo(info, s_flow); } - SetUnderlayInfo(info, s_flow); - info->set_changed(false); - if (flow->is_flags_set(FlowEntry::LocalFlow)) { + if (info->is_flags_set(FlowEntry::LocalFlow)) { /* For local flows we need to send two flow log messages. * 1. With direction as ingress * 2. With direction as egress @@ -976,8 +1030,10 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, * direction as egress. */ s_flow.set_direction_ing(1); - s_flow.set_reverse_uuid(to_string(flow->egress_uuid())); - SourceIpOverride(info, s_flow, params); + if (read_flow) { + s_flow.set_reverse_uuid(to_string(flow->egress_uuid())); + SourceIpOverride(info, s_flow, params); + } EnqueueFlowMsg(); FlowLogData &s_flow2 = msg_list_[GetFlowMsgIdx()]; @@ -993,7 +1049,11 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, //while exporting rev flow, this done so that //key, stats and other stuff can be copied over //from current flow - SetImplicitFlowDetails(info, s_flow2, params); + if (read_flow) { + SetImplicitFlowDetails(info, s_flow2, params); + } else { + s_flow2.set_flowuuid(to_string(info->rev_flow_egress_uuid())); + } //Export local flow of egress direction with a different UUID even when //the flow is same. Required for analytics module to query flows //irrespective of direction. @@ -1001,9 +1061,11 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, flow_stats_manager_->UpdateFlowExportStats(2, subject_flows_to_algorithm); } else { - if (flow->is_flags_set(FlowEntry::IngressDir)) { + if (info->is_flags_set(FlowEntry::IngressDir)) { s_flow.set_direction_ing(1); - SourceIpOverride(info, s_flow, params); + if (read_flow) { + SourceIpOverride(info, s_flow, params); + } } else { s_flow.set_direction_ing(0); } @@ -1144,7 +1206,7 @@ bool FlowStatsCollector::RequestHandler(boost::shared_ptr req) { case FlowExportReq::UPDATE_FLOW_STATS: { EvictedFlowStatsUpdate(flow, req->bytes(), req->packets(), - req->oflow_bytes()); + req->oflow_bytes(), req->uuid()); break; } @@ -1221,12 +1283,37 @@ void FlowStatsCollector::NewFlow(FlowEntry *flow) { } void FlowStatsCollector::AddFlow(FlowExportInfo info) { + /* Before inserting update the gen_id and flow_handle in FlowExportInfo. + * Locks for accessing fields of flow are taken in calling function. + */ + FlowEntry* fe = info.flow(); + info.CopyFlowInfo(fe); + FlowEntryTree::iterator it = flow_tree_.find(fe); std::pair ret = - flow_tree_.insert(make_pair(info.flow(), info)); + flow_tree_.insert(make_pair(fe, info)); if (ret.second == false) { - ret.first->second.set_changed(true); - ret.first->second.set_delete_enqueue_time(0); - ret.first->second.set_evict_enqueue_time(0); + FlowExportInfo &prev = ret.first->second; + if (prev.uuid() != fe->uuid()) { + /* Received ADD request for already added entry with a different + * UUID. Because of state-compression of messages to + * FlowStatsCollector in FlowMgmt, we have not received DELETE for + * previous UUID. Send FlowExport to indicate delete for the flow. + * This export need not be sent if teardown time is already set. + * Teardown time would be set if EvictedFlowStats update request + * comes before this duplicate add. + */ + if (!prev.teardown_time()) { + UpdateStatsAndExportFlow(&prev, info.setup_time(), NULL); + } + /* After sending Delete to collector (if required), reset the stats + */ + prev.ResetStats(); + } + prev.CopyFlowInfo(fe); + prev.set_changed(true); + prev.set_delete_enqueue_time(0); + prev.set_evict_enqueue_time(0); + prev.set_teardown_time(0); } else { NewFlow(info.flow()); } @@ -1289,16 +1376,23 @@ void FlowStatsCollector::DeleteFlow(FlowEntryTree::iterator &it) { void FlowStatsCollector::EvictedFlowStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { - FlowEntry *fe = flow.get(); - FlowExportInfo *info = FindFlowExportInfo(fe); + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { + FlowExportInfo *info = FindFlowExportInfo(flow.get()); if (info) { + /* Ignore stats update request for Evicted flow, if we don't have + * FlowEntry corresponding to the Evicted Flow. The match is done using + * UUID + */ + if (info->uuid() != u) { + return; + } /* We are updating stats of evicted flow. Set teardown_time here. * When delete event is being handled we don't export flow if * teardown time is set */ UpdateAndExportInternal(info, bytes, oflow_bytes & 0xFFFF, packets, oflow_bytes & 0xFFFF0000, - GetCurrentTime(), true, NULL); + GetCurrentTime(), true, NULL, false); } } diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h index 8b5617512ab..b7c77b50de4 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h @@ -25,6 +25,13 @@ class FlowStatsRecordsReq; class FetchFlowStatsRecord; class FlowStatsManager; +struct KFlowData { +public: + uint16_t underlay_src_port; + uint16_t tcp_flags; + uint16_t flags; +}; + //Defines the functionality to periodically read flow stats from //shared memory (between agent and Kernel) and export this stats info to //collector. Also responsible for aging of flow entries. Runs in the context @@ -118,10 +125,11 @@ class FlowStatsCollector : public StatsCollector { KSyncFlowMemory *ksync_obj, FlowExportInfo *info, uint64_t curr_time); bool AgeFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, + const vr_flow_stats &k_stats, const KFlowData &kinfo, FlowExportInfo *info, uint64_t curr_time); bool EvictFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, - uint32_t flow_handle, uint16_t gen_id, FlowExportInfo *info, - uint64_t curr_time); + uint16_t k_flow_flags, uint32_t flow_handle, uint16_t gen_id, + FlowExportInfo *info, uint64_t curr_time); uint32_t RunAgeing(uint32_t max_count); void UpdateFlowAgeTime(uint64_t usecs) { flow_age_time_intvl_ = usecs; @@ -144,11 +152,13 @@ class FlowStatsCollector : public StatsCollector { FlowExportInfo *FindFlowExportInfo(const FlowEntry *fe); const FlowExportInfo *FindFlowExportInfo(const FlowEntry *fe) const; void ExportFlow(FlowExportInfo *info, uint64_t diff_bytes, - uint64_t diff_pkts, const RevFlowDepParams *params); + uint64_t diff_pkts, const RevFlowDepParams *params, + bool read_flow); void UpdateFloatingIpStats(const FlowExportInfo *flow, uint64_t bytes, uint64_t pkts); void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes); + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u); size_t Size() const { return flow_tree_.size(); } size_t AgeTreeSize() const { return flow_export_info_list_.size(); } void NewFlow(FlowEntry *flow); @@ -182,10 +192,9 @@ class FlowStatsCollector : public StatsCollector { void UpdateEntriesToVisit(); void UpdateStatsAndExportFlow(FlowExportInfo *info, uint64_t teardown_time, const RevFlowDepParams *params); - void EvictedFlowStatsUpdate(const FlowEntryPtr &flow, - uint32_t bytes, - uint32_t packets, - uint32_t oflow_bytes); + void EvictedFlowStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u); void UpdateAndExportInternal(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, @@ -193,7 +202,8 @@ class FlowStatsCollector : public StatsCollector { uint16_t oflow_pkts, uint64_t time, bool teardown_time, - const RevFlowDepParams *params); + const RevFlowDepParams *params, + bool read_flow); void UpdateAndExportInternalLocked(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, @@ -225,7 +235,7 @@ class FlowStatsCollector : public StatsCollector { uint64_t bytes, uint64_t pkts); uint64_t GetFlowStats(const uint16_t &oflow_data, const uint32_t &data); bool ShouldBeAged(FlowExportInfo *info, const vr_flow_entry *k_flow, - uint64_t curr_time); + const vr_flow_stats &k_stats, uint64_t curr_time); uint64_t GetUpdatedFlowPackets(const FlowExportInfo *stats, uint64_t k_flow_pkts); uint64_t GetUpdatedFlowBytes(const FlowExportInfo *stats, diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc index b88d70919d8..9003509a69e 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc @@ -82,7 +82,8 @@ FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent), flow_export_count_(), prev_flow_export_rate_compute_time_(0), flow_export_rate_(0), threshold_(kDefaultFlowSamplingThreshold), flow_export_disable_drops_(), flow_export_sampling_drops_(), - flow_export_drops_(), prev_cfg_flow_export_rate_(0), + flow_export_drops_(), deleted_flow_export_drops_(), + prev_cfg_flow_export_rate_(0), timer_(TimerManager::CreateTimer(*(agent_->event_manager())->io_service(), "FlowThresholdTimer", TaskScheduler::GetInstance()->GetTaskId("Agent::FlowStatsManager"), 0)), @@ -91,6 +92,7 @@ FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent), flow_export_disable_drops_ = 0; flow_export_sampling_drops_ = 0; flow_export_drops_ = 0; + deleted_flow_export_drops_ = 0; flows_sampled_atleast_once_ = false; request_queue_.set_measure_busy_time(agent->MeasureQueueDelay()); for (uint16_t i = 0; i < sizeof(protocol_list_)/sizeof(protocol_list_[0]); @@ -304,7 +306,8 @@ void FlowStatsManager::DeleteEvent(const FlowEntryPtr &flow, void FlowStatsManager::UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, - uint32_t oflow_bytes) { + uint32_t oflow_bytes, + const boost::uuids::uuid &u) { if (flow == NULL) { return; } @@ -316,7 +319,7 @@ void FlowStatsManager::UpdateStatsEvent(const FlowEntryPtr &flow, return; } - fsc->UpdateStatsEvent(flow, bytes, packets, oflow_bytes); + fsc->UpdateStatsEvent(flow, bytes, packets, oflow_bytes, u); } uint32_t FlowStatsManager::AllocateIndex() { diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h index cc43c2c2ced..6e930d2626b 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h @@ -98,7 +98,8 @@ class FlowStatsManager { void AddEvent(FlowEntryPtr &flow); void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams ¶ms); void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, - uint32_t packets, uint32_t oflow_bytes); + uint32_t packets, uint32_t oflow_bytes, + const boost::uuids::uuid &u); void Init(uint64_t flow_stats_interval, uint64_t flow_cache_timeout); void InitDone(); @@ -150,6 +151,10 @@ class FlowStatsManager { return flow_export_drops_; } + uint32_t deleted_flow_export_drops() const { + return deleted_flow_export_drops_; + } + uint64_t threshold() const { return threshold_;} bool delete_short_flow() const { return delete_short_flow_; @@ -190,6 +195,7 @@ class FlowStatsManager { tbb::atomic flow_export_sampling_drops_; tbb::atomic flow_export_without_sampling_; tbb::atomic flow_export_drops_; + tbb::atomic deleted_flow_export_drops_; tbb::atomic flows_sampled_atleast_once_; uint32_t prev_cfg_flow_export_rate_; Timer* timer_; diff --git a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h index 0a1763c7b28..9ed3765c879 100644 --- a/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h +++ b/src/vnsw/agent/vrouter/ksync/flowtable_ksync.h @@ -28,7 +28,7 @@ class KSyncFlowIndexManager; struct FlowKSyncResponseInfo { int ksync_error_; uint32_t flow_handle_; - uint32_t gen_id_; + uint8_t gen_id_; uint64_t evict_flow_bytes_; uint64_t evict_flow_packets_; int32_t evict_flow_oflow_; @@ -92,7 +92,7 @@ class FlowTableKSyncEntry : public KSyncNetlinkEntry { ksync_response_info_.Reset(); } void SetKSyncResponseInfo(int ksync_error, uint32_t flow_handle, - uint32_t gen_id, uint64_t evict_flow_bytes, + uint8_t gen_id, uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow) { ksync_response_info_.ksync_error_ = ksync_error; diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc index 07a643f0b77..32487621b08 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -278,16 +279,66 @@ bool KSyncFlowMemory::GetFlowKey(uint32_t index, FlowKey *key) { return true; } -bool KSyncFlowMemory::IsEvictionMarked(const vr_flow_entry *entry) const { +bool KSyncFlowMemory::IsEvictionMarked(const vr_flow_entry *entry, + uint16_t flags) const { if (!entry) { return false; } - if (entry->fe_flags & VR_FLOW_FLAG_EVICTED) { + if (flags & VR_FLOW_FLAG_EVICTED) { return true; } return false; } +const vr_flow_entry *KSyncFlowMemory::GetKFlowStats(const FlowKey &key, + uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stat) const { + const vr_flow_entry *kflow = GetValidKFlowEntry(key, idx, gen_id); + if (!kflow) { + return NULL; + } + *stat = kflow->fe_stats; + kflow = GetValidKFlowEntry(key, idx, gen_id); + return kflow; +} + +void KSyncFlowMemory::ReadFlowInfo(const vr_flow_entry *kflow, + vr_flow_stats *stat, KFlowData *info) const { + *stat = kflow->fe_stats; + info->underlay_src_port = kflow->fe_udp_src_port; + info->tcp_flags = kflow->fe_tcp_flags; + info->flags = kflow->fe_flags; +} + +const vr_flow_entry *KSyncFlowMemory::GetKFlowStatsAndInfo(const FlowKey &key, + uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stats, + KFlowData *info) + const { + const vr_flow_entry *kflow = GetKernelFlowEntry(idx, false); + if (!kflow) { + return NULL; + } + if (key.protocol == IPPROTO_TCP) { + FlowKey rhs; + KFlow2FlowKey(kflow, &rhs); + if (!key.IsEqual(rhs)) { + return NULL; + } + + ReadFlowInfo(kflow, stats, info); + + if (kflow->fe_gen_id != gen_id) { + return NULL; + } + } else { + ReadFlowInfo(kflow, stats, info); + } + return kflow; +} + bool KSyncFlowMemory::AuditProcess() { uint32_t flow_idx; uint8_t gen_id; diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h index 2a0bbd5596d..79b2531cd88 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h @@ -15,7 +15,8 @@ class Timer; class KSync; class FlowKey; struct vr_flow_entry; -struct vr_flow_entry; +struct vr_flow_stats; +struct KFlowData; class KSyncFlowMemory { public: @@ -44,6 +45,14 @@ class KSyncFlowMemory { bool ignore_active_status) const; const vr_flow_entry *GetValidKFlowEntry(const FlowKey &key, uint32_t idx, uint8_t gen_id) const; + const vr_flow_entry *GetKFlowStats(const FlowKey &key, uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stats) const; + const vr_flow_entry *GetKFlowStatsAndInfo(const FlowKey &key, + uint32_t idx, + uint8_t gen_id, + vr_flow_stats *stats, + KFlowData *info) const; bool GetFlowKey(uint32_t index, FlowKey *key); uint32_t flow_table_entries_count() { return flow_table_entries_count_; } @@ -53,7 +62,7 @@ class KSyncFlowMemory { void UnmapFlowMemTest(); void MapSharedMemory(); void GetFlowTableSize(); - bool IsEvictionMarked(const vr_flow_entry *entry) const; + bool IsEvictionMarked(const vr_flow_entry *entry, uint16_t flags) const; KSync *ksync() const { return ksync_; } void set_major_devid(int id) { major_devid_ = id; } @@ -73,6 +82,8 @@ class KSyncFlowMemory { uint64_t timeout; }; void KFlow2FlowKey(const vr_flow_entry *entry, FlowKey *key) const; + void ReadFlowInfo(const vr_flow_entry *k_flow, vr_flow_stats *stats, + KFlowData *info) const; KSync *ksync_;