Skip to content

Commit

Permalink
Merge "Adjust flow-stats-collector timer to handle stats-collecting l…
Browse files Browse the repository at this point in the history
…atency"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Apr 17, 2016
2 parents 83ec1af + 70996fe commit 1e766ce
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 37 deletions.
8 changes: 5 additions & 3 deletions src/vnsw/agent/pkt/flow_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,11 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
}

case FlowEvent::AUDIT_FLOW: {
FlowEntryPtr flow = FlowEntry::Allocate(req->get_flow_key(), table);
flow->InitAuditFlow(req->flow_handle(), req->gen_id());
flow->flow_table()->Add(flow.get(), NULL);
if (table->Find(req->get_flow_key()) == NULL) {
FlowEntryPtr flow = FlowEntry::Allocate(req->get_flow_key(), table);
flow->InitAuditFlow(req->flow_handle(), req->gen_id());
flow->flow_table()->Add(flow.get(), NULL);
}
break;
}

Expand Down
85 changes: 52 additions & 33 deletions src/vnsw/agent/pkt/test/test_flow_audit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,12 @@ struct PortInfo input[] = {
};
VmInterface *vmi0;

static bool FlowStatsTimerStartStopTrigger (bool stop) {
Agent::GetInstance()->flow_stats_manager()->\
default_flow_stats_collector()->TestStartStopTimer(stop);
static bool FlowStatsTimerStartStopTrigger (FlowStatsCollector *fsc,
bool stop) {
fsc->TestStartStopTimer(stop);
return true;
}

static void FlowStatsTimerStartStop (bool stop) {
int task_id = TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent);
std::auto_ptr<TaskTrigger> trigger_
(new TaskTrigger(boost::bind(FlowStatsTimerStartStopTrigger, stop),
task_id, 0));
trigger_->Set();
client->WaitForIdle();
}

class FlowAuditTest : public ::testing::Test {
public:
FlowAuditTest() : agent_(Agent::GetInstance()) {
Expand All @@ -49,6 +40,8 @@ class FlowAuditTest : public ::testing::Test {

vmi0 = VmInterfaceGet(input[0].intf_id);
assert(vmi0);
FlowStatsTimerStartStop(true);
KFlowPurgeHold();
}

virtual void TearDown() {
Expand All @@ -57,8 +50,20 @@ class FlowAuditTest : public ::testing::Test {

DeleteVmportEnv(input, 1, true, 1);
client->WaitForIdle();
FlowStatsTimerStartStop(false);
KFlowPurgeHold();
}

void FlowStatsTimerStartStop (bool stop) {
int task_id =
agent_->task_scheduler()->GetTaskId(kTaskFlowStatsCollector);
std::auto_ptr<TaskTrigger> trigger_
(new TaskTrigger(boost::bind(FlowStatsTimerStartStopTrigger,
flow_stats_collector_, stop),
task_id, 0));
trigger_->Set();
client->WaitForIdle();
}

bool FlowTableWait(size_t count) {
int i = 1000;
Expand Down Expand Up @@ -140,24 +145,35 @@ class FlowAuditTest : public ::testing::Test {
FlowStatsCollector* flow_stats_collector_;
};

TEST_F(FlowAuditTest, FlowAudit) {
KFlowPurgeHold();
FlowStatsTimerStartStop(true);
// Validate flows audit
TEST_F(FlowAuditTest, FlowAudit_1) {
// Create two hold-flows
EXPECT_TRUE(KFlowHoldAdd(1, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));
EXPECT_TRUE(KFlowHoldAdd(2, 1, "2.2.2.2", "3.3.3.3", 1, 0, 0, 0));
RunFlowAudit();
EXPECT_TRUE(FlowTableWait(2));

FlowEntry *fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0);
EXPECT_TRUE(fe != NULL && fe->is_flags_set(FlowEntry::ShortFlow) == true &&
fe->short_flow_reason() == FlowEntry::SHORT_AUDIT_ENTRY);
//FlowStatsTimerStartStop(false);

// Wait till flow-stats-collector sees the flows
WAIT_FOR(1000, 1000, (flow_stats_collector_->Size() == 2));

// Enqueue aging and validate flows are deleted
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U));
KFlowPurgeHold();
}

// Validate flow do not get deleted in following case,
// - Flow-audit runs and enqueues request to delete
// - Add flow before audit message is run
// - Flow-audit message should be ignored
TEST_F(FlowAuditTest, FlowAudit_2) {

string vrf_name =
Agent::GetInstance()->vrf_table()->FindVrfFromId(1)->GetName();
// Create the flow first
string vrf_name = agent_->vrf_table()->FindVrfFromId(1)->GetName();
TestFlow flow[] = {
{
TestFlowPkt(Address::INET, "1.1.1.1", "2.2.2.2", 1, 0, 0, vrf_name,
Expand All @@ -166,24 +182,27 @@ TEST_F(FlowAuditTest, FlowAudit) {
}
}
};

CreateFlow(flow, 1);

EXPECT_TRUE(FlowTableWait(2));
EXPECT_TRUE(KFlowHoldAdd(10, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));

uint32_t nh_id = vmi0->flow_key_nh()->id();
// Validate that flow-drop-reason is not AUDIT
FlowEntry *fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, nh_id);
EXPECT_TRUE(fe != NULL &&
fe->short_flow_reason() != FlowEntry::SHORT_AUDIT_ENTRY);

// Wait till flow-stats-collector sees the flows
WAIT_FOR(1000, 1000, (flow_stats_collector_->Size() == 2));

// Enqueue Audit message
EXPECT_TRUE(KFlowHoldAdd(nh_id, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));
RunFlowAudit();
client->EnqueueFlowAge();
client->WaitForIdle();
usleep(500);
int tmp_age_time = 10 * 1000;
int bkp_age_time = flow_stats_collector_->flow_age_time_intvl();
//Set the flow age time to 10 microsecond
flow_stats_collector_->UpdateFlowAgeTime(tmp_age_time);
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U));
flow_stats_collector_->UpdateFlowAgeTime(bkp_age_time);
KFlowPurgeHold();

// Validate that flow-drop-reason is not AUDIT
fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, nh_id);
EXPECT_TRUE(fe != NULL &&
fe->short_flow_reason() != FlowEntry::SHORT_AUDIT_ENTRY);
}

int main(int argc, char *argv[]) {
Expand Down
21 changes: 21 additions & 0 deletions src/vnsw/agent/pkt/test/test_xml_packet_ut.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,45 @@ static void GetArgs(char *test_file, int argc, char *argv[]) {
return;
}

static bool FlowStatsTimerStartStopTrigger(FlowStatsCollector *fsc, bool stop) {
fsc->TestStartStopTimer(stop);
return true;
}

class TestPkt : public ::testing::Test {
public:
virtual void SetUp() {
agent_ = Agent::GetInstance();
proto_ = agent_->pkt()->get_flow_proto();
interface_count_ = agent_->interface_table()->Size();
flow_stats_collector_ = agent_->flow_stats_manager()->
default_flow_stats_collector();
FlowStatsTimerStartStop(true);
}

virtual void TearDown() {
EXPECT_EQ(agent_->pkt()->get_flow_proto()->FlowCount(), 0);
EXPECT_EQ(agent_->vn_table()->Size(), 0);
EXPECT_EQ(agent_->interface_table()->Size(), interface_count_);
agent_->flow_stats_manager()->set_flow_export_count(0);
FlowStatsTimerStartStop(false);
}

void FlowStatsTimerStartStop(bool stop) {
int task_id =
agent_->task_scheduler()->GetTaskId(kTaskFlowStatsCollector);
std::auto_ptr<TaskTrigger> trigger_
(new TaskTrigger(boost::bind(FlowStatsTimerStartStopTrigger,
flow_stats_collector_, stop),
task_id, 0));
trigger_->Set();
client->WaitForIdle();
}

Agent *agent_;
FlowProto *proto_;
uint32_t interface_count_;
FlowStatsCollector* flow_stats_collector_;
};

TEST_F(TestPkt, parse_1) {
Expand Down
4 changes: 4 additions & 0 deletions src/vnsw/agent/uve/stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class StatsCollector {
}
}

void RescheduleTimer(int time) {
timer_->Reschedule(time);
}

int run_counter_; //used only in UT code

private:
Expand Down
10 changes: 10 additions & 0 deletions src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ void FlowStatsCollector::UpdateAndExportInternal(FlowExportInfo *info,
bool FlowStatsCollector::Run() {
run_counter_++;
if (!flow_tree_.size()) {
set_expiry_time(kFlowStatsTimerInterval);
return true;
}

Expand Down Expand Up @@ -490,6 +491,15 @@ bool FlowStatsCollector::Run() {
flow_iteration_key_ = it->first;
}

// The flow processing above can have significant latency (>20msec)
// adjust timer such that time between firing is kFlowStatsTimerInterval
int64_t latency = (UTCTimestampUsec() - curr_time)/1000;
int next_interval = (int)kFlowStatsTimerInterval - latency;
if (next_interval < (int)kFlowStatsTimerIntervalMin) {
next_interval = (int)kFlowStatsTimerIntervalMin;
}
RescheduleTimer(next_interval);

return true;
}

Expand Down
4 changes: 3 additions & 1 deletion src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ class FlowStatsCollector : public StatsCollector {
public:
static const uint64_t FlowAgeTime = 1000000 * 180;
static const uint32_t kFlowStatsTimerInterval = 50; // time in milliseconds
// Min time in milliseconds
static const uint32_t kFlowStatsTimerIntervalMin = 5;
static const uint64_t FlowTcpSynAgeTime = 1000000 * 180;
// Retry flow-delete after 2 second
static const uint64_t kFlowDeleteRetryTime = (2 * 1000 * 1000);
static const uint64_t kFlowDeleteRetryTime = (5 * 1000 * 1000);

// Time within which complete table must be scanned
// Specified in terms of percentage of aging-time
Expand Down

0 comments on commit 1e766ce

Please sign in to comment.