Skip to content

Commit

Permalink
Merge "Introspect changes to read the flows for each task instance. T…
Browse files Browse the repository at this point in the history
…ask exculsions are added to get Concurrency between FlowTask Event PacketFlowResponder & main thread" into R3.0
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 24, 2016
2 parents 9e91a62 + 05245e8 commit fd68a8f
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 45 deletions.
4 changes: 3 additions & 1 deletion src/vnsw/agent/cmn/agent.cc
Expand Up @@ -144,7 +144,9 @@ void Agent::SetAgentTaskPolicy() {
sizeof(db_exclude_list) / sizeof(char *));

const char *flow_table_exclude_list[] = {
AGENT_SHUTDOWN_TASKNAME,
"Agent::PktFlowResponder",
"sandesh::RecvQueue",
AGENT_SHUTDOWN_TASKNAME,
AGENT_INIT_TASKNAME
};
SetTaskPolicyOne(kTaskFlowEvent, flow_table_exclude_list,
Expand Down
123 changes: 82 additions & 41 deletions src/vnsw/agent/pkt/pkt_sandesh_flow.cc
Expand Up @@ -95,7 +95,7 @@ using boost::system::error_code;
data.set_aging_port(fe->fsc()->flow_aging_key().port);\
}\

const std::string PktSandeshFlow::start_key = "0-0-0-0-0.0.0.0-0.0.0.0";
const std::string PktSandeshFlow::start_key = "0-0-0-0-0-0.0.0.0-0.0.0.0";

////////////////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -223,10 +223,11 @@ static void SetAclInfo(SandeshFlowData &data, FlowEntry *fe) {
////////////////////////////////////////////////////////////////////////////////

PktSandeshFlow::PktSandeshFlow(Agent *agent, FlowRecordsResp *obj,
std::string resp_ctx, std::string key) :
std::string resp_ctx, std::string key):
Task((TaskScheduler::GetInstance()->GetTaskId("Agent::PktFlowResponder")),
0), resp_obj_(obj), resp_data_(resp_ctx),
flow_iteration_key_(), key_valid_(false), delete_op_(false), agent_(agent) {
0), resp_obj_(obj), resp_data_(resp_ctx),
flow_iteration_key_(), key_valid_(false), delete_op_(false), agent_(agent),
partition_id_(0) {
if (key != agent_->NullString()) {
if (SetFlowKey(key)) {
key_valid_ = true;
Expand All @@ -250,8 +251,9 @@ void PktSandeshFlow::SendResponse(SandeshResponse *resp) {
resp->Response();
}

string PktSandeshFlow::GetFlowKey(const FlowKey &key) {
string PktSandeshFlow::GetFlowKey(const FlowKey &key, uint16_t partition_id) {
stringstream ss;
ss << partition_id << kDelimiter;
ss << key.nh << kDelimiter;
ss << key.src_port << kDelimiter;
ss << key.dst_port << kDelimiter;
Expand All @@ -264,12 +266,16 @@ string PktSandeshFlow::GetFlowKey(const FlowKey &key) {
bool PktSandeshFlow::SetFlowKey(string key) {
const char ch = kDelimiter;
size_t n = std::count(key.begin(), key.end(), ch);
if (n != 5) {
if (n != 6) {
return false;
}
stringstream ss(key);
string item, sip, dip;
uint32_t proto;

if (getline(ss, item, ch)) {
istringstream(item) >> partition_id_;
}
if (getline(ss, item, ch)) {
istringstream(item) >> flow_iteration_key_.nh;
}
Expand All @@ -288,7 +294,6 @@ bool PktSandeshFlow::SetFlowKey(string key) {
if (getline(ss, item, ch)) {
dip = item;
}

error_code ec;
flow_iteration_key_.src_addr = IpAddress::from_string(sip.c_str(), ec);
flow_iteration_key_.dst_addr = IpAddress::from_string(dip.c_str(), ec);
Expand All @@ -301,17 +306,26 @@ bool PktSandeshFlow::SetFlowKey(string key) {
return true;
}

// FIXME : Should handle multiple flow tables
bool PktSandeshFlow::Run() {
FlowTable::FlowEntryMap::iterator it;
std::vector<SandeshFlowData>& list =
const_cast<std::vector<SandeshFlowData>&>(resp_obj_->get_flow_list());
int count = 0;
bool flow_key_set = false;
FlowTable *flow_obj = agent_->pkt()->flow_table(0);

if (partition_id_ >= agent_->flow_thread_count()) {
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
}

FlowTable *flow_obj = agent_->pkt()->flow_table(partition_id_);

if (delete_op_) {
flow_obj->DeleteAll();
for (int i =0; i < agent_->flow_thread_count(); i++){
flow_obj = agent_->pkt()->flow_table(i);
flow_obj->DeleteAll();
}
SendResponse(resp_obj_);
return true;
}
Expand All @@ -323,25 +337,34 @@ bool PktSandeshFlow::Run() {
SendResponse(resp);
return true;
}
FlowStatsCollector *fec =
agent_->flow_stats_manager()->default_flow_stats_collector();

while (it != flow_obj->flow_entry_map_.end()) {
FlowEntry *fe = it->second;
FlowStatsCollector *fec =
agent_->flow_stats_manager()->GetFlowStatsCollector(fe->key());
const FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid());
SetSandeshFlowData(list, fe, info);
++it;
count++;
if (count == kMaxFlowResponse) {
if (it != flow_obj->flow_entry_map_.end()) {
resp_obj_->set_flow_key(GetFlowKey(fe->key()));
resp_obj_->set_flow_key(GetFlowKey(fe->key(), partition_id_));
flow_key_set = true;

}
break;
}
}

if (!flow_key_set) {
resp_obj_->set_flow_key(PktSandeshFlow::start_key);
if (++partition_id_ < agent_->flow_thread_count()) {
FlowKey key;
resp_obj_->set_flow_key(GetFlowKey(key, partition_id_));
} else {
resp_obj_->set_flow_key(PktSandeshFlow::start_key);
}
}

SendResponse(resp_obj_);
return true;
}
Expand Down Expand Up @@ -380,6 +403,8 @@ void DeleteAllFlowRecords::HandleRequest() const {
void FetchFlowRecord::HandleRequest() const {
FlowKey key;
Agent *agent = Agent::GetInstance();
FlowTable *flow_obj;

key.nh = get_nh();
error_code ec;
key.src_addr = IpAddress::from_string(get_sip(), ec);
Expand All @@ -394,22 +419,28 @@ void FetchFlowRecord::HandleRequest() const {
key.protocol = get_protocol();

FlowTable::FlowEntryMap::iterator it;
FlowTable *flow_obj = agent->pkt()->flow_table(0);
FlowStatsCollector *fec =
agent->flow_stats_manager()->default_flow_stats_collector();
it = flow_obj->flow_entry_map_.find(key);
for (int i = 0; i < agent->flow_thread_count(); i++) {
flow_obj = agent->pkt()->flow_table(i);
it = flow_obj->flow_entry_map_.find(key);
if (it != flow_obj->flow_entry_map_.end())
break;
}

SandeshResponse *resp;
if (it != flow_obj->flow_entry_map_.end()) {
FlowRecordResp *flow_resp = new FlowRecordResp();
FlowEntry *fe = it->second;
FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid());
SandeshFlowData data;
SET_SANDESH_FLOW_DATA(agent, data, fe, info);
flow_resp->set_record(data);
resp = flow_resp;
FlowRecordResp *flow_resp = new FlowRecordResp();
FlowEntry *fe = it->second;
FlowStatsCollector *fec =
agent->flow_stats_manager()->GetFlowStatsCollector(fe->key());
FlowExportInfo *info = fec->FindFlowExportInfo(fe->uuid());
SandeshFlowData data;
SET_SANDESH_FLOW_DATA(agent, data, fe, info);
flow_resp->set_record(data);
resp = flow_resp;
} else {
resp = new FlowErrorResp();
}

resp->set_context(context());
resp->set_more(false);
resp->Response();
Expand Down Expand Up @@ -476,7 +507,13 @@ bool PktSandeshFlowStats::Run() {
int count = 0;
bool flow_key_set = false;

FlowTable *flow_obj = agent_->pkt()->flow_table(0);
if (partition_id_ > agent_->flow_thread_count()) {
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
}

FlowTable *flow_obj = agent_->pkt()->flow_table(partition_id_);
FlowStatsManager *fm = agent_->flow_stats_manager();
const FlowStatsCollector *fsc = fm->Find(proto_, port_);
if (!fsc) {
Expand All @@ -486,14 +523,14 @@ bool PktSandeshFlowStats::Run() {
}

FlowTable::FlowEntryMap::iterator it;
if (key_valid_) {
if (key_valid_) {
it = flow_obj->flow_entry_map_.upper_bound(flow_iteration_key_);
} else {
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
FlowErrorResp *resp = new FlowErrorResp();
SendResponse(resp);
return true;
}

while (it != flow_obj->flow_entry_map_.end()) {
FlowEntry *fe = it->second;
const FlowExportInfo *info = fsc->FindFlowExportInfo(fe->uuid());
Expand All @@ -504,7 +541,7 @@ bool PktSandeshFlowStats::Run() {
if (it != flow_obj->flow_entry_map_.end()) {
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":"
<< GetFlowKey(fe->key());
<< GetFlowKey(fe->key(), partition_id_);
resp_->set_flow_key(ostr.str());
flow_key_set = true;
}
Expand All @@ -513,9 +550,17 @@ bool PktSandeshFlowStats::Run() {
}

if (!flow_key_set) {
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":" << "0x0";
resp_->set_flow_key(ostr.str());
if ( ++partition_id_ < agent_->flow_thread_count()) {
FlowKey key;
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":"
<< GetFlowKey(key, partition_id_);
resp_->set_flow_key(ostr.str());
} else {
ostringstream ostr;
ostr << proto_ << ":" << port_ << ":" << "0x0";
resp_->set_flow_key(ostr.str());
}
}
SendResponse(resp_);
return true;
Expand All @@ -534,19 +579,15 @@ bool PktSandeshFlowStats::SetProto(string &key) {
if (getline(ss, item, ':')) {
istringstream(item) >> port_;
}

long flow_ptr;
if (getline(ss, item)) {
istringstream(item) >> flow_ptr;
SetFlowKey(item);
}

flow_ptr_ = (FlowEntry *)(flow_ptr);
return true;
}

PktSandeshFlowStats::PktSandeshFlowStats(Agent *agent, FlowStatsCollectorRecordsResp *obj,
std::string resp_ctx, std::string key):
PktSandeshFlow(agent, NULL, resp_ctx, key), resp_(obj), flow_ptr_(NULL) {
PktSandeshFlow(agent, NULL, resp_ctx, key), resp_(obj) {
if (key != agent_->NullString()) {
if (SetProto(key)) {
key_valid_ = true;
Expand Down
5 changes: 2 additions & 3 deletions src/vnsw/agent/pkt/pkt_sandesh_flow.h
Expand Up @@ -15,14 +15,13 @@ class PktSandeshFlow : public Task {
static const int kMaxFlowResponse = 100;
static const char kDelimiter = '-';
static const std::string start_key;

PktSandeshFlow(Agent *agent, FlowRecordsResp *obj, std::string resp_ctx,
std::string key);
virtual ~PktSandeshFlow();

void SendResponse(SandeshResponse *resp);
bool SetFlowKey(std::string key);
static std::string GetFlowKey(const FlowKey &key);
static std::string GetFlowKey(const FlowKey &key, uint16_t partition_id);

virtual bool Run();
std::string Description() const { return "PktSandeshFlow"; }
Expand All @@ -37,6 +36,7 @@ class PktSandeshFlow : public Task {
bool key_valid_;
bool delete_op_;
Agent *agent_;
uint16_t partition_id_;

private:
DISALLOW_COPY_AND_ASSIGN(PktSandeshFlow);
Expand All @@ -54,7 +54,6 @@ class PktSandeshFlowStats : public PktSandeshFlow {
uint32_t proto_;
uint32_t port_;
FlowStatsCollectorRecordsResp *resp_;
FlowEntry *flow_ptr_;
DISALLOW_COPY_AND_ASSIGN(PktSandeshFlowStats);
};
#endif

0 comments on commit fd68a8f

Please sign in to comment.