diff --git a/src/vnsw/agent/pkt/flow_entry.cc b/src/vnsw/agent/pkt/flow_entry.cc index 11608129977..976cf7cdca0 100644 --- a/src/vnsw/agent/pkt/flow_entry.cc +++ b/src/vnsw/agent/pkt/flow_entry.cc @@ -1322,20 +1322,6 @@ void SetActionStr(const FlowAction &action_info, } } -void GetFlowSandeshActionParams(const FlowAction &action_info, - std::string &action_str) { - std::bitset<32> bs(action_info.action); - for (unsigned int i = 0; i <= bs.size(); i++) { - if (bs[i]) { - if (!action_str.empty()) { - action_str += "|"; - } - action_str += TrafficAction::ActionToString( - static_cast(i)); - } - } -} - static void SetAclListAclAction(const std::list &acl_l, std::vector &acl_action_l, std::string &acl_type) { diff --git a/src/vnsw/agent/pkt/flow_entry.h b/src/vnsw/agent/pkt/flow_entry.h index bb0d167cebd..6f44a278196 100644 --- a/src/vnsw/agent/pkt/flow_entry.h +++ b/src/vnsw/agent/pkt/flow_entry.h @@ -106,7 +106,7 @@ struct FlowKey { struct FlowStats { FlowStats() : setup_time(0), teardown_time(0), last_modified_time(0), - bytes(0), packets(0), intf_in(0), exported(false), fip(0), + bytes(0), packets(0), intf_in(0), fip(0), fip_vm_port_id(Interface::kInvalidIndex) {} uint64_t setup_time; @@ -115,7 +115,6 @@ struct FlowStats { uint64_t bytes; uint64_t packets; uint32_t intf_in; - bool exported; // Following fields are required for FIP stats accounting uint32_t fip; uint32_t fip_vm_port_id; diff --git a/src/vnsw/agent/pkt/flow_mgmt.cc b/src/vnsw/agent/pkt/flow_mgmt.cc index 01ebf0d83a7..8e9e231a26e 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.cc +++ b/src/vnsw/agent/pkt/flow_mgmt.cc @@ -1,3 +1,4 @@ +#include #include "pkt/flow_mgmt.h" #include "pkt/flow_mgmt_request.h" #include "pkt/flow_mgmt_response.h" @@ -29,6 +30,11 @@ FlowMgmtManager::FlowMgmtManager(Agent *agent, FlowTable *flow_table) : void FlowMgmtManager::Init() { flow_mgmt_dbclient_->Init(); + agent_->acl_table()->set_ace_flow_sandesh_data_cb + (boost::bind(&FlowMgmtManager::SetAceSandeshData, this, _1, _2, _3)); + agent_->acl_table()->set_acl_flow_sandesh_data_cb + (boost::bind(&FlowMgmtManager::SetAclFlowSandeshData, this, _1, _2, + _3)); } void FlowMgmtManager::Shutdown() { @@ -37,6 +43,28 @@ void FlowMgmtManager::Shutdown() { flow_mgmt_dbclient_->Shutdown(); } +///////////////////////////////////////////////////////////////////////////// +// Introspect routines +///////////////////////////////////////////////////////////////////////////// +string FlowMgmtManager::GetAceSandeshDataKey(const AclDBEntry *acl, + int ace_id) { + string uuid_str = UuidToString(acl->GetUuid()); + stringstream ss; + ss << uuid_str << ":"; + ss << ace_id; + return ss.str(); +} + +void FlowMgmtManager::SetAceSandeshData(const AclDBEntry *acl, + AclFlowCountResp &data, + int ace_id) { +} + +void FlowMgmtManager::SetAclFlowSandeshData(const AclDBEntry *acl, + AclFlowResp &data, + const int last_count) { +} + ///////////////////////////////////////////////////////////////////////////// // Utility methods to enqueue events into work-queue ///////////////////////////////////////////////////////////////////////////// @@ -47,6 +75,15 @@ void FlowMgmtManager::AddEvent(FlowEntry *flow) { request_queue_.Enqueue(req); } +void FlowMgmtManager::ExportEvent(FlowEntry *flow, uint64_t diff_bytes, + uint64_t diff_pkts) { + FlowEntryPtr flow_ptr(flow); + boost::shared_ptr + req(new FlowMgmtRequest(FlowMgmtRequest::EXPORT_FLOW, flow_ptr, + diff_bytes, diff_pkts)); + request_queue_.Enqueue(req); +} + void FlowMgmtManager::DeleteEvent(FlowEntry *flow) { FlowEntryPtr flow_ptr(flow); boost::shared_ptr @@ -192,6 +229,11 @@ bool FlowMgmtManager::RequestHandler(boost::shared_ptr req) { break; } + case FlowMgmtRequest::EXPORT_FLOW: { + ExportFlow(req->flow(), req->diff_bytes(), req->diff_packets()); + break; + } + default: assert(0); @@ -203,6 +245,215 @@ void FlowMgmtManager::RetryVrfDelete(uint32_t vrf_id) { vrf_flow_mgmt_tree_.RetryDelete(vrf_id); } +bool FlowMgmtManager::SetUnderlayPort(FlowEntry *flow, FlowDataIpv4 &s_flow) { + uint16_t underlay_src_port = 0; + bool exported = false; + if (flow->is_flags_set(FlowEntry::LocalFlow)) { + /* Set source_port as 0 for local flows. Source port is calculated by + * vrouter irrespective of whether flow is local or not. So for local + * flows we need to ignore port given by vrouter + */ + s_flow.set_underlay_source_port(0); + exported = true; + } else { + if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) { + underlay_src_port = flow->underlay_source_port(); + if (underlay_src_port) { + exported = true; + } + } else { + exported = true; + } + s_flow.set_underlay_source_port(underlay_src_port); + } + flow->set_underlay_sport_exported(exported); + return exported; +} + +void FlowMgmtManager::SetUnderlayInfo(FlowEntry *flow, FlowDataIpv4 &s_flow) { + string rid = agent_->router_id().to_string(); + uint16_t underlay_src_port = 0; + if (flow->is_flags_set(FlowEntry::LocalFlow)) { + s_flow.set_vrouter_ip(rid); + s_flow.set_other_vrouter_ip(rid); + /* Set source_port as 0 for local flows. Source port is calculated by + * vrouter irrespective of whether flow is local or not. So for local + * flows we need to ignore port given by vrouter + */ + s_flow.set_underlay_source_port(0); + flow->set_underlay_sport_exported(true); + } else { + s_flow.set_vrouter_ip(rid); + s_flow.set_other_vrouter_ip(flow->peer_vrouter()); + if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) { + underlay_src_port = flow->underlay_source_port(); + if (underlay_src_port) { + flow->set_underlay_sport_exported(true); + } + } else { + flow->set_underlay_sport_exported(true); + } + s_flow.set_underlay_source_port(underlay_src_port); + } + s_flow.set_underlay_proto(flow->tunnel_type().GetType()); +} + +/* For ingress flows, change the SIP as Nat-IP instead of Native IP */ +void FlowMgmtManager::SourceIpOverride(FlowEntry *flow, FlowDataIpv4 &s_flow) { + FlowEntry *rev_flow = flow->reverse_flow_entry(); + if (flow->is_flags_set(FlowEntry::NatFlow) && s_flow.get_direction_ing() && + rev_flow) { + const FlowKey *nat_key = &rev_flow->key(); + if (flow->key().src_addr != nat_key->dst_addr) { + // TODO: IPV6 + if (flow->key().family == Address::INET) { + s_flow.set_sourceip(nat_key->dst_addr.to_v4().to_ulong()); + } else { + s_flow.set_sourceip(0); + } + } + } +} + +void FlowMgmtManager::GetFlowSandeshActionParams(const FlowAction &action_info, + std::string &action_str) { + std::bitset<32> bs(action_info.action); + for (unsigned int i = 0; i <= bs.size(); i++) { + if (bs[i]) { + if (!action_str.empty()) { + action_str += "|"; + } + action_str += TrafficAction::ActionToString( + static_cast(i)); + } + } +} + +void FlowMgmtManager::DispatchFlowMsg(SandeshLevel::type level, + FlowDataIpv4 &flow) { + FLOW_DATA_IPV4_OBJECT_LOG("", level, flow); +} + +void FlowMgmtManager::ExportFlow(FlowEntryPtr &fe, uint64_t diff_bytes, + uint64_t diff_pkts) { + FlowEntry *flow = fe.get(); + tbb::mutex::scoped_lock mutex(flow->mutex()); + FlowMgmtManager::FlowEntryInfo *info = FindFlowEntryInfo(fe); + if (info == NULL) { + return; + } + if ((diff_bytes == 0) && info->stats_exported_) { + return; + } + FlowDataIpv4 s_flow; + SandeshLevel::type level = SandeshLevel::SYS_DEBUG; + const FlowStats &stats = flow->stats(); + + s_flow.set_flowuuid(to_string(flow->flow_uuid())); + s_flow.set_bytes(stats.bytes); + s_flow.set_packets(stats.packets); + s_flow.set_diff_bytes(diff_bytes); + s_flow.set_diff_packets(diff_pkts); + + // TODO: IPV6 + if (flow->key().family == Address::INET) { + s_flow.set_sourceip(flow->key().src_addr.to_v4().to_ulong()); + s_flow.set_destip(flow->key().dst_addr.to_v4().to_ulong()); + } else { + s_flow.set_sourceip(0); + s_flow.set_destip(0); + } + s_flow.set_protocol(flow->key().protocol); + s_flow.set_sport(flow->key().src_port); + s_flow.set_dport(flow->key().dst_port); + s_flow.set_sourcevn(flow->data().source_vn); + s_flow.set_destvn(flow->data().dest_vn); + + if (stats.intf_in != Interface::kInvalidIndex) { + Interface *intf = InterfaceTable::GetInstance()->FindInterface(stats.intf_in); + if (intf && intf->type() == Interface::VM_INTERFACE) { + VmInterface *vm_port = static_cast(intf); + const VmEntry *vm = vm_port->vm(); + if (vm) { + s_flow.set_vm(vm->GetCfgName()); + } + } + } + s_flow.set_sg_rule_uuid(flow->sg_rule_uuid()); + s_flow.set_nw_ace_uuid(flow->nw_ace_uuid()); + + FlowEntry *rev_flow = flow->reverse_flow_entry(); + if (rev_flow) { + s_flow.set_reverse_uuid(to_string(rev_flow->flow_uuid())); + } + + // Flow setup(first) and teardown(last) messages are sent with higher + // priority. + if (!info->stats_exported_) { + s_flow.set_setup_time(stats.setup_time); + // Set flow action + std::string action_str; + GetFlowSandeshActionParams(flow->match_p().action_info, action_str); + s_flow.set_action(action_str); + info->stats_exported_ = true; + level = SandeshLevel::SYS_ERR; + SetUnderlayInfo(flow, s_flow); + } else { + /* When the flow is being exported for first time, underlay port + * info is set as part of SetUnderlayInfo. At this point it is possible + * that port is not yet populated to flow-entry because of either + * (i) flow-entry has not got chance to be evaluated by + * flow-stats-collector + * (ii) there is no flow entry in vrouter yet + * (iii) the flow entry in vrouter does not have underlay source port + * populated yet + */ + if (!flow->underlay_sport_exported()) { + if (SetUnderlayPort(flow, s_flow)) { + level = SandeshLevel::SYS_ERR; + } + } + } + + if (stats.teardown_time) { + s_flow.set_teardown_time(stats.teardown_time); + //Teardown time will be set in flow only when flow is deleted. + //We need to reset the exported flag when flow is getting deleted to + //handle flow entry reuse case (Flow add request coming for flows + //marked as deleted) + info->stats_exported_ = false; + flow->set_underlay_sport_exported(false); + level = SandeshLevel::SYS_ERR; + } + + if (flow->is_flags_set(FlowEntry::LocalFlow)) { + /* For local flows we need to send two flow log messages. + * 1. With direction as ingress + * 2. With direction as egress + * For local flows we have already sent flow log above with + * direction as ingress. We are sending flow log below with + * direction as egress. + */ + s_flow.set_direction_ing(1); + SourceIpOverride(flow, s_flow); + DispatchFlowMsg(level, s_flow); + s_flow.set_direction_ing(0); + //Export local flow of egress direction with a different UUID even when + //the flow is same. Required for analytics module to query flows + //irrespective of direction. + s_flow.set_flowuuid(to_string(flow->egress_uuid())); + DispatchFlowMsg(level, s_flow); + } else { + if (flow->is_flags_set(FlowEntry::IngressDir)) { + s_flow.set_direction_ing(1); + SourceIpOverride(flow, s_flow); + } else { + s_flow.set_direction_ing(0); + } + DispatchFlowMsg(level, s_flow); + } +} + // Extract all the FlowMgmtKey for a flow void FlowMgmtManager::LogFlow(FlowEntry *flow, const std::string &op) { FlowInfo trace; diff --git a/src/vnsw/agent/pkt/flow_mgmt.h b/src/vnsw/agent/pkt/flow_mgmt.h index a67f214f4d9..6c6bfc687bc 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.h +++ b/src/vnsw/agent/pkt/flow_mgmt.h @@ -7,6 +7,7 @@ #include "pkt/flow_table.h" #include "pkt/flow_mgmt_request.h" #include "pkt/flow_mgmt_response.h" +#include //////////////////////////////////////////////////////////////////////////// // Flow Management module is responsible to keep flow action in-sync with @@ -55,6 +56,7 @@ // * Add of DBEntry // * Change of DBEntry // * Delete of DBEntry +// * Export of Flow // // - Flow Management Response // Flow Management Tree module may generate response events as response to @@ -65,15 +67,18 @@ // // Workflow for flow manager module is given below, // 1. Flow Table module will enqueue message to Flow Management queue on -// add/delete/change of a flow -// 2. Flow Management module builds the following tracking information +// add/delete/change of a flow. On Flow delete event, Flow Table module will +// will also enqueue export of the flow. +// 2. Flow stats collection module will enqueue message to Flow Management +// queue on export of flow +// 3. Flow Management module builds the following tracking information // - Operational entry to list of dependent flows // - Flow entry to list of operational-entries it is dependent on -// 3. DBClient module registers to DBTables of interest and tracks changes to +// 4. DBClient module registers to DBTables of interest and tracks changes to // operational-db entries -// 4. Flow Table module will enqueue a message to Flow Management queue on +// 5. Flow Table module will enqueue a message to Flow Management queue on // add/delete/change of operational entries -// 5. The action in flow-management module for operational entry events will +// 6. The action in flow-management module for operational entry events will // depend on the operational entry type // // VN Add/Change : Revaluate flows for change in policy @@ -109,9 +114,9 @@ // // Flow reference // -------------- -// 1. All message between flow-management and flow-table module will hold -// object references. This will ensure ref-count for object dont drop till -// messages are processed. +// 1. All message between flow-management and flow-table/flow-stats module will +// hold object references. This will ensure ref-count for object dont drop +// till messages are processed. // 2. Every flow seen by flow-management module is stored in flow_tree_. Key // for the tree will FlowEntryPtr which holds reference for the flow entry // All other data structures will refer to flow pointer directly. @@ -936,8 +941,9 @@ class FlowMgmtManager { uint32_t count_; // Number of times tree modified bool ingress_; // Ingress flow? bool local_flow_; + bool stats_exported_; - FlowEntryInfo() { } + FlowEntryInfo() : stats_exported_(false) { } virtual ~FlowEntryInfo() { assert(tree_.size() == 0); } }; @@ -976,6 +982,7 @@ class FlowMgmtManager { Agent *agent() const { return agent_; } FlowTable *flow_table() const { return flow_table_; } void AddEvent(FlowEntry *low); + void ExportEvent(FlowEntry *flow, uint64_t diff_bytes, uint64_t diff_pkts); void DeleteEvent(FlowEntry *flow); void AddEvent(const DBEntry *entry, uint32_t gen_id); void ChangeEvent(const DBEntry *entry, uint32_t gen_id); @@ -1006,6 +1013,18 @@ class FlowMgmtManager { void DeleteFlowEntryInfo(FlowEntryPtr &flow); void MakeFlowMgmtKeyTree(FlowEntry *flow, FlowMgmtKeyTree *tree); void LogFlow(FlowEntry *flow, const std::string &op); + void ExportFlow(FlowEntryPtr &flow, uint64_t diff_bytes, uint64_t diff_pkts); + void DispatchFlowMsg(SandeshLevel::type level, FlowDataIpv4 &flow); + void GetFlowSandeshActionParams(const FlowAction &action_info, + std::string &action_str); + void SourceIpOverride(FlowEntry *flow, FlowDataIpv4 &s_flow); + void SetUnderlayInfo(FlowEntry *flow, FlowDataIpv4 &s_flow); + bool SetUnderlayPort(FlowEntry *flow, FlowDataIpv4 &s_flow); + std::string GetAceSandeshDataKey(const AclDBEntry *acl, int ace_id); + void SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, + int ace_id); + void SetAclFlowSandeshData(const AclDBEntry *acl, AclFlowResp &data, + const int last_count); Agent *agent_; FlowTable *flow_table_; diff --git a/src/vnsw/agent/pkt/flow_mgmt_request.h b/src/vnsw/agent/pkt/flow_mgmt_request.h index a6542e6b2d6..2dc5a477167 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_request.h +++ b/src/vnsw/agent/pkt/flow_mgmt_request.h @@ -19,7 +19,8 @@ class FlowMgmtRequest { ADD_DBENTRY, CHANGE_DBENTRY, DELETE_DBENTRY, - RETRY_DELETE_VRF + RETRY_DELETE_VRF, + EXPORT_FLOW }; FlowMgmtRequest(Event event, FlowEntryPtr &flow) : @@ -28,6 +29,14 @@ class FlowMgmtRequest { assert(vrf_id_); } + FlowMgmtRequest(Event event, FlowEntryPtr &flow, uint64_t bytes, + uint64_t packets) : + event_(event), flow_(flow), db_entry_(NULL), vrf_id_(0), + diff_bytes_(bytes), diff_packets_(packets) { + if (event == RETRY_DELETE_VRF) + assert(vrf_id_); + } + FlowMgmtRequest(Event event, const DBEntry *db_entry, uint32_t gen_id) : event_(event), flow_(NULL), db_entry_(db_entry), vrf_id_(0), gen_id_(gen_id) { @@ -74,6 +83,8 @@ class FlowMgmtRequest { void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; } uint32_t vrf_id() const { return vrf_id_; } uint32_t gen_id() const { return gen_id_; } + uint64_t diff_bytes() const { return diff_bytes_; } + uint64_t diff_packets() const { return diff_packets_; } private: Event event_; @@ -84,6 +95,8 @@ class FlowMgmtRequest { const DBEntry *db_entry_; uint32_t vrf_id_; uint32_t gen_id_; + uint64_t diff_bytes_; + uint64_t diff_packets_; DISALLOW_COPY_AND_ASSIGN(FlowMgmtRequest); }; diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index 08949711bd8..28bf5b7b581 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -71,10 +71,6 @@ FlowTable::~FlowTable() { void FlowTable::Init() { FlowEntry::Init(); rand_gen_ = boost::uuids::random_generator(); - agent_->acl_table()->set_ace_flow_sandesh_data_cb - (boost::bind(&FlowTable::SetAceSandeshData, this, _1, _2, _3)); - agent_->acl_table()->set_acl_flow_sandesh_data_cb - (boost::bind(&FlowTable::SetAclFlowSandeshData, this, _1, _2, _3)); return; } @@ -859,33 +855,6 @@ void FlowTable::UpdateKSync(FlowEntry *flow) { } } -///////////////////////////////////////////////////////////////////////////// -// Introspect routines -///////////////////////////////////////////////////////////////////////////// -string FlowTable::GetAceSandeshDataKey(const AclDBEntry *acl, int ace_id) { - string uuid_str = UuidToString(acl->GetUuid()); - stringstream ss; - ss << uuid_str << ":"; - ss << ace_id; - return ss.str(); -} - -void FlowTable::DispatchFlowMsg(SandeshLevel::type level, FlowDataIpv4 &flow) { - FLOW_DATA_IPV4_OBJECT_LOG("", level, flow); -} - -void FlowTable::FlowExport(FlowEntry *flow, uint64_t diff_bytes, - uint64_t diff_pkts) { -} - -void FlowTable::SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, - int ace_id) { -} - -void FlowTable::SetAclFlowSandeshData(const AclDBEntry *acl, AclFlowResp &data, - const int last_count) { -} - void FlowTable::SendFlowInternal(FlowEntry *fe) { if (fe->deleted()) { /* Already deleted return from here. */ @@ -896,7 +865,8 @@ void FlowTable::SendFlowInternal(FlowEntry *fe) { fec->UpdateFlowStats(fe, diff_bytes, diff_packets); fe->stats_.teardown_time = UTCTimestampUsec(); - FlowExport(fe, diff_bytes, diff_packets); + agent_->pkt()->flow_mgmt_manager()->ExportEvent(fe, diff_bytes, + diff_packets); /* Reset stats and teardown_time after these information is exported during * flow delete so that if the flow entry is reused they point to right * values */ diff --git a/src/vnsw/agent/pkt/flow_table.h b/src/vnsw/agent/pkt/flow_table.h index 8b8856f8eaa..1a35f732b11 100644 --- a/src/vnsw/agent/pkt/flow_table.h +++ b/src/vnsw/agent/pkt/flow_table.h @@ -125,9 +125,6 @@ class FlowTable { void SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, int ace_id); - void FlowExport(FlowEntry *flow, uint64_t diff_bytes, uint64_t diff_pkts); - virtual void DispatchFlowMsg(SandeshLevel::type level, FlowDataIpv4 &flow); - void RevaluateFlow(FlowEntry *flow); void DeleteMessage(FlowEntry *flow); @@ -217,7 +214,6 @@ struct VmFlowInfo { extern SandeshTraceBufferPtr FlowTraceBuf; extern void SetActionStr(const FlowAction &, std::vector &); -extern void GetFlowSandeshActionParams(const FlowAction &, std::string &); #define FLOW_TRACE(obj, ...)\ do {\ diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index 0145d2d5f27..97dfe555e40 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -226,6 +227,7 @@ bool FlowStatsCollector::Run() { bool key_updation_reqd = true, deleted; uint64_t diff_bytes, diff_pkts; FlowTable *flow_obj = Agent::GetInstance()->pkt()->flow_table(); + FlowMgmtManager *mgr = agent_uve_->agent()->pkt()->flow_mgmt_manager(); run_counter_++; if (!flow_obj->Size()) { @@ -315,10 +317,10 @@ bool FlowStatsCollector::Run() { stats->bytes = bytes; stats->packets = packets; stats->last_modified_time = curr_time; - flow_obj->FlowExport(entry, diff_bytes, diff_pkts); - } else if (!stats->exported && !entry->deleted()) { + mgr->ExportEvent(entry, diff_bytes, diff_pkts); + } else if (!entry->deleted()) { /* export flow (reverse) for which traffic is not seen yet. */ - flow_obj->FlowExport(entry, 0, 0); + mgr->ExportEvent(entry, 0, 0); } }