Skip to content

Commit

Permalink
Merge "Move Flow logging to Flow Management module" into R2.22-dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 17, 2015
2 parents 0362b7f + b2ae967 commit 43025d7
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 65 deletions.
14 changes: 0 additions & 14 deletions src/vnsw/agent/pkt/flow_entry.cc
Expand Up @@ -1322,20 +1322,6 @@ void SetActionStr(const FlowAction &action_info,
}
}

void GetFlowSandeshActionParams(const FlowAction &action_info,
std::string &action_str) {
std::bitset<32> bs(action_info.action);
for (unsigned int i = 0; i <= bs.size(); i++) {
if (bs[i]) {
if (!action_str.empty()) {
action_str += "|";
}
action_str += TrafficAction::ActionToString(
static_cast<TrafficAction::Action>(i));
}
}
}

static void SetAclListAclAction(const std::list<MatchAclParams> &acl_l,
std::vector<AclAction> &acl_action_l,
std::string &acl_type) {
Expand Down
3 changes: 1 addition & 2 deletions src/vnsw/agent/pkt/flow_entry.h
Expand Up @@ -106,7 +106,7 @@ struct FlowKey {

struct FlowStats {
FlowStats() : setup_time(0), teardown_time(0), last_modified_time(0),
bytes(0), packets(0), intf_in(0), exported(false), fip(0),
bytes(0), packets(0), intf_in(0), fip(0),
fip_vm_port_id(Interface::kInvalidIndex) {}

uint64_t setup_time;
Expand All @@ -115,7 +115,6 @@ struct FlowStats {
uint64_t bytes;
uint64_t packets;
uint32_t intf_in;
bool exported;
// Following fields are required for FIP stats accounting
uint32_t fip;
uint32_t fip_vm_port_id;
Expand Down
251 changes: 251 additions & 0 deletions src/vnsw/agent/pkt/flow_mgmt.cc
@@ -1,3 +1,4 @@
#include <bitset>
#include "pkt/flow_mgmt.h"
#include "pkt/flow_mgmt_request.h"
#include "pkt/flow_mgmt_response.h"
Expand Down Expand Up @@ -29,6 +30,11 @@ FlowMgmtManager::FlowMgmtManager(Agent *agent, FlowTable *flow_table) :

void FlowMgmtManager::Init() {
flow_mgmt_dbclient_->Init();
agent_->acl_table()->set_ace_flow_sandesh_data_cb
(boost::bind(&FlowMgmtManager::SetAceSandeshData, this, _1, _2, _3));
agent_->acl_table()->set_acl_flow_sandesh_data_cb
(boost::bind(&FlowMgmtManager::SetAclFlowSandeshData, this, _1, _2,
_3));
}

void FlowMgmtManager::Shutdown() {
Expand All @@ -37,6 +43,28 @@ void FlowMgmtManager::Shutdown() {
flow_mgmt_dbclient_->Shutdown();
}

/////////////////////////////////////////////////////////////////////////////
// Introspect routines
/////////////////////////////////////////////////////////////////////////////
string FlowMgmtManager::GetAceSandeshDataKey(const AclDBEntry *acl,
int ace_id) {
string uuid_str = UuidToString(acl->GetUuid());
stringstream ss;
ss << uuid_str << ":";
ss << ace_id;
return ss.str();
}

void FlowMgmtManager::SetAceSandeshData(const AclDBEntry *acl,
AclFlowCountResp &data,
int ace_id) {
}

void FlowMgmtManager::SetAclFlowSandeshData(const AclDBEntry *acl,
AclFlowResp &data,
const int last_count) {
}

/////////////////////////////////////////////////////////////////////////////
// Utility methods to enqueue events into work-queue
/////////////////////////////////////////////////////////////////////////////
Expand All @@ -47,6 +75,15 @@ void FlowMgmtManager::AddEvent(FlowEntry *flow) {
request_queue_.Enqueue(req);
}

void FlowMgmtManager::ExportEvent(FlowEntry *flow, uint64_t diff_bytes,
uint64_t diff_pkts) {
FlowEntryPtr flow_ptr(flow);
boost::shared_ptr<FlowMgmtRequest>
req(new FlowMgmtRequest(FlowMgmtRequest::EXPORT_FLOW, flow_ptr,
diff_bytes, diff_pkts));
request_queue_.Enqueue(req);
}

void FlowMgmtManager::DeleteEvent(FlowEntry *flow) {
FlowEntryPtr flow_ptr(flow);
boost::shared_ptr<FlowMgmtRequest>
Expand Down Expand Up @@ -192,6 +229,11 @@ bool FlowMgmtManager::RequestHandler(boost::shared_ptr<FlowMgmtRequest> req) {
break;
}

case FlowMgmtRequest::EXPORT_FLOW: {
ExportFlow(req->flow(), req->diff_bytes(), req->diff_packets());
break;
}

default:
assert(0);

Expand All @@ -203,6 +245,215 @@ void FlowMgmtManager::RetryVrfDelete(uint32_t vrf_id) {
vrf_flow_mgmt_tree_.RetryDelete(vrf_id);
}

bool FlowMgmtManager::SetUnderlayPort(FlowEntry *flow, FlowDataIpv4 &s_flow) {
uint16_t underlay_src_port = 0;
bool exported = false;
if (flow->is_flags_set(FlowEntry::LocalFlow)) {
/* Set source_port as 0 for local flows. Source port is calculated by
* vrouter irrespective of whether flow is local or not. So for local
* flows we need to ignore port given by vrouter
*/
s_flow.set_underlay_source_port(0);
exported = true;
} else {
if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) {
underlay_src_port = flow->underlay_source_port();
if (underlay_src_port) {
exported = true;
}
} else {
exported = true;
}
s_flow.set_underlay_source_port(underlay_src_port);
}
flow->set_underlay_sport_exported(exported);
return exported;
}

void FlowMgmtManager::SetUnderlayInfo(FlowEntry *flow, FlowDataIpv4 &s_flow) {
string rid = agent_->router_id().to_string();
uint16_t underlay_src_port = 0;
if (flow->is_flags_set(FlowEntry::LocalFlow)) {
s_flow.set_vrouter_ip(rid);
s_flow.set_other_vrouter_ip(rid);
/* Set source_port as 0 for local flows. Source port is calculated by
* vrouter irrespective of whether flow is local or not. So for local
* flows we need to ignore port given by vrouter
*/
s_flow.set_underlay_source_port(0);
flow->set_underlay_sport_exported(true);
} else {
s_flow.set_vrouter_ip(rid);
s_flow.set_other_vrouter_ip(flow->peer_vrouter());
if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) {
underlay_src_port = flow->underlay_source_port();
if (underlay_src_port) {
flow->set_underlay_sport_exported(true);
}
} else {
flow->set_underlay_sport_exported(true);
}
s_flow.set_underlay_source_port(underlay_src_port);
}
s_flow.set_underlay_proto(flow->tunnel_type().GetType());
}

/* For ingress flows, change the SIP as Nat-IP instead of Native IP */
void FlowMgmtManager::SourceIpOverride(FlowEntry *flow, FlowDataIpv4 &s_flow) {
FlowEntry *rev_flow = flow->reverse_flow_entry();
if (flow->is_flags_set(FlowEntry::NatFlow) && s_flow.get_direction_ing() &&
rev_flow) {
const FlowKey *nat_key = &rev_flow->key();
if (flow->key().src_addr != nat_key->dst_addr) {
// TODO: IPV6
if (flow->key().family == Address::INET) {
s_flow.set_sourceip(nat_key->dst_addr.to_v4().to_ulong());
} else {
s_flow.set_sourceip(0);
}
}
}
}

void FlowMgmtManager::GetFlowSandeshActionParams(const FlowAction &action_info,
std::string &action_str) {
std::bitset<32> bs(action_info.action);
for (unsigned int i = 0; i <= bs.size(); i++) {
if (bs[i]) {
if (!action_str.empty()) {
action_str += "|";
}
action_str += TrafficAction::ActionToString(
static_cast<TrafficAction::Action>(i));
}
}
}

void FlowMgmtManager::DispatchFlowMsg(SandeshLevel::type level,
FlowDataIpv4 &flow) {
FLOW_DATA_IPV4_OBJECT_LOG("", level, flow);
}

void FlowMgmtManager::ExportFlow(FlowEntryPtr &fe, uint64_t diff_bytes,
uint64_t diff_pkts) {
FlowEntry *flow = fe.get();
tbb::mutex::scoped_lock mutex(flow->mutex());
FlowMgmtManager::FlowEntryInfo *info = FindFlowEntryInfo(fe);
if (info == NULL) {
return;
}
if ((diff_bytes == 0) && info->stats_exported_) {
return;
}
FlowDataIpv4 s_flow;
SandeshLevel::type level = SandeshLevel::SYS_DEBUG;
const FlowStats &stats = flow->stats();

s_flow.set_flowuuid(to_string(flow->flow_uuid()));
s_flow.set_bytes(stats.bytes);
s_flow.set_packets(stats.packets);
s_flow.set_diff_bytes(diff_bytes);
s_flow.set_diff_packets(diff_pkts);

// TODO: IPV6
if (flow->key().family == Address::INET) {
s_flow.set_sourceip(flow->key().src_addr.to_v4().to_ulong());
s_flow.set_destip(flow->key().dst_addr.to_v4().to_ulong());
} else {
s_flow.set_sourceip(0);
s_flow.set_destip(0);
}
s_flow.set_protocol(flow->key().protocol);
s_flow.set_sport(flow->key().src_port);
s_flow.set_dport(flow->key().dst_port);
s_flow.set_sourcevn(flow->data().source_vn);
s_flow.set_destvn(flow->data().dest_vn);

if (stats.intf_in != Interface::kInvalidIndex) {
Interface *intf = InterfaceTable::GetInstance()->FindInterface(stats.intf_in);
if (intf && intf->type() == Interface::VM_INTERFACE) {
VmInterface *vm_port = static_cast<VmInterface *>(intf);
const VmEntry *vm = vm_port->vm();
if (vm) {
s_flow.set_vm(vm->GetCfgName());
}
}
}
s_flow.set_sg_rule_uuid(flow->sg_rule_uuid());
s_flow.set_nw_ace_uuid(flow->nw_ace_uuid());

FlowEntry *rev_flow = flow->reverse_flow_entry();
if (rev_flow) {
s_flow.set_reverse_uuid(to_string(rev_flow->flow_uuid()));
}

// Flow setup(first) and teardown(last) messages are sent with higher
// priority.
if (!info->stats_exported_) {
s_flow.set_setup_time(stats.setup_time);
// Set flow action
std::string action_str;
GetFlowSandeshActionParams(flow->match_p().action_info, action_str);
s_flow.set_action(action_str);
info->stats_exported_ = true;
level = SandeshLevel::SYS_ERR;
SetUnderlayInfo(flow, s_flow);
} else {
/* When the flow is being exported for first time, underlay port
* info is set as part of SetUnderlayInfo. At this point it is possible
* that port is not yet populated to flow-entry because of either
* (i) flow-entry has not got chance to be evaluated by
* flow-stats-collector
* (ii) there is no flow entry in vrouter yet
* (iii) the flow entry in vrouter does not have underlay source port
* populated yet
*/
if (!flow->underlay_sport_exported()) {
if (SetUnderlayPort(flow, s_flow)) {
level = SandeshLevel::SYS_ERR;
}
}
}

if (stats.teardown_time) {
s_flow.set_teardown_time(stats.teardown_time);
//Teardown time will be set in flow only when flow is deleted.
//We need to reset the exported flag when flow is getting deleted to
//handle flow entry reuse case (Flow add request coming for flows
//marked as deleted)
info->stats_exported_ = false;
flow->set_underlay_sport_exported(false);
level = SandeshLevel::SYS_ERR;
}

if (flow->is_flags_set(FlowEntry::LocalFlow)) {
/* For local flows we need to send two flow log messages.
* 1. With direction as ingress
* 2. With direction as egress
* For local flows we have already sent flow log above with
* direction as ingress. We are sending flow log below with
* direction as egress.
*/
s_flow.set_direction_ing(1);
SourceIpOverride(flow, s_flow);
DispatchFlowMsg(level, s_flow);
s_flow.set_direction_ing(0);
//Export local flow of egress direction with a different UUID even when
//the flow is same. Required for analytics module to query flows
//irrespective of direction.
s_flow.set_flowuuid(to_string(flow->egress_uuid()));
DispatchFlowMsg(level, s_flow);
} else {
if (flow->is_flags_set(FlowEntry::IngressDir)) {
s_flow.set_direction_ing(1);
SourceIpOverride(flow, s_flow);
} else {
s_flow.set_direction_ing(0);
}
DispatchFlowMsg(level, s_flow);
}
}

// Extract all the FlowMgmtKey for a flow
void FlowMgmtManager::LogFlow(FlowEntry *flow, const std::string &op) {
FlowInfo trace;
Expand Down

0 comments on commit 43025d7

Please sign in to comment.