diff --git a/src/config/api-server/utils.py b/src/config/api-server/utils.py index 42656e5d810..f4aa684b1e9 100644 --- a/src/config/api-server/utils.py +++ b/src/config/api-server/utils.py @@ -144,6 +144,7 @@ def parse_args(args_str): } config = None + saved_conf_file = args.conf_file if args.conf_file: config = ConfigParser.SafeConfigParser({'admin_token': None}) config.read(args.conf_file) @@ -372,6 +373,7 @@ def parse_args(args_str): if type(args_obj.collectors) is str: args_obj.collectors = args_obj.collectors.split() + args_obj.conf_file = saved_conf_file return args_obj, remaining_argv # end parse_args diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index ec949c415ee..121a4c641a2 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -18,12 +18,15 @@ import sys reload(sys) sys.setdefaultencoding('UTF8') +import ConfigParser import functools +import hashlib import logging import logging.config import signal import os import re +import random import socket from cfgm_common import jsonutils as json from provision_defaults import * @@ -1423,6 +1426,14 @@ def __init__(self, args_str=None): self.route('/aaa-mode', 'GET', self.aaa_mode_http_get) self.route('/aaa-mode', 'PUT', self.aaa_mode_http_put) + # randomize the collector list + self._random_collectors = self._args.collectors + self._chksum = ""; + if self._args.collectors: + self._chksum = hashlib.md5(''.join(self._args.collectors)).hexdigest() + self._random_collectors = random.sample(self._args.collectors, \ + len(self._args.collectors)) + # Initialize discovery client self._disc = None if self._args.disc_server_ip and self._args.disc_server_port: @@ -1451,7 +1462,7 @@ def __init__(self, args_str=None): hostname = socket.gethostname() self._sandesh.init_generator(module_name, hostname, node_type_name, instance_id, - self._args.collectors, + self._random_collectors, 'vnc_api_server_context', int(self._args.http_server_port), ['cfgm_common', 'vnc_cfg_api_server.sandesh'], self._disc, @@ -2641,6 +2652,25 @@ def sigchld_handler(self): def sigterm_handler(self): exit() + # sighup handler for applying new configs + def sighup_handler(self): + if self._args.conf_file: + config = ConfigParser.SafeConfigParser() + config.read(self._args.conf_file) + if 'DEFAULTS' in config.sections(): + try: + collectors = config.get('DEFAULTS', 'collectors') + if type(collectors) is str: + collectors = collectors.split() + new_chksum = hashlib.md5("".join(collectors)).hexdigest() + if new_chksum != self._chksum: + self._chksum = new_chksum + random_collectors = random.sample(collectors, len(collectors)) + self._sandesh.reconfig_collectors(random_collectors) + except ConfigParser.NoOptionError as e: + pass + # end sighup_handler + def _load_extensions(self): try: conf_sections = self._args.config_sections @@ -3644,6 +3674,7 @@ def main(args_str=None, server=None): """ #hub.signal(signal.SIGCHLD, vnc_api_server.sigchld_handler) hub.signal(signal.SIGTERM, vnc_api_server.sigterm_handler) + hub.signal(signal.SIGHUP, vnc_api_server.sighup_handler) if pipe_start_app is None: pipe_start_app = vnc_api_server.api_bottle try: diff --git a/src/config/common/vnc_logger.py b/src/config/common/vnc_logger.py index ebb3abe8aaa..dadd3f53b1f 100644 --- a/src/config/common/vnc_logger.py +++ b/src/config/common/vnc_logger.py @@ -117,7 +117,7 @@ def sandesh_init(self): self.redefine_sandesh_handles() self._sandesh.init_generator( self._module_name, self._hostname, self._node_type_name, - self._instance_id, self._args.collectors, + self._instance_id, self._args.random_collectors, '%s_context' % self.context, int(self._args.http_server_port), ['cfgm_common', '%s.sandesh' % self.module_pkg], self.discovery, logger_class=self._args.logger_class, @@ -137,3 +137,7 @@ def sandesh_init(self): self._instance_id, staticmethod(ConnectionState.get_process_state_cb), NodeStatusUVE, NodeStatus, self.table) + + def sandesh_reconfig_collectors(self, args): + self._sandesh.reconfig_collectors(args.random_collectors) + #end sandesh_reconfig_collectors diff --git a/src/config/device-manager/device_manager/device_manager.py b/src/config/device-manager/device_manager/device_manager.py index 19b3058a16a..dbe45d42c27 100644 --- a/src/config/device-manager/device_manager/device_manager.py +++ b/src/config/device-manager/device_manager/device_manager.py @@ -16,6 +16,9 @@ import ConfigParser import socket import time +import hashlib +import signal +import random from pprint import pformat from pysandesh.sandesh_base import * @@ -150,7 +153,15 @@ def __init__(self, args=None): PushConfigState.set_push_delay_per_kb(float(self._args.push_delay_per_kb)) PushConfigState.set_push_delay_max(int(self._args.push_delay_max)) PushConfigState.set_push_delay_enable(bool(self._args.push_delay_enable)) - + + # randomize collector list + self._args.random_collectors = self._args.collectors + self._chksum = ""; + if self._args.collectors: + self._chksum = hashlib.md5(''.join(self._args.collectors)).hexdigest() + self._args.random_collectors = random.sample(self._args.collectors, \ + len(self._args.collectors)) + # Initialize logger module = Module.DEVICE_MANAGER module_pkg = "device_manager" @@ -175,6 +186,11 @@ def __init__(self, args=None): except ResourceExhaustionError: # haproxy throws 503 time.sleep(3) + """ @sighup + Handle of SIGHUP for collector list config change + """ + gevent.signal(signal.SIGHUP, self.sighup_handler) + # Initialize amqp self._vnc_amqp = DMAmqpHandle(self.logger, self._REACTION_MAP, self._args) @@ -254,6 +270,25 @@ def connection_state_update(self, status, message=None): self._args.api_server_port)]) # end connection_state_update + # sighup handler for applying new configs + def sighup_handler(self): + if self._args.conf_file: + config = ConfigParser.SafeConfigParser() + config.read(self._args.conf_file) + if 'DEFAULTS' in config.sections(): + try: + collectors = config.get('DEFAULTS', 'collectors') + if type(collectors) is str: + collectors = collectors.split() + new_chksum = hashlib.md5("".join(collectors)).hexdigest() + if new_chksum != self._chksum: + self._chksum = new_chksum + config.random_collectors = random.sample(collectors, len(collectors)) + self.logger.sandesh_reconfig_collectors(config) + except ConfigParser.NoOptionError as e: + pass + # end sighup_handler + def parse_args(args_str): ''' Eg. python device_manager.py --rabbit_server localhost @@ -347,6 +382,7 @@ def parse_args(args_str): 'cassandra_password': None } + saved_conf_file = args.conf_file if args.conf_file: config = ConfigParser.SafeConfigParser() config.read(args.conf_file) @@ -449,6 +485,7 @@ def parse_args(args_str): if type(args.collectors) is str: args.collectors = args.collectors.split() + args.conf_file = saved_conf_file return args # end parse_args diff --git a/src/config/schema-transformer/to_bgp.py b/src/config/schema-transformer/to_bgp.py index 0dc96bdd88d..e02ce743aa8 100644 --- a/src/config/schema-transformer/to_bgp.py +++ b/src/config/schema-transformer/to_bgp.py @@ -18,7 +18,9 @@ sys.setdefaultencoding('UTF8') import requests import ConfigParser - +import signal +import random +import hashlib import argparse from cfgm_common import vnc_cgitb @@ -336,6 +338,25 @@ def reset(self): cls.reset() self._vnc_amqp.close() # end reset + + def sighup_handler(self): + if self._conf_file: + config = ConfigParser.SafeConfigParser() + config.read(self._conf_file) + if 'DEFAULTS' in config.sections(): + try: + collectors = config.get('DEFAULTS', 'collectors') + if type(collectors) is str: + collectors = collectors.split() + new_chksum = hashlib.md5("".join(collectors)).hexdigest() + if new_chksum != self._chksum: + self._chksum = new_chksum + config.random_collectors = random.sample(collectors, len(collectors)) + self.logger.sandesh_reconfig_collectors(config) + except ConfigParser.NoOptionError as e: + pass + # end sighup_handler + # end class SchemaTransformer @@ -401,6 +422,7 @@ def parse_args(args_str): 'cassandra_password': None, } + saved_conf_file = args.conf_file if args.conf_file: config = ConfigParser.SafeConfigParser() config.read(args.conf_file) @@ -415,7 +437,6 @@ def parse_args(args_str): if 'CASSANDRA' in config.sections(): cassandraopts.update(dict(config.items('CASSANDRA'))) - # Override with CLI options # Don't surpress add_help here so it will handle -h parser = argparse.ArgumentParser( @@ -511,6 +532,7 @@ def _bool(s): help="Enabled logical routers") args = parser.parse_args(remaining_argv) + args.conf_file = saved_conf_file if type(args.cassandra_server_list) is str: args.cassandra_server_list = args.cassandra_server_list.split() if type(args.collectors) is str: @@ -519,6 +541,8 @@ def _bool(s): return args # end parse_args + + transformer = None @@ -552,8 +576,24 @@ def connection_state_update(status, message=None): # auth failure or haproxy throws 503 time.sleep(3) + #randomize collector list + args.random_collectors = args.collectors + if args.collectors: + args.random_collectors = random.sample(args.collectors, len(args.collectors)) + global transformer transformer = SchemaTransformer(args) + transformer._conf_file = args.conf_file + transformer._chksum = "" + # checksum of collector list + if args.collectors: + transformer._chksum = hashlib.md5("".join(args.collectors)).hexdigest() + + """ @sighup + SIGHUP handler to indicate configuration changes + """ + gevent.signal(signal.SIGHUP, transformer.sighup_handler) + gevent.joinall(transformer._vnc_amqp._vnc_kombu.greenlets()) # end run_schema_transformer @@ -562,7 +602,9 @@ def main(args_str=None): global _zookeeper_client if not args_str: args_str = ' '.join(sys.argv[1:]) + args = parse_args(args_str) + args._args_list = args_str if args.cluster_id: client_pfx = args.cluster_id + '-' zk_path_pfx = args.cluster_id + '/' diff --git a/src/config/svc-monitor/svc_monitor/svc_monitor.py b/src/config/svc-monitor/svc_monitor/svc_monitor.py index 14bebff6805..92c1c70750e 100644 --- a/src/config/svc-monitor/svc_monitor/svc_monitor.py +++ b/src/config/svc-monitor/svc_monitor/svc_monitor.py @@ -18,6 +18,9 @@ import ConfigParser import cStringIO import argparse +import signal +import random +import hashlib import os @@ -522,7 +525,24 @@ def reset(): for cls in DBBaseSM.get_obj_type_map().values(): cls.reset() - + def sighup_handler(self): + if self._conf_file: + config = ConfigParser.SafeConfigParser() + config.read(self._conf_file) + if 'DEFAULTS' in config.sections(): + try: + collectors = config.get('DEFAULTS', 'collectors') + if type(collectors) is str: + collectors = collectors.split() + new_chksum = hashlib.md5("".join(collectors)).hexdigest() + if new_chksum != self._chksum: + self._chksum = new_chksum + config.random_collectors = random.sample(collectors, len(collectors)) + self.logger.sandesh_reconfig_collectors(config) + except ConfigParser.NoOptionError as e: + pass + # end sighup_handler + def skip_check_service(si): # wait for first launch if not si.launch_count: @@ -727,6 +747,7 @@ def parse_args(args_str): 'cassandra_password': None, } + saved_conf_file = args.conf_file config = ConfigParser.SafeConfigParser() if args.conf_file: config.read(args.conf_file) @@ -823,6 +844,7 @@ def parse_args(args_str): help="Check service interval") args = parser.parse_args(remaining_argv) + args._conf_file = saved_conf_file args.config_sections = config if type(args.cassandra_server_list) is str: args.cassandra_server_list = args.cassandra_server_list.split() @@ -839,9 +861,23 @@ def parse_args(args_str): def run_svc_monitor(args=None): - monitor = SvcMonitor(args) + # randomize collector list + args.random_collectors = args.collectors + if args.collectors: + args.random_collectors = random.sample(args.collectors, len(args.collectors)) + + monitor = SvcMonitor(args) monitor._zookeeper_client = _zookeeper_client + monitor._conf_file = args._conf_file + monitor._chksum = "" + if args.collectors: + monitor._chksum = hashlib.md5("".join(args.collectors)).hexdigest() + + """ @sighup + SIGHUP handler to indicate configuration changes + """ + gevent.signal(signal.SIGHUP, monitor.sighup_handler) # Retry till API server is up connected = False diff --git a/src/nodemgr/common/event_manager.py b/src/nodemgr/common/event_manager.py index ac7dad7d0ae..dd7f308d90b 100644 --- a/src/nodemgr/common/event_manager.py +++ b/src/nodemgr/common/event_manager.py @@ -17,6 +17,8 @@ import supervisor.xmlrpc import xmlrpclib import platform +import random +import hashlib from supervisor import childutils from nodemgr.common.event_listener_protocol_nodemgr import \ @@ -670,3 +672,21 @@ def runforever(self, test=False): if headers['eventname'].startswith("TICK_60"): self.do_periodic_events() self.listener_nodemgr.ok(self.stdout) + + def nodemgr_sighup_handler(self): + config = ConfigParser.SafeConfigParser() + config.read(self.config_file) + if 'COLLECTOR' in config.sections(): + try: + collector = config.get('COLLECTOR', 'server_list') + collector_list = collector.split() + except ConfigParser.NoOptionError as e: + pass + + if collector_list: + new_chksum = hashlib.md5("".join(collector_list)).hexdigest() + if new_chksum != self.collector_chksum: + self.collector_chksum = new_chksum + random_collectors = random.sample(collector_list, len(collector_list)) + self.sandesh_global.reconfig_collectors(random_collectors) + #end sighup_handler diff --git a/src/nodemgr/main.py b/src/nodemgr/main.py index 509583ce23f..b57b15edbcb 100755 --- a/src/nodemgr/main.py +++ b/src/nodemgr/main.py @@ -42,6 +42,9 @@ import socket import gevent import ConfigParser +import signal +import random +import hashlib from nodemgr.analytics_nodemgr.analytics_event_manager import AnalyticsEventManager from nodemgr.control_nodemgr.control_event_manager import ControlEventManager from nodemgr.config_nodemgr.config_event_manager import ConfigEventManager @@ -53,7 +56,6 @@ def usage(): print doc sys.exit(255) - def main(args_str=' '.join(sys.argv[1:])): # Parse Arguments node_parser = argparse.ArgumentParser(add_help=False) @@ -149,6 +151,17 @@ def main(args_str=' '.join(sys.argv[1:])): sys.stderr.write("Discovery port: " + str(discovery_port) + "\n") collector_addr = _args.collectors sys.stderr.write("Collector address: " + str(collector_addr) + "\n") + + # randomize collector list + _args.chksum = "" + if _args.collectors: + _args.chksum = hashlib.md5("".join(_args.collectors)).hexdigest() + _args.random_collectors = random.sample(_args.collectors, len(_args.collectors)) + _args.collectors = _args.random_collectors + + collector_addr = _args.collectors + sys.stderr.write("Random Collector address: " + str(collector_addr) + "\n") + if _args.sandesh_send_rate_limit is not None: SandeshSystem.set_sandesh_send_rate_limit(_args.sandesh_send_rate_limit) # done parsing arguments @@ -192,9 +205,18 @@ def main(args_str=' '.join(sys.argv[1:])): else: sys.stderr.write("Node type" + str(node_type) + "is incorrect" + "\n") return + prog.process() prog.send_nodemgr_process_status() prog.send_process_state_db(prog.group_names) + prog.config_file = config_file + prog.collector_chksum = _args.chksum + + """ @sighup + Reconfig of collector list + """ + gevent.signal(signal.SIGHUP, prog.nodemgr_sighup_handler) + gevent.joinall([gevent.spawn(prog.runforever)]) if __name__ == '__main__':