Skip to content

Commit

Permalink
Merge "Dont access flow free-list from Flow Audit context"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 5, 2016
2 parents a68749f + 13b1058 commit 72003c1
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 124 deletions.
6 changes: 6 additions & 0 deletions src/vnsw/agent/pkt/flow_event.h
Expand Up @@ -102,6 +102,12 @@ class FlowEvent {
ksync_event_() {
}

FlowEvent(Event event, const FlowKey &key, uint32_t flow_handle) :
event_(event), flow_(NULL), pkt_info_(), db_entry_(NULL),
gen_id_(0), flow_key_(key), del_rev_flow_(false),
flow_handle_(flow_handle), ksync_entry_(NULL), ksync_event_() {
}

FlowEvent(Event event, PktInfoPtr pkt_info) :
event_(event), flow_(NULL), pkt_info_(pkt_info), db_entry_(NULL),
gen_id_(0), flow_key_(), del_rev_flow_(),
Expand Down
11 changes: 6 additions & 5 deletions src/vnsw/agent/pkt/flow_proto.cc
Expand Up @@ -208,7 +208,6 @@ void FlowProto::EnqueueFlowEvent(const FlowEvent &event) {
break;
}

case FlowEvent::AUDIT_FLOW:
case FlowEvent::DELETE_DBENTRY:
case FlowEvent::EVICT_FLOW:
case FlowEvent::RETRY_INDEX_ACQUIRE:
Expand All @@ -226,6 +225,7 @@ void FlowProto::EnqueueFlowEvent(const FlowEvent &event) {
break;
}

case FlowEvent::AUDIT_FLOW:
case FlowEvent::GROW_FREE_LIST: {
FlowTable *table = GetFlowTable(event.get_flow_key());
flow_event_queue_[table->table_index()]->Enqueue(event);
Expand Down Expand Up @@ -264,8 +264,9 @@ bool FlowProto::FlowEventHandler(const FlowEvent &req, FlowTable *table) {
}

case FlowEvent::AUDIT_FLOW: {
FlowEntry *flow = req.flow();
flow->flow_table()->Add(flow, NULL);
FlowEntryPtr flow = FlowEntry::Allocate(req.get_flow_key(), table);
flow->InitAuditFlow(req.flow_handle());
flow->flow_table()->Add(flow.get(), NULL);
break;
}

Expand Down Expand Up @@ -370,8 +371,8 @@ void FlowProto::RetryIndexAcquireRequest(FlowEntry *flow, uint32_t flow_handle){
return;
}

void FlowProto::CreateAuditEntry(FlowEntry *flow) {
EnqueueFlowEvent(FlowEvent(FlowEvent::AUDIT_FLOW, flow));
void FlowProto::CreateAuditEntry(const FlowKey &key, uint32_t flow_handle) {
EnqueueFlowEvent(FlowEvent(FlowEvent::AUDIT_FLOW, key, flow_handle));
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/vnsw/agent/pkt/flow_proto.h
Expand Up @@ -65,7 +65,7 @@ class FlowProto : public Proto {
void DeleteFlowRequest(const FlowKey &flow_key, bool del_rev_flow);
void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle);
void RetryIndexAcquireRequest(FlowEntry *flow, uint32_t flow_handle);
void CreateAuditEntry(FlowEntry *flow);
void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle);
bool FlowEventHandler(const FlowEvent &req, FlowTable *table);
void GrowFreeListRequest(const FlowKey &key);
void KSyncEventRequest(KSyncEntry *entry, KSyncEntry::KSyncEvent event);
Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/pkt/test/SConscript
Expand Up @@ -26,6 +26,7 @@ test_pkt_flow_mock = AgentEnv.MakeTestCmd(env, 'test_pkt_flow_mock',
pkt_flaky_test_suite)

test_pkt_flow_limits = AgentEnv.MakeTestCmd(env, 'test_pkt_flow_limits', pkt_test_suite)
test_flow_adit = AgentEnv.MakeTestCmd(env, 'test_flow_audit', pkt_test_suite)
test_pkt_flow = AgentEnv.MakeTestCmd(env, 'test_pkt_flow', pkt_flaky_test_suite)
test_pkt_flowv6 = AgentEnv.MakeTestCmd(env, 'test_pkt_flowv6', pkt_test_suite)
test_rpf_flow = AgentEnv.MakeTestCmd(env, 'test_rpf_flow', pkt_flaky_test_suite)
Expand Down
197 changes: 197 additions & 0 deletions src/vnsw/agent/pkt/test/test_flow_audit.cc
@@ -0,0 +1,197 @@
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/

#include "base/os.h"
#include <algorithm>
#include <net/address_util.h>
#include "test/test_cmn_util.h"
#include "test_flow_util.h"
#include "ksync/ksync_sock_user.h"
#include "oper/tunnel_nh.h"
#include "pkt/flow_table.h"

#define vm1_ip "11.1.1.1"
struct PortInfo input[] = {
{"vmi0", 6, vm1_ip, "00:00:00:01:01:01", 5, 1},
};
VmInterface *vmi0;

static bool FlowStatsTimerStartStopTrigger (bool stop) {
Agent::GetInstance()->flow_stats_manager()->\
default_flow_stats_collector()->TestStartStopTimer(stop);
return true;
}

static void FlowStatsTimerStartStop (bool stop) {
int task_id = TaskScheduler::GetInstance()->GetTaskId(kTaskFlowEvent);
std::auto_ptr<TaskTrigger> trigger_
(new TaskTrigger(boost::bind(FlowStatsTimerStartStopTrigger, stop),
task_id, 0));
trigger_->Set();
client->WaitForIdle();
}

class FlowAuditTest : public ::testing::Test {
public:
FlowAuditTest() : agent_(Agent::GetInstance()) {
flow_proto_ = agent_->pkt()->get_flow_proto();
flow_stats_collector_ = agent_->flow_stats_manager()->
default_flow_stats_collector();
}

virtual void SetUp() {
EXPECT_EQ(0U, get_flow_proto()->FlowCount());
client->Reset();

CreateVmportEnv(input, 1, 1);
client->WaitForIdle();

vmi0 = VmInterfaceGet(input[0].intf_id);
assert(vmi0);
}

virtual void TearDown() {
FlushFlowTable();
client->Reset();

DeleteVmportEnv(input, 1, true, 1);
client->WaitForIdle();
}


bool FlowTableWait(size_t count) {
int i = 1000;
while (i > 0) {
i--;
if (get_flow_proto()->FlowCount() == count) {
break;
}
client->WaitForIdle();
usleep(1);
}
return (get_flow_proto()->FlowCount() == count);
}

void FlushFlowTable() {
client->EnqueueFlowFlush();
client->WaitForIdle();
EXPECT_EQ(0U, get_flow_proto()->FlowCount());
}

void RunFlowAudit() {
KSyncFlowMemory *flow_memory = agent_->ksync()->ksync_flow_memory();
flow_memory->AuditProcess();
// audit timeout set to 10 in case of test code.
// Sleep for audit duration
usleep(flow_memory->audit_timeout() * 2);
flow_memory->AuditProcess();
}

bool KFlowHoldAdd(uint32_t hash_id, int vrf, const char *sip,
const char *dip, int proto, int sport, int dport,
int nh_id) {
KSyncFlowMemory *flow_memory = agent_->ksync()->ksync_flow_memory();
if (hash_id >= flow_memory->flow_table_entries_count()) {
return false;
}

vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(hash_id);

vr_flow_req req;
req.set_fr_index(hash_id);
IpAddress saddr = IpAddress::from_string(sip);
IpAddress daddr = IpAddress::from_string(dip);
req.set_fr_flow_ip(IpToVector(saddr, daddr, Address::INET));
req.set_fr_flow_proto(proto);
req.set_fr_family(AF_INET);
req.set_fr_flow_sport(htons(sport));
req.set_fr_flow_dport(htons(dport));
req.set_fr_flow_vrf(vrf);
req.set_fr_flow_nh_id(nh_id);

vr_flow->fe_action = VR_FLOW_ACTION_HOLD;
KSyncSockTypeMap::SetFlowEntry(&req, true);

return true;
}

void KFlowPurgeHold() {
KSyncFlowMemory *flow_memory = agent_->ksync()->ksync_flow_memory();
for (size_t count = 0;
count < flow_memory->flow_table_entries_count();
count++) {
vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(count);
vr_flow->fe_action = VR_FLOW_ACTION_DROP;
vr_flow_req req;
req.set_fr_index(0);
KSyncSockTypeMap::SetFlowEntry(&req, false);
}

return;
}

FlowProto *get_flow_proto() const { return flow_proto_; }
Agent *agent() {return agent_;}

public:
Agent *agent_;
FlowProto *flow_proto_;
FlowStatsCollector* flow_stats_collector_;
};

TEST_F(FlowAuditTest, FlowAudit) {
KFlowPurgeHold();
FlowStatsTimerStartStop(true);
EXPECT_TRUE(KFlowHoldAdd(1, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));
EXPECT_TRUE(KFlowHoldAdd(2, 1, "2.2.2.2", "3.3.3.3", 1, 0, 0, 0));
RunFlowAudit();
EXPECT_TRUE(FlowTableWait(2));
FlowEntry *fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0);
EXPECT_TRUE(fe != NULL && fe->is_flags_set(FlowEntry::ShortFlow) == true &&
fe->short_flow_reason() == FlowEntry::SHORT_AUDIT_ENTRY);
FlowStatsTimerStartStop(false);
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U));
KFlowPurgeHold();

string vrf_name =
Agent::GetInstance()->vrf_table()->FindVrfFromId(1)->GetName();
TestFlow flow[] = {
{
TestFlowPkt(Address::INET, "1.1.1.1", "2.2.2.2", 1, 0, 0, vrf_name,
vmi0->id(), 1),
{
}
}
};

CreateFlow(flow, 1);

EXPECT_TRUE(FlowTableWait(2));
EXPECT_TRUE(KFlowHoldAdd(10, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));
RunFlowAudit();
client->EnqueueFlowAge();
client->WaitForIdle();
usleep(500);
int tmp_age_time = 10 * 1000;
int bkp_age_time = flow_stats_collector_->flow_age_time_intvl();
//Set the flow age time to 10 microsecond
flow_stats_collector_->UpdateFlowAgeTime(tmp_age_time);
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U));
flow_stats_collector_->UpdateFlowAgeTime(bkp_age_time);
KFlowPurgeHold();
}

int main(int argc, char *argv[]) {
GETUSERARGS();
client = TestInit(init_file, ksync_init, true, true, true, 100*1000);
int ret = RUN_ALL_TESTS();
client->WaitForIdle();
TestShutdown();
delete client;
return ret;
}
100 changes: 0 additions & 100 deletions src/vnsw/agent/pkt/test/test_pkt_flow.cc
Expand Up @@ -171,60 +171,6 @@ class FlowTest : public ::testing::Test {
WAIT_FOR(1000, 1, (RouteFind(vrf, addr, 32) == false));
}

static void RunFlowAudit() {
KSync *ksync_obj = Agent::GetInstance()->ksync();
ksync_obj->ksync_flow_memory()->AuditProcess();
ksync_obj->ksync_flow_memory()->AuditProcess();
}

static bool KFlowHoldAdd(uint32_t hash_id, int vrf, const char *sip,
const char *dip, int proto, int sport, int dport,
int nh_id) {
KSync *ksync_obj = Agent::GetInstance()->ksync();
if (hash_id >= ksync_obj->ksync_flow_memory()->flow_table_entries_count()) {
return false;
}
if (ksync_init_) {
return false;
}

vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(hash_id);

vr_flow_req req;
req.set_fr_index(hash_id);
IpAddress saddr = IpAddress::from_string(sip);
IpAddress daddr = IpAddress::from_string(dip);
req.set_fr_flow_ip(IpToVector(saddr, daddr, Address::INET));
req.set_fr_flow_proto(proto);
req.set_fr_family(AF_INET);
req.set_fr_flow_sport(htons(sport));
req.set_fr_flow_dport(htons(dport));
req.set_fr_flow_vrf(vrf);
req.set_fr_flow_nh_id(nh_id);

vr_flow->fe_action = VR_FLOW_ACTION_HOLD;
KSyncSockTypeMap::SetFlowEntry(&req, true);

return true;
}

static void KFlowPurgeHold() {
if (ksync_init_) {
return;
}
KSync *ksync_obj = Agent::GetInstance()->ksync();
for (size_t count = 0; count < ksync_obj->ksync_flow_memory()->flow_table_entries_count();
count++) {
vr_flow_entry *vr_flow = KSyncSockTypeMap::GetFlowEntry(count);
vr_flow->fe_action = VR_FLOW_ACTION_DROP;
vr_flow_req req;
req.set_fr_index(hash_id);
KSyncSockTypeMap::SetFlowEntry(&req, false);
}

return;
}

static void FlowAdd(int hash_id, int vrf, const char *sip, const char *dip,
int proto, int sport, int dport, const char *nat_sip,
const char *nat_dip, int nat_vrf) {
Expand Down Expand Up @@ -1954,52 +1900,6 @@ TEST_F(FlowTest, TwoNatFlow) {
EXPECT_TRUE(FlowTableWait(0));
}

TEST_F(FlowTest, FlowAudit) {
KFlowPurgeHold();
FlowStatsTimerStartStop(true);
EXPECT_TRUE(KFlowHoldAdd(1, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));
EXPECT_TRUE(KFlowHoldAdd(2, 1, "2.2.2.2", "3.3.3.3", 1, 0, 0, 0));
RunFlowAudit();
EXPECT_TRUE(FlowTableWait(2));
FlowEntry *fe = FlowGet(1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0);
EXPECT_TRUE(fe != NULL && fe->is_flags_set(FlowEntry::ShortFlow) == true &&
fe->short_flow_reason() == FlowEntry::SHORT_AUDIT_ENTRY);
FlowStatsTimerStartStop(false);
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U));
KFlowPurgeHold();

string vrf_name =
Agent::GetInstance()->vrf_table()->FindVrfFromId(1)->GetName();
TestFlow flow[] = {
{
TestFlowPkt(Address::INET, "1.1.1.1", "2.2.2.2", 1, 0, 0, vrf_name,
flow0->id(), 1),
{
}
}
};

CreateFlow(flow, 1);

EXPECT_TRUE(FlowTableWait(2));
EXPECT_TRUE(KFlowHoldAdd(10, 1, "1.1.1.1", "2.2.2.2", 1, 0, 0, 0));
RunFlowAudit();
client->EnqueueFlowAge();
client->WaitForIdle();
usleep(500);
int tmp_age_time = 10 * 1000;
int bkp_age_time = flow_stats_collector_->flow_age_time_intvl();
//Set the flow age time to 10 microsecond
flow_stats_collector_->UpdateFlowAgeTime(tmp_age_time);
client->EnqueueFlowAge();
client->WaitForIdle();
WAIT_FOR(1000, 1000, (get_flow_proto()->FlowCount() == 0U));
flow_stats_collector_->UpdateFlowAgeTime(bkp_age_time);
KFlowPurgeHold();
}

//Test flow deletion on ACL deletion
TEST_F(FlowTest, AclDelete) {
AddAcl("acl1", 1, "vn5" , "vn5", "pass");
Expand Down

0 comments on commit 72003c1

Please sign in to comment.