Skip to content

Commit

Permalink
Merge "Scale fixes in Alarmgen. - Timeout for releasing partition own…
Browse files Browse the repository at this point in the history
…ership is extended from 60s to 120s - Took out printing of UVE Keys in log messages - these lists can get too big - We only process upto 200 UVE updates at at time. Unlimited processing can starve other partitions - Changed log levels of some libpartition logs - Fixed discovery client ID of alarmgen to accomodate multiple alarmgens per analytics node - Increased partitions from 15 to 30 Closes-Bug: 1596795 Change-Id: Id4a8f9a7c411e8e1a9c223afd7be3455abfff855" into R3.0
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jul 1, 2016
2 parents 84bbd2f + fc29689 commit 5d62a0a
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/analytics/contrail-collector.conf
Expand Up @@ -24,6 +24,7 @@

# List of IP:port for kafka brokers
# kafka_broker_list=127.0.0.1:9092
partitions=30

# IP address of analytics node. Resolved IP of 'hostname'
# hostip=
Expand Down
8 changes: 4 additions & 4 deletions src/libpartition/libpartition.py
Expand Up @@ -186,13 +186,13 @@ def _acquire_partition_ownership(self):
# list of partitions for which locks have to be released
release_lock_list = []

self._logger.error("known servers: %s" % self._con_hash.get_all_nodes())
self._logger.info("known servers: %s" % self._con_hash.get_all_nodes())

for part in range(0, self._max_partition):
if (part in self._target_part_ownership_list):
if (part in self._curr_part_ownership_list):
# do nothing, I already have ownership of this partition
self._logger.error("No need to acquire ownership of:" +
self._logger.info("No need to acquire ownership of:" +
str(part))
else:
# I need to acquire lock for this partition before I own
Expand Down Expand Up @@ -274,13 +274,13 @@ def update_cluster_list(self, cluster_list):
deleted_servers = list(set(self._cluster_list).difference(
new_cluster_list))
self._cluster_list = set(cluster_list)
self._logger.error("deleted servers:" + str(deleted_servers))
self._logger.error("new servers:" + str(new_servers))

# update the hash structure
if new_servers:
self._logger.error("new servers:" + str(new_servers))
self._con_hash.add_nodes(new_servers)
if deleted_servers:
self._logger.error("deleted servers:" + str(deleted_servers))
self._con_hash.del_nodes(deleted_servers)

# update target partition ownership list
Expand Down
20 changes: 14 additions & 6 deletions src/opserver/alarmgen.py
Expand Up @@ -565,7 +565,8 @@ def __init__(self, conf, test_logger=None):
self.disc = client.DiscoveryClient(
self._conf.discovery()['server'],
self._conf.discovery()['port'],
ModuleNames[Module.ALARM_GENERATOR])
ModuleNames[Module.ALARM_GENERATOR],
'%s-%s' % (self._hostname, self._instance_id))

is_collector = True
if test_logger is not None:
Expand Down Expand Up @@ -1009,9 +1010,16 @@ def run_uve_processing(self):

# Allow the partition handlers to queue new UVEs without
# interfering with the work of processing the current UVEs
pendingset[part] = copy.deepcopy(self._uveq[part])
self._uveq[part] = {}

# Process no more than 200 keys at a time
pendingset[part] = {}
icount = 0
while (len(self._uveq[part]) > 0) and icount < 200:
kp,vp = self._uveq[part].popitem()
pendingset[part][kp] = vp
icount += 1
self._logger.info("UVE Process for %d : %d, %d remain" % \
(part, len(pendingset[part]), len(self._uveq[part])))

gevs[part] = gevent.spawn(self.handle_uve_notif,part,\
pendingset[part])
if len(gevs):
Expand Down Expand Up @@ -1489,7 +1497,7 @@ def partition_change(self, partno, enl):
ph.start()
self._workers[partno] = ph
self._uvestats[partno] = {}
tout = 600
tout = 1200
idx = 0
while idx < tout:
# When this partitions starts,
Expand Down Expand Up @@ -1523,7 +1531,7 @@ def partition_change(self, partno, enl):
del self._workers[partno]
del self._uvestats[partno]

tout = 600
tout = 1200
idx = 0
while idx < tout:
# When this partitions stop.s
Expand Down
1 change: 1 addition & 0 deletions src/opserver/contrail-alarm-gen.conf
Expand Up @@ -7,6 +7,7 @@ log_level = SYS_NOTICE
#log_category =
log_file = /var/log/contrail/contrail-alarm-gen.log
#kafka_broker_list = xx.xx.xx.xx:9092
partitions=30
#zk_list = xx.xx.xx.xx:2181

[DISCOVERY]
Expand Down
1 change: 1 addition & 0 deletions src/opserver/contrail-analytics-api.conf
Expand Up @@ -12,6 +12,7 @@ log_file = /var/log/contrail/contrail-analytics-api.log
# Sandesh send rate limit can be used to throttle system logs transmitted per
# second. System logs are dropped if the sending rate is exceeded
#sandesh_send_rate_limit = 100
partitions=30

[DISCOVERY]
#disc_server_ip = 127.0.0.1
Expand Down
4 changes: 2 additions & 2 deletions src/opserver/partition_handler.py
Expand Up @@ -750,8 +750,8 @@ def start_partition(self, cbdb):
# TODO: for loading only specific types:
# uves[kk][typ] = contents

self._logger.error("Starting part %d UVEs %s" % \
(self._partno, str(uves.keys())))
self._logger.error("Starting part %d UVEs %d" % \
(self._partno, len(uves)))
self._callback(self._partno, uves)

def contents(self):
Expand Down

0 comments on commit 5d62a0a

Please sign in to comment.