Skip to content

Commit

Permalink
Set the work queue limit for different services.
Browse files Browse the repository at this point in the history
As we do not want the limit to be applied for flow, remove the
limit applied in the packet handler and put it in the services queues.

(cherry picked from commit 0b5f817)
(cherry picked from commit 0d170fa)

Change-Id: Iaf9fe4366e128806325d00bc58539eb81a42efe5
closes-bug: #1576332
  • Loading branch information
haripk committed May 29, 2016
1 parent e24cc66 commit eefdf6c
Show file tree
Hide file tree
Showing 16 changed files with 116 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/base/queue_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ class WorkQueue {
start_runner_ = start_runner_fn;
}

void SetSize(size_t size) {
size_ = size;
}

void SetBounded(bool bounded) {
bounded_ = bounded;
}
Expand Down
12 changes: 8 additions & 4 deletions src/vnsw/agent/init/agent_param.cc
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,10 @@ void AgentParam::ParseNexthopServer() {
}
}

void AgentParam::ParseBgpAsAServicePortRange() {
void AgentParam::ParseServices() {
GetValueFromTree<string>(bgp_as_a_service_port_range_,
"SERVICES.bgp_as_a_service_port_range");
GetValueFromTree<uint32_t>(services_queue_limit_, "SERVICES.queue_limit");
}

void AgentParam::ParseCollectorArguments
Expand Down Expand Up @@ -871,10 +872,11 @@ void AgentParam::ParsePlatformArguments
}
}

void AgentParam::ParseBgpAsAServicePortRangeArguments
void AgentParam::ParseServicesArguments
(const boost::program_options::variables_map &v) {
GetOptValue<string>(v, bgp_as_a_service_port_range_,
"SERVICES.bgp_as_a_service_port_range");
GetOptValue<uint32_t>(v, services_queue_limit_, "SERVICES.queue_limit");
}

// Initialize hypervisor mode based on system information
Expand Down Expand Up @@ -924,7 +926,7 @@ void AgentParam::InitFromConfig() {
ParseAgentInfo();
ParseNexthopServer();
ParsePlatform();
ParseBgpAsAServicePortRange();
ParseServices();
cout << "Config file <" << config_file_ << "> parsing completed.\n";
return;
}
Expand All @@ -948,7 +950,7 @@ void AgentParam::InitFromArguments() {
ParseAgentInfoArguments(var_map_);
ParseNexthopServerArguments(var_map_);
ParsePlatformArguments(var_map_);
ParseBgpAsAServicePortRangeArguments(var_map_);
ParseServicesArguments(var_map_);
return;
}

Expand Down Expand Up @@ -1235,6 +1237,7 @@ void AgentParam::LogConfig() const {
LOG(DEBUG, "Service instance lb ssl : " << si_lb_ssl_cert_path_);
LOG(DEBUG, "Service instance lbaas auth : " << si_lbaas_auth_conf_);
LOG(DEBUG, "Bgp as a service port range : " << bgp_as_a_service_port_range_);
LOG(DEBUG, "Services queue limit : " << services_queue_limit_);
if (hypervisor_mode_ == MODE_KVM) {
LOG(DEBUG, "Hypervisor mode : kvm");
return;
Expand Down Expand Up @@ -1345,6 +1348,7 @@ AgentParam::AgentParam(bool enable_flow_options,
flow_trace_enable_(true),
flow_latency_limit_(Agent::kDefaultFlowLatencyLimit),
subnet_hosts_resolvable_(true),
services_queue_limit_(1024),
tbb_thread_count_(Agent::kMaxTbbThreads),
tbb_exec_delay_(0),
tbb_schedule_delay_(0),
Expand Down
6 changes: 4 additions & 2 deletions src/vnsw/agent/init/agent_param.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ class AgentParam {
const std::vector<uint16_t> &bgp_as_a_service_port_range_value() const {
return bgp_as_a_service_port_range_value_;
}
uint32_t services_queue_limit() { return services_queue_limit_; }

uint16_t flow_thread_count() const { return flow_thread_count_; }
void set_flow_thread_count(uint16_t count) { flow_thread_count_ = count; }
Expand Down Expand Up @@ -349,7 +350,7 @@ class AgentParam {
void ParseAgentInfo();
void ParseNexthopServer();
void ParsePlatform();
void ParseBgpAsAServicePortRange();
void ParseServices();
void set_agent_mode(const std::string &mode);

void ParseCollectorArguments
Expand Down Expand Up @@ -384,7 +385,7 @@ class AgentParam {
(const boost::program_options::variables_map &v);
void ParsePlatformArguments
(const boost::program_options::variables_map &v);
void ParseBgpAsAServicePortRangeArguments
void ParseServicesArguments
(const boost::program_options::variables_map &v);

boost::program_options::variables_map var_map_;
Expand Down Expand Up @@ -492,6 +493,7 @@ class AgentParam {
bool subnet_hosts_resolvable_;
std::string bgp_as_a_service_port_range_;
std::vector<uint16_t> bgp_as_a_service_port_range_value_;
uint32_t services_queue_limit_;

// TBB related
uint32_t tbb_thread_count_;
Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/init/test/cfg.ini
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ physical_interface=vnet0

[SERVICES]
bgp_as_a_service_port_range=100-199
queue_limit=8192

[GATEWAY-0]
routing_instance=default-domain:admin:public:public
Expand Down
2 changes: 2 additions & 0 deletions src/vnsw/agent/init/test/test_agent_init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ TEST_F(FlowTest, Agent_Conf_file_1) {
const std::vector<uint16_t> &ports = param.bgp_as_a_service_port_range_value();
EXPECT_EQ(ports[0], 100);
EXPECT_EQ(ports[1], 199);
EXPECT_EQ(param.services_queue_limit(), 8192);

// By default, flow-tracing must be enabled
EXPECT_TRUE(param.flow_trace_enable());
Expand Down Expand Up @@ -100,6 +101,7 @@ TEST_F(FlowTest, Agent_Conf_file_2) {
EXPECT_EQ(param.mirror_client_port(), 8097);
// Default value for pkt0_tx_buffer_count
EXPECT_EQ(param.pkt0_tx_buffer_count(), 1000);
EXPECT_EQ(param.services_queue_limit(), 1024);
}

TEST_F(FlowTest, Agent_Flows_Option_1) {
Expand Down
5 changes: 4 additions & 1 deletion src/vnsw/agent/pkt/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class ProtoHandler;
// Protocol task (work queue for each protocol)
class Proto {
public:
typedef WorkQueue<boost::shared_ptr<PktInfo> > ProtoWorkQueue;

Proto(Agent *agent, const char *task_name, PktHandler::PktModuleName mod,
boost::asio::io_service &io);
virtual ~Proto();
Expand All @@ -28,15 +30,16 @@ class Proto {
void set_trace(bool val) { trace_ = val; }
void set_free_buffer(bool val) { free_buffer_ = val; }
Agent *agent() const { return agent_; }
const ProtoWorkQueue *work_queue() const { return &work_queue_; }
protected:
Agent *agent_;
PktHandler::PktModuleName module_;
bool trace_;
bool free_buffer_;
boost::asio::io_service &io_;
ProtoWorkQueue work_queue_;

private:
WorkQueue<boost::shared_ptr<PktInfo> > work_queue_;
DISALLOW_COPY_AND_ASSIGN(Proto);
};

Expand Down
1 change: 1 addition & 0 deletions src/vnsw/agent/pkt/test/test_pkt.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ TEST_F(PktTest, tx_no_vlan_1) {
EXPECT_TRUE(*(data_p + 6) == htons(ETHERTYPE_ARP));
}

// TODO : flaky test
TEST_F(PktTest, tx_vlan_1) {
int len;
PktInfo pkt_info(agent_, 1024, PktHandler::ARP, 0);
Expand Down
4 changes: 4 additions & 0 deletions src/vnsw/agent/services/arp_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "sandesh/sandesh_types.h"
#include "sandesh/sandesh.h"
#include "net/address_util.h"
#include "init/agent_init.h"
#include "oper/nexthop.h"
#include "oper/tunnel_nh.h"
#include "oper/mirror_table.h"
Expand All @@ -21,6 +22,9 @@ ArpProto::ArpProto(Agent *agent, boost::asio::io_service &io,
ip_fabric_interface_(NULL), gratuitous_arp_entry_(NULL),
max_retries_(kMaxRetries), retry_timeout_(kRetryTimeout),
aging_timeout_(kAgingTimeout) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);

vrf_table_listener_id_ = agent->vrf_table()->Register(
boost::bind(&ArpProto::VrfNotify, this, _1, _2));
Expand Down
3 changes: 3 additions & 0 deletions src/vnsw/agent/services/dhcp_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ DhcpProto::DhcpProto(Agent *agent, boost::asio::io_service &io,
ip_fabric_interface_index_(-1), pkt_interface_index_(-1),
dhcp_server_socket_(io), dhcp_server_read_buf_(NULL),
gateway_delete_seqno_(0) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);

dhcp_relay_mode_ = agent->params()->dhcp_relay_mode();
if (dhcp_relay_mode_) {
Expand Down
5 changes: 5 additions & 0 deletions src/vnsw/agent/services/dhcpv6_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

#include "cmn/agent_cmn.h"
#include "init/agent_init.h"
#include "services/dhcpv6_proto.h"
#include "services/services_types.h"
#include "services/services_init.h"
Expand All @@ -15,6 +16,10 @@ Dhcpv6Proto::Dhcpv6Proto(Agent *agent, boost::asio::io_service &io,
bool run_with_vrouter) :
Proto(agent, "Agent::Services", PktHandler::DHCPV6, io),
run_with_vrouter_(run_with_vrouter) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);

// server duid based on vrrp mac
server_duid_.type = htons(DHCPV6_DUID_TYPE_LL);
server_duid_.hw_type = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/vnsw/agent/services/dns_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <sys/types.h>
#include "net/address_util.h"
#include "init/agent_init.h"
#include "oper/interface_common.h"
#include "services/dns_proto.h"
#include "bind/bind_resolver.h"
Expand Down Expand Up @@ -48,6 +49,10 @@ void DnsProto::ConfigInit() {
DnsProto::DnsProto(Agent *agent, boost::asio::io_service &io) :
Proto(agent, "Agent::Services", PktHandler::DNS, io),
xid_(0), timeout_(kDnsTimeout), max_retries_(kDnsMaxRetries) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);

lid_ = agent->interface_table()->Register(
boost::bind(&DnsProto::InterfaceNotify, this, _2));
Vnlid_ = agent->vn_table()->Register(
Expand Down
4 changes: 4 additions & 0 deletions src/vnsw/agent/services/icmp_error_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
*/

#include <cmn/agent_cmn.h>
#include <init/agent_init.h>
#include <services/icmp_error_proto.h>
#include <services/icmp_error_handler.h>

IcmpErrorProto::IcmpErrorProto(Agent *agent, boost::asio::io_service &io) :
Proto(agent, "Agent::Services", PktHandler::ICMP_ERROR, io) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);
}

IcmpErrorProto::~IcmpErrorProto() {
Expand Down
4 changes: 4 additions & 0 deletions src/vnsw/agent/services/icmp_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
*/

#include <cmn/agent_cmn.h>
#include <init/agent_init.h>
#include <services/icmp_proto.h>

IcmpProto::IcmpProto(Agent *agent, boost::asio::io_service &io) :
Proto(agent, "Agent::Services", PktHandler::ICMP, io) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);
}

IcmpProto::~IcmpProto() {
Expand Down
4 changes: 4 additions & 0 deletions src/vnsw/agent/services/icmpv6_error_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
*/

#include <cmn/agent_cmn.h>
#include <init/agent_init.h>
#include <services/icmpv6_error_proto.h>
#include <services/icmpv6_error_handler.h>

Icmpv6ErrorProto::Icmpv6ErrorProto(Agent *agent, boost::asio::io_service &io) :
Proto(agent, "Agent::Services", PktHandler::ICMPV6_ERROR, io) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);
}

Icmpv6ErrorProto::~Icmpv6ErrorProto() {
Expand Down
5 changes: 5 additions & 0 deletions src/vnsw/agent/services/icmpv6_proto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

#include "base/os.h"
#include <cmn/agent_cmn.h>
#include <init/agent_init.h>
#include <pkt/pkt_handler.h>
#include <oper/route_common.h>
#include <services/icmpv6_proto.h>

Icmpv6Proto::Icmpv6Proto(Agent *agent, boost::asio::io_service &io) :
Proto(agent, "Agent::Services", PktHandler::ICMPV6, io) {
// limit the number of entries in the workqueue
work_queue_.SetSize(agent->params()->services_queue_limit());
work_queue_.SetBounded(true);

vn_table_listener_id_ = agent->vn_table()->Register(
boost::bind(&Icmpv6Proto::VnNotify, this, _2));
vrf_table_listener_id_ = agent->vrf_table()->Register(
Expand Down
58 changes: 58 additions & 0 deletions src/vnsw/agent/services/test/dhcp_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2487,6 +2487,64 @@ TEST_F(DhcpTest, DhcpReqv6PortTest) {
Agent::GetInstance()->GetDhcpProto()->ClearStats();
}

// Check the DHCP queue limit
TEST_F(DhcpTest, QueueLimitTest) {
struct PortInfo input[] = {
{"vnet1", 1, "1.1.1.1", "00:00:00:01:01:01", 1, 1},
};

uint8_t options[] = {
DHCP_OPTION_MSG_TYPE,
DHCP_OPTION_HOST_NAME,
DHCP_OPTION_DOMAIN_NAME,
DHCP_OPTION_END
};
DhcpProto::DhcpStats stats;

ClearPktTrace();
IpamInfo ipam_info[] = {
{"1.2.3.128", 27, "1.2.3.129", true},
{"7.8.9.0", 24, "7.8.9.12", true},
{"1.1.1.0", 24, "1.1.1.200", true},
};
char vdns_attr[] = "<virtual-DNS-data>\n <domain-name>test.contrail.juniper.net</domain-name>\n <dynamic-records-from-client>true</dynamic-records-from-client>\n <record-order>fixed</record-order>\n <default-ttl-seconds>120</default-ttl-seconds>\n </virtual-DNS-data>\n";
char ipam_attr[] = "<network-ipam-mgmt>\n <ipam-dns-method>virtual-dns-server</ipam-dns-method>\n <ipam-dns-server><virtual-dns-server-name>vdns1</virtual-dns-server-name></ipam-dns-server>\n </network-ipam-mgmt>\n";

CreateVmportEnv(input, 1, 0);
client->WaitForIdle();
client->Reset();
AddVDNS("vdns1", vdns_attr);
client->WaitForIdle();
AddIPAM("vn1", ipam_info, 3, ipam_attr, "vdns1");
client->WaitForIdle();

// disable pkt handler queue, enqueue packets and
// check that limit is not exceeded
WorkQueue<boost::shared_ptr<PktInfo> > *queue =
const_cast<Proto::ProtoWorkQueue *>(
Agent::GetInstance()->GetDhcpProto()->work_queue());
queue->set_disable(true);
EXPECT_EQ(queue->Length(), 0);
for (int i = 0; i < 2048; i++) {
SendDhcp(GetItfId(0), 0x8000, DHCP_DISCOVER, options, 4);
SendDhcp(GetItfId(0), 0x8000, DHCP_REQUEST, options, 4);
}
EXPECT_EQ(queue->Length(), 1023);
queue->set_disable(false);

client->Reset();
DelIPAM("vn1", "vdns1");
client->WaitForIdle();
DelVDNS("vdns1");
client->WaitForIdle();

client->Reset();
DeleteVmportEnv(input, 1, 1, 0);
client->WaitForIdle();

Agent::GetInstance()->GetDhcpProto()->ClearStats();
}

void RouterIdDepInit(Agent *agent) {
}

Expand Down

0 comments on commit eefdf6c

Please sign in to comment.