From fc29689147db7fff31b4d23d35c91e1aee88e076 Mon Sep 17 00:00:00 2001 From: Anish Mehta Date: Mon, 27 Jun 2016 22:33:57 -0700 Subject: [PATCH] Scale fixes in Alarmgen. - Timeout for releasing partition ownership is extended from 60s to 120s - Took out printing of UVE Keys in log messages - these lists can get too big - We only process upto 200 UVE updates at at time. Unlimited processing can starve other partitions - Changed log levels of some libpartition logs - Fixed discovery client ID of alarmgen to accomodate multiple alarmgens per analytics node - Increased partitions from 15 to 30 Closes-Bug: 1596795 Change-Id: Id4a8f9a7c411e8e1a9c223afd7be3455abfff855 --- src/analytics/contrail-collector.conf | 1 + src/libpartition/libpartition.py | 8 ++++---- src/opserver/alarmgen.py | 20 ++++++++++++++------ src/opserver/contrail-alarm-gen.conf | 1 + src/opserver/contrail-analytics-api.conf | 1 + src/opserver/partition_handler.py | 4 ++-- 6 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/analytics/contrail-collector.conf b/src/analytics/contrail-collector.conf index 3db491ae751..91fcb51345a 100644 --- a/src/analytics/contrail-collector.conf +++ b/src/analytics/contrail-collector.conf @@ -24,6 +24,7 @@ # List of IP:port for kafka brokers # kafka_broker_list=127.0.0.1:9092 +partitions=30 # IP address of analytics node. Resolved IP of 'hostname' # hostip= diff --git a/src/libpartition/libpartition.py b/src/libpartition/libpartition.py index cc49e7501aa..19be1b1268f 100644 --- a/src/libpartition/libpartition.py +++ b/src/libpartition/libpartition.py @@ -186,13 +186,13 @@ def _acquire_partition_ownership(self): # list of partitions for which locks have to be released release_lock_list = [] - self._logger.error("known servers: %s" % self._con_hash.get_all_nodes()) + self._logger.info("known servers: %s" % self._con_hash.get_all_nodes()) for part in range(0, self._max_partition): if (part in self._target_part_ownership_list): if (part in self._curr_part_ownership_list): # do nothing, I already have ownership of this partition - self._logger.error("No need to acquire ownership of:" + + self._logger.info("No need to acquire ownership of:" + str(part)) else: # I need to acquire lock for this partition before I own @@ -274,13 +274,13 @@ def update_cluster_list(self, cluster_list): deleted_servers = list(set(self._cluster_list).difference( new_cluster_list)) self._cluster_list = set(cluster_list) - self._logger.error("deleted servers:" + str(deleted_servers)) - self._logger.error("new servers:" + str(new_servers)) # update the hash structure if new_servers: + self._logger.error("new servers:" + str(new_servers)) self._con_hash.add_nodes(new_servers) if deleted_servers: + self._logger.error("deleted servers:" + str(deleted_servers)) self._con_hash.del_nodes(deleted_servers) # update target partition ownership list diff --git a/src/opserver/alarmgen.py b/src/opserver/alarmgen.py index baa09cc4d44..21b310eb112 100644 --- a/src/opserver/alarmgen.py +++ b/src/opserver/alarmgen.py @@ -562,7 +562,8 @@ def __init__(self, conf, test_logger=None): self.disc = client.DiscoveryClient( self._conf.discovery()['server'], self._conf.discovery()['port'], - ModuleNames[Module.ALARM_GENERATOR]) + ModuleNames[Module.ALARM_GENERATOR], + '%s-%s' % (self._hostname, self._instance_id)) is_collector = True if test_logger is not None: @@ -1006,9 +1007,16 @@ def run_uve_processing(self): # Allow the partition handlers to queue new UVEs without # interfering with the work of processing the current UVEs - pendingset[part] = copy.deepcopy(self._uveq[part]) - self._uveq[part] = {} - + # Process no more than 200 keys at a time + pendingset[part] = {} + icount = 0 + while (len(self._uveq[part]) > 0) and icount < 200: + kp,vp = self._uveq[part].popitem() + pendingset[part][kp] = vp + icount += 1 + self._logger.info("UVE Process for %d : %d, %d remain" % \ + (part, len(pendingset[part]), len(self._uveq[part]))) + gevs[part] = gevent.spawn(self.handle_uve_notif,part,\ pendingset[part]) if len(gevs): @@ -1486,7 +1494,7 @@ def partition_change(self, partno, enl): ph.start() self._workers[partno] = ph self._uvestats[partno] = {} - tout = 600 + tout = 1200 idx = 0 while idx < tout: # When this partitions starts, @@ -1520,7 +1528,7 @@ def partition_change(self, partno, enl): del self._workers[partno] del self._uvestats[partno] - tout = 600 + tout = 1200 idx = 0 while idx < tout: # When this partitions stop.s diff --git a/src/opserver/contrail-alarm-gen.conf b/src/opserver/contrail-alarm-gen.conf index a01e2abc83f..b08280e6537 100644 --- a/src/opserver/contrail-alarm-gen.conf +++ b/src/opserver/contrail-alarm-gen.conf @@ -7,6 +7,7 @@ log_level = SYS_NOTICE #log_category = log_file = /var/log/contrail/contrail-alarm-gen.log #kafka_broker_list = xx.xx.xx.xx:9092 +partitions=30 #zk_list = xx.xx.xx.xx:2181 [DISCOVERY] diff --git a/src/opserver/contrail-analytics-api.conf b/src/opserver/contrail-analytics-api.conf index 1d94b62499b..a4c954ef493 100644 --- a/src/opserver/contrail-analytics-api.conf +++ b/src/opserver/contrail-analytics-api.conf @@ -12,6 +12,7 @@ log_file = /var/log/contrail/contrail-analytics-api.log # Sandesh send rate limit can be used to throttle system logs transmitted per # second. System logs are dropped if the sending rate is exceeded #sandesh_send_rate_limit = 100 +partitions=30 [DISCOVERY] #disc_server_ip = 127.0.0.1 diff --git a/src/opserver/partition_handler.py b/src/opserver/partition_handler.py index 160e922c6d4..8e5563a5765 100644 --- a/src/opserver/partition_handler.py +++ b/src/opserver/partition_handler.py @@ -750,8 +750,8 @@ def start_partition(self, cbdb): # TODO: for loading only specific types: # uves[kk][typ] = contents - self._logger.error("Starting part %d UVEs %s" % \ - (self._partno, str(uves.keys()))) + self._logger.error("Starting part %d UVEs %d" % \ + (self._partno, len(uves))) self._callback(self._partno, uves) def contents(self):