Skip to content

Commit

Permalink
Merge "Use 5-tuple to distribute flow setup across partitions" into R3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Apr 1, 2016
2 parents f825929 + ebc75e8 commit 50927d3
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 72 deletions.
5 changes: 4 additions & 1 deletion src/vnsw/agent/pkt/flow_entry.cc
Expand Up @@ -2071,6 +2071,10 @@ void FlowEntry::FillFlowInfo(FlowInfo &info) {
info.set_allow(true);
}

if (reverse_flow_entry_.get()) {
info.set_reverse_index(reverse_flow_entry_->flow_handle());
}

if (is_flags_set(FlowEntry::NatFlow)) {
info.set_nat(true);
FlowEntry *nat_flow = reverse_flow_entry_.get();
Expand Down Expand Up @@ -2103,7 +2107,6 @@ void FlowEntry::FillFlowInfo(FlowInfo &info) {
}
info.set_nat_protocol(nat_flow->key().protocol);
info.set_nat_vrf(data_.dest_vrf);
info.set_reverse_index(nat_flow->flow_handle());
info.set_nat_mirror_vrf(nat_flow->data().mirror_vrf);
}
}
Expand Down
118 changes: 100 additions & 18 deletions src/vnsw/agent/pkt/flow_proto.cc
Expand Up @@ -2,6 +2,7 @@
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/
#include <net/address_util.h>
#include <boost/functional/hash.hpp>
#include <init/agent_param.h>
#include <cmn/agent_stats.h>
#include <oper/agent_profile.h>
Expand All @@ -21,7 +22,7 @@ FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) :
flow_update_queue_(agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
boost::bind(&FlowProto::FlowEventHandler, this, _1,
static_cast<FlowTable *>(NULL))),
stats_() {
use_vrouter_hash_(false), stats_() {
flow_update_queue_.set_name("Flow update queue");
agent->SetFlowProto(this);
set_trace(false);
Expand All @@ -39,6 +40,13 @@ FlowProto::FlowProto(Agent *agent, boost::asio::io_service &io) :
boost::bind(&FlowProto::FlowEventHandler, this,
_1, flow_table_list_[i])));
}
if (::getenv("USE_VROUTER_HASH") != NULL) {
string opt = ::getenv("USE_VROUTER_HASH");
if (opt == "" || strcasecmp(opt.c_str(), "false"))
use_vrouter_hash_ = false;
else
use_vrouter_hash_ = true;
}
}

FlowProto::~FlowProto() {
Expand Down Expand Up @@ -75,9 +83,75 @@ void FlowProto::Shutdown() {
flow_update_queue_.Shutdown();
}

static std::size_t HashCombine(std::size_t hash, uint64_t val) {
boost::hash_combine(hash, val);
return hash;
}

static std::size_t HashIp(const IpAddress &ip, std::size_t hash) {
if (ip.is_v6()) {
uint64_t val[2];
Ip6AddressToU64Array(ip.to_v6(), val, 2);
hash = HashCombine(hash, val[0]);
hash = HashCombine(hash, val[1]);
} else if (ip.is_v4()) {
hash = HashCombine(hash, ip.to_v4().to_ulong());
} else {
assert(0);
}
return hash;
}

// Get the thread to be used for the flow. We *try* to map forward and reverse
// flow to same thread with following,
// if (sip < dip)
// ip1 = sip
// ip2 = dip
// else
// ip1 = dip
// ip2 = sip
// if (sport < dport)
// port1 = sport
// port2 = dport
// else
// port1 = dport
// port2 = sport
// field5 = proto
// hash = HASH(ip1, ip2, port1, port2, proto)
//
// The algorithm above cannot ensure NAT flows belong to same thread.
uint16_t FlowProto::FlowTableIndex(const IpAddress &sip, const IpAddress &dip,
uint8_t proto, uint16_t sport,
uint16_t dport, uint32_t flow_handle) const {
if (use_vrouter_hash_) {
return (flow_handle/flow_table_list_.size()) % flow_table_list_.size();
}

std::size_t hash = 0;
if (sip < dip) {
hash = HashIp(sip, hash);
hash = HashIp(dip, hash);
} else {
hash = HashIp(dip, hash);
hash = HashIp(sip, hash);
}

if (sport < dport) {
HashCombine(hash, sport);
HashCombine(hash, dport);
} else {
HashCombine(hash, dport);
HashCombine(hash, sport);
}
HashCombine(hash, proto);
return (hash % (flow_event_queue_.size()));
}

FlowHandler *FlowProto::AllocProtoHandler(boost::shared_ptr<PktInfo> info,
boost::asio::io_service &io) {
uint32_t index = FlowTableIndex(info->sport, info->dport);
uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
info->ip_proto, info->sport, info->dport,
info->agent_hdr.cmd_param);
return new FlowHandler(agent(), info, io, this, index);
}

Expand Down Expand Up @@ -110,12 +184,10 @@ bool FlowProto::Validate(PktInfo *msg) {
return true;
}

uint16_t FlowProto::FlowTableIndex(uint16_t sport, uint16_t dport) const {
return (sport ^ dport) % (flow_event_queue_.size());
}

FlowTable *FlowProto::GetFlowTable(const FlowKey &key) const {
uint16_t index = FlowTableIndex(key.src_port, key.dst_port);
FlowTable *FlowProto::GetFlowTable(const FlowKey &key,
uint32_t flow_handle) const {
uint32_t index = FlowTableIndex(key.src_addr, key.dst_addr, key.protocol,
key.src_port, key.dst_port, flow_handle);
return flow_table_list_[index];
}

Expand Down Expand Up @@ -170,8 +242,8 @@ void FlowProto::VnFlowCounters(const VnEntry *vn, uint32_t *in_count,
agent_->pkt()->flow_mgmt_manager()->VnFlowCounters(vn, in_count, out_count);
}

FlowEntry *FlowProto::Find(const FlowKey &key) const {
return GetFlowTable(key)->Find(key);
FlowEntry *FlowProto::Find(const FlowKey &key, uint32_t table_index) const {
return GetTable(table_index)->Find(key);
}

bool FlowProto::AddFlow(FlowEntry *flow) {
Expand Down Expand Up @@ -199,7 +271,10 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) {
switch (event->event()) {
case FlowEvent::VROUTER_FLOW_MSG: {
PktInfo *info = event->pkt_info().get();
uint32_t index = FlowTableIndex(info->sport, info->dport);
uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
info->ip_proto, info->sport,
info->dport,
info->agent_hdr.cmd_param);
flow_event_queue_[index]->Enqueue(event);
break;
}
Expand Down Expand Up @@ -234,9 +309,15 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) {
break;
}

case FlowEvent::AUDIT_FLOW:
case FlowEvent::AUDIT_FLOW: {
FlowTable *table = GetFlowTable(event->get_flow_key(),
event->flow_handle());
flow_event_queue_[table->table_index()]->Enqueue(event);
break;
}

case FlowEvent::GROW_FREE_LIST: {
FlowTable *table = GetFlowTable(event->get_flow_key());
FlowTable *table = GetTable(event->table_index());
flow_event_queue_[table->table_index()]->Enqueue(event);
break;
}
Expand Down Expand Up @@ -267,6 +348,9 @@ void FlowProto::EnqueueFlowEvent(FlowEvent *event) {

bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
std::auto_ptr<FlowEvent> req_ptr(req);
// concurrency check to ensure all request are in right partitions
// flow-update-queue doenst happen table pointer. Skip concurrency check
// for flow-update-queue
if (table) {
assert(table->ConcurrencyCheck() == true);
}
Expand Down Expand Up @@ -314,7 +398,6 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
}

case FlowEvent::GROW_FREE_LIST: {
FlowTable *table = GetFlowTable(req->get_flow_key());
table->GrowFreeList();
break;
}
Expand All @@ -327,7 +410,6 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
}

case FlowEvent::DELETE_FLOW: {
table = GetTable(req->table_index());
table->ProcessFlowEvent(req);
//In case flow is deleted enqueue a free flow reference event.
EnqueueFreeFlowReference(req->flow_ref());
Expand All @@ -344,7 +426,6 @@ bool FlowProto::FlowEventHandler(FlowEvent *req, FlowTable *table) {
case FlowEvent::REVALUATE_FLOW:
case FlowEvent::KSYNC_EVENT:
case FlowEvent::KSYNC_VROUTER_ERROR: {
table = GetFlowTable(req->get_flow_key());
table->ProcessFlowEvent(req);
//In case flow is deleted enqueue a free flow reference event.
EnqueueFreeFlowReference(req->flow_ref());
Expand Down Expand Up @@ -387,8 +468,9 @@ void FlowProto::CreateAuditEntry(const FlowKey &key, uint32_t flow_handle) {
}


void FlowProto::GrowFreeListRequest(const FlowKey &key) {
EnqueueFlowEvent(new FlowEvent(FlowEvent::GROW_FREE_LIST, key, false));
void FlowProto::GrowFreeListRequest(const FlowKey &key, FlowTable *table) {
EnqueueFlowEvent(new FlowEvent(FlowEvent::GROW_FREE_LIST, key, false,
table->table_index()));
return;
}

Expand Down
12 changes: 8 additions & 4 deletions src/vnsw/agent/pkt/flow_proto.h
Expand Up @@ -51,10 +51,13 @@ class FlowProto : public Proto {
boost::asio::io_service &io);
bool Enqueue(boost::shared_ptr<PktInfo> msg);

FlowEntry *Find(const FlowKey &key) const;
uint16_t FlowTableIndex(uint16_t sport, uint16_t dport) const;
FlowEntry *Find(const FlowKey &key, uint32_t table_index) const;
uint16_t FlowTableIndex(const IpAddress &sip, const IpAddress &dip,
uint8_t proto, uint16_t sport,
uint16_t dport, uint32_t flow_handle) const;
uint32_t flow_table_count() const { return flow_table_list_.size(); }
FlowTable *GetTable(uint16_t index) const;
FlowTable *GetFlowTable(const FlowKey &key) const;
FlowTable *GetFlowTable(const FlowKey &key, uint32_t flow_handle) const;
uint32_t FlowCount() const;
void VnFlowCounters(const VnEntry *vn, uint32_t *in_count,
uint32_t *out_count);
Expand All @@ -71,7 +74,7 @@ class FlowProto : public Proto {
void RetryIndexAcquireRequest(FlowEntry *flow, uint32_t flow_handle);
void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle);
bool FlowEventHandler(FlowEvent *req, FlowTable *table);
void GrowFreeListRequest(const FlowKey &key);
void GrowFreeListRequest(const FlowKey &key, FlowTable *table);
void KSyncEventRequest(KSyncEntry *entry, KSyncEntry::KSyncEvent event);
void KSyncFlowHandleRequest(KSyncEntry *entry, uint32_t flow_handle);
void KSyncFlowErrorRequest(KSyncEntry *ksync_entry, int error);
Expand Down Expand Up @@ -101,6 +104,7 @@ class FlowProto : public Proto {
std::vector<FlowTable *> flow_table_list_;
FlowEventQueue flow_update_queue_;
tbb::atomic<int> linklocal_flow_count_;
bool use_vrouter_hash_;
FlowStats stats_;
};

Expand Down
24 changes: 22 additions & 2 deletions src/vnsw/agent/pkt/flow_table.cc
Expand Up @@ -1017,7 +1017,27 @@ bool FlowTable::ProcessFlowEventInternal(const FlowEvent *req,
// For EEXIST error donot mark the flow as ShortFlow since Vrouter
// generates EEXIST only for cases where another add should be
// coming from the pkt trap from Vrouter
if (req->ksync_error() != EEXIST) {
//
// FIXME : We dont have good scheme to handle following scenario,
// - VM1 in VN1 has floating-ip FIP1 in VN2
// - VM2 in VN2
// - VM1 pings VM2 (using floating-ip)
// The forward and reverse flows go to different partitions.
//
// If packets for both forward and reverse flows are trapped together
// we try to setup following flows from different partitions,
// FlowPair-1
// - VM1 to VM2
// - VM2 to FIP1
// FlowPair-2
// - VM2 to FIP1
// - VM1 to VM2
//
// The reverse flows for both FlowPair-1 and FlowPair-2 are not
// installed due to EEXIST error. We are converting flows to
// short-flow till this case is handled properly
if (req->ksync_error() != EEXIST ||
flow->is_flags_set(FlowEntry::NatFlow)) {
flow->MakeShortFlow(FlowEntry::SHORT_FAILED_VROUTER_INSTALL);
// Enqueue Add request to flow-stats-collector
// to update flow flags in stats collector
Expand Down Expand Up @@ -1115,7 +1135,7 @@ FlowEntry *FlowEntryFreeList::Allocate(const FlowKey &key) {
if (grow_pending_ == false && free_list_.size() < kMinThreshold) {
grow_pending_ = true;
FlowProto *proto = table_->agent()->pkt()->get_flow_proto();
proto->GrowFreeListRequest(key);
proto->GrowFreeListRequest(key, table_);
}
flow->Reset(key);
total_alloc_++;
Expand Down
2 changes: 2 additions & 0 deletions src/vnsw/agent/pkt/flow_table.h
Expand Up @@ -290,6 +290,8 @@ class FlowTable {
FlowEntryFreeList free_list_;
tbb::mutex mutex_;
int flow_task_id_;
uint64_t total_add_;
uint64_t total_del_;
DISALLOW_COPY_AND_ASSIGN(FlowTable);
};

Expand Down
23 changes: 23 additions & 0 deletions src/vnsw/agent/pkt/pkt.sandesh
Expand Up @@ -406,3 +406,26 @@ response sandesh FlowStatsCollectorRecordsResp {
1: list<SandeshFlowData> flow_list;
2: string flow_key (link="NextFlowStatsRecordsSet");
}

/**
* Request message to get summary of flow tables
*/
request sandesh SandeshFlowTableInfoRequest{
}

/**
* Sandesh definition for every flow-table
*/
struct SandeshFlowTableInfo {
1: u32 index;
2: u32 count;
3: u64 total_add;
4: u64 total_del;
}

/**
* Response message for flow tables
*/
response sandesh SandeshFlowTableInfoResp {
1: list<SandeshFlowTableInfo> table_list;
}
20 changes: 20 additions & 0 deletions src/vnsw/agent/pkt/pkt_sandesh_flow.cc
Expand Up @@ -613,4 +613,24 @@ void NextFlowStatsRecordsSet::HandleRequest() const {
TaskScheduler *scheduler = TaskScheduler::GetInstance();
scheduler->Enqueue(task);
}


void SandeshFlowTableInfoRequest::HandleRequest() const {
FlowProto *proto = Agent::GetInstance()->pkt()->get_flow_proto();
SandeshFlowTableInfoResp *resp = new SandeshFlowTableInfoResp();
std::vector<SandeshFlowTableInfo> info_list;
for (uint16_t i = 0; i < proto->flow_table_count(); i++) {
FlowTable *table = proto->GetTable(i);
SandeshFlowTableInfo info;
info.set_index(table->table_index());
info.set_count(table->Size());
info.set_total_add(table->free_list()->total_alloc());
info.set_total_del(table->free_list()->total_free());
info_list.push_back(info);
}
resp->set_table_list(info_list);
resp->set_context(context());
resp->set_more(false);
resp->Response();
}
////////////////////////////////////////////////////////////////////////////////

0 comments on commit 50927d3

Please sign in to comment.