diff --git a/src/vnsw/agent/pkt/flow_event.h b/src/vnsw/agent/pkt/flow_event.h index efd2b632dd7..3f11199ea70 100644 --- a/src/vnsw/agent/pkt/flow_event.h +++ b/src/vnsw/agent/pkt/flow_event.h @@ -102,6 +102,12 @@ class FlowEvent { ksync_event_() { } + FlowEvent(Event event, const FlowKey &key, uint32_t flow_handle) : + event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL), + gen_id_(0), flow_key_(key), del_rev_flow_(false), + flow_handle_(flow_handle), ksync_entry_(NULL), ksync_event_() { + } + FlowEvent(Event event, PktInfoPtr pkt_info) : event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL), gen_id_(0), flow_key_(), del_rev_flow_(), diff --git a/src/vnsw/agent/pkt/flow_proto.cc b/src/vnsw/agent/pkt/flow_proto.cc index db6f0aacf02..15ed4aa7dea 100644 --- a/src/vnsw/agent/pkt/flow_proto.cc +++ b/src/vnsw/agent/pkt/flow_proto.cc @@ -208,7 +208,6 @@ void FlowProto::EnqueueFlowEvent(const FlowEvent &event) { break; } - case FlowEvent::AUDIT_FLOW: case FlowEvent::DELETE_DBENTRY: case FlowEvent::EVICT_FLOW: case FlowEvent::RETRY_INDEX_ACQUIRE: @@ -226,6 +225,7 @@ void FlowProto::EnqueueFlowEvent(const FlowEvent &event) { break; } + case FlowEvent::AUDIT_FLOW: case FlowEvent::GROW_FREE_LIST: { FlowTable *table = GetFlowTable(event.get_flow_key()); flow_event_queue_[table->table_index()]->Enqueue(event); @@ -264,8 +264,9 @@ bool FlowProto::FlowEventHandler(const FlowEvent &req, FlowTable *table) { } case FlowEvent::AUDIT_FLOW: { - FlowEntry *flow = req.flow(); - flow->flow_table()->Add(flow, NULL); + FlowEntryPtr flow = FlowEntry::Allocate(req.get_flow_key(), table); + flow->InitAuditFlow(req.flow_handle()); + flow->flow_table()->Add(flow.get(), NULL); break; } @@ -370,8 +371,8 @@ void FlowProto::RetryIndexAcquireRequest(FlowEntry *flow, uint32_t flow_handle){ return; } -void FlowProto::CreateAuditEntry(FlowEntry *flow) { - EnqueueFlowEvent(FlowEvent(FlowEvent::AUDIT_FLOW, flow)); +void FlowProto::CreateAuditEntry(const FlowKey &key, uint32_t flow_handle) { + EnqueueFlowEvent(FlowEvent(FlowEvent::AUDIT_FLOW, key, flow_handle)); return; } diff --git a/src/vnsw/agent/pkt/flow_proto.h b/src/vnsw/agent/pkt/flow_proto.h index f73b6a2186e..30fbbc96d85 100644 --- a/src/vnsw/agent/pkt/flow_proto.h +++ b/src/vnsw/agent/pkt/flow_proto.h @@ -65,7 +65,7 @@ class FlowProto : public Proto { void DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow); void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle); void RetryIndexAcquireRequest(FlowEntry *flow, uint32_t flow_handle); - void CreateAuditEntry(FlowEntry *flow); + void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle); bool FlowEventHandler(const FlowEvent &req, FlowTable *table); void GrowFreeListRequest(const FlowKey &key); void KSyncEventRequest(KSyncEntry *entry, KSyncEntry::KSyncEvent event); diff --git a/src/vnsw/agent/pkt/test/SConscript b/src/vnsw/agent/pkt/test/SConscript index 341241f2345..1cc52bb7f80 100644 --- a/src/vnsw/agent/pkt/test/SConscript +++ b/src/vnsw/agent/pkt/test/SConscript @@ -26,6 +26,7 @@ test_pkt_flow_mock = AgentEnv.MakeTestCmd(env, 'test_pkt_flow_mock', pkt_flaky_test_suite) test_pkt_flow_limits = AgentEnv.MakeTestCmd(env, 'test_pkt_flow_limits', pkt_test_suite) +test_flow_adit = AgentEnv.MakeTestCmd(env, 'test_flow_audit', pkt_test_suite) test_pkt_flow = AgentEnv.MakeTestCmd(env, 'test_pkt_flow', pkt_flaky_test_suite) test_pkt_flowv6 = AgentEnv.MakeTestCmd(env, 'test_pkt_flowv6', pkt_test_suite) test_rpf_flow = AgentEnv.MakeTestCmd(env, 'test_rpf_flow', pkt_flaky_test_suite) diff --git a/src/vnsw/agent/pkt/test/test_flow_audit.cc b/src/vnsw/agent/pkt/test/test_flow_audit.cc new file mode 100644 index 00000000000..aa363182447 --- /dev/null +++ b/src/vnsw/agent/pkt/test/test_flow_audit.cc @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. + */ + +#include "base/os.h" +#include +#include +#include "test/test_cmn_util.h" +#include "test_flow_util.h" +#include "ksync/ksync_sock_user.h" +#include "oper/tunnel_nh.h" +#include "pkt/flow_table.h" + +#define vm1_ip "11.1.1.1" +struct PortInfo input[] = { + {"vmi0", 6, vm1_ip, "00:00:00:01:01:01", 5, 1}, +}; +VmInterface *vmi0; + +static bool FlowStatsTimerStartStopTrigger (bool stop) { + Agent::GetInstance()->flow_stats_manager()->\ + default_flow_stats_collector()->TestStartStopTimer(stop); + return true; +} + +static void FlowStatsTimerStartStop (bool stop) { + int task_id = TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent); + std::auto_ptr trigger_ + (new TaskTrigger(boost::bind(FlowStatsTimerStartStopTrigger, stop), + task_id, 0)); + trigger_->Set(); + client->WaitForIdle(); +} + +class FlowAuditTest : public ::testing::Test { +public: + FlowAuditTest() : agent_(Agent::GetInstance()) { + flow_proto_ = agent_->pkt()->get_flow_proto(); + flow_stats_collector_ = agent_->flow_stats_manager()-> + default_flow_stats_collector(); + } + + virtual void SetUp() { + EXPECT_EQ(0U, get_flow_proto()->FlowCount()); + client->Reset(); + + CreateVmportEnv(input, 1, 1); + client->WaitForIdle(); + + vmi0 = VmInterfaceGet(input[0].intf_id); + assert(vmi0); + } + + virtual void TearDown() { + FlushFlowTable(); + client->Reset(); + + DeleteVmportEnv(input, 1, true, 1); + client->WaitForIdle(); + } + + + bool FlowTableWait(size_t count) { + int i = 1000; + while (i > 0) { + i--; + if (get_flow_proto()->FlowCount() == count) { + break; + } + client->WaitForIdle(); + usleep(1); + } + return (get_flow_proto()->FlowCount() == count); + } + + void FlushFlowTable() { + client->EnqueueFlowFlush(); + client->WaitForIdle(); + EXPECT_EQ(0U, get_flow_proto()->FlowCount()); + } + + void RunFlowAudit() { + KSyncFlowMemory *flow_memory = agent_->ksync()->ksync_flow_memory(); + flow_memory->AuditProcess(); + // audit timeout set to 10 in case of test code. + // Sleep for audit duration + usleep(flow_memory->audit_timeout() * 2); + flow_memory->AuditProcess(); + } + + bool KFlowHoldAdd(uint32_t hash_id, int vrf, const char *sip, + const char *dip, int proto, int sport, int dport, + int nh_id) { + KSyncFlowMemory *flow_memory = agent_->ksync()->ksync_flow_memory(); + if (hash_id >= flow_memory->flow_table_entries_count()) { + return false; + } + + vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(hash_id); + + vr_flow_req req; + req.set_fr_index(hash_id); + IpAddress saddr = IpAddress::from_string(sip); + IpAddress daddr = IpAddress::from_string(dip); + req.set_fr_flow_ip(IpToVector(saddr, daddr, Address::INET)); + req.set_fr_flow_proto(proto); + req.set_fr_family(AF_INET); + req.set_fr_flow_sport(htons(sport)); + req.set_fr_flow_dport(htons(dport)); + req.set_fr_flow_vrf(vrf); + req.set_fr_flow_nh_id(nh_id); + + vr_flow->fe_action = VR_FLOW_ACTION_HOLD; + KSyncSockTypeMap::SetFlowEntry(&req, true); + + return true; + } + + void KFlowPurgeHold() { + KSyncFlowMemory *flow_memory = agent_->ksync()->ksync_flow_memory(); + for (size_t count = 0; + count < flow_memory->flow_table_entries_count(); + count++) { + vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(count); + vr_flow->fe_action = VR_FLOW_ACTION_DROP; + vr_flow_req req; + req.set_fr_index(0); + KSyncSockTypeMap::SetFlowEntry(&req, false); + } + + return; + } + + FlowProto *get_flow_proto() const { return flow_proto_; } + Agent *agent() {return agent_;} + +public: + Agent *agent_; + FlowProto *flow_proto_; + FlowStatsCollector* flow_stats_collector_; +}; + +TEST_F(FlowAuditTest, FlowAudit) { + KFlowPurgeHold(); + FlowStatsTimerStartStop(true); + EXPECT_TRUE(KFlowHoldAdd(1, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0)); + EXPECT_TRUE(KFlowHoldAdd(2, 1, "2.2.2.2", "3.3.3.3", 1, 0, 0, 0)); + RunFlowAudit(); + EXPECT_TRUE(FlowTableWait(2)); + FlowEntry *fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0); + EXPECT_TRUE(fe != NULL && fe->is_flags_set(FlowEntry::ShortFlow) == true && + fe->short_flow_reason() == FlowEntry::SHORT_AUDIT_ENTRY); + FlowStatsTimerStartStop(false); + client->EnqueueFlowAge(); + client->WaitForIdle(); + WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U)); + KFlowPurgeHold(); + + string vrf_name = + Agent::GetInstance()->vrf_table()->FindVrfFromId(1)->GetName(); + TestFlow flow[] = { + { + TestFlowPkt(Address::INET, "1.1.1.1", "2.2.2.2", 1, 0, 0, vrf_name, + vmi0->id(), 1), + { + } + } + }; + + CreateFlow(flow, 1); + + EXPECT_TRUE(FlowTableWait(2)); + EXPECT_TRUE(KFlowHoldAdd(10, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0)); + RunFlowAudit(); + client->EnqueueFlowAge(); + client->WaitForIdle(); + usleep(500); + int tmp_age_time = 10 * 1000; + int bkp_age_time = flow_stats_collector_->flow_age_time_intvl(); + //Set the flow age time to 10 microsecond + flow_stats_collector_->UpdateFlowAgeTime(tmp_age_time); + client->EnqueueFlowAge(); + client->WaitForIdle(); + WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U)); + flow_stats_collector_->UpdateFlowAgeTime(bkp_age_time); + KFlowPurgeHold(); +} + +int main(int argc, char *argv[]) { + GETUSERARGS(); + client = TestInit(init_file, ksync_init, true, true, true, 100*1000); + int ret = RUN_ALL_TESTS(); + client->WaitForIdle(); + TestShutdown(); + delete client; + return ret; +} diff --git a/src/vnsw/agent/pkt/test/test_pkt_flow.cc b/src/vnsw/agent/pkt/test/test_pkt_flow.cc index 6c72f45de40..cf90f27af6a 100644 --- a/src/vnsw/agent/pkt/test/test_pkt_flow.cc +++ b/src/vnsw/agent/pkt/test/test_pkt_flow.cc @@ -171,60 +171,6 @@ class FlowTest : public ::testing::Test { WAIT_FOR(1000, 1, (RouteFind(vrf, addr, 32) == false)); } - static void RunFlowAudit() { - KSync *ksync_obj = Agent::GetInstance()->ksync(); - ksync_obj->ksync_flow_memory()->AuditProcess(); - ksync_obj->ksync_flow_memory()->AuditProcess(); - } - - static bool KFlowHoldAdd(uint32_t hash_id, int vrf, const char *sip, - const char *dip, int proto, int sport, int dport, - int nh_id) { - KSync *ksync_obj = Agent::GetInstance()->ksync(); - if (hash_id >= ksync_obj->ksync_flow_memory()->flow_table_entries_count()) { - return false; - } - if (ksync_init_) { - return false; - } - - vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(hash_id); - - vr_flow_req req; - req.set_fr_index(hash_id); - IpAddress saddr = IpAddress::from_string(sip); - IpAddress daddr = IpAddress::from_string(dip); - req.set_fr_flow_ip(IpToVector(saddr, daddr, Address::INET)); - req.set_fr_flow_proto(proto); - req.set_fr_family(AF_INET); - req.set_fr_flow_sport(htons(sport)); - req.set_fr_flow_dport(htons(dport)); - req.set_fr_flow_vrf(vrf); - req.set_fr_flow_nh_id(nh_id); - - vr_flow->fe_action = VR_FLOW_ACTION_HOLD; - KSyncSockTypeMap::SetFlowEntry(&req, true); - - return true; - } - - static void KFlowPurgeHold() { - if (ksync_init_) { - return; - } - KSync *ksync_obj = Agent::GetInstance()->ksync(); - for (size_t count = 0; count < ksync_obj->ksync_flow_memory()->flow_table_entries_count(); - count++) { - vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(count); - vr_flow->fe_action = VR_FLOW_ACTION_DROP; - vr_flow_req req; - req.set_fr_index(hash_id); - KSyncSockTypeMap::SetFlowEntry(&req, false); - } - - return; - } - static void FlowAdd(int hash_id, int vrf, const char *sip, const char *dip, int proto, int sport, int dport, const char *nat_sip, const char *nat_dip, int nat_vrf) { @@ -1954,52 +1900,6 @@ TEST_F(FlowTest, TwoNatFlow) { EXPECT_TRUE(FlowTableWait(0)); } -TEST_F(FlowTest, FlowAudit) { - KFlowPurgeHold(); - FlowStatsTimerStartStop(true); - EXPECT_TRUE(KFlowHoldAdd(1, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0)); - EXPECT_TRUE(KFlowHoldAdd(2, 1, "2.2.2.2", "3.3.3.3", 1, 0, 0, 0)); - RunFlowAudit(); - EXPECT_TRUE(FlowTableWait(2)); - FlowEntry *fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0); - EXPECT_TRUE(fe != NULL && fe->is_flags_set(FlowEntry::ShortFlow) == true && - fe->short_flow_reason() == FlowEntry::SHORT_AUDIT_ENTRY); - FlowStatsTimerStartStop(false); - client->EnqueueFlowAge(); - client->WaitForIdle(); - WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U)); - KFlowPurgeHold(); - - string vrf_name = - Agent::GetInstance()->vrf_table()->FindVrfFromId(1)->GetName(); - TestFlow flow[] = { - { - TestFlowPkt(Address::INET, "1.1.1.1", "2.2.2.2", 1, 0, 0, vrf_name, - flow0->id(), 1), - { - } - } - }; - - CreateFlow(flow, 1); - - EXPECT_TRUE(FlowTableWait(2)); - EXPECT_TRUE(KFlowHoldAdd(10, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0)); - RunFlowAudit(); - client->EnqueueFlowAge(); - client->WaitForIdle(); - usleep(500); - int tmp_age_time = 10 * 1000; - int bkp_age_time = flow_stats_collector_->flow_age_time_intvl(); - //Set the flow age time to 10 microsecond - flow_stats_collector_->UpdateFlowAgeTime(tmp_age_time); - client->EnqueueFlowAge(); - client->WaitForIdle(); - WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U)); - flow_stats_collector_->UpdateFlowAgeTime(bkp_age_time); - KFlowPurgeHold(); -} - //Test flow deletion on ACL deletion TEST_F(FlowTest, AclDelete) { AddAcl("acl1", 1, "vn5" , "vn5", "pass"); diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc index f0a2a963e19..c45d1f5c358 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.cc @@ -54,8 +54,8 @@ KSyncFlowMemory::KSyncFlowMemory(KSync *ksync) : 0)), audit_timeout_(0), audit_yield_(0), + audit_interval_(0), audit_flow_idx_(0), - audit_timestamp_(0), audit_flow_list_() { } @@ -66,9 +66,19 @@ KSyncFlowMemory::~KSyncFlowMemory() { void KSyncFlowMemory::Init() { IcmpErrorProto *proto = ksync_->agent()->services()->icmp_error_proto(); proto->Register(boost::bind(&KSyncFlowMemory::GetFlowKey, this, _1, _2)); - audit_yield_ = kAuditYield; + + audit_interval_ = kAuditYieldTimer; audit_timeout_ = kAuditTimeout; - audit_timer_->Start(audit_timeout_, + uint32_t flow_table_count = ksync_->agent()->flow_table_size(); + // Compute number of entries to visit per timer interval so that complete + // table can be visited in kAuditSweepTime + uint32_t timer_per_sec = 1000 / kAuditYieldTimer; + uint32_t timer_per_sweep = kAuditSweepTime * timer_per_sec; + audit_yield_ = flow_table_count / timer_per_sweep; + if (audit_yield_ > kAuditYieldMax) + audit_yield_ = kAuditYieldMax; + + audit_timer_->Start(audit_interval_, boost::bind(&KSyncFlowMemory::AuditProcess, this)); } @@ -160,7 +170,7 @@ void KSyncFlowMemory::InitTest() { memset(flow_table_, 0, kTestFlowTableSize); flow_table_entries_count_ = kTestFlowTableSize / sizeof(vr_flow_entry); audit_yield_ = flow_table_entries_count_; - audit_timeout_ = 0; // timout immediately. + audit_timeout_ = 10; // timout immediately. ksync_->agent()->set_flow_table_size(flow_table_entries_count_); } @@ -245,10 +255,14 @@ bool KSyncFlowMemory::GetFlowKey(uint32_t index, FlowKey *key) { bool KSyncFlowMemory::AuditProcess() { uint32_t flow_idx; const vr_flow_entry *vflow_entry; - audit_timestamp_ += kAuditYieldTimer; + // Get current time + uint64_t t = UTCTimestampUsec(); + while (!audit_flow_list_.empty()) { std::pair list_entry = audit_flow_list_.front(); - if ((audit_timestamp_ - list_entry.second) < audit_timeout_) { + // audit_flow_list_ is sorted on last time of insertion in the list + // So, break on finding first flow entry that cannot be aged + if ((t - list_entry.second) < audit_timeout_) { /* Wait for audit_timeout_ to create short flow for the entry */ break; } @@ -256,6 +270,7 @@ bool KSyncFlowMemory::AuditProcess() { audit_flow_list_.pop_front(); vflow_entry = GetKernelFlowEntry(flow_idx, false); + // Audit and remove flow entry if its still in HOLD state if (vflow_entry && vflow_entry->fe_action == VR_FLOW_ACTION_HOLD) { int family = (vflow_entry->fe_key.flow_family == AF_INET)? Address::INET : Address::INET6; @@ -269,22 +284,18 @@ bool KSyncFlowMemory::AuditProcess() { ntohs(vflow_entry->fe_key.flow_dport)); FlowProto *proto = ksync_->agent()->pkt()->get_flow_proto(); - FlowTable *flow_table = proto->GetFlowTable(key); - FlowEntry *flow = FlowEntry::Allocate(key, flow_table); - flow->InitAuditFlow(flow_idx); - proto->CreateAuditEntry(flow); + proto->CreateAuditEntry(key, flow_idx); AGENT_ERROR(FlowLog, flow_idx, "FlowAudit : Converting HOLD " "entry to short flow"); } } - int count = 0; + uint32_t count = 0; assert(audit_yield_); while (count < audit_yield_) { vflow_entry = GetKernelFlowEntry(audit_flow_idx_, false); if (vflow_entry && vflow_entry->fe_action == VR_FLOW_ACTION_HOLD) { - audit_flow_list_.push_back(std::make_pair(audit_flow_idx_, - audit_timestamp_)); + audit_flow_list_.push_back(std::make_pair(audit_flow_idx_, t)); } count++; diff --git a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h index 52d0a528dc5..d62e4037bb3 100644 --- a/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h +++ b/src/vnsw/agent/vrouter/ksync/ksync_flow_memory.h @@ -18,9 +18,14 @@ struct vr_flow_entry; class KSyncFlowMemory { public: - static const uint32_t kAuditYieldTimer = 500; // in msec - static const uint32_t kAuditTimeout = 2000; // in msec - static const int kAuditYield = 1024; + // Time to sweep flow-table for audit + static const uint32_t kAuditSweepTime = 180; + // Timer interval for audit process + static const uint32_t kAuditYieldTimer = 100; // in msec + // Flows in HOLD state longer than kAuditTimeout are deleted + static const uint32_t kAuditTimeout = (5 * 1000 * 1000); // in usec + // Upper limit on number of entries to visit per timer + static const uint32_t kAuditYieldMax = (1024); KSyncFlowMemory(KSync *ksync); ~KSyncFlowMemory(); @@ -50,6 +55,7 @@ class KSyncFlowMemory { void set_flow_table_path(const std::string &path) { flow_table_path_ = path; } + uint32_t audit_timeout() const { return audit_timeout_; } private: void KFlow2FlowKey(const vr_flow_entry *entry, FlowKey *key) const; @@ -66,9 +72,9 @@ class KSyncFlowMemory { // Audit related entries Timer *audit_timer_; uint32_t audit_timeout_; - int audit_yield_; + uint32_t audit_yield_; + uint32_t audit_interval_; uint32_t audit_flow_idx_; - uint64_t audit_timestamp_; std::list > audit_flow_list_; };