Skip to content

Commit

Permalink
Packets received on pkt0 are handled in asio context. This code also
Browse files Browse the repository at this point in the history
accesses the oper tables and since DB task can run in parallel, there
are issues with this simultaneous access. Moving this to a different
task, which runs in exclusion to DB task

closes-bug: #1522973

Change-Id: Ib15a14090b06bc10f51a948b692ecc92fe00ea42
(cherry picked from commit ac1a0e8)
  • Loading branch information
jayaramsatya committed Feb 18, 2016
1 parent d2f2957 commit 01e2667
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/vnsw/agent/cmn/agent.cc
Expand Up @@ -114,6 +114,7 @@ void Agent::SetAgentTaskPolicy() {
"Agent::Uve",
"Agent::KSync",
"Agent::PktFlowResponder",
"Agent::PktHandler",
AGENT_INIT_TASKNAME
};
SetTaskPolicyOne("db::DBTable", db_exclude_list,
Expand Down
15 changes: 13 additions & 2 deletions src/vnsw/agent/pkt/flow_handler.cc
Expand Up @@ -26,7 +26,18 @@ bool FlowHandler::Run() {
PktFlowInfo info(pkt_info_, agent_->pkt()->flow_table());
std::auto_ptr<FlowTaskMsg> ipc;

if (pkt_info_->type == PktType::MESSAGE) {
if (pkt_info_->type == PktType::INVALID) {
// packet parsing is not done, invoke the same here
uint8_t *pkt = pkt_info_->packet_buffer()->data();
PktHandler::PktModuleName mod = agent_->pkt()->pkt_handler()->
ParseFlowPacket(pkt_info_, pkt);
// if packet wasnt for flow module, it would've got enqueued to the
// correct module in the above call. Nothing else to do.
if (mod != PktHandler::FLOW) {
return true;
}
info.SetPktInfo(pkt_info_);
} else if (pkt_info_->type == PktType::MESSAGE) {
ipc = std::auto_ptr<FlowTaskMsg>(static_cast<FlowTaskMsg *>(pkt_info_->ipc));
pkt_info_->ipc = NULL;
FlowEntry *fe = ipc->fe_ptr.get();
Expand Down Expand Up @@ -54,7 +65,7 @@ bool FlowHandler::Run() {
pkt_info_->vrf = fe->data().vrf;
pkt_info_->l3_forwarding = fe->l3_flow();
info.l3_flow = fe->l3_flow();
}
}

if (info.Process(pkt_info_.get(), &in, &out) == false) {
info.short_flow = true;
Expand Down
5 changes: 5 additions & 0 deletions src/vnsw/agent/pkt/pkt_flow_info.cc
Expand Up @@ -1638,3 +1638,8 @@ void PktFlowInfo::UpdateEvictedFlowStats(const PktInfo *pkt) {
pkt->agent_hdr.cmd_param_3, pkt->agent_hdr.cmd_param_4, flow_table);
}
}
void PktFlowInfo::SetPktInfo(boost::shared_ptr<PktInfo> pkt_info) {
l3_flow = pkt_info->l3_forwarding;
family = pkt_info->family;
pkt = pkt_info;
}
1 change: 1 addition & 0 deletions src/vnsw/agent/pkt/pkt_flow_info.h
Expand Up @@ -97,6 +97,7 @@ class PktFlowInfo {
FlowRouteRefMap &ref_map);
uint8_t RouteToPrefixLen(const AgentRoute *route);
void CalculatePort(const PktInfo *p, const Interface *intf);
void SetPktInfo(boost::shared_ptr<PktInfo> info);
bool l3_flow;
Address::Family family;
boost::shared_ptr<PktInfo> pkt;
Expand Down
65 changes: 60 additions & 5 deletions src/vnsw/agent/pkt/pkt_handler.cc
Expand Up @@ -41,7 +41,9 @@ const std::size_t PktTrace::kPktMaxTraceSize;
////////////////////////////////////////////////////////////////////////////////

PktHandler::PktHandler(Agent *agent, PktModule *pkt_module) :
stats_(), agent_(agent), pkt_module_(pkt_module) {
stats_(), agent_(agent), pkt_module_(pkt_module),
work_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::PktHandler"), 0,
boost::bind(&PktHandler::ProcessPacket, this, _1)) {
for (int i = 0; i < MAX_MODULES; ++i) {
if (i == PktHandler::DHCP || i == PktHandler::DHCPV6 ||
i == PktHandler::DNS)
Expand All @@ -52,6 +54,7 @@ PktHandler::PktHandler(Agent *agent, PktModule *pkt_module) :
}

PktHandler::~PktHandler() {
work_queue_.Shutdown();
}

void PktHandler::Register(PktModuleName type, RcvQueueFunc cb) {
Expand Down Expand Up @@ -189,23 +192,57 @@ PktHandler::PktModuleName PktHandler::ParsePacket(const AgentHdr &hdr,
}

void PktHandler::HandleRcvPkt(const AgentHdr &hdr, const PacketBufferPtr &buff){
// Enqueue Flow packets directly to the flow module and avoid additional
// work queue hop.
if (IsFlowPacket(hdr)) {
boost::shared_ptr<PktInfo> pkt_info (new PktInfo(buff, hdr));
// uint8_t *pkt = buff->data();
// ParsePacket(hdr, pkt_info.get(), pkt);
PktModuleEnqueue(FLOW, hdr, pkt_info);
return;
}

// Other packets are enqueued to a workqueue to decouple from ASIO and
// run in exclusion with DB.
boost::shared_ptr<PacketBufferEnqueueItem>
info(new PacketBufferEnqueueItem(hdr, buff));
work_queue_.Enqueue(info);
}

bool PktHandler::ProcessPacket(boost::shared_ptr<PacketBufferEnqueueItem> item) {
const AgentHdr &hdr = item->hdr;
const PacketBufferPtr &buff = item->buff;
boost::shared_ptr<PktInfo> pkt_info (new PktInfo(buff));
uint8_t *pkt = buff->data();

PktModuleName mod = ParsePacket(hdr, pkt_info.get(), pkt);
PktModuleEnqueue(mod, hdr, pkt_info);
return true;
}

void PktHandler::PktModuleEnqueue(PktModuleName mod, const AgentHdr &hdr,
boost::shared_ptr<PktInfo> pkt_info) {
pkt_info->packet_buffer()->set_module(mod);
stats_.PktRcvd(mod);
pkt_trace_.at(mod).AddPktTrace(PktTrace::In, pkt_info->len, pkt_info->pkt,
&pkt_info->agent_hdr);
pkt_trace_.at(mod).AddPktTrace(PktTrace::In, pkt_info->len,
pkt_info->pkt, &hdr);
if (mod == INVALID) {
agent_->stats()->incr_pkt_dropped();
return;
}

if (!(enqueue_cb_.at(mod))(pkt_info)) {
stats_.PktQThresholdExceeded(mod);
}
return;
}

PktHandler::PktModuleName PktHandler::ParseFlowPacket(
boost::shared_ptr<PktInfo> pkt_info, uint8_t *pkt) {
PktModuleName mod = ParsePacket(pkt_info->agent_hdr, pkt_info.get(), pkt);
// In case it is not a flow packet, enqueue it back to the right module
if (mod != FLOW) {
PktModuleEnqueue(mod, pkt_info->agent_hdr, pkt_info);
}
return mod;
}

// Compute L2/L3 forwarding mode for pacekt.
Expand Down Expand Up @@ -764,6 +801,15 @@ bool PktHandler::IsManagedTORPacket(Interface *intf, PktInfo *pkt_info,
return true;
}

bool PktHandler::IsFlowPacket(const AgentHdr &agent_hdr) {
if (agent_hdr.cmd == AgentHdr::TRAP_FLOW_MISS ||
agent_hdr.cmd == AgentHdr::TRAP_ECMP_RESOLVE ||
agent_hdr.cmd == AgentHdr::TRAP_FLOW_ACTION_HOLD) {
return true;
}
return false;
}

bool PktHandler::IsDiagPacket(PktInfo *pkt_info) {
if (pkt_info->agent_hdr.cmd == AgentHdr::TRAP_ZERO_TTL ||
pkt_info->agent_hdr.cmd == AgentHdr::TRAP_ICMP_ERROR)
Expand Down Expand Up @@ -862,6 +908,15 @@ PktInfo::PktInfo(const PacketBufferPtr &buff) :
transp.tcp = 0;
}

PktInfo::PktInfo(const PacketBufferPtr &buff, const AgentHdr &hdr) :
pkt(buff->data()), len(buff->data_len()), max_pkt_len(buff->buffer_len()),
data(), ipc(), family(Address::UNSPEC), type(PktType::INVALID),
agent_hdr(hdr), ether_type(-1), ip_saddr(), ip_daddr(), ip_proto(), sport(),
dport(), tcp_ack(false), tunnel(), l3_forwarding(false), l3_label(false),
eth(), arp(), ip(), ip6(), packet_buffer_(buff) {
transp.tcp = 0;
}

PktInfo::PktInfo(Agent *agent, uint32_t buff_len, uint32_t module,
uint32_t mdata) :
len(), max_pkt_len(), data(), ipc(), family(Address::UNSPEC),
Expand Down
17 changes: 17 additions & 0 deletions src/vnsw/agent/pkt/pkt_handler.h
Expand Up @@ -118,6 +118,7 @@ struct AgentHdr {
TRAP_TOR_CONTROL_PKT = AGENT_TRAP_TOR_CONTROL_PKT,
TRAP_ZERO_TTL = AGENT_TRAP_ZERO_TTL,
TRAP_ICMP_ERROR = AGENT_TRAP_ICMP_ERROR,
TRAP_FLOW_ACTION_HOLD = AGENT_TRAP_FLOW_ACTION_HOLD,
INVALID = MAX_AGENT_HDR_COMMANDS
};

Expand Down Expand Up @@ -219,6 +220,7 @@ struct PktInfo {

PktInfo(Agent *agent, uint32_t buff_len, uint32_t module, uint32_t mdata);
PktInfo(const PacketBufferPtr &buff);
PktInfo(const PacketBufferPtr &buff, const AgentHdr &hdr);
PktInfo(InterTaskMsg *msg);
virtual ~PktInfo();

Expand Down Expand Up @@ -276,6 +278,14 @@ class PktHandler {
void PktQThresholdExceeded(PktModuleName mod);
};

struct PacketBufferEnqueueItem {
const AgentHdr hdr;
const PacketBufferPtr buff;

PacketBufferEnqueueItem(const AgentHdr &h, const PacketBufferPtr &b)
: hdr(h), buff(b) {}
};

PktHandler(Agent *, PktModule *pkt_module);
virtual ~PktHandler();

Expand All @@ -285,9 +295,12 @@ class PktHandler {

PktModuleName ParsePacket(const AgentHdr &hdr, PktInfo *pkt_info,
uint8_t *pkt);
PktModuleName ParseFlowPacket(boost::shared_ptr<PktInfo> pkt_info,
uint8_t *pkt);
int ParseUserPkt(PktInfo *pkt_info, Interface *intf,
PktType::Type &pkt_type, uint8_t *pkt);
// identify pkt type and send to the registered handler
bool ProcessPacket(boost::shared_ptr<PacketBufferEnqueueItem> item);
void HandleRcvPkt(const AgentHdr &hdr, const PacketBufferPtr &buff);
void SendMessage(PktModuleName mod, InterTaskMsg *msg);

Expand All @@ -312,6 +325,8 @@ class PktHandler {
PktModule *pkt_module() const { return pkt_module_; }

private:
void PktModuleEnqueue(PktModuleName mod, const AgentHdr &hdr,
boost::shared_ptr<PktInfo> pkt_info);
int ParseEthernetHeader(PktInfo *pkt_info, uint8_t *pkt);
int ParseMplsHdr(PktInfo *pkt_info, uint8_t *pkt);
int ParseIpPacket(PktInfo *pkt_info, PktType::Type &pkt_type,
Expand All @@ -331,6 +346,7 @@ class PktHandler {
bool IsToRDevice(uint32_t vrf_id, const IpAddress &ip);
bool IsManagedTORPacket(Interface *intf, PktInfo *pkt_info,
PktType::Type &pkt_type, uint8_t *pkt);
bool IsFlowPacket(const AgentHdr &agent_hdr);
bool IsDiagPacket(PktInfo *pkt_info);

// handlers for each module type
Expand All @@ -342,6 +358,7 @@ class PktHandler {

Agent *agent_;
PktModule *pkt_module_;
WorkQueue<boost::shared_ptr<PacketBufferEnqueueItem> > work_queue_;

DISALLOW_COPY_AND_ASSIGN(PktHandler);
};
Expand Down
2 changes: 2 additions & 0 deletions src/vnsw/agent/pkt/vrouter_interface.h
Expand Up @@ -43,6 +43,8 @@ class VrouterControlInterface : public ControlInterface {
vr_cmd_list_[AGENT_TRAP_HANDLE_DF] = AgentHdr::TRAP_HANDLE_DF;
vr_cmd_list_[AGENT_TRAP_ZERO_TTL] = AgentHdr::TRAP_ZERO_TTL;
vr_cmd_list_[AGENT_TRAP_ICMP_ERROR] = AgentHdr::TRAP_ICMP_ERROR;
vr_cmd_list_[AGENT_TRAP_TOR_CONTROL_PKT] = AgentHdr::TRAP_TOR_CONTROL_PKT;
vr_cmd_list_[AGENT_TRAP_FLOW_ACTION_HOLD] = AgentHdr::TRAP_FLOW_ACTION_HOLD;

// Init and populate vector for translating command params from vrouter
// to agent
Expand Down

0 comments on commit 01e2667

Please sign in to comment.