Skip to content

Commit

Permalink
Merge "Flow Sampling changes" into R3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 17, 2016
2 parents 7b03722 + 0271ce1 commit 249dd2e
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 15 deletions.
6 changes: 2 additions & 4 deletions src/vnsw/agent/pkt/test/test_xml_packet_ut.cc
Expand Up @@ -193,13 +193,11 @@ TEST_F(TestPkt, flow_eviction) {
test.Run();
}
client->WaitForIdle();
client->agent()->flow_stats_manager()->
default_flow_stats_collector()->set_delete_short_flow(true);
client->agent()->flow_stats_manager()->set_delete_short_flow(true);
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(0, 1000, (0U == proto_->FlowCount()));
client->agent()->flow_stats_manager()->
default_flow_stats_collector()->set_delete_short_flow(false);
client->agent()->flow_stats_manager()->set_delete_short_flow(false);
}


Expand Down
12 changes: 12 additions & 0 deletions src/vnsw/agent/vrouter/flow_stats/flow_stats.sandesh
Expand Up @@ -168,3 +168,15 @@ request sandesh DeleteAgingConfig {
1: u32 protocol;
2: u32 port;
}

/**
* @description: Trace message for agent flow sampling export rate
* @type: Trace
* @severity: DEBUG
*/
traceobject sandesh FlowExportStatsTrace {
1: u32 export_rate;
2: u32 export_rate_without_sampling;
3: u32 old_threshold;
4: u32 new_threshold;
}
42 changes: 37 additions & 5 deletions src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc
Expand Up @@ -776,7 +776,8 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info,

/* Subject a flow to sampling algorithm only when all of below is met:-
* a. if Log is not configured as action for flow
* b. actual flow-export-rate is >= 80% of configured flow-export-rate
* b. actual flow-export-rate is >= 80% of configured flow-export-rate. This
* is done only for first time.
* c. diff_bytes is lesser than the threshold
* d. Flow-sampling is not disabled
* e. Flow-sample does not have teardown time or the sample for the flow is
Expand All @@ -786,8 +787,11 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info,
if (!info->IsActionLog() && (diff_bytes < threshold()) &&
(cfg_rate != GlobalVrouter::kDisableSampling) &&
(!info->teardown_time() || !info->exported_atleast_once()) &&
flow_stats_manager_->flow_export_rate() >= ((double)cfg_rate) * 0.8) {
((!flow_stats_manager_->flows_sampled_atleast_once_ &&
flow_stats_manager_->flow_export_rate() >= ((double)cfg_rate) * 0.8)
|| flow_stats_manager_->flows_sampled_atleast_once_)) {
subject_flows_to_algorithm = true;
flow_stats_manager_->set_flows_sampled_atleast_once();
}

if (subject_flows_to_algorithm) {
Expand Down Expand Up @@ -905,7 +909,8 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info,
//the flow is same. Required for analytics module to query flows
//irrespective of direction.
EnqueueFlowMsg();
flow_stats_manager_->flow_export_count_ += 2;
flow_stats_manager_->UpdateFlowExportStats(2,
subject_flows_to_algorithm);
} else {
if (flow->is_flags_set(FlowEntry::IngressDir)) {
s_flow.set_direction_ing(1);
Expand All @@ -914,13 +919,15 @@ void FlowStatsCollector::ExportFlow(FlowExportInfo *info,
s_flow.set_direction_ing(0);
}
EnqueueFlowMsg();
flow_stats_manager_->flow_export_count_++;
flow_stats_manager_->UpdateFlowExportStats(1,
subject_flows_to_algorithm);
}
}

bool FlowStatsManager::UpdateFlowThreshold() {
uint64_t curr_time = FlowStatsCollector::GetCurrentTime();
bool export_rate_calculated = false;
uint32_t exp_rate_without_sampling = 0;

/* If flows are not being exported, no need to update threshold */
if (!flow_export_count_) {
Expand All @@ -938,6 +945,8 @@ bool FlowStatsManager::UpdateFlowThreshold() {
if (diff_secs) {
uint32_t flow_export_count = flow_export_count_reset();
flow_export_rate_ = flow_export_count/diff_secs;
exp_rate_without_sampling =
flow_export_without_sampling_reset()/diff_secs;
prev_flow_export_rate_compute_time_ = curr_time;
export_rate_calculated = true;
}
Expand All @@ -955,9 +964,25 @@ bool FlowStatsManager::UpdateFlowThreshold() {
(cfg_rate == prev_cfg_flow_export_rate_)) {
return true;
}
uint32_t cur_t = threshold(), new_t = 0;
// Update sampling threshold based on flow_export_rate_
if (flow_export_rate_ < ((double)cfg_rate) * 0.8) {
UpdateThreshold(kDefaultFlowSamplingThreshold);
/* There are two reasons why we can be here.
* 1. None of the flows were sampled because we never crossed
* 80% of configured flow-export-rate.
* 2. In scale setups, the threshold was updated to high value because
* of which flow-export-rate has dropped drastically.
* Threshold should be updated here depending on which of the above two
* situations we are in. */
if (!flows_sampled_atleast_once_) {
UpdateThreshold(kDefaultFlowSamplingThreshold);
} else {
if (flow_export_rate_ < ((double)cfg_rate) * 0.5) {
UpdateThreshold((threshold_ / 4));
} else {
UpdateThreshold((threshold_ / 2));
}
}
} else if (flow_export_rate_ > (cfg_rate * 3)) {
UpdateThreshold((threshold_ * 4));
} else if (flow_export_rate_ > (cfg_rate * 2)) {
Expand All @@ -966,13 +991,20 @@ bool FlowStatsManager::UpdateFlowThreshold() {
UpdateThreshold((threshold_ * 2));
}
prev_cfg_flow_export_rate_ = cfg_rate;
new_t = threshold();
FLOW_EXPORT_STATS_TRACE(flow_export_rate_, exp_rate_without_sampling, cur_t,
new_t);
return true;
}

uint32_t FlowStatsCollector::threshold() const {
return flow_stats_manager_->threshold();
}

uint32_t FlowStatsCollector::flow_export_count() const {
return flow_stats_manager_->flow_export_count();
}

bool FlowStatsCollector::RequestHandler(boost::shared_ptr<FlowExportReq> req) {
const FlowExportInfo &info = req->info();
FlowEntry *flow = info.flow();
Expand Down
5 changes: 1 addition & 4 deletions src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h
Expand Up @@ -118,7 +118,6 @@ class FlowStatsCollector : public StatsCollector {
void UpdateFloatingIpStats(const FlowEntry *flow, uint64_t bytes,
uint64_t pkts);
void Shutdown();
void set_delete_short_flow(bool val) { delete_short_flow_ = val; }
void AddEvent(const FlowEntryPtr &flow);
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams &params);
void SourceIpOverride(FlowExportInfo *info, FlowLogData &s_flow,
Expand Down Expand Up @@ -157,6 +156,7 @@ class FlowStatsCollector : public StatsCollector {
virtual void DispatchFlowMsg(const std::vector<FlowLogData> &lst);

private:
uint32_t flow_export_count() const;
static uint64_t GetCurrentTime();
void ExportFlowLocked(FlowExportInfo *info, uint64_t diff_bytes,
uint64_t diff_pkts, const RevFlowDepParams *params);
Expand Down Expand Up @@ -233,9 +233,6 @@ class FlowStatsCollector : public StatsCollector {
uint64_t flow_age_time_intvl_;
// Number of entries pending to be visited
uint32_t entries_to_visit_;
// Should short-flow be deleted immediately?
// Value will be set to false for test cases
bool delete_short_flow_;
uint64_t flow_tcp_syn_age_time_;

FlowEntryTree flow_tree_;
Expand Down
15 changes: 14 additions & 1 deletion src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.cc
Expand Up @@ -30,10 +30,14 @@
#include <oper/global_vrouter.h>
#include <init/agent_param.h>

SandeshTraceBufferPtr FlowExportStatsTraceBuf(SandeshTraceBufferCreate(
"FlowExportStats", 3000));
const uint8_t FlowStatsManager::kCatchAllProto;

void FlowStatsManager::UpdateThreshold(uint32_t new_value) {
if (new_value != 0) {
if (new_value < kMinFlowSamplingThreshold) {
threshold_ = kMinFlowSamplingThreshold;
} else {
threshold_ = new_value;
}
}
Expand Down Expand Up @@ -83,6 +87,7 @@ FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent),
flow_export_disable_drops_ = 0;
flow_export_sampling_drops_ = 0;
flow_export_drops_ = 0;
flows_sampled_atleast_once_ = false;
request_queue_.set_measure_busy_time(agent->MeasureQueueDelay());
}

Expand Down Expand Up @@ -421,3 +426,11 @@ void FlowStatsManager::SetProfileData(ProfileData *data) {
it++;
}
}

void FlowStatsManager::UpdateFlowExportStats(uint32_t count,
bool sampled_flow) {
flow_export_count_ += count;
if (!sampled_flow) {
flow_export_without_sampling_ += count;
}
}
19 changes: 18 additions & 1 deletion src/vnsw/agent/vrouter/flow_stats/flow_stats_manager.h
Expand Up @@ -13,6 +13,13 @@
#include <vrouter/ksync/flowtable_ksync.h>
#include <sandesh/common/flow_types.h>

extern SandeshTraceBufferPtr FlowExportStatsTraceBuf;

#define FLOW_EXPORT_STATS_TRACE(...)\
do {\
FlowExportStatsTrace::TraceMsg(FlowExportStatsTraceBuf, __FILE__, __LINE__, __VA_ARGS__);\
} while(0);

class FlowStatsCollector;

struct FlowAgingTableKey {
Expand Down Expand Up @@ -62,6 +69,7 @@ class FlowStatsManager {
static const uint8_t kCatchAllProto = 0x0;
static const uint64_t FlowThresoldUpdateTime = 1000 * 2;
static const uint32_t kDefaultFlowSamplingThreshold = 500;
static const uint32_t kMinFlowSamplingThreshold = 20;

typedef boost::shared_ptr<FlowStatsCollector> FlowAgingTablePtr;

Expand Down Expand Up @@ -125,6 +133,10 @@ class FlowStatsManager {
return flow_export_count_.fetch_and_store(0);
}

uint32_t flow_export_without_sampling_reset() {
return flow_export_without_sampling_.fetch_and_store(0);
}

uint32_t flow_export_disable_drops() const {
return flow_export_disable_drops_;
}
Expand All @@ -145,11 +157,14 @@ class FlowStatsManager {
void set_delete_short_flow(bool val) {
delete_short_flow_ = val;
}

void set_flows_sampled_atleast_once() {
flows_sampled_atleast_once_ = true;
}
static void FlowStatsReqHandler(Agent *agent, uint32_t proto,
uint32_t port,
uint64_t protocol);
void FreeIndex(uint32_t idx);
void UpdateFlowExportStats(uint32_t count, bool sampled_flow);

void SetProfileData(ProfileData *data);
friend class AgentUtXmlFlowThreshold;
Expand All @@ -169,7 +184,9 @@ class FlowStatsManager {
uint32_t threshold_;
tbb::atomic<uint64_t> flow_export_disable_drops_;
tbb::atomic<uint64_t> flow_export_sampling_drops_;
tbb::atomic<uint32_t> flow_export_without_sampling_;
tbb::atomic<uint64_t> flow_export_drops_;
tbb::atomic<bool> flows_sampled_atleast_once_;
uint32_t prev_cfg_flow_export_rate_;
Timer* timer_;
bool delete_short_flow_;
Expand Down

0 comments on commit 249dd2e

Please sign in to comment.