Skip to content

Commit

Permalink
Fix Invalid flow-rate exported from Vrouter UVE
Browse files Browse the repository at this point in the history
Currently flow-rate is updated in Agent::FlowEvent task while it is being read in Agent::Uve task and there is no exclusion between these tasks. Made changes to update flow-rate in a new task which has exclusion with Agent::Uve. Also ensured in Agent::Uve task that flow-rate
is not exported if it is not yet computed.

Change-Id: I19bff570b0990a3b932576bf23c4b13a8260a3a6
Closes-Bug: #1639698
  • Loading branch information
ashoksr committed Nov 17, 2016
1 parent 4204c0f commit 4c00bbe
Show file tree
Hide file tree
Showing 15 changed files with 220 additions and 203 deletions.
6 changes: 6 additions & 0 deletions src/vnsw/agent/cmn/agent.cc
Expand Up @@ -132,6 +132,7 @@ void Agent::SetAgentTaskPolicy() {
kTaskFlowKSync,
kTaskFlowUpdate,
kTaskFlowAudit,
kTaskFlowStatsUpdate,
"Agent::Services",
"Agent::StatsCollector",
kTaskFlowStatsCollector,
Expand Down Expand Up @@ -309,6 +310,11 @@ void Agent::SetAgentTaskPolicy() {
SetTaskPolicyOne(kTaskDBExclude, db_exclude_task_exclude_list,
sizeof(db_exclude_task_exclude_list) / sizeof(char *));

const char *flow_stats_update_exclude_list[] = {
"Agent::Uve"
};
SetTaskPolicyOne(kTaskFlowStatsUpdate, flow_stats_update_exclude_list,
sizeof(flow_stats_update_exclude_list) / sizeof(char *));
}

void Agent::CreateLifetimeManager() {
Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/cmn/agent.h
Expand Up @@ -220,6 +220,7 @@ extern void RouterIdDepInit(Agent *agent);
#define kTaskFlowKSync "Agent::FlowKSync"
#define kTaskFlowAudit "KSync::FlowAudit"
#define kTaskFlowStatsCollector "Flow::StatsCollector"
#define kTaskFlowStatsUpdate "Agent::FlowStatsUpdate"

#define kTaskHealthCheck "Agent::HealthCheck"

Expand Down
91 changes: 14 additions & 77 deletions src/vnsw/agent/cmn/agent_stats.cc
Expand Up @@ -24,7 +24,7 @@ void AgentStats::Reset() {
pkt_invalid_agent_hdr_ = pkt_invalid_interface_ = 0;
pkt_no_handler_ = pkt_dropped_ = flow_created_ = 0;
pkt_fragments_dropped_ = 0;
flow_aged_ = flow_active_ = flow_drop_due_to_max_limit_ = 0;
flow_aged_ = flow_drop_due_to_max_limit_ = 0;
flow_drop_due_to_linklocal_limit_ = ipc_in_msgs_ = 0;
ipc_out_msgs_ = in_tpkts_ = in_bytes_ = out_tpkts_ = 0;
out_bytes_ = 0;
Expand Down Expand Up @@ -112,86 +112,23 @@ void AgentStatsReq::HandleRequest() const {
sandesh->Response();
}

void AgentStats::UpdateAddMinMaxStats(uint64_t count, uint64_t time) {
if ((max_flow_adds_per_second_ == kInvalidFlowCount) ||
(count > max_flow_adds_per_second_)) {
max_flow_adds_per_second_ = count;
void AgentStats::UpdateFlowMinMaxStats(uint64_t total_flows,
FlowCounters &stat) const {
uint64_t count = total_flows - stat.prev_flow_count;
if ((stat.max_flows_per_second == kInvalidFlowCount) ||
(count > stat.max_flows_per_second)) {
stat.max_flows_per_second = count;
}
if ((min_flow_adds_per_second_ == kInvalidFlowCount) ||
(count < min_flow_adds_per_second_)) {
min_flow_adds_per_second_ = count;
if ((stat.min_flows_per_second == kInvalidFlowCount) ||
(count < stat.min_flows_per_second)) {
stat.min_flows_per_second = count;
}
prev_flow_add_time_ = time;
stat.prev_flow_count = total_flows;
}

void AgentStats::UpdateFlowAddMinMaxStats(uint64_t time) {
uint64_t diff_micro_secs = time - prev_flow_add_time_;
uint64_t diff_secs = 0;
uint64_t count = 0;
if (diff_micro_secs) {
diff_secs = diff_micro_secs/1000000;
}
if (!diff_secs) {
return;
}
if (diff_secs > 1) {
count = (flow_created_ - 1) - prev_flow_created_;
if (count) {
UpdateAddMinMaxStats(count, time);
prev_flow_created_ = flow_created_ - 1;
return;
}
}
count = flow_created_ - prev_flow_created_;
UpdateAddMinMaxStats(count, time);
prev_flow_created_ = flow_created_;
}

void AgentStats::UpdateDelMinMaxStats(uint64_t count, uint64_t time) {
if ((max_flow_deletes_per_second_ == kInvalidFlowCount) ||
(count > max_flow_deletes_per_second_)) {
max_flow_deletes_per_second_ = count;
}
if ((min_flow_deletes_per_second_ == kInvalidFlowCount) ||
(count < min_flow_deletes_per_second_)) {
min_flow_deletes_per_second_ = count;
}
prev_flow_delete_time_ = time;
}

void AgentStats::UpdateFlowDelMinMaxStats(uint64_t time) {
uint64_t diff_micro_secs = time - prev_flow_delete_time_;
uint64_t diff_secs = 0;
uint64_t count = 0;
if (diff_micro_secs) {
diff_secs = diff_micro_secs/1000000;
}
if (!diff_secs) {
return;
}
if (diff_secs > 1) {
count = (flow_aged_ - 1) - prev_flow_aged_;
if (count) {
prev_flow_aged_ = flow_aged_ - 1;
UpdateDelMinMaxStats(count, time);
return;
}
}
count = flow_aged_ - prev_flow_aged_;
UpdateDelMinMaxStats(count, time);
prev_flow_aged_ = flow_aged_;
}

void AgentStats::ResetFlowAddMinMaxStats(uint64_t time) {
max_flow_adds_per_second_ = kInvalidFlowCount;
min_flow_adds_per_second_ = kInvalidFlowCount;
prev_flow_add_time_ = time;
}

void AgentStats::ResetFlowDelMinMaxStats(uint64_t time) {
max_flow_deletes_per_second_ = kInvalidFlowCount;
min_flow_deletes_per_second_ = kInvalidFlowCount;
prev_flow_delete_time_ = time;
void AgentStats::ResetFlowMinMaxStats(FlowCounters &stat) const {
stat.max_flows_per_second = kInvalidFlowCount;
stat.min_flows_per_second = kInvalidFlowCount;
}

void AgentStats::RegisterFlowCountFn(FlowCountFn cb) {
Expand Down
104 changes: 54 additions & 50 deletions src/vnsw/agent/cmn/agent_stats.h
Expand Up @@ -13,7 +13,19 @@
typedef boost::function<uint32_t()> FlowCountFn;
class AgentStats {
public:
static const uint64_t kInvalidFlowCount = 0xFFFFFFFFFFFFFFFF;
static const uint32_t kInvalidFlowCount = 0xFFFFFFFF;
static const int kFlowStatsUpdateInterval = 1000;
struct FlowCounters {
uint64_t prev_flow_count; //previous flow created/aged count
uint32_t max_flows_per_second; //max_flows_added/deleted_per_second
uint32_t min_flows_per_second; //min_flows_added/deleted_per_second

FlowCounters() : prev_flow_count(0),
max_flows_per_second(kInvalidFlowCount),
min_flows_per_second(kInvalidFlowCount) {
}
};

AgentStats(Agent *agent)
: agent_(agent), xmpp_reconnect_(), xmpp_in_msgs_(), xmpp_out_msgs_(),
xmpp_config_in_msgs_(), sandesh_reconnects_(0U),
Expand All @@ -22,18 +34,15 @@ class AgentStats {
pkt_invalid_agent_hdr_(0U), pkt_invalid_interface_(0U),
pkt_no_handler_(0U), pkt_fragments_dropped_(0U), pkt_dropped_(0U),
max_flow_count_(0),
flow_created_(0U), flow_aged_(0U), flow_active_(0U),
flow_drop_due_to_max_limit_(0), flow_drop_due_to_linklocal_limit_(0),
prev_flow_created_(0U), prev_flow_aged_(0U),
max_flow_adds_per_second_(kInvalidFlowCount),
min_flow_adds_per_second_(kInvalidFlowCount),
max_flow_deletes_per_second_(kInvalidFlowCount),
min_flow_deletes_per_second_(kInvalidFlowCount),
flow_stats_update_timeout_(kFlowStatsUpdateInterval),
ipc_in_msgs_(0U), ipc_out_msgs_(0U), in_tpkts_(0U), in_bytes_(0U),
out_tpkts_(0U), out_bytes_(0U) {
assert(singleton_ == NULL);
singleton_ = this;
flow_count_ = 0;
flow_created_ = 0;
flow_aged_ = 0;
}

virtual ~AgentStats() {singleton_ = NULL;}
Expand Down Expand Up @@ -71,7 +80,7 @@ class AgentStats {
uint32_t sandesh_http_sessions() const {return sandesh_http_sessions_;}

void incr_flow_created() {
flow_created_++;
flow_created_.fetch_and_increment();
uint32_t count = flow_count_.fetch_and_increment();
if (count > max_flow_count_)
max_flow_count_ = count + 1;
Expand All @@ -84,9 +93,17 @@ class AgentStats {

uint64_t max_flow_count() const {return max_flow_count_;}

void incr_flow_aged() {flow_aged_++;}
void incr_flow_aged() { flow_aged_.fetch_and_increment(); }
uint64_t flow_aged() const {return flow_aged_;}

int flow_stats_update_timeout() const {
return flow_stats_update_timeout_;
}

void set_flow_stats_update_timeout(int value) {
flow_stats_update_timeout_ = value;
}

void incr_flow_drop_due_to_max_limit() {flow_drop_due_to_max_limit_++;}
uint64_t flow_drop_due_to_max_limit() const {
return flow_drop_due_to_max_limit_;
Expand Down Expand Up @@ -134,53 +151,46 @@ class AgentStats {
void incr_out_bytes(uint64_t count) {out_bytes_ += count;}
uint64_t out_bytes() const {return out_bytes_;}

uint64_t max_flow_adds_per_second() const {
return max_flow_adds_per_second_;
uint32_t max_flow_adds_per_second() const {
return added_.max_flows_per_second;
}
uint64_t min_flow_adds_per_second() const {
return min_flow_adds_per_second_;
uint32_t min_flow_adds_per_second() const {
return added_.min_flows_per_second;
}
uint64_t max_flow_deletes_per_second() const {
return max_flow_deletes_per_second_;
uint32_t max_flow_deletes_per_second() const {
return deleted_.max_flows_per_second;
}
uint64_t min_flow_deletes_per_second() const {
return min_flow_deletes_per_second_;
uint32_t min_flow_deletes_per_second() const {
return deleted_.min_flows_per_second;
}

void set_prev_flow_add_time(uint64_t time) {
prev_flow_add_time_ = time;
}
void set_prev_flow_delete_time(uint64_t time) {
prev_flow_delete_time_ = time;
}
void set_prev_flow_created(uint64_t value) {
prev_flow_created_ = value;
added_.prev_flow_count = value;
}

void set_prev_flow_aged(uint64_t value) {
prev_flow_aged_ = value;
deleted_.prev_flow_count = value;
}
void set_max_flow_adds_per_second(uint64_t value) {
max_flow_adds_per_second_ = value;;
void set_max_flow_adds_per_second(uint32_t value) {
added_.max_flows_per_second = value;
}
void set_min_flow_adds_per_second(uint64_t value) {
min_flow_adds_per_second_ = value;;
void set_min_flow_adds_per_second(uint32_t value) {
added_.min_flows_per_second = value;
}
void set_max_flow_deletes_per_second(uint64_t value) {
max_flow_deletes_per_second_ = value;;
void set_max_flow_deletes_per_second(uint32_t value) {
deleted_.max_flows_per_second = value;
}
void set_min_flow_deletes_per_second(uint64_t value) {
min_flow_deletes_per_second_ = value;
void set_min_flow_deletes_per_second(uint32_t value) {
deleted_.min_flows_per_second = value;
}
void UpdateFlowAddMinMaxStats(uint64_t time);
void UpdateFlowDelMinMaxStats(uint64_t time);
void ResetFlowAddMinMaxStats(uint64_t time);
void ResetFlowDelMinMaxStats(uint64_t time);
void UpdateFlowMinMaxStats(uint64_t total_flows, FlowCounters &stat) const;
void ResetFlowMinMaxStats(FlowCounters &stat) const;

void RegisterFlowCountFn(FlowCountFn cb);
uint32_t FlowCount() const;
FlowCounters& added() { return added_; }
FlowCounters& deleted() { return deleted_; }
private:
void UpdateAddMinMaxStats(uint64_t count, uint64_t time);
void UpdateDelMinMaxStats(uint64_t count, uint64_t time);

Agent *agent_;
FlowCountFn flow_count_fn_;
Expand Down Expand Up @@ -208,19 +218,13 @@ class AgentStats {
// Flow stats
tbb::atomic<uint32_t> flow_count_;
uint32_t max_flow_count_;
uint64_t flow_created_;
uint64_t flow_aged_;
uint64_t flow_active_;
uint64_t flow_drop_due_to_max_limit_;
uint64_t flow_drop_due_to_linklocal_limit_;
uint64_t prev_flow_created_;
uint64_t prev_flow_aged_;
uint64_t max_flow_adds_per_second_;
uint64_t min_flow_adds_per_second_;
uint64_t max_flow_deletes_per_second_;
uint64_t min_flow_deletes_per_second_;
uint64_t prev_flow_add_time_;
uint64_t prev_flow_delete_time_;
tbb::atomic<uint64_t> flow_created_;
tbb::atomic<uint64_t> flow_aged_;
FlowCounters added_;
FlowCounters deleted_;
int flow_stats_update_timeout_;

// Kernel IPC
uint64_t ipc_in_msgs_;
Expand Down
19 changes: 18 additions & 1 deletion src/vnsw/agent/pkt/flow_proto.cc
Expand Up @@ -26,7 +26,10 @@ FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) :
flow_update_queue_(agent, this, &update_tokens_,
agent->params()->flow_task_latency_limit(), 16),
use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(),
stats_() {
stats_(),
stats_update_timer_(TimerManager::CreateTimer
(*(agent->event_manager())->io_service(), "FlowStatsUpdateTimer",
TaskScheduler::GetInstance()->GetTaskId(kTaskFlowStatsUpdate), 0)) {
linklocal_flow_count_ = 0;
agent->SetFlowProto(this);
set_trace(false);
Expand Down Expand Up @@ -90,6 +93,8 @@ void FlowProto::InitDone() {
for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
flow_table_list_[i]->InitDone();
}
stats_update_timer_->Start(agent_->stats()->flow_stats_update_timeout(),
boost::bind(&FlowProto::FlowStatsUpdate, this));
}

void FlowProto::Shutdown() {
Expand All @@ -103,6 +108,10 @@ void FlowProto::Shutdown() {
flow_ksync_queue_[i]->Shutdown();
}
flow_update_queue_.Shutdown();
if (stats_update_timer_) {
stats_update_timer_->Cancel();
TimerManager::DeleteTimer(stats_update_timer_);
}
}

static std::size_t HashCombine(std::size_t hash, uint64_t val) {
Expand Down Expand Up @@ -838,3 +847,11 @@ void FlowProto::SetProfileData(ProfileData *data) {
data->flow_.token_stats_.del_failures_ = del_tokens_.failures();
data->flow_.token_stats_.del_restarts_ = del_tokens_.restarts();
}

bool FlowProto::FlowStatsUpdate() const {
agent_->stats()->UpdateFlowMinMaxStats(agent_->stats()->flow_created(),
agent_->stats()->added());
agent_->stats()->UpdateFlowMinMaxStats(agent_->stats()->flow_aged(),
agent_->stats()->deleted());
return true;
}
3 changes: 2 additions & 1 deletion src/vnsw/agent/pkt/flow_proto.h
Expand Up @@ -70,7 +70,6 @@ class FlowProto : public Proto {
uint32_t FlowCount() const;
void VnFlowCounters(const VnEntry *vn, uint32_t *in_count,
uint32_t *out_count);

bool AddFlow(FlowEntry *flow);
bool UpdateFlow(FlowEntry *flow);

Expand Down Expand Up @@ -128,6 +127,7 @@ class FlowProto : public Proto {
FlowTraceFilter *ipv6_trace_filter() { return &ipv6_trace_filter_; }

bool ProcessFlowEvent(const FlowEvent &req, FlowTable *table);
bool FlowStatsUpdate() const;

FlowTokenPool add_tokens_;
FlowTokenPool ksync_tokens_;
Expand All @@ -144,6 +144,7 @@ class FlowProto : public Proto {
FlowTraceFilter ipv4_trace_filter_;
FlowTraceFilter ipv6_trace_filter_;
FlowStats stats_;
Timer *stats_update_timer_;
};

extern SandeshTraceBufferPtr PktFlowTraceBuf;
Expand Down
2 changes: 0 additions & 2 deletions src/vnsw/agent/pkt/flow_table.cc
Expand Up @@ -143,7 +143,6 @@ FlowEntry *FlowTable::Locate(FlowEntry *flow, uint64_t time) {
ret = flow_entry_map_.insert(FlowEntryMapPair(flow->key(), flow));
if (ret.second == true) {
agent_->stats()->incr_flow_created();
agent_->stats()->UpdateFlowAddMinMaxStats(time);
ret.first->second->set_on_tree();
return flow;
}
Expand Down Expand Up @@ -312,7 +311,6 @@ void FlowTable::DeleteInternal(FlowEntry *fe, uint64_t time,
DeleteKSync(fe);

agent_->stats()->incr_flow_aged();
agent_->stats()->UpdateFlowDelMinMaxStats(time);
}

bool FlowTable::DeleteFlows(FlowEntry *flow, FlowEntry *rflow) {
Expand Down

0 comments on commit 4c00bbe

Please sign in to comment.