Skip to content

Commit

Permalink
SIGHUP handler for modules to indicate change of collector list in co…
Browse files Browse the repository at this point in the history
…nfiguration.

changes for api-server, schema-transfomer, svc-monitor, device-manager
and all nodemgrs

Randomize parsed list before connecting to collector. Also handle
SIGHUP for any collector list changes.

Change-Id: I140eba389f08e50b820b46eb371f49ef1f04b960
Partial-Bug:1636319
  • Loading branch information
nipak committed Jan 11, 2017
1 parent 2581b82 commit 9105bf4
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/config/api-server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def parse_args(args_str):
}

config = None
saved_conf_file = args.conf_file
if args.conf_file:
config = ConfigParser.SafeConfigParser({'admin_token': None})
config.read(args.conf_file)
Expand Down Expand Up @@ -372,6 +373,7 @@ def parse_args(args_str):
if type(args_obj.collectors) is str:
args_obj.collectors = args_obj.collectors.split()

args_obj.conf_file = saved_conf_file
return args_obj, remaining_argv
# end parse_args

Expand Down
33 changes: 32 additions & 1 deletion src/config/api-server/vnc_cfg_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import sys
reload(sys)
sys.setdefaultencoding('UTF8')
import ConfigParser
import functools
import hashlib
import logging
import logging.config
import signal
import os
import re
import random
import socket
from cfgm_common import jsonutils as json
from provision_defaults import *
Expand Down Expand Up @@ -1423,6 +1426,14 @@ def __init__(self, args_str=None):
self.route('/aaa-mode', 'GET', self.aaa_mode_http_get)
self.route('/aaa-mode', 'PUT', self.aaa_mode_http_put)

# randomize the collector list
self._random_collectors = self._args.collectors
self._chksum = "";
if self._args.collectors:
self._chksum = hashlib.md5(''.join(self._args.collectors)).hexdigest()
self._random_collectors = random.sample(self._args.collectors, \
len(self._args.collectors))

# Initialize discovery client
self._disc = None
if self._args.disc_server_ip and self._args.disc_server_port:
Expand Down Expand Up @@ -1451,7 +1462,7 @@ def __init__(self, args_str=None):
hostname = socket.gethostname()
self._sandesh.init_generator(module_name, hostname,
node_type_name, instance_id,
self._args.collectors,
self._random_collectors,
'vnc_api_server_context',
int(self._args.http_server_port),
['cfgm_common', 'vnc_cfg_api_server.sandesh'], self._disc,
Expand Down Expand Up @@ -2641,6 +2652,25 @@ def sigchld_handler(self):
def sigterm_handler(self):
exit()

# sighup handler for applying new configs
def sighup_handler(self):
if self._args.conf_file:
config = ConfigParser.SafeConfigParser()
config.read(self._args.conf_file)
if 'DEFAULTS' in config.sections():
try:
collectors = config.get('DEFAULTS', 'collectors')
if type(collectors) is str:
collectors = collectors.split()
new_chksum = hashlib.md5("".join(collectors)).hexdigest()
if new_chksum != self._chksum:
self._chksum = new_chksum
random_collectors = random.sample(collectors, len(collectors))
self._sandesh.reconfig_collectors(random_collectors)
except ConfigParser.NoOptionError as e:
pass
# end sighup_handler

def _load_extensions(self):
try:
conf_sections = self._args.config_sections
Expand Down Expand Up @@ -3643,6 +3673,7 @@ def main(args_str=None, server=None):
"""
#hub.signal(signal.SIGCHLD, vnc_api_server.sigchld_handler)
hub.signal(signal.SIGTERM, vnc_api_server.sigterm_handler)
hub.signal(signal.SIGHUP, vnc_api_server.sighup_handler)
if pipe_start_app is None:
pipe_start_app = vnc_api_server.api_bottle
try:
Expand Down
6 changes: 5 additions & 1 deletion src/config/common/vnc_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def sandesh_init(self):
self.redefine_sandesh_handles()
self._sandesh.init_generator(
self._module_name, self._hostname, self._node_type_name,
self._instance_id, self._args.collectors,
self._instance_id, self._args.random_collectors,
'%s_context' % self.context, int(self._args.http_server_port),
['cfgm_common', '%s.sandesh' % self.module_pkg], self.discovery,
logger_class=self._args.logger_class,
Expand All @@ -137,3 +137,7 @@ def sandesh_init(self):
self._instance_id,
staticmethod(ConnectionState.get_process_state_cb),
NodeStatusUVE, NodeStatus, self.table)

def sandesh_reconfig_collectors(self, args):
self._sandesh.reconfig_collectors(args.random_collectors)
#end sandesh_reconfig_collectors
39 changes: 38 additions & 1 deletion src/config/device-manager/device_manager/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import ConfigParser
import socket
import time
import hashlib
import signal
import random
from pprint import pformat

from pysandesh.sandesh_base import *
Expand Down Expand Up @@ -150,7 +153,15 @@ def __init__(self, args=None):
PushConfigState.set_push_delay_per_kb(float(self._args.push_delay_per_kb))
PushConfigState.set_push_delay_max(int(self._args.push_delay_max))
PushConfigState.set_push_delay_enable(bool(self._args.push_delay_enable))


# randomize collector list
self._args.random_collectors = self._args.collectors
self._chksum = "";
if self._args.collectors:
self._chksum = hashlib.md5(''.join(self._args.collectors)).hexdigest()
self._args.random_collectors = random.sample(self._args.collectors, \
len(self._args.collectors))

# Initialize logger
module = Module.DEVICE_MANAGER
module_pkg = "device_manager"
Expand All @@ -175,6 +186,11 @@ def __init__(self, args=None):
except ResourceExhaustionError: # haproxy throws 503
time.sleep(3)

""" @sighup
Handle of SIGHUP for collector list config change
"""
gevent.signal(signal.SIGHUP, self.sighup_handler)

# Initialize amqp
self._vnc_amqp = DMAmqpHandle(self.logger,
self._REACTION_MAP, self._args)
Expand Down Expand Up @@ -254,6 +270,25 @@ def connection_state_update(self, status, message=None):
self._args.api_server_port)])
# end connection_state_update

# sighup handler for applying new configs
def sighup_handler(self):
if self._args.conf_file:
config = ConfigParser.SafeConfigParser()
config.read(self._args.conf_file)
if 'DEFAULTS' in config.sections():
try:
collectors = config.get('DEFAULTS', 'collectors')
if type(collectors) is str:
collectors = collectors.split()
new_chksum = hashlib.md5("".join(collectors)).hexdigest()
if new_chksum != self._chksum:
self._chksum = new_chksum
config.random_collectors = random.sample(collectors, len(collectors))
self.logger.sandesh_reconfig_collectors(config)
except ConfigParser.NoOptionError as e:
pass
# end sighup_handler

def parse_args(args_str):
'''
Eg. python device_manager.py --rabbit_server localhost
Expand Down Expand Up @@ -347,6 +382,7 @@ def parse_args(args_str):
'cassandra_password': None
}

saved_conf_file = args.conf_file
if args.conf_file:
config = ConfigParser.SafeConfigParser()
config.read(args.conf_file)
Expand Down Expand Up @@ -449,6 +485,7 @@ def parse_args(args_str):
if type(args.collectors) is str:
args.collectors = args.collectors.split()

args.conf_file = saved_conf_file
return args
# end parse_args

Expand Down
46 changes: 44 additions & 2 deletions src/config/schema-transformer/to_bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
sys.setdefaultencoding('UTF8')
import requests
import ConfigParser

import signal
import random
import hashlib
import argparse

from cfgm_common import vnc_cgitb
Expand Down Expand Up @@ -336,6 +338,25 @@ def reset(self):
cls.reset()
self._vnc_amqp.close()
# end reset

def sighup_handler(self):
if self._conf_file:
config = ConfigParser.SafeConfigParser()
config.read(self._conf_file)
if 'DEFAULTS' in config.sections():
try:
collectors = config.get('DEFAULTS', 'collectors')
if type(collectors) is str:
collectors = collectors.split()
new_chksum = hashlib.md5("".join(collectors)).hexdigest()
if new_chksum != self._chksum:
self._chksum = new_chksum
config.random_collectors = random.sample(collectors, len(collectors))
self.logger.sandesh_reconfig_collectors(config)
except ConfigParser.NoOptionError as e:
pass
# end sighup_handler

# end class SchemaTransformer


Expand Down Expand Up @@ -401,6 +422,7 @@ def parse_args(args_str):
'cassandra_password': None,
}

saved_conf_file = args.conf_file
if args.conf_file:
config = ConfigParser.SafeConfigParser()
config.read(args.conf_file)
Expand All @@ -415,7 +437,6 @@ def parse_args(args_str):
if 'CASSANDRA' in config.sections():
cassandraopts.update(dict(config.items('CASSANDRA')))


# Override with CLI options
# Don't surpress add_help here so it will handle -h
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -511,6 +532,7 @@ def _bool(s):
help="Enabled logical routers")

args = parser.parse_args(remaining_argv)
args.conf_file = saved_conf_file
if type(args.cassandra_server_list) is str:
args.cassandra_server_list = args.cassandra_server_list.split()
if type(args.collectors) is str:
Expand All @@ -519,6 +541,8 @@ def _bool(s):
return args
# end parse_args



transformer = None


Expand Down Expand Up @@ -552,8 +576,24 @@ def connection_state_update(status, message=None):
# auth failure or haproxy throws 503
time.sleep(3)

#randomize collector list
args.random_collectors = args.collectors
if args.collectors:
args.random_collectors = random.sample(args.collectors, len(args.collectors))

global transformer
transformer = SchemaTransformer(args)
transformer._conf_file = args.conf_file
transformer._chksum = ""
# checksum of collector list
if args.collectors:
transformer._chksum = hashlib.md5("".join(args.collectors)).hexdigest()

""" @sighup
SIGHUP handler to indicate configuration changes
"""
gevent.signal(signal.SIGHUP, transformer.sighup_handler)

gevent.joinall(transformer._vnc_amqp._vnc_kombu.greenlets())
# end run_schema_transformer

Expand All @@ -562,7 +602,9 @@ def main(args_str=None):
global _zookeeper_client
if not args_str:
args_str = ' '.join(sys.argv[1:])

args = parse_args(args_str)
args._args_list = args_str
if args.cluster_id:
client_pfx = args.cluster_id + '-'
zk_path_pfx = args.cluster_id + '/'
Expand Down
40 changes: 38 additions & 2 deletions src/config/svc-monitor/svc_monitor/svc_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import ConfigParser
import cStringIO
import argparse
import signal
import random
import hashlib

import os

Expand Down Expand Up @@ -522,7 +525,24 @@ def reset():
for cls in DBBaseSM.get_obj_type_map().values():
cls.reset()


def sighup_handler(self):
if self._conf_file:
config = ConfigParser.SafeConfigParser()
config.read(self._conf_file)
if 'DEFAULTS' in config.sections():
try:
collectors = config.get('DEFAULTS', 'collectors')
if type(collectors) is str:
collectors = collectors.split()
new_chksum = hashlib.md5("".join(collectors)).hexdigest()
if new_chksum != self._chksum:
self._chksum = new_chksum
config.random_collectors = random.sample(collectors, len(collectors))
self.logger.sandesh_reconfig_collectors(config)
except ConfigParser.NoOptionError as e:
pass
# end sighup_handler

def skip_check_service(si):
# wait for first launch
if not si.launch_count:
Expand Down Expand Up @@ -727,6 +747,7 @@ def parse_args(args_str):
'cassandra_password': None,
}

saved_conf_file = args.conf_file
config = ConfigParser.SafeConfigParser()
if args.conf_file:
config.read(args.conf_file)
Expand Down Expand Up @@ -823,6 +844,7 @@ def parse_args(args_str):
help="Check service interval")

args = parser.parse_args(remaining_argv)
args._conf_file = saved_conf_file
args.config_sections = config
if type(args.cassandra_server_list) is str:
args.cassandra_server_list = args.cassandra_server_list.split()
Expand All @@ -839,9 +861,23 @@ def parse_args(args_str):


def run_svc_monitor(args=None):
monitor = SvcMonitor(args)

# randomize collector list
args.random_collectors = args.collectors
if args.collectors:
args.random_collectors = random.sample(args.collectors, len(args.collectors))

monitor = SvcMonitor(args)
monitor._zookeeper_client = _zookeeper_client
monitor._conf_file = args._conf_file
monitor._chksum = ""
if args.collectors:
monitor._chksum = hashlib.md5("".join(args.collectors)).hexdigest()

""" @sighup
SIGHUP handler to indicate configuration changes
"""
gevent.signal(signal.SIGHUP, monitor.sighup_handler)

# Retry till API server is up
connected = False
Expand Down

0 comments on commit 9105bf4

Please sign in to comment.