Skip to content

Commit

Permalink
This fix introduces throtlling of sandesh messages. The daemons which
Browse files Browse the repository at this point in the history
use sandesh generator can specify the throttling value in msgs/second
and specify it either as command line argument or in their respective
config file sandesh_send_rate_limit
Partial-Bug: 1469414

Change-Id: Idda249149f905a8a89e861e048e39b3f93f05ffd
  • Loading branch information
arvindvis committed Sep 8, 2015
1 parent 16e17c7 commit 3a0ac65
Show file tree
Hide file tree
Showing 45 changed files with 230 additions and 21 deletions.
4 changes: 4 additions & 0 deletions src/analytics/contrail-collector.conf
Expand Up @@ -64,6 +64,10 @@ log_local=1
# UDP port to listen on for receiving ipfix messages. -1 to disable.
# ipfix_port=4739

# Sandesh send rate limit can be used to throttle system logs transmitted per
# second. System logs are dropped if the sending rate is exceeded
sandesh_send_rate_limit=100

[COLLECTOR]
# Everything in this section is optional

Expand Down
Expand Up @@ -90,6 +90,7 @@ def parse(self):
'fast_scan_frequency' : 60,
'http_server_port' : 5920,
'zookeeper' : '127.0.0.1:2181',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
}
ksopts = {
'auth_host': '127.0.0.1',
Expand Down Expand Up @@ -176,6 +177,8 @@ def parse(self):
help="where to look for snmp credentials")
group.add_argument("--api_server",
help="ip:port of api-server for snmp credentials")
group.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec.")
self._args = parser.parse_args(remaining_argv)
if type(self._args.collectors) is str:
self._args.collectors = self._args.collectors.split()
Expand Down Expand Up @@ -258,3 +261,5 @@ def frequency(self):
def http_port(self):
return self._args.http_server_port

def sandesh_send_rate_limit(self):
return self._args.sandesh_send_rate_limit
Expand Up @@ -36,6 +36,9 @@ def __init__(self, conf, instance='0'):
self._node_type_name = NodeTypeNames[node_type]
self._hostname = socket.gethostname()
self._instance_id = instance
if self._conf.sandesh_send_rate_limit() is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
self._conf.sandesh_send_rate_limit());
sandesh_global.init_generator(self._moduleid, self._hostname,
self._node_type_name, self._instance_id,
self._conf.collectors(),
Expand Down
5 changes: 5 additions & 0 deletions src/analytics/contrail-topology/contrail_topology/config.py
Expand Up @@ -71,6 +71,7 @@ def parse(self):
'scan_frequency' : 60,
'http_server_port': 5921,
'zookeeper' : '127.0.0.1:2181',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
}

config = None
Expand Down Expand Up @@ -119,6 +120,8 @@ def parse(self):
help="introspect server port")
parser.add_argument("--zookeeper",
help="ip:port of zookeeper server")
parser.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec.")
self._args = parser.parse_args(remaining_argv)
if type(self._args.collectors) is str:
self._args.collectors = self._args.collectors.split()
Expand Down Expand Up @@ -168,3 +171,5 @@ def frequency(self):
def http_port(self):
return self._args.http_server_port

def sandesh_send_rate_limit(self):
return self._args.sandesh_send_rate_limit
Expand Up @@ -19,6 +19,9 @@ def __init__(self, conf):
self._node_type_name = NodeTypeNames[node_type]
self._hostname = socket.gethostname()
self._instance_id = '0'
if self._conf.sandesh_send_rate_limit() is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
self._conf.sandesh_send_rate_limit())
sandesh_global.init_generator(self._moduleid, self._hostname,
self._node_type_name, self._instance_id,
self._conf.collectors(),
Expand Down
1 change: 1 addition & 0 deletions src/analytics/main.cc
Expand Up @@ -351,6 +351,7 @@ int main(int argc, char *argv[])

unsigned short coll_port = analytics.GetCollector()->GetPort();
VizSandeshContext vsc(&analytics);
Sandesh::set_send_rate_limit(options.sandesh_send_rate_limit());
Sandesh::InitCollector(
module_id,
analytics.name(),
Expand Down
6 changes: 6 additions & 0 deletions src/analytics/options.cc
Expand Up @@ -145,6 +145,10 @@ void Options::Initialize(EventManager &evm,
"ipfix listener UDP port (< 0 will disable ipfix Collector)")
("DEFAULT.test_mode", opt::bool_switch(&test_mode_),
"Enable collector to run in test-mode")
("DEFAULT.sandesh_send_rate_limit",
opt::value<uint32_t>()->default_value(
Sandesh::get_send_rate_limit()),
"Sandesh send rate limit in messages/sec")

("DISCOVERY.port", opt::value<uint16_t>()->default_value(
default_discovery_port),
Expand Down Expand Up @@ -300,6 +304,8 @@ void Options::Process(int argc, char *argv[],
GetOptValue<int>(var_map, syslog_port_, "DEFAULT.syslog_port");
GetOptValue<int>(var_map, sflow_port_, "DEFAULT.sflow_port");
GetOptValue<int>(var_map, ipfix_port_, "DEFAULT.ipfix_port");
GetOptValue<uint32_t>(var_map, sandesh_ratelimit_,
"DEFAULT.sandesh_send_rate_limit");

GetOptValue<uint16_t>(var_map, discovery_port_, "DISCOVERY.port");
GetOptValue<string>(var_map, discovery_server_, "DISCOVERY.server");
Expand Down
3 changes: 2 additions & 1 deletion src/analytics/options.h
Expand Up @@ -56,6 +56,7 @@ class Options {
const int sflow_port() const { return sflow_port_; }
const int ipfix_port() const { return ipfix_port_; }
const bool test_mode() const { return test_mode_; }
const uint32_t sandesh_send_rate_limit() const { return sandesh_ratelimit_; }

private:
template <typename ValueType>
Expand Down Expand Up @@ -117,6 +118,6 @@ class Options {
std::vector<std::string> cassandra_server_list_;
std::vector<std::string> kafka_broker_list_;
uint16_t partitions_;

uint32_t sandesh_ratelimit_;
boost::program_options::options_description config_file_options_;
};
15 changes: 13 additions & 2 deletions src/analytics/test/options_test.cc
Expand Up @@ -81,6 +81,7 @@ TEST_F(OptionsTest, NoArguments) {
EXPECT_EQ(options_.syslog_port(), -1);
EXPECT_EQ(options_.dup(), false);
EXPECT_EQ(options_.test_mode(), false);
EXPECT_EQ(options_.sandesh_send_rate_limit(), 0);
uint16_t protobuf_port(0);
EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port));
}
Expand Down Expand Up @@ -122,19 +123,22 @@ TEST_F(OptionsTest, DefaultConfFile) {
EXPECT_EQ(options_.syslog_port(), -1);
EXPECT_EQ(options_.dup(), false);
EXPECT_EQ(options_.test_mode(), false);
EXPECT_EQ(options_.sandesh_send_rate_limit(), 100);
uint16_t protobuf_port(0);
EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port));
}

TEST_F(OptionsTest, OverrideStringFromCommandLine) {
int argc = 3;
int argc = 4;
char *argv[argc];
char argv_0[] = "options_test";
char argv_1[] = "--conf_file=controller/src/analytics/contrail-collector.conf";
char argv_2[] = "--DEFAULT.log_file=test.log";
char argv_3[] = "--DEFAULT.sandesh_send_rate_limit=5";
argv[0] = argv_0;
argv[1] = argv_1;
argv[2] = argv_2;
argv[3] = argv_3;

options_.Parse(evm_, argc, argv);

Expand Down Expand Up @@ -165,6 +169,7 @@ TEST_F(OptionsTest, OverrideStringFromCommandLine) {
EXPECT_EQ(options_.syslog_port(), -1);
EXPECT_EQ(options_.dup(), false);
EXPECT_EQ(options_.test_mode(), false);
EXPECT_EQ(options_.sandesh_send_rate_limit(), 5);
uint16_t protobuf_port(0);
EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port));
}
Expand Down Expand Up @@ -231,6 +236,7 @@ TEST_F(OptionsTest, CustomConfigFile) {
"log_local=1\n"
"test_mode=1\n"
"syslog_port=101\n"
"sandesh_send_rate_limit=5\n"
"\n"
"[COLLECTOR]\n"
"port=100\n"
Expand Down Expand Up @@ -295,6 +301,7 @@ TEST_F(OptionsTest, CustomConfigFile) {
uint16_t protobuf_port(0);
EXPECT_TRUE(options_.collector_protobuf_port(&protobuf_port));
EXPECT_EQ(protobuf_port, 3333);
EXPECT_EQ(options_.sandesh_send_rate_limit(), 5);
}

TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) {
Expand All @@ -317,6 +324,7 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) {
"log_local=0\n"
"test_mode=1\n"
"syslog_port=102\n"
"sandesh_send_rate_limit=5\n"
"\n"
"[COLLECTOR]\n"
"port=100\n"
Expand All @@ -337,7 +345,7 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) {
config_file << config;
config_file.close();

int argc = 9;
int argc = 10;
char *argv[argc];
char argv_0[] = "options_test";
char argv_1[] = "--conf_file=./options_test_collector_config_file.conf";
Expand All @@ -348,6 +356,7 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) {
char argv_6[] = "--DEFAULT.cassandra_server_list=21.20.20.2:200";
char argv_7[] = "--DEFAULT.cassandra_server_list=31.30.30.3:300";
char argv_8[] = "--COLLECTOR.protobuf_port=3334";
char argv_9[] = "--DEFAULT.sandesh_send_rate_limit=7";
argv[0] = argv_0;
argv[1] = argv_1;
argv[2] = argv_2;
Expand All @@ -357,6 +366,7 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) {
argv[6] = argv_6;
argv[7] = argv_7;
argv[8] = argv_8;
argv[9] = argv_9;

options_.Parse(evm_, argc, argv);

Expand Down Expand Up @@ -394,6 +404,7 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) {
uint16_t protobuf_port(0);
EXPECT_TRUE(options_.collector_protobuf_port(&protobuf_port));
EXPECT_EQ(protobuf_port, 3334);
EXPECT_EQ(options_.sandesh_send_rate_limit(), 7);
}

TEST_F(OptionsTest, MultitokenVector) {
Expand Down
5 changes: 4 additions & 1 deletion src/config/api-server/utils.py
Expand Up @@ -9,7 +9,7 @@
import ConfigParser
import gen.resource_xsd
import vnc_quota
from pysandesh.sandesh_base import Sandesh
from pysandesh.sandesh_base import Sandesh, SandeshSystem
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel

_WEB_HOST = '0.0.0.0'
Expand Down Expand Up @@ -61,6 +61,7 @@ def parse_args(args_str):
'rabbit_max_pending_updates': '4096',
'cluster_id': '',
'max_requests': 1024,
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
}
# ssl options
secopts = {
Expand Down Expand Up @@ -240,6 +241,8 @@ def parse_args(args_str):
parser.add_argument(
"--max_requests", type=int,
help="Maximum number of concurrent requests served by api server")
parser.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec.")
args_obj, remaining_argv = parser.parse_known_args(remaining_argv)
args_obj.config_sections = config
if type(args_obj.cassandra_server_list) is str:
Expand Down
4 changes: 4 additions & 0 deletions src/config/api-server/vnc_cfg_api_server.py
Expand Up @@ -1131,6 +1131,10 @@ def __init__(self, args_str=None):

# sandesh init
self._sandesh = Sandesh()
# Reset the sandesh send rate limit value
if self._args.sandesh_send_rate_limit is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
self._args.sandesh_send_rate_limit)
module = Module.API_SERVER
module_name = ModuleNames[Module.API_SERVER]
node_type = Module2NodeType[module]
Expand Down
7 changes: 7 additions & 0 deletions src/config/device-manager/device_manager/device_manager.py
Expand Up @@ -116,6 +116,10 @@ def __init__(self, args=None):
ModuleNames[Module.DEVICE_MANAGER])

self._sandesh = Sandesh()
# Reset the sandesh send rate limit value
if self._args.sandesh_send_rate_limit is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
self._args.sandesh_send_rate_limit)
module = Module.DEVICE_MANAGER
module_name = ModuleNames[module]
node_type = Module2NodeType[module]
Expand Down Expand Up @@ -377,6 +381,7 @@ def parse_args(args_str):
'use_syslog': False,
'syslog_facility': Sandesh._DEFAULT_SYSLOG_FACILITY,
'cluster_id': '',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
}
secopts = {
'use_certs': False,
Expand Down Expand Up @@ -460,6 +465,8 @@ def parse_args(args_str):
help="Tenant name for keystone admin user")
parser.add_argument("--cluster_id",
help="Used for database keyspace separation")
parser.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec")
args = parser.parse_args(remaining_argv)
if type(args.cassandra_server_list) is str:
args.cassandra_server_list = args.cassandra_server_list.split()
Expand Down
7 changes: 7 additions & 0 deletions src/config/schema-transformer/to_bgp.py
Expand Up @@ -2667,6 +2667,10 @@ def __init__(self, args=None):
ModuleNames[Module.SCHEMA_TRANSFORMER])

_sandesh = Sandesh()
# Reset the sandesh send rate limit value
if args.sandesh_send_rate_limit is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
args.sandesh_send_rate_limit)
sandesh.VnList.handle_request = self.sandesh_vn_handle_request
sandesh.RoutintInstanceList.handle_request = \
self.sandesh_ri_handle_request
Expand Down Expand Up @@ -3671,6 +3675,7 @@ def parse_args(args_str):
'use_syslog': False,
'syslog_facility': Sandesh._DEFAULT_SYSLOG_FACILITY,
'cluster_id': '',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
}
secopts = {
'use_certs': False,
Expand Down Expand Up @@ -3765,6 +3770,8 @@ def parse_args(args_str):
help="Tenant name for keystone admin user")
parser.add_argument("--cluster_id",
help="Used for database keyspace separation")
parser.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec")
args = parser.parse_args(remaining_argv)
if type(args.cassandra_server_list) is str:
args.cassandra_server_list = args.cassandra_server_list.split()
Expand Down
6 changes: 5 additions & 1 deletion src/config/svc-monitor/svc_monitor/logger.py
Expand Up @@ -15,7 +15,7 @@
from cfgm_common import vnc_cpu_info
from cfgm_common.uve.service_instance.ttypes import *

from pysandesh.sandesh_base import Sandesh
from pysandesh.sandesh_base import Sandesh, SandeshSystem
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel

from sandesh_common.vns.ttypes import Module, NodeType
Expand Down Expand Up @@ -201,6 +201,10 @@ def uve_svc_instance(self, si_fq_name_str, status=None,
# init sandesh
def _sandesh_init(self, discovery):
sandesh_instance = Sandesh()
# Reset the sandesh send rate limit value
if self._args.sandesh_send_rate_limit is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
self._args.sandesh_send_rate_limit)
sandesh.ServiceInstanceList.handle_request =\
self.sandesh_si_handle_request
sandesh_instance.init_generator(
Expand Down
6 changes: 5 additions & 1 deletion src/config/svc-monitor/svc_monitor/svc_monitor.py
Expand Up @@ -34,7 +34,7 @@
from config_db import *
from cfgm_common.dependency_tracker import DependencyTracker

from pysandesh.sandesh_base import Sandesh
from pysandesh.sandesh_base import Sandesh, SandeshSystem
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from pysandesh.gen_py.process_info.ttypes import ConnectionStatus
from sandesh_common.vns.ttypes import Module
Expand Down Expand Up @@ -766,6 +766,7 @@ def parse_args(args_str):
'syslog_facility': Sandesh._DEFAULT_SYSLOG_FACILITY,
'region_name': None,
'cluster_id': '',
'sandesh_send_rate_limit' : SandeshSystem.get_sandesh_send_rate_limit(),
}
secopts = {
'use_certs': False,
Expand Down Expand Up @@ -876,6 +877,9 @@ def parse_args(args_str):
help="Region name for openstack API")
parser.add_argument("--cluster_id",
help="Used for database keyspace separation")
parser.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec.")

args = parser.parse_args(remaining_argv)
args.config_sections = config
if type(args.cassandra_server_list) is str:
Expand Down
3 changes: 3 additions & 0 deletions src/control-node/contrail-control.conf
Expand Up @@ -21,6 +21,9 @@ log_local=1
# test_mode=0
# xmpp_server_port=5269

# Sandesh send rate limit can be used to throttle system logs transmitted per
# second. System logs are dropped if the sending rate is exceeded
sandesh_send_rate_limit=100
[DISCOVERY]
# port=5998
# server=127.0.0.1 # discovery_server IP address
Expand Down
1 change: 1 addition & 0 deletions src/control-node/main.cc
Expand Up @@ -470,6 +470,7 @@ int main(int argc, char *argv[]) {

RegisterSandeshShowXmppExtensions(&sandesh_context);

Sandesh::set_send_rate_limit(options.sandesh_send_rate_limit());
if (sandesh_generator_init) {
NodeType::type node_type =
g_vns_constants.Module2NodeType.find(module)->second;
Expand Down
6 changes: 6 additions & 0 deletions src/control-node/options.cc
Expand Up @@ -128,6 +128,10 @@ void Options::Initialize(EventManager &evm,
("DEFAULT.xmpp_server_key",
opt::value<string>()->default_value("/etc/contrail/ssl/private/server.key"),
"XMPP Server ssl private key")
("DEFAULT.sandesh_send_rate_limit",
opt::value<uint32_t>()->default_value(
Sandesh::get_send_rate_limit()),
"Sandesh send rate limit in messages/sec")

("DISCOVERY.port", opt::value<uint16_t>()->default_value(
default_discovery_port),
Expand Down Expand Up @@ -258,6 +262,8 @@ bool Options::Process(int argc, char *argv[],
GetOptValue<bool>(var_map, xmpp_auth_enable_, "DEFAULT.xmpp_auth_enable");
GetOptValue<string>(var_map, xmpp_server_cert_, "DEFAULT.xmpp_server_cert");
GetOptValue<string>(var_map, xmpp_server_key_, "DEFAULT.xmpp_server_key");
GetOptValue<uint32_t>(var_map, sandesh_ratelimit_,
"DEFAULT.sandesh_send_rate_limit");

GetOptValue<uint16_t>(var_map, discovery_port_, "DISCOVERY.port");
GetOptValue<string>(var_map, discovery_server_, "DISCOVERY.server");
Expand Down

0 comments on commit 3a0ac65

Please sign in to comment.