diff --git a/src/vnsw/agent/cmn/agent.cc b/src/vnsw/agent/cmn/agent.cc index a260ac98199..671dfcc36b8 100644 --- a/src/vnsw/agent/cmn/agent.cc +++ b/src/vnsw/agent/cmn/agent.cc @@ -144,7 +144,9 @@ void Agent::SetAgentTaskPolicy() { sizeof(db_exclude_list) / sizeof(char *)); const char *flow_table_exclude_list[] = { - AGENT_SHUTDOWN_TASKNAME, + "Agent::PktFlowResponder", + "sandesh::RecvQueue", + AGENT_SHUTDOWN_TASKNAME, AGENT_INIT_TASKNAME }; SetTaskPolicyOne(kTaskFlowEvent, flow_table_exclude_list, diff --git a/src/vnsw/agent/pkt/pkt_sandesh_flow.cc b/src/vnsw/agent/pkt/pkt_sandesh_flow.cc index ccddcd9d59b..ea64736203c 100644 --- a/src/vnsw/agent/pkt/pkt_sandesh_flow.cc +++ b/src/vnsw/agent/pkt/pkt_sandesh_flow.cc @@ -95,7 +95,7 @@ using boost::system::error_code; data.set_aging_port(fe->fsc()->flow_aging_key().port);\ }\ -const std::string PktSandeshFlow::start_key = "0-0-0-0-0.0.0.0-0.0.0.0"; +const std::string PktSandeshFlow::start_key = "0-0-0-0-0-0.0.0.0-0.0.0.0"; //////////////////////////////////////////////////////////////////////////////// @@ -223,10 +223,11 @@ static void SetAclInfo(SandeshFlowData &data, FlowEntry *fe) { //////////////////////////////////////////////////////////////////////////////// PktSandeshFlow::PktSandeshFlow(Agent *agent, FlowRecordsResp *obj, - std::string resp_ctx, std::string key) : + std::string resp_ctx, std::string key): Task((TaskScheduler::GetInstance()->GetTaskId("Agent::PktFlowResponder")), - 0), resp_obj_(obj), resp_data_(resp_ctx), - flow_iteration_key_(), key_valid_(false), delete_op_(false), agent_(agent) { + 0), resp_obj_(obj), resp_data_(resp_ctx), + flow_iteration_key_(), key_valid_(false), delete_op_(false), agent_(agent), + partition_id_(0) { if (key != agent_->NullString()) { if (SetFlowKey(key)) { key_valid_ = true; @@ -250,8 +251,9 @@ void PktSandeshFlow::SendResponse(SandeshResponse *resp) { resp->Response(); } -string PktSandeshFlow::GetFlowKey(const FlowKey &key) { +string PktSandeshFlow::GetFlowKey(const FlowKey &key, uint16_t partition_id) { stringstream ss; + ss << partition_id << kDelimiter; ss << key.nh << kDelimiter; ss << key.src_port << kDelimiter; ss << key.dst_port << kDelimiter; @@ -264,12 +266,16 @@ string PktSandeshFlow::GetFlowKey(const FlowKey &key) { bool PktSandeshFlow::SetFlowKey(string key) { const char ch = kDelimiter; size_t n = std::count(key.begin(), key.end(), ch); - if (n != 5) { + if (n != 6) { return false; } stringstream ss(key); string item, sip, dip; uint32_t proto; + + if (getline(ss, item, ch)) { + istringstream(item) >> partition_id_; + } if (getline(ss, item, ch)) { istringstream(item) >> flow_iteration_key_.nh; } @@ -288,7 +294,6 @@ bool PktSandeshFlow::SetFlowKey(string key) { if (getline(ss, item, ch)) { dip = item; } - error_code ec; flow_iteration_key_.src_addr = IpAddress::from_string(sip.c_str(), ec); flow_iteration_key_.dst_addr = IpAddress::from_string(dip.c_str(), ec); @@ -301,17 +306,26 @@ bool PktSandeshFlow::SetFlowKey(string key) { return true; } -// FIXME : Should handle multiple flow tables bool PktSandeshFlow::Run() { FlowTable::FlowEntryMap::iterator it; std::vector& list = const_cast&>(resp_obj_->get_flow_list()); int count = 0; bool flow_key_set = false; - FlowTable *flow_obj = agent_->pkt()->flow_table(0); + + if (partition_id_ >= agent_->flow_thread_count()) { + FlowErrorResp *resp = new FlowErrorResp(); + SendResponse(resp); + return true; + } + + FlowTable *flow_obj = agent_->pkt()->flow_table(partition_id_); if (delete_op_) { - flow_obj->DeleteAll(); + for (int i =0; i < agent_->flow_thread_count(); i++){ + flow_obj = agent_->pkt()->flow_table(i); + flow_obj->DeleteAll(); + } SendResponse(resp_obj_); return true; } @@ -323,25 +337,34 @@ bool PktSandeshFlow::Run() { SendResponse(resp); return true; } - FlowStatsCollector *fec = - agent_->flow_stats_manager()->default_flow_stats_collector(); + while (it != flow_obj->flow_entry_map_.end()) { FlowEntry *fe = it->second; + FlowStatsCollector *fec = + agent_->flow_stats_manager()->GetFlowStatsCollector(fe->key()); const FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid()); SetSandeshFlowData(list, fe, info); ++it; count++; if (count == kMaxFlowResponse) { if (it != flow_obj->flow_entry_map_.end()) { - resp_obj_->set_flow_key(GetFlowKey(fe->key())); + resp_obj_->set_flow_key(GetFlowKey(fe->key(), partition_id_)); flow_key_set = true; + } break; } } + if (!flow_key_set) { - resp_obj_->set_flow_key(PktSandeshFlow::start_key); + if (++partition_id_ < agent_->flow_thread_count()) { + FlowKey key; + resp_obj_->set_flow_key(GetFlowKey(key, partition_id_)); + } else { + resp_obj_->set_flow_key(PktSandeshFlow::start_key); + } } + SendResponse(resp_obj_); return true; } @@ -380,6 +403,8 @@ void DeleteAllFlowRecords::HandleRequest() const { void FetchFlowRecord::HandleRequest() const { FlowKey key; Agent *agent = Agent::GetInstance(); + FlowTable *flow_obj; + key.nh = get_nh(); error_code ec; key.src_addr = IpAddress::from_string(get_sip(), ec); @@ -394,22 +419,28 @@ void FetchFlowRecord::HandleRequest() const { key.protocol = get_protocol(); FlowTable::FlowEntryMap::iterator it; - FlowTable *flow_obj = agent->pkt()->flow_table(0); - FlowStatsCollector *fec = - agent->flow_stats_manager()->default_flow_stats_collector(); - it = flow_obj->flow_entry_map_.find(key); + for (int i = 0; i < agent->flow_thread_count(); i++) { + flow_obj = agent->pkt()->flow_table(i); + it = flow_obj->flow_entry_map_.find(key); + if (it != flow_obj->flow_entry_map_.end()) + break; + } + SandeshResponse *resp; if (it != flow_obj->flow_entry_map_.end()) { - FlowRecordResp *flow_resp = new FlowRecordResp(); - FlowEntry *fe = it->second; - FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid()); - SandeshFlowData data; - SET_SANDESH_FLOW_DATA(agent, data, fe, info); - flow_resp->set_record(data); - resp = flow_resp; + FlowRecordResp *flow_resp = new FlowRecordResp(); + FlowEntry *fe = it->second; + FlowStatsCollector *fec = + agent->flow_stats_manager()->GetFlowStatsCollector(fe->key()); + FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid()); + SandeshFlowData data; + SET_SANDESH_FLOW_DATA(agent, data, fe, info); + flow_resp->set_record(data); + resp = flow_resp; } else { resp = new FlowErrorResp(); } + resp->set_context(context()); resp->set_more(false); resp->Response(); @@ -476,7 +507,13 @@ bool PktSandeshFlowStats::Run() { int count = 0; bool flow_key_set = false; - FlowTable *flow_obj = agent_->pkt()->flow_table(0); + if (partition_id_ > agent_->flow_thread_count()) { + FlowErrorResp *resp = new FlowErrorResp(); + SendResponse(resp); + return true; + } + + FlowTable *flow_obj = agent_->pkt()->flow_table(partition_id_); FlowStatsManager *fm = agent_->flow_stats_manager(); const FlowStatsCollector *fsc = fm->Find(proto_, port_); if (!fsc) { @@ -486,14 +523,14 @@ bool PktSandeshFlowStats::Run() { } FlowTable::FlowEntryMap::iterator it; - if (key_valid_) { + if (key_valid_) { it = flow_obj->flow_entry_map_.upper_bound(flow_iteration_key_); } else { - FlowErrorResp *resp = new FlowErrorResp(); - SendResponse(resp); - return true; + FlowErrorResp *resp = new FlowErrorResp(); + SendResponse(resp); + return true; } - + while (it != flow_obj->flow_entry_map_.end()) { FlowEntry *fe = it->second; const FlowExportInfo *info = fsc->FindFlowExportInfo(fe->uuid()); @@ -504,7 +541,7 @@ bool PktSandeshFlowStats::Run() { if (it != flow_obj->flow_entry_map_.end()) { ostringstream ostr; ostr << proto_ << ":" << port_ << ":" - << GetFlowKey(fe->key()); + << GetFlowKey(fe->key(), partition_id_); resp_->set_flow_key(ostr.str()); flow_key_set = true; } @@ -513,9 +550,17 @@ bool PktSandeshFlowStats::Run() { } if (!flow_key_set) { - ostringstream ostr; - ostr << proto_ << ":" << port_ << ":" << "0x0"; - resp_->set_flow_key(ostr.str()); + if ( ++partition_id_ < agent_->flow_thread_count()) { + FlowKey key; + ostringstream ostr; + ostr << proto_ << ":" << port_ << ":" + << GetFlowKey(key, partition_id_); + resp_->set_flow_key(ostr.str()); + } else { + ostringstream ostr; + ostr << proto_ << ":" << port_ << ":" << "0x0"; + resp_->set_flow_key(ostr.str()); + } } SendResponse(resp_); return true; @@ -534,19 +579,15 @@ bool PktSandeshFlowStats::SetProto(string &key) { if (getline(ss, item, ':')) { istringstream(item) >> port_; } - - long flow_ptr; if (getline(ss, item)) { - istringstream(item) >> flow_ptr; + SetFlowKey(item); } - - flow_ptr_ = (FlowEntry *)(flow_ptr); return true; } PktSandeshFlowStats::PktSandeshFlowStats(Agent *agent, FlowStatsCollectorRecordsResp *obj, std::string resp_ctx, std::string key): - PktSandeshFlow(agent, NULL, resp_ctx, key), resp_(obj), flow_ptr_(NULL) { + PktSandeshFlow(agent, NULL, resp_ctx, key), resp_(obj) { if (key != agent_->NullString()) { if (SetProto(key)) { key_valid_ = true; diff --git a/src/vnsw/agent/pkt/pkt_sandesh_flow.h b/src/vnsw/agent/pkt/pkt_sandesh_flow.h index 33377eab9d6..2680e2841ad 100644 --- a/src/vnsw/agent/pkt/pkt_sandesh_flow.h +++ b/src/vnsw/agent/pkt/pkt_sandesh_flow.h @@ -15,14 +15,13 @@ class PktSandeshFlow : public Task { static const int kMaxFlowResponse = 100; static const char kDelimiter = '-'; static const std::string start_key; - PktSandeshFlow(Agent *agent, FlowRecordsResp *obj, std::string resp_ctx, std::string key); virtual ~PktSandeshFlow(); void SendResponse(SandeshResponse *resp); bool SetFlowKey(std::string key); - static std::string GetFlowKey(const FlowKey &key); + static std::string GetFlowKey(const FlowKey &key, uint16_t partition_id); virtual bool Run(); std::string Description() const { return "PktSandeshFlow"; } @@ -37,6 +36,7 @@ class PktSandeshFlow : public Task { bool key_valid_; bool delete_op_; Agent *agent_; + uint16_t partition_id_; private: DISALLOW_COPY_AND_ASSIGN(PktSandeshFlow); @@ -54,7 +54,6 @@ class PktSandeshFlowStats : public PktSandeshFlow { uint32_t proto_; uint32_t port_; FlowStatsCollectorRecordsResp *resp_; - FlowEntry *flow_ptr_; DISALLOW_COPY_AND_ASSIGN(PktSandeshFlowStats); }; #endif