From b2ae967c8b464d54eab87b28534a2a6ad2cf1d62 Mon Sep 17 00:00:00 2001 From: Ashok Singh Date: Tue, 11 Aug 2015 23:24:45 -0700 Subject: [PATCH] Move Flow logging to Flow Management module Define a message to enqueue Flow Export requests in Flow Management module. Move FlowExport functionality from FlowTable to Flow Management module. Replace FlowExport API calls in Flow Stats collector and Flow Table with a message to Flow Management module. Change-Id: I5bea219cac127f651d4c709f5e959779f89f3374 Partial-Bug: #1479295 --- src/vnsw/agent/pkt/flow_entry.cc | 14 - src/vnsw/agent/pkt/flow_entry.h | 3 +- src/vnsw/agent/pkt/flow_mgmt.cc | 251 ++++++++++++++++++ src/vnsw/agent/pkt/flow_mgmt.h | 37 ++- src/vnsw/agent/pkt/flow_mgmt_request.h | 15 +- src/vnsw/agent/pkt/flow_table.cc | 34 +-- src/vnsw/agent/pkt/flow_table.h | 4 - .../flow_stats/flow_stats_collector.cc | 8 +- 8 files changed, 301 insertions(+), 65 deletions(-) diff --git a/src/vnsw/agent/pkt/flow_entry.cc b/src/vnsw/agent/pkt/flow_entry.cc index 11608129977..976cf7cdca0 100644 --- a/src/vnsw/agent/pkt/flow_entry.cc +++ b/src/vnsw/agent/pkt/flow_entry.cc @@ -1322,20 +1322,6 @@ void SetActionStr(const FlowAction &action_info, } } -void GetFlowSandeshActionParams(const FlowAction &action_info, - std::string &action_str) { - std::bitset<32> bs(action_info.action); - for (unsigned int i = 0; i <= bs.size(); i++) { - if (bs[i]) { - if (!action_str.empty()) { - action_str += "|"; - } - action_str += TrafficAction::ActionToString( - static_cast(i)); - } - } -} - static void SetAclListAclAction(const std::list &acl_l, std::vector &acl_action_l, std::string &acl_type) { diff --git a/src/vnsw/agent/pkt/flow_entry.h b/src/vnsw/agent/pkt/flow_entry.h index bb0d167cebd..6f44a278196 100644 --- a/src/vnsw/agent/pkt/flow_entry.h +++ b/src/vnsw/agent/pkt/flow_entry.h @@ -106,7 +106,7 @@ struct FlowKey { struct FlowStats { FlowStats() : setup_time(0), teardown_time(0), last_modified_time(0), - bytes(0), packets(0), intf_in(0), exported(false), fip(0), + bytes(0), packets(0), intf_in(0), fip(0), fip_vm_port_id(Interface::kInvalidIndex) {} uint64_t setup_time; @@ -115,7 +115,6 @@ struct FlowStats { uint64_t bytes; uint64_t packets; uint32_t intf_in; - bool exported; // Following fields are required for FIP stats accounting uint32_t fip; uint32_t fip_vm_port_id; diff --git a/src/vnsw/agent/pkt/flow_mgmt.cc b/src/vnsw/agent/pkt/flow_mgmt.cc index 01ebf0d83a7..8e9e231a26e 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.cc +++ b/src/vnsw/agent/pkt/flow_mgmt.cc @@ -1,3 +1,4 @@ +#include #include "pkt/flow_mgmt.h" #include "pkt/flow_mgmt_request.h" #include "pkt/flow_mgmt_response.h" @@ -29,6 +30,11 @@ FlowMgmtManager::FlowMgmtManager(Agent *agent, FlowTable *flow_table) : void FlowMgmtManager::Init() { flow_mgmt_dbclient_->Init(); + agent_->acl_table()->set_ace_flow_sandesh_data_cb + (boost::bind(&FlowMgmtManager::SetAceSandeshData, this, _1, _2, _3)); + agent_->acl_table()->set_acl_flow_sandesh_data_cb + (boost::bind(&FlowMgmtManager::SetAclFlowSandeshData, this, _1, _2, + _3)); } void FlowMgmtManager::Shutdown() { @@ -37,6 +43,28 @@ void FlowMgmtManager::Shutdown() { flow_mgmt_dbclient_->Shutdown(); } +///////////////////////////////////////////////////////////////////////////// +// Introspect routines +///////////////////////////////////////////////////////////////////////////// +string FlowMgmtManager::GetAceSandeshDataKey(const AclDBEntry *acl, + int ace_id) { + string uuid_str = UuidToString(acl->GetUuid()); + stringstream ss; + ss << uuid_str << ":"; + ss << ace_id; + return ss.str(); +} + +void FlowMgmtManager::SetAceSandeshData(const AclDBEntry *acl, + AclFlowCountResp &data, + int ace_id) { +} + +void FlowMgmtManager::SetAclFlowSandeshData(const AclDBEntry *acl, + AclFlowResp &data, + const int last_count) { +} + ///////////////////////////////////////////////////////////////////////////// // Utility methods to enqueue events into work-queue ///////////////////////////////////////////////////////////////////////////// @@ -47,6 +75,15 @@ void FlowMgmtManager::AddEvent(FlowEntry *flow) { request_queue_.Enqueue(req); } +void FlowMgmtManager::ExportEvent(FlowEntry *flow, uint64_t diff_bytes, + uint64_t diff_pkts) { + FlowEntryPtr flow_ptr(flow); + boost::shared_ptr + req(new FlowMgmtRequest(FlowMgmtRequest::EXPORT_FLOW, flow_ptr, + diff_bytes, diff_pkts)); + request_queue_.Enqueue(req); +} + void FlowMgmtManager::DeleteEvent(FlowEntry *flow) { FlowEntryPtr flow_ptr(flow); boost::shared_ptr @@ -192,6 +229,11 @@ bool FlowMgmtManager::RequestHandler(boost::shared_ptr req) { break; } + case FlowMgmtRequest::EXPORT_FLOW: { + ExportFlow(req->flow(), req->diff_bytes(), req->diff_packets()); + break; + } + default: assert(0); @@ -203,6 +245,215 @@ void FlowMgmtManager::RetryVrfDelete(uint32_t vrf_id) { vrf_flow_mgmt_tree_.RetryDelete(vrf_id); } +bool FlowMgmtManager::SetUnderlayPort(FlowEntry *flow, FlowDataIpv4 &s_flow) { + uint16_t underlay_src_port = 0; + bool exported = false; + if (flow->is_flags_set(FlowEntry::LocalFlow)) { + /* Set source_port as 0 for local flows. Source port is calculated by + * vrouter irrespective of whether flow is local or not. So for local + * flows we need to ignore port given by vrouter + */ + s_flow.set_underlay_source_port(0); + exported = true; + } else { + if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) { + underlay_src_port = flow->underlay_source_port(); + if (underlay_src_port) { + exported = true; + } + } else { + exported = true; + } + s_flow.set_underlay_source_port(underlay_src_port); + } + flow->set_underlay_sport_exported(exported); + return exported; +} + +void FlowMgmtManager::SetUnderlayInfo(FlowEntry *flow, FlowDataIpv4 &s_flow) { + string rid = agent_->router_id().to_string(); + uint16_t underlay_src_port = 0; + if (flow->is_flags_set(FlowEntry::LocalFlow)) { + s_flow.set_vrouter_ip(rid); + s_flow.set_other_vrouter_ip(rid); + /* Set source_port as 0 for local flows. Source port is calculated by + * vrouter irrespective of whether flow is local or not. So for local + * flows we need to ignore port given by vrouter + */ + s_flow.set_underlay_source_port(0); + flow->set_underlay_sport_exported(true); + } else { + s_flow.set_vrouter_ip(rid); + s_flow.set_other_vrouter_ip(flow->peer_vrouter()); + if (flow->tunnel_type().GetType() != TunnelType::MPLS_GRE) { + underlay_src_port = flow->underlay_source_port(); + if (underlay_src_port) { + flow->set_underlay_sport_exported(true); + } + } else { + flow->set_underlay_sport_exported(true); + } + s_flow.set_underlay_source_port(underlay_src_port); + } + s_flow.set_underlay_proto(flow->tunnel_type().GetType()); +} + +/* For ingress flows, change the SIP as Nat-IP instead of Native IP */ +void FlowMgmtManager::SourceIpOverride(FlowEntry *flow, FlowDataIpv4 &s_flow) { + FlowEntry *rev_flow = flow->reverse_flow_entry(); + if (flow->is_flags_set(FlowEntry::NatFlow) && s_flow.get_direction_ing() && + rev_flow) { + const FlowKey *nat_key = &rev_flow->key(); + if (flow->key().src_addr != nat_key->dst_addr) { + // TODO: IPV6 + if (flow->key().family == Address::INET) { + s_flow.set_sourceip(nat_key->dst_addr.to_v4().to_ulong()); + } else { + s_flow.set_sourceip(0); + } + } + } +} + +void FlowMgmtManager::GetFlowSandeshActionParams(const FlowAction &action_info, + std::string &action_str) { + std::bitset<32> bs(action_info.action); + for (unsigned int i = 0; i <= bs.size(); i++) { + if (bs[i]) { + if (!action_str.empty()) { + action_str += "|"; + } + action_str += TrafficAction::ActionToString( + static_cast(i)); + } + } +} + +void FlowMgmtManager::DispatchFlowMsg(SandeshLevel::type level, + FlowDataIpv4 &flow) { + FLOW_DATA_IPV4_OBJECT_LOG("", level, flow); +} + +void FlowMgmtManager::ExportFlow(FlowEntryPtr &fe, uint64_t diff_bytes, + uint64_t diff_pkts) { + FlowEntry *flow = fe.get(); + tbb::mutex::scoped_lock mutex(flow->mutex()); + FlowMgmtManager::FlowEntryInfo *info = FindFlowEntryInfo(fe); + if (info == NULL) { + return; + } + if ((diff_bytes == 0) && info->stats_exported_) { + return; + } + FlowDataIpv4 s_flow; + SandeshLevel::type level = SandeshLevel::SYS_DEBUG; + const FlowStats &stats = flow->stats(); + + s_flow.set_flowuuid(to_string(flow->flow_uuid())); + s_flow.set_bytes(stats.bytes); + s_flow.set_packets(stats.packets); + s_flow.set_diff_bytes(diff_bytes); + s_flow.set_diff_packets(diff_pkts); + + // TODO: IPV6 + if (flow->key().family == Address::INET) { + s_flow.set_sourceip(flow->key().src_addr.to_v4().to_ulong()); + s_flow.set_destip(flow->key().dst_addr.to_v4().to_ulong()); + } else { + s_flow.set_sourceip(0); + s_flow.set_destip(0); + } + s_flow.set_protocol(flow->key().protocol); + s_flow.set_sport(flow->key().src_port); + s_flow.set_dport(flow->key().dst_port); + s_flow.set_sourcevn(flow->data().source_vn); + s_flow.set_destvn(flow->data().dest_vn); + + if (stats.intf_in != Interface::kInvalidIndex) { + Interface *intf = InterfaceTable::GetInstance()->FindInterface(stats.intf_in); + if (intf && intf->type() == Interface::VM_INTERFACE) { + VmInterface *vm_port = static_cast(intf); + const VmEntry *vm = vm_port->vm(); + if (vm) { + s_flow.set_vm(vm->GetCfgName()); + } + } + } + s_flow.set_sg_rule_uuid(flow->sg_rule_uuid()); + s_flow.set_nw_ace_uuid(flow->nw_ace_uuid()); + + FlowEntry *rev_flow = flow->reverse_flow_entry(); + if (rev_flow) { + s_flow.set_reverse_uuid(to_string(rev_flow->flow_uuid())); + } + + // Flow setup(first) and teardown(last) messages are sent with higher + // priority. + if (!info->stats_exported_) { + s_flow.set_setup_time(stats.setup_time); + // Set flow action + std::string action_str; + GetFlowSandeshActionParams(flow->match_p().action_info, action_str); + s_flow.set_action(action_str); + info->stats_exported_ = true; + level = SandeshLevel::SYS_ERR; + SetUnderlayInfo(flow, s_flow); + } else { + /* When the flow is being exported for first time, underlay port + * info is set as part of SetUnderlayInfo. At this point it is possible + * that port is not yet populated to flow-entry because of either + * (i) flow-entry has not got chance to be evaluated by + * flow-stats-collector + * (ii) there is no flow entry in vrouter yet + * (iii) the flow entry in vrouter does not have underlay source port + * populated yet + */ + if (!flow->underlay_sport_exported()) { + if (SetUnderlayPort(flow, s_flow)) { + level = SandeshLevel::SYS_ERR; + } + } + } + + if (stats.teardown_time) { + s_flow.set_teardown_time(stats.teardown_time); + //Teardown time will be set in flow only when flow is deleted. + //We need to reset the exported flag when flow is getting deleted to + //handle flow entry reuse case (Flow add request coming for flows + //marked as deleted) + info->stats_exported_ = false; + flow->set_underlay_sport_exported(false); + level = SandeshLevel::SYS_ERR; + } + + if (flow->is_flags_set(FlowEntry::LocalFlow)) { + /* For local flows we need to send two flow log messages. + * 1. With direction as ingress + * 2. With direction as egress + * For local flows we have already sent flow log above with + * direction as ingress. We are sending flow log below with + * direction as egress. + */ + s_flow.set_direction_ing(1); + SourceIpOverride(flow, s_flow); + DispatchFlowMsg(level, s_flow); + s_flow.set_direction_ing(0); + //Export local flow of egress direction with a different UUID even when + //the flow is same. Required for analytics module to query flows + //irrespective of direction. + s_flow.set_flowuuid(to_string(flow->egress_uuid())); + DispatchFlowMsg(level, s_flow); + } else { + if (flow->is_flags_set(FlowEntry::IngressDir)) { + s_flow.set_direction_ing(1); + SourceIpOverride(flow, s_flow); + } else { + s_flow.set_direction_ing(0); + } + DispatchFlowMsg(level, s_flow); + } +} + // Extract all the FlowMgmtKey for a flow void FlowMgmtManager::LogFlow(FlowEntry *flow, const std::string &op) { FlowInfo trace; diff --git a/src/vnsw/agent/pkt/flow_mgmt.h b/src/vnsw/agent/pkt/flow_mgmt.h index a67f214f4d9..6c6bfc687bc 100644 --- a/src/vnsw/agent/pkt/flow_mgmt.h +++ b/src/vnsw/agent/pkt/flow_mgmt.h @@ -7,6 +7,7 @@ #include "pkt/flow_table.h" #include "pkt/flow_mgmt_request.h" #include "pkt/flow_mgmt_response.h" +#include //////////////////////////////////////////////////////////////////////////// // Flow Management module is responsible to keep flow action in-sync with @@ -55,6 +56,7 @@ // * Add of DBEntry // * Change of DBEntry // * Delete of DBEntry +// * Export of Flow // // - Flow Management Response // Flow Management Tree module may generate response events as response to @@ -65,15 +67,18 @@ // // Workflow for flow manager module is given below, // 1. Flow Table module will enqueue message to Flow Management queue on -// add/delete/change of a flow -// 2. Flow Management module builds the following tracking information +// add/delete/change of a flow. On Flow delete event, Flow Table module will +// will also enqueue export of the flow. +// 2. Flow stats collection module will enqueue message to Flow Management +// queue on export of flow +// 3. Flow Management module builds the following tracking information // - Operational entry to list of dependent flows // - Flow entry to list of operational-entries it is dependent on -// 3. DBClient module registers to DBTables of interest and tracks changes to +// 4. DBClient module registers to DBTables of interest and tracks changes to // operational-db entries -// 4. Flow Table module will enqueue a message to Flow Management queue on +// 5. Flow Table module will enqueue a message to Flow Management queue on // add/delete/change of operational entries -// 5. The action in flow-management module for operational entry events will +// 6. The action in flow-management module for operational entry events will // depend on the operational entry type // // VN Add/Change : Revaluate flows for change in policy @@ -109,9 +114,9 @@ // // Flow reference // -------------- -// 1. All message between flow-management and flow-table module will hold -// object references. This will ensure ref-count for object dont drop till -// messages are processed. +// 1. All message between flow-management and flow-table/flow-stats module will +// hold object references. This will ensure ref-count for object dont drop +// till messages are processed. // 2. Every flow seen by flow-management module is stored in flow_tree_. Key // for the tree will FlowEntryPtr which holds reference for the flow entry // All other data structures will refer to flow pointer directly. @@ -936,8 +941,9 @@ class FlowMgmtManager { uint32_t count_; // Number of times tree modified bool ingress_; // Ingress flow? bool local_flow_; + bool stats_exported_; - FlowEntryInfo() { } + FlowEntryInfo() : stats_exported_(false) { } virtual ~FlowEntryInfo() { assert(tree_.size() == 0); } }; @@ -976,6 +982,7 @@ class FlowMgmtManager { Agent *agent() const { return agent_; } FlowTable *flow_table() const { return flow_table_; } void AddEvent(FlowEntry *low); + void ExportEvent(FlowEntry *flow, uint64_t diff_bytes, uint64_t diff_pkts); void DeleteEvent(FlowEntry *flow); void AddEvent(const DBEntry *entry, uint32_t gen_id); void ChangeEvent(const DBEntry *entry, uint32_t gen_id); @@ -1006,6 +1013,18 @@ class FlowMgmtManager { void DeleteFlowEntryInfo(FlowEntryPtr &flow); void MakeFlowMgmtKeyTree(FlowEntry *flow, FlowMgmtKeyTree *tree); void LogFlow(FlowEntry *flow, const std::string &op); + void ExportFlow(FlowEntryPtr &flow, uint64_t diff_bytes, uint64_t diff_pkts); + void DispatchFlowMsg(SandeshLevel::type level, FlowDataIpv4 &flow); + void GetFlowSandeshActionParams(const FlowAction &action_info, + std::string &action_str); + void SourceIpOverride(FlowEntry *flow, FlowDataIpv4 &s_flow); + void SetUnderlayInfo(FlowEntry *flow, FlowDataIpv4 &s_flow); + bool SetUnderlayPort(FlowEntry *flow, FlowDataIpv4 &s_flow); + std::string GetAceSandeshDataKey(const AclDBEntry *acl, int ace_id); + void SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, + int ace_id); + void SetAclFlowSandeshData(const AclDBEntry *acl, AclFlowResp &data, + const int last_count); Agent *agent_; FlowTable *flow_table_; diff --git a/src/vnsw/agent/pkt/flow_mgmt_request.h b/src/vnsw/agent/pkt/flow_mgmt_request.h index a6542e6b2d6..2dc5a477167 100644 --- a/src/vnsw/agent/pkt/flow_mgmt_request.h +++ b/src/vnsw/agent/pkt/flow_mgmt_request.h @@ -19,7 +19,8 @@ class FlowMgmtRequest { ADD_DBENTRY, CHANGE_DBENTRY, DELETE_DBENTRY, - RETRY_DELETE_VRF + RETRY_DELETE_VRF, + EXPORT_FLOW }; FlowMgmtRequest(Event event, FlowEntryPtr &flow) : @@ -28,6 +29,14 @@ class FlowMgmtRequest { assert(vrf_id_); } + FlowMgmtRequest(Event event, FlowEntryPtr &flow, uint64_t bytes, + uint64_t packets) : + event_(event), flow_(flow), db_entry_(NULL), vrf_id_(0), + diff_bytes_(bytes), diff_packets_(packets) { + if (event == RETRY_DELETE_VRF) + assert(vrf_id_); + } + FlowMgmtRequest(Event event, const DBEntry *db_entry, uint32_t gen_id) : event_(event), flow_(NULL), db_entry_(db_entry), vrf_id_(0), gen_id_(gen_id) { @@ -74,6 +83,8 @@ class FlowMgmtRequest { void set_db_entry(const DBEntry *db_entry) { db_entry_ = db_entry; } uint32_t vrf_id() const { return vrf_id_; } uint32_t gen_id() const { return gen_id_; } + uint64_t diff_bytes() const { return diff_bytes_; } + uint64_t diff_packets() const { return diff_packets_; } private: Event event_; @@ -84,6 +95,8 @@ class FlowMgmtRequest { const DBEntry *db_entry_; uint32_t vrf_id_; uint32_t gen_id_; + uint64_t diff_bytes_; + uint64_t diff_packets_; DISALLOW_COPY_AND_ASSIGN(FlowMgmtRequest); }; diff --git a/src/vnsw/agent/pkt/flow_table.cc b/src/vnsw/agent/pkt/flow_table.cc index 08949711bd8..28bf5b7b581 100644 --- a/src/vnsw/agent/pkt/flow_table.cc +++ b/src/vnsw/agent/pkt/flow_table.cc @@ -71,10 +71,6 @@ FlowTable::~FlowTable() { void FlowTable::Init() { FlowEntry::Init(); rand_gen_ = boost::uuids::random_generator(); - agent_->acl_table()->set_ace_flow_sandesh_data_cb - (boost::bind(&FlowTable::SetAceSandeshData, this, _1, _2, _3)); - agent_->acl_table()->set_acl_flow_sandesh_data_cb - (boost::bind(&FlowTable::SetAclFlowSandeshData, this, _1, _2, _3)); return; } @@ -859,33 +855,6 @@ void FlowTable::UpdateKSync(FlowEntry *flow) { } } -///////////////////////////////////////////////////////////////////////////// -// Introspect routines -///////////////////////////////////////////////////////////////////////////// -string FlowTable::GetAceSandeshDataKey(const AclDBEntry *acl, int ace_id) { - string uuid_str = UuidToString(acl->GetUuid()); - stringstream ss; - ss << uuid_str << ":"; - ss << ace_id; - return ss.str(); -} - -void FlowTable::DispatchFlowMsg(SandeshLevel::type level, FlowDataIpv4 &flow) { - FLOW_DATA_IPV4_OBJECT_LOG("", level, flow); -} - -void FlowTable::FlowExport(FlowEntry *flow, uint64_t diff_bytes, - uint64_t diff_pkts) { -} - -void FlowTable::SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, - int ace_id) { -} - -void FlowTable::SetAclFlowSandeshData(const AclDBEntry *acl, AclFlowResp &data, - const int last_count) { -} - void FlowTable::SendFlowInternal(FlowEntry *fe) { if (fe->deleted()) { /* Already deleted return from here. */ @@ -896,7 +865,8 @@ void FlowTable::SendFlowInternal(FlowEntry *fe) { fec->UpdateFlowStats(fe, diff_bytes, diff_packets); fe->stats_.teardown_time = UTCTimestampUsec(); - FlowExport(fe, diff_bytes, diff_packets); + agent_->pkt()->flow_mgmt_manager()->ExportEvent(fe, diff_bytes, + diff_packets); /* Reset stats and teardown_time after these information is exported during * flow delete so that if the flow entry is reused they point to right * values */ diff --git a/src/vnsw/agent/pkt/flow_table.h b/src/vnsw/agent/pkt/flow_table.h index 8b8856f8eaa..1a35f732b11 100644 --- a/src/vnsw/agent/pkt/flow_table.h +++ b/src/vnsw/agent/pkt/flow_table.h @@ -125,9 +125,6 @@ class FlowTable { void SetAceSandeshData(const AclDBEntry *acl, AclFlowCountResp &data, int ace_id); - void FlowExport(FlowEntry *flow, uint64_t diff_bytes, uint64_t diff_pkts); - virtual void DispatchFlowMsg(SandeshLevel::type level, FlowDataIpv4 &flow); - void RevaluateFlow(FlowEntry *flow); void DeleteMessage(FlowEntry *flow); @@ -217,7 +214,6 @@ struct VmFlowInfo { extern SandeshTraceBufferPtr FlowTraceBuf; extern void SetActionStr(const FlowAction &, std::vector &); -extern void GetFlowSandeshActionParams(const FlowAction &, std::string &); #define FLOW_TRACE(obj, ...)\ do {\ diff --git a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc index 0145d2d5f27..97dfe555e40 100644 --- a/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc +++ b/src/vnsw/agent/vrouter/flow_stats/flow_stats_collector.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -226,6 +227,7 @@ bool FlowStatsCollector::Run() { bool key_updation_reqd = true, deleted; uint64_t diff_bytes, diff_pkts; FlowTable *flow_obj = Agent::GetInstance()->pkt()->flow_table(); + FlowMgmtManager *mgr = agent_uve_->agent()->pkt()->flow_mgmt_manager(); run_counter_++; if (!flow_obj->Size()) { @@ -315,10 +317,10 @@ bool FlowStatsCollector::Run() { stats->bytes = bytes; stats->packets = packets; stats->last_modified_time = curr_time; - flow_obj->FlowExport(entry, diff_bytes, diff_pkts); - } else if (!stats->exported && !entry->deleted()) { + mgr->ExportEvent(entry, diff_bytes, diff_pkts); + } else if (!entry->deleted()) { /* export flow (reverse) for which traffic is not seen yet. */ - flow_obj->FlowExport(entry, 0, 0); + mgr->ExportEvent(entry, 0, 0); } }