Skip to content

Commit

Permalink
Move Flow logging to Flow Management module
Browse files Browse the repository at this point in the history
Define a message to enqueue Flow Export requests in Flow Management module. Move FlowExport functionality from FlowTable to
Flow Management module. Replace FlowExport API calls in Flow Stats collector and Flow Table with a message to Flow Management
module.

Change-Id: I5bea219cac127f651d4c709f5e959779f89f3374
Partial-Bug: #1479295
  • Loading branch information
ashoksr committed Aug 13, 2015
1 parent a283b6f commit b2ae967
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 65 deletions.
14 changes: 0 additions & 14 deletions src/vnsw/agent/pkt/flow_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1322,20 +1322,6 @@ void SetActionStr(const FlowAction &action_info,
}
}

void GetFlowSandeshActionParams(const FlowAction &action_info,
std::string &action_str) {
std::bitset<32> bs(action_info.action);
for (unsigned int i = 0; i <= bs.size(); i++) {
if (bs[i]) {
if (!action_str.empty()) {
action_str += "|";
}
action_str += TrafficAction::ActionToString(
static_cast<TrafficAction::Action>(i));
}
}
}

static void SetAclListAclAction(const std::list<MatchAclParams> &acl_l,
std::vector<AclAction> &acl_action_l,
std::string &acl_type) {
Expand Down
3 changes: 1 addition & 2 deletions src/vnsw/agent/pkt/flow_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ struct FlowKey {

struct FlowStats {
FlowStats() : setup_time(0), teardown_time(0), last_modified_time(0),
bytes(0), packets(0), intf_in(0), exported(false), fip(0),
bytes(0), packets(0), intf_in(0), fip(0),
fip_vm_port_id(Interface::kInvalidIndex) {}

uint64_t setup_time;
Expand All @@ -115,7 +115,6 @@ struct FlowStats {
uint64_t bytes;
uint64_t packets;
uint32_t intf_in;
bool exported;
// Following fields are required for FIP stats accounting
uint32_t fip;
uint32_t fip_vm_port_id;
Expand Down
251 changes: 251 additions & 0 deletions src/vnsw/agent/pkt/flow_mgmt.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <bitset>
#include "pkt/flow_mgmt.h"
#include "pkt/flow_mgmt_request.h"
#include "pkt/flow_mgmt_response.h"
Expand Down Expand Up @@ -29,6 +30,11 @@ FlowMgmtManager::FlowMgmtManager(Agent *agent, FlowTable *flow_table) :

void FlowMgmtManager::Init() {
flow_mgmt_dbclient_->Init();
agent_->acl_table()->set_ace_flow_sandesh_data_cb
(boost::bind(&FlowMgmtManager::SetAceSandeshData, this, _1, _2, _3));
agent_->acl_table()->set_acl_flow_sandesh_data_cb
(boost::bind(&FlowMgmtManager::SetAclFlowSandeshData, this, _1, _2,
_3));
}

void FlowMgmtManager::Shutdown() {
Expand All @@ -37,6 +43,28 @@ void FlowMgmtManager::Shutdown() {
flow_mgmt_dbclient_->Shutdown();
}

/////////////////////////////////////////////////////////////////////////////
// Introspect routines
/////////////////////////////////////////////////////////////////////////////
string FlowMgmtManager::GetAceSandeshDataKey(const AclDBEntry *acl,
int ace_id) {
string uuid_str = UuidToString(acl->GetUuid());
stringstream ss;
ss << uuid_str << ":";
ss << ace_id;
return ss.str();
}

void FlowMgmtManager::SetAceSandeshData(const AclDBEntry *acl,
AclFlowCountResp &data,
int ace_id) {
}

void FlowMgmtManager::SetAclFlowSandeshData(const AclDBEntry *acl,
AclFlowResp &data,
const int last_count) {
}

/////////////////////////////////////////////////////////////////////////////
// Utility methods to enqueue events into work-queue
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -47,6 +75,15 @@ void FlowMgmtManager::AddEvent(FlowEntry *flow) {
request_queue_.Enqueue(req);
}

void FlowMgmtManager::ExportEvent(FlowEntry *flow, uint64_t diff_bytes,
uint64_t diff_pkts) {
FlowEntryPtr flow_ptr(flow);
boost::shared_ptr<FlowMgmtRequest>
req(new FlowMgmtRequest(FlowMgmtRequest::EXPORT_FLOW, flow_ptr,
diff_bytes, diff_pkts));
request_queue_.Enqueue(req);
}

void FlowMgmtManager::DeleteEvent(FlowEntry *flow) {
FlowEntryPtr flow_ptr(flow);
boost::shared_ptr<FlowMgmtRequest>
Expand Down Expand Up @@ -192,6 +229,11 @@ bool FlowMgmtManager::RequestHandler(boost::shared_ptr<FlowMgmtRequest> req) {
break;
}

case FlowMgmtRequest::EXPORT_FLOW: {
ExportFlow(req->flow(), req->diff_bytes(), req->diff_packets());
break;
}

default:
assert(0);

Expand All @@ -203,6 +245,215 @@ void FlowMgmtManager::RetryVrfDelete(uint32_t vrf_id) {
vrf_flow_mgmt_tree_.RetryDelete(vrf_id);
}

bool FlowMgmtManager::SetUnderlayPort(FlowEntry *flow, FlowDataIpv4 &s_flow) {
uint16_t underlay_src_port = 0;
bool exported = false;
if (flow->is_flags_set(FlowEntry::LocalFlow)) {
/* Set source_port as 0 for local flows. Source port is calculated by
* vrouter irrespective of whether flow is local or not. So for local
* flows we need to ignore port given by vrouter
*/
s_flow.set_underlay_source_port(0);
exported = true;
} else {
if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) {
underlay_src_port = flow->underlay_source_port();
if (underlay_src_port) {
exported = true;
}
} else {
exported = true;
}
s_flow.set_underlay_source_port(underlay_src_port);
}
flow->set_underlay_sport_exported(exported);
return exported;
}

void FlowMgmtManager::SetUnderlayInfo(FlowEntry *flow, FlowDataIpv4 &s_flow) {
string rid = agent_->router_id().to_string();
uint16_t underlay_src_port = 0;
if (flow->is_flags_set(FlowEntry::LocalFlow)) {
s_flow.set_vrouter_ip(rid);
s_flow.set_other_vrouter_ip(rid);
/* Set source_port as 0 for local flows. Source port is calculated by
* vrouter irrespective of whether flow is local or not. So for local
* flows we need to ignore port given by vrouter
*/
s_flow.set_underlay_source_port(0);
flow->set_underlay_sport_exported(true);
} else {
s_flow.set_vrouter_ip(rid);
s_flow.set_other_vrouter_ip(flow->peer_vrouter());
if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) {
underlay_src_port = flow->underlay_source_port();
if (underlay_src_port) {
flow->set_underlay_sport_exported(true);
}
} else {
flow->set_underlay_sport_exported(true);
}
s_flow.set_underlay_source_port(underlay_src_port);
}
s_flow.set_underlay_proto(flow->tunnel_type().GetType());
}

/* For ingress flows, change the SIP as Nat-IP instead of Native IP */
void FlowMgmtManager::SourceIpOverride(FlowEntry *flow, FlowDataIpv4 &s_flow) {
FlowEntry *rev_flow = flow->reverse_flow_entry();
if (flow->is_flags_set(FlowEntry::NatFlow) && s_flow.get_direction_ing() &&
rev_flow) {
const FlowKey *nat_key = &rev_flow->key();
if (flow->key().src_addr != nat_key->dst_addr) {
// TODO: IPV6
if (flow->key().family == Address::INET) {
s_flow.set_sourceip(nat_key->dst_addr.to_v4().to_ulong());
} else {
s_flow.set_sourceip(0);
}
}
}
}

void FlowMgmtManager::GetFlowSandeshActionParams(const FlowAction &action_info,
std::string &action_str) {
std::bitset<32> bs(action_info.action);
for (unsigned int i = 0; i <= bs.size(); i++) {
if (bs[i]) {
if (!action_str.empty()) {
action_str += "|";
}
action_str += TrafficAction::ActionToString(
static_cast<TrafficAction::Action>(i));
}
}
}

void FlowMgmtManager::DispatchFlowMsg(SandeshLevel::type level,
FlowDataIpv4 &flow) {
FLOW_DATA_IPV4_OBJECT_LOG("", level, flow);
}

void FlowMgmtManager::ExportFlow(FlowEntryPtr &fe, uint64_t diff_bytes,
uint64_t diff_pkts) {
FlowEntry *flow = fe.get();
tbb::mutex::scoped_lock mutex(flow->mutex());
FlowMgmtManager::FlowEntryInfo *info = FindFlowEntryInfo(fe);
if (info == NULL) {
return;
}
if ((diff_bytes == 0) && info->stats_exported_) {
return;
}
FlowDataIpv4 s_flow;
SandeshLevel::type level = SandeshLevel::SYS_DEBUG;
const FlowStats &stats = flow->stats();

s_flow.set_flowuuid(to_string(flow->flow_uuid()));
s_flow.set_bytes(stats.bytes);
s_flow.set_packets(stats.packets);
s_flow.set_diff_bytes(diff_bytes);
s_flow.set_diff_packets(diff_pkts);

// TODO: IPV6
if (flow->key().family == Address::INET) {
s_flow.set_sourceip(flow->key().src_addr.to_v4().to_ulong());
s_flow.set_destip(flow->key().dst_addr.to_v4().to_ulong());
} else {
s_flow.set_sourceip(0);
s_flow.set_destip(0);
}
s_flow.set_protocol(flow->key().protocol);
s_flow.set_sport(flow->key().src_port);
s_flow.set_dport(flow->key().dst_port);
s_flow.set_sourcevn(flow->data().source_vn);
s_flow.set_destvn(flow->data().dest_vn);

if (stats.intf_in != Interface::kInvalidIndex) {
Interface *intf = InterfaceTable::GetInstance()->FindInterface(stats.intf_in);
if (intf && intf->type() == Interface::VM_INTERFACE) {
VmInterface *vm_port = static_cast<VmInterface *>(intf);
const VmEntry *vm = vm_port->vm();
if (vm) {
s_flow.set_vm(vm->GetCfgName());
}
}
}
s_flow.set_sg_rule_uuid(flow->sg_rule_uuid());
s_flow.set_nw_ace_uuid(flow->nw_ace_uuid());

FlowEntry *rev_flow = flow->reverse_flow_entry();
if (rev_flow) {
s_flow.set_reverse_uuid(to_string(rev_flow->flow_uuid()));
}

// Flow setup(first) and teardown(last) messages are sent with higher
// priority.
if (!info->stats_exported_) {
s_flow.set_setup_time(stats.setup_time);
// Set flow action
std::string action_str;
GetFlowSandeshActionParams(flow->match_p().action_info, action_str);
s_flow.set_action(action_str);
info->stats_exported_ = true;
level = SandeshLevel::SYS_ERR;
SetUnderlayInfo(flow, s_flow);
} else {
/* When the flow is being exported for first time, underlay port
* info is set as part of SetUnderlayInfo. At this point it is possible
* that port is not yet populated to flow-entry because of either
* (i) flow-entry has not got chance to be evaluated by
* flow-stats-collector
* (ii) there is no flow entry in vrouter yet
* (iii) the flow entry in vrouter does not have underlay source port
* populated yet
*/
if (!flow->underlay_sport_exported()) {
if (SetUnderlayPort(flow, s_flow)) {
level = SandeshLevel::SYS_ERR;
}
}
}

if (stats.teardown_time) {
s_flow.set_teardown_time(stats.teardown_time);
//Teardown time will be set in flow only when flow is deleted.
//We need to reset the exported flag when flow is getting deleted to
//handle flow entry reuse case (Flow add request coming for flows
//marked as deleted)
info->stats_exported_ = false;
flow->set_underlay_sport_exported(false);
level = SandeshLevel::SYS_ERR;
}

if (flow->is_flags_set(FlowEntry::LocalFlow)) {
/* For local flows we need to send two flow log messages.
* 1. With direction as ingress
* 2. With direction as egress
* For local flows we have already sent flow log above with
* direction as ingress. We are sending flow log below with
* direction as egress.
*/
s_flow.set_direction_ing(1);
SourceIpOverride(flow, s_flow);
DispatchFlowMsg(level, s_flow);
s_flow.set_direction_ing(0);
//Export local flow of egress direction with a different UUID even when
//the flow is same. Required for analytics module to query flows
//irrespective of direction.
s_flow.set_flowuuid(to_string(flow->egress_uuid()));
DispatchFlowMsg(level, s_flow);
} else {
if (flow->is_flags_set(FlowEntry::IngressDir)) {
s_flow.set_direction_ing(1);
SourceIpOverride(flow, s_flow);
} else {
s_flow.set_direction_ing(0);
}
DispatchFlowMsg(level, s_flow);
}
}

// Extract all the FlowMgmtKey for a flow
void FlowMgmtManager::LogFlow(FlowEntry *flow, const std::string &op) {
FlowInfo trace;
Expand Down

0 comments on commit b2ae967

Please sign in to comment.