From 0271ce169af1db2d626c5615b5532cd0bb50ed9b Mon Sep 17 00:00:00 2001 From: Ashok Singh Date: Thu, 11 Aug 2016 04:58:38 -0700 Subject: [PATCH] Flow Sampling changes 1.Do gradual reduction of flow-threshold when flow-export-rate has come down after reaching a high value (instead of resetting it to default threshold) 2.Do not disable sampling of flows when flow-export-rate drops to below 80% of configured rate after reaching high values 3.Define a minimum threshold value 20 4.Define a tracebuffer to log flow-export-rates and thresholds Also removed unused variable in FlowStatsCollector object. Partial-Bug: #1586246 (cherry picked from commit b541d966fe0f15c1ded0016ab19aa8077cb41159) Change-Id: I527f59b5b7b707252ec412cceded94094ee8bd54 --- src/vnsw/agent/pkt/test/test_xml_packet_ut.cc | 6 +-- .../vrouter/flow_stats/flow_stats.sandesh | 12 ++++++ .../flow_stats/flow_stats_collector.cc | 42 ++++++++++++++++--- .../vrouter/flow_stats/flow_stats_collector.h | 5 +-- .../vrouter/flow_stats/flow_stats_manager.cc | 15 ++++++- .../vrouter/flow_stats/flow_stats_manager.h | 19 ++++++++- 6 files changed, 84 insertions(+), 15 deletions(-) diff --git a/src/vnsw/agent/pkt/test/test_xml_packet_ut.cc b/src/vnsw/agent/pkt/test/test_xml_packet_ut.cc index 5310d64007f..54d37a8f9ee 100644 --- a/src/vnsw/agent/pkt/test/test_xml_packet_ut.cc +++ b/src/vnsw/agent/pkt/test/test_xml_packet_ut.cc @@ -193,13 +193,11 @@ TEST_F(TestPkt, flow_eviction) { test.Run(); } client->WaitForIdle(); - client->agent()->flow_stats_manager()-> - default_flow_stats_collector()->set_delete_short_flow(true); + client->agent()->flow_stats_manager()->set_delete_short_flow(true); client->EnqueueFlowAge(); client->WaitForIdle(); WAIT_FOR(0, 1000, (0U == proto_->FlowCount())); - client->agent()->flow_stats_manager()-> - default_flow_stats_collector()->set_delete_short_flow(false); + client->agent()->flow_stats_manager()->set_delete_short_flow(false); } diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats.sandesh b/src/vnsw/agent/vrouter/flow_stats/flow_stats.sandesh index 8bfe17738b2..ddca7a26c46 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats.sandesh +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats.sandesh @@ -168,3 +168,15 @@ request sandesh DeleteAgingConfig { 1: u32 protocol; 2: u32 port; } + +/** + * @description: Trace message for agent flow sampling export rate + * @type: Trace + * @severity: DEBUG + */ +traceobject sandesh FlowExportStatsTrace { + 1: u32 export_rate; + 2: u32 export_rate_without_sampling; + 3: u32 old_threshold; + 4: u32 new_threshold; +} diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index 013b9c5198e..0cbd0e7976a 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -761,7 +761,8 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, /* Subject a flow to sampling algorithm only when all of below is met:- * a. if Log is not configured as action for flow - * b. actual flow-export-rate is >= 80% of configured flow-export-rate + * b. actual flow-export-rate is >= 80% of configured flow-export-rate. This + * is done only for first time. * c. diff_bytes is lesser than the threshold * d. Flow-sampling is not disabled * e. Flow-sample does not have teardown time or the sample for the flow is @@ -771,8 +772,11 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, if (!info->IsActionLog() && (diff_bytes < threshold()) && (cfg_rate != GlobalVrouter::kDisableSampling) && (!info->teardown_time() || !info->exported_atleast_once()) && - flow_stats_manager_->flow_export_rate() >= ((double)cfg_rate) * 0.8) { + ((!flow_stats_manager_->flows_sampled_atleast_once_ && + flow_stats_manager_->flow_export_rate() >= ((double)cfg_rate) * 0.8) + || flow_stats_manager_->flows_sampled_atleast_once_)) { subject_flows_to_algorithm = true; + flow_stats_manager_->set_flows_sampled_atleast_once(); } if (subject_flows_to_algorithm) { @@ -890,7 +894,8 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, //the flow is same. Required for analytics module to query flows //irrespective of direction. EnqueueFlowMsg(); - flow_stats_manager_->flow_export_count_ += 2; + flow_stats_manager_->UpdateFlowExportStats(2, + subject_flows_to_algorithm); } else { if (flow->is_flags_set(FlowEntry::IngressDir)) { s_flow.set_direction_ing(1); @@ -899,13 +904,15 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info, s_flow.set_direction_ing(0); } EnqueueFlowMsg(); - flow_stats_manager_->flow_export_count_++; + flow_stats_manager_->UpdateFlowExportStats(1, + subject_flows_to_algorithm); } } bool FlowStatsManager::UpdateFlowThreshold() { uint64_t curr_time = FlowStatsCollector::GetCurrentTime(); bool export_rate_calculated = false; + uint32_t exp_rate_without_sampling = 0; /* If flows are not being exported, no need to update threshold */ if (!flow_export_count_) { @@ -923,6 +930,8 @@ bool FlowStatsManager::UpdateFlowThreshold() { if (diff_secs) { uint32_t flow_export_count = flow_export_count_reset(); flow_export_rate_ = flow_export_count/diff_secs; + exp_rate_without_sampling = + flow_export_without_sampling_reset()/diff_secs; prev_flow_export_rate_compute_time_ = curr_time; export_rate_calculated = true; } @@ -940,9 +949,25 @@ bool FlowStatsManager::UpdateFlowThreshold() { (cfg_rate == prev_cfg_flow_export_rate_)) { return true; } + uint32_t cur_t = threshold(), new_t = 0; // Update sampling threshold based on flow_export_rate_ if (flow_export_rate_ < ((double)cfg_rate) * 0.8) { - UpdateThreshold(kDefaultFlowSamplingThreshold); + /* There are two reasons why we can be here. + * 1. None of the flows were sampled because we never crossed + * 80% of configured flow-export-rate. + * 2. In scale setups, the threshold was updated to high value because + * of which flow-export-rate has dropped drastically. + * Threshold should be updated here depending on which of the above two + * situations we are in. */ + if (!flows_sampled_atleast_once_) { + UpdateThreshold(kDefaultFlowSamplingThreshold); + } else { + if (flow_export_rate_ < ((double)cfg_rate) * 0.5) { + UpdateThreshold((threshold_ / 4)); + } else { + UpdateThreshold((threshold_ / 2)); + } + } } else if (flow_export_rate_ > (cfg_rate * 3)) { UpdateThreshold((threshold_ * 4)); } else if (flow_export_rate_ > (cfg_rate * 2)) { @@ -951,6 +976,9 @@ bool FlowStatsManager::UpdateFlowThreshold() { UpdateThreshold((threshold_ * 2)); } prev_cfg_flow_export_rate_ = cfg_rate; + new_t = threshold(); + FLOW_EXPORT_STATS_TRACE(flow_export_rate_, exp_rate_without_sampling, cur_t, + new_t); return true; } @@ -958,6 +986,10 @@ uint32_t FlowStatsCollector::threshold() const { return flow_stats_manager_->threshold(); } +uint32_t FlowStatsCollector::flow_export_count() const { + return flow_stats_manager_->flow_export_count(); +} + bool FlowStatsCollector::RequestHandler(boost::shared_ptr req) { const FlowExportInfo &info = req->info(); FlowEntry *flow = info.flow(); diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h index 21f99802692..317eed03300 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h @@ -118,7 +118,6 @@ class FlowStatsCollector : public StatsCollector { void UpdateFloatingIpStats(const FlowEntry *flow, uint64_t bytes, uint64_t pkts); void Shutdown(); - void set_delete_short_flow(bool val) { delete_short_flow_ = val; } void AddEvent(const FlowEntryPtr &flow); void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams ¶ms); void SourceIpOverride(FlowExportInfo *info, FlowLogData &s_flow, @@ -157,6 +156,7 @@ class FlowStatsCollector : public StatsCollector { virtual void DispatchFlowMsg(const std::vector &lst); private: + uint32_t flow_export_count() const; static uint64_t GetCurrentTime(); void ExportFlowLocked(FlowExportInfo *info, uint64_t diff_bytes, uint64_t diff_pkts, const RevFlowDepParams *params); @@ -232,9 +232,6 @@ class FlowStatsCollector : public StatsCollector { uint64_t flow_age_time_intvl_; // Number of entries pending to be visited uint32_t entries_to_visit_; - // Should short-flow be deleted immediately? - // Value will be set to false for test cases - bool delete_short_flow_; uint64_t flow_tcp_syn_age_time_; FlowEntryTree flow_tree_; diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc index 248cfa3bff2..08f6a3473e8 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc @@ -30,10 +30,14 @@ #include #include +SandeshTraceBufferPtr FlowExportStatsTraceBuf(SandeshTraceBufferCreate( + "FlowExportStats", 3000)); const uint8_t FlowStatsManager::kCatchAllProto; void FlowStatsManager::UpdateThreshold(uint32_t new_value) { - if (new_value != 0) { + if (new_value < kMinFlowSamplingThreshold) { + threshold_ = kMinFlowSamplingThreshold; + } else { threshold_ = new_value; } } @@ -83,6 +87,7 @@ FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent), flow_export_disable_drops_ = 0; flow_export_sampling_drops_ = 0; flow_export_drops_ = 0; + flows_sampled_atleast_once_ = false; request_queue_.set_measure_busy_time(agent->MeasureQueueDelay()); } @@ -421,3 +426,11 @@ void FlowStatsManager::SetProfileData(ProfileData *data) { it++; } } + +void FlowStatsManager::UpdateFlowExportStats(uint32_t count, + bool sampled_flow) { + flow_export_count_ += count; + if (!sampled_flow) { + flow_export_without_sampling_ += count; + } +} diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h index 829f383145f..a5744cb498a 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h @@ -13,6 +13,13 @@ #include #include +extern SandeshTraceBufferPtr FlowExportStatsTraceBuf; + +#define FLOW_EXPORT_STATS_TRACE(...)\ +do {\ + FlowExportStatsTrace::TraceMsg(FlowExportStatsTraceBuf, __FILE__, __LINE__, __VA_ARGS__);\ +} while(0); + class FlowStatsCollector; struct FlowAgingTableKey { @@ -62,6 +69,7 @@ class FlowStatsManager { static const uint8_t kCatchAllProto = 0x0; static const uint64_t FlowThresoldUpdateTime = 1000 * 2; static const uint32_t kDefaultFlowSamplingThreshold = 500; + static const uint32_t kMinFlowSamplingThreshold = 20; typedef boost::shared_ptr FlowAgingTablePtr; @@ -125,6 +133,10 @@ class FlowStatsManager { return flow_export_count_.fetch_and_store(0); } + uint32_t flow_export_without_sampling_reset() { + return flow_export_without_sampling_.fetch_and_store(0); + } + uint32_t flow_export_disable_drops() const { return flow_export_disable_drops_; } @@ -145,11 +157,14 @@ class FlowStatsManager { void set_delete_short_flow(bool val) { delete_short_flow_ = val; } - + void set_flows_sampled_atleast_once() { + flows_sampled_atleast_once_ = true; + } static void FlowStatsReqHandler(Agent *agent, uint32_t proto, uint32_t port, uint64_t protocol); void FreeIndex(uint32_t idx); + void UpdateFlowExportStats(uint32_t count, bool sampled_flow); void SetProfileData(ProfileData *data); friend class AgentUtXmlFlowThreshold; @@ -169,7 +184,9 @@ class FlowStatsManager { uint32_t threshold_; tbb::atomic flow_export_disable_drops_; tbb::atomic flow_export_sampling_drops_; + tbb::atomic flow_export_without_sampling_; tbb::atomic flow_export_drops_; + tbb::atomic flows_sampled_atleast_once_; uint32_t prev_cfg_flow_export_rate_; Timer* timer_; bool delete_short_flow_;