diff --git a/src/vnsw/agent/cmn/agent.cc b/src/vnsw/agent/cmn/agent.cc index a3d609e705f..e1135a269c5 100644 --- a/src/vnsw/agent/cmn/agent.cc +++ b/src/vnsw/agent/cmn/agent.cc @@ -114,6 +114,7 @@ void Agent::SetAgentTaskPolicy() { "Agent::Uve", "Agent::KSync", "Agent::PktFlowResponder", + "Agent::PktHandler", AGENT_INIT_TASKNAME }; SetTaskPolicyOne("db::DBTable", db_exclude_list, diff --git a/src/vnsw/agent/pkt/flow_handler.cc b/src/vnsw/agent/pkt/flow_handler.cc index 95b2a60a705..60b4ed287c7 100644 --- a/src/vnsw/agent/pkt/flow_handler.cc +++ b/src/vnsw/agent/pkt/flow_handler.cc @@ -26,7 +26,18 @@ bool FlowHandler::Run() { PktFlowInfo info(pkt_info_, agent_->pkt()->flow_table()); std::auto_ptr ipc; - if (pkt_info_->type == PktType::MESSAGE) { + if (pkt_info_->type == PktType::INVALID) { + // packet parsing is not done, invoke the same here + uint8_t *pkt = pkt_info_->packet_buffer()->data(); + PktHandler::PktModuleName mod = agent_->pkt()->pkt_handler()-> + ParseFlowPacket(pkt_info_, pkt); + // if packet wasnt for flow module, it would've got enqueued to the + // correct module in the above call. Nothing else to do. + if (mod != PktHandler::FLOW) { + return true; + } + info.SetPktInfo(pkt_info_); + } else if (pkt_info_->type == PktType::MESSAGE) { ipc = std::auto_ptr(static_cast(pkt_info_->ipc)); pkt_info_->ipc = NULL; FlowEntry *fe = ipc->fe_ptr.get(); @@ -54,7 +65,7 @@ bool FlowHandler::Run() { pkt_info_->vrf = fe->data().vrf; pkt_info_->l3_forwarding = fe->l3_flow(); info.l3_flow = fe->l3_flow(); - } + } if (info.Process(pkt_info_.get(), &in, &out) == false) { info.short_flow = true; diff --git a/src/vnsw/agent/pkt/pkt_flow_info.cc b/src/vnsw/agent/pkt/pkt_flow_info.cc index 60582042022..4bcca4854f1 100644 --- a/src/vnsw/agent/pkt/pkt_flow_info.cc +++ b/src/vnsw/agent/pkt/pkt_flow_info.cc @@ -1638,3 +1638,8 @@ void PktFlowInfo::UpdateEvictedFlowStats(const PktInfo *pkt) { pkt->agent_hdr.cmd_param_3, pkt->agent_hdr.cmd_param_4, flow_table); } } +void PktFlowInfo::SetPktInfo(boost::shared_ptr pkt_info) { + l3_flow = pkt_info->l3_forwarding; + family = pkt_info->family; + pkt = pkt_info; +} diff --git a/src/vnsw/agent/pkt/pkt_flow_info.h b/src/vnsw/agent/pkt/pkt_flow_info.h index 80ede344a9d..7976bd423a0 100644 --- a/src/vnsw/agent/pkt/pkt_flow_info.h +++ b/src/vnsw/agent/pkt/pkt_flow_info.h @@ -97,6 +97,7 @@ class PktFlowInfo { FlowRouteRefMap &ref_map); uint8_t RouteToPrefixLen(const AgentRoute *route); void CalculatePort(const PktInfo *p, const Interface *intf); + void SetPktInfo(boost::shared_ptr info); bool l3_flow; Address::Family family; boost::shared_ptr pkt; diff --git a/src/vnsw/agent/pkt/pkt_handler.cc b/src/vnsw/agent/pkt/pkt_handler.cc index a7ae900eba9..3a375309d56 100644 --- a/src/vnsw/agent/pkt/pkt_handler.cc +++ b/src/vnsw/agent/pkt/pkt_handler.cc @@ -41,7 +41,9 @@ const std::size_t PktTrace::kPktMaxTraceSize; //////////////////////////////////////////////////////////////////////////////// PktHandler::PktHandler(Agent *agent, PktModule *pkt_module) : - stats_(), agent_(agent), pkt_module_(pkt_module) { + stats_(), agent_(agent), pkt_module_(pkt_module), + work_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::PktHandler"), 0, + boost::bind(&PktHandler::ProcessPacket, this, _1)) { for (int i = 0; i < MAX_MODULES; ++i) { if (i == PktHandler::DHCP || i == PktHandler::DHCPV6 || i == PktHandler::DNS) @@ -52,6 +54,7 @@ PktHandler::PktHandler(Agent *agent, PktModule *pkt_module) : } PktHandler::~PktHandler() { + work_queue_.Shutdown(); } void PktHandler::Register(PktModuleName type, RcvQueueFunc cb) { @@ -189,23 +192,57 @@ PktHandler::PktModuleName PktHandler::ParsePacket(const AgentHdr &hdr, } void PktHandler::HandleRcvPkt(const AgentHdr &hdr, const PacketBufferPtr &buff){ + // Enqueue Flow packets directly to the flow module and avoid additional + // work queue hop. + if (IsFlowPacket(hdr)) { + boost::shared_ptr pkt_info (new PktInfo(buff, hdr)); + // uint8_t *pkt = buff->data(); + // ParsePacket(hdr, pkt_info.get(), pkt); + PktModuleEnqueue(FLOW, hdr, pkt_info); + return; + } + + // Other packets are enqueued to a workqueue to decouple from ASIO and + // run in exclusion with DB. + boost::shared_ptr + info(new PacketBufferEnqueueItem(hdr, buff)); + work_queue_.Enqueue(info); +} + +bool PktHandler::ProcessPacket(boost::shared_ptr item) { + const AgentHdr &hdr = item->hdr; + const PacketBufferPtr &buff = item->buff; boost::shared_ptr pkt_info (new PktInfo(buff)); uint8_t *pkt = buff->data(); PktModuleName mod = ParsePacket(hdr, pkt_info.get(), pkt); + PktModuleEnqueue(mod, hdr, pkt_info); + return true; +} + +void PktHandler::PktModuleEnqueue(PktModuleName mod, const AgentHdr &hdr, + boost::shared_ptr pkt_info) { pkt_info->packet_buffer()->set_module(mod); stats_.PktRcvd(mod); - pkt_trace_.at(mod).AddPktTrace(PktTrace::In, pkt_info->len, pkt_info->pkt, - &pkt_info->agent_hdr); + pkt_trace_.at(mod).AddPktTrace(PktTrace::In, pkt_info->len, + pkt_info->pkt, &hdr); if (mod == INVALID) { agent_->stats()->incr_pkt_dropped(); return; } - if (!(enqueue_cb_.at(mod))(pkt_info)) { stats_.PktQThresholdExceeded(mod); } - return; +} + +PktHandler::PktModuleName PktHandler::ParseFlowPacket( + boost::shared_ptr pkt_info, uint8_t *pkt) { + PktModuleName mod = ParsePacket(pkt_info->agent_hdr, pkt_info.get(), pkt); + // In case it is not a flow packet, enqueue it back to the right module + if (mod != FLOW) { + PktModuleEnqueue(mod, pkt_info->agent_hdr, pkt_info); + } + return mod; } // Compute L2/L3 forwarding mode for pacekt. @@ -764,6 +801,15 @@ bool PktHandler::IsManagedTORPacket(Interface *intf, PktInfo *pkt_info, return true; } +bool PktHandler::IsFlowPacket(const AgentHdr &agent_hdr) { + if (agent_hdr.cmd == AgentHdr::TRAP_FLOW_MISS || + agent_hdr.cmd == AgentHdr::TRAP_ECMP_RESOLVE || + agent_hdr.cmd == AgentHdr::TRAP_FLOW_ACTION_HOLD) { + return true; + } + return false; +} + bool PktHandler::IsDiagPacket(PktInfo *pkt_info) { if (pkt_info->agent_hdr.cmd == AgentHdr::TRAP_ZERO_TTL || pkt_info->agent_hdr.cmd == AgentHdr::TRAP_ICMP_ERROR) @@ -862,6 +908,15 @@ PktInfo::PktInfo(const PacketBufferPtr &buff) : transp.tcp = 0; } +PktInfo::PktInfo(const PacketBufferPtr &buff, const AgentHdr &hdr) : + pkt(buff->data()), len(buff->data_len()), max_pkt_len(buff->buffer_len()), + data(), ipc(), family(Address::UNSPEC), type(PktType::INVALID), + agent_hdr(hdr), ether_type(-1), ip_saddr(), ip_daddr(), ip_proto(), sport(), + dport(), tcp_ack(false), tunnel(), l3_forwarding(false), l3_label(false), + eth(), arp(), ip(), ip6(), packet_buffer_(buff) { + transp.tcp = 0; +} + PktInfo::PktInfo(Agent *agent, uint32_t buff_len, uint32_t module, uint32_t mdata) : len(), max_pkt_len(), data(), ipc(), family(Address::UNSPEC), diff --git a/src/vnsw/agent/pkt/pkt_handler.h b/src/vnsw/agent/pkt/pkt_handler.h index b4d23bf544c..d24e9fe5a22 100644 --- a/src/vnsw/agent/pkt/pkt_handler.h +++ b/src/vnsw/agent/pkt/pkt_handler.h @@ -118,6 +118,7 @@ struct AgentHdr { TRAP_TOR_CONTROL_PKT = AGENT_TRAP_TOR_CONTROL_PKT, TRAP_ZERO_TTL = AGENT_TRAP_ZERO_TTL, TRAP_ICMP_ERROR = AGENT_TRAP_ICMP_ERROR, + TRAP_FLOW_ACTION_HOLD = AGENT_TRAP_FLOW_ACTION_HOLD, INVALID = MAX_AGENT_HDR_COMMANDS }; @@ -219,6 +220,7 @@ struct PktInfo { PktInfo(Agent *agent, uint32_t buff_len, uint32_t module, uint32_t mdata); PktInfo(const PacketBufferPtr &buff); + PktInfo(const PacketBufferPtr &buff, const AgentHdr &hdr); PktInfo(InterTaskMsg *msg); virtual ~PktInfo(); @@ -276,6 +278,14 @@ class PktHandler { void PktQThresholdExceeded(PktModuleName mod); }; + struct PacketBufferEnqueueItem { + const AgentHdr hdr; + const PacketBufferPtr buff; + + PacketBufferEnqueueItem(const AgentHdr &h, const PacketBufferPtr &b) + : hdr(h), buff(b) {} + }; + PktHandler(Agent *, PktModule *pkt_module); virtual ~PktHandler(); @@ -285,9 +295,12 @@ class PktHandler { PktModuleName ParsePacket(const AgentHdr &hdr, PktInfo *pkt_info, uint8_t *pkt); + PktModuleName ParseFlowPacket(boost::shared_ptr pkt_info, + uint8_t *pkt); int ParseUserPkt(PktInfo *pkt_info, Interface *intf, PktType::Type &pkt_type, uint8_t *pkt); // identify pkt type and send to the registered handler + bool ProcessPacket(boost::shared_ptr item); void HandleRcvPkt(const AgentHdr &hdr, const PacketBufferPtr &buff); void SendMessage(PktModuleName mod, InterTaskMsg *msg); @@ -312,6 +325,8 @@ class PktHandler { PktModule *pkt_module() const { return pkt_module_; } private: + void PktModuleEnqueue(PktModuleName mod, const AgentHdr &hdr, + boost::shared_ptr pkt_info); int ParseEthernetHeader(PktInfo *pkt_info, uint8_t *pkt); int ParseMplsHdr(PktInfo *pkt_info, uint8_t *pkt); int ParseIpPacket(PktInfo *pkt_info, PktType::Type &pkt_type, @@ -331,6 +346,7 @@ class PktHandler { bool IsToRDevice(uint32_t vrf_id, const IpAddress &ip); bool IsManagedTORPacket(Interface *intf, PktInfo *pkt_info, PktType::Type &pkt_type, uint8_t *pkt); + bool IsFlowPacket(const AgentHdr &agent_hdr); bool IsDiagPacket(PktInfo *pkt_info); // handlers for each module type @@ -342,6 +358,7 @@ class PktHandler { Agent *agent_; PktModule *pkt_module_; + WorkQueue > work_queue_; DISALLOW_COPY_AND_ASSIGN(PktHandler); }; diff --git a/src/vnsw/agent/pkt/vrouter_interface.h b/src/vnsw/agent/pkt/vrouter_interface.h index ecf5c6dfa5d..f9046181d29 100644 --- a/src/vnsw/agent/pkt/vrouter_interface.h +++ b/src/vnsw/agent/pkt/vrouter_interface.h @@ -43,6 +43,8 @@ class VrouterControlInterface : public ControlInterface { vr_cmd_list_[AGENT_TRAP_HANDLE_DF] = AgentHdr::TRAP_HANDLE_DF; vr_cmd_list_[AGENT_TRAP_ZERO_TTL] = AgentHdr::TRAP_ZERO_TTL; vr_cmd_list_[AGENT_TRAP_ICMP_ERROR] = AgentHdr::TRAP_ICMP_ERROR; + vr_cmd_list_[AGENT_TRAP_TOR_CONTROL_PKT] = AgentHdr::TRAP_TOR_CONTROL_PKT; + vr_cmd_list_[AGENT_TRAP_FLOW_ACTION_HOLD] = AgentHdr::TRAP_FLOW_ACTION_HOLD; // Init and populate vector for translating command params from vrouter // to agent