Skip to content

Commit

Permalink
Merge "Scaling fixes for alarmgen: Remove prints of keys. Stop partit…
Browse files Browse the repository at this point in the history
…ions in parallel Write UVE Agg in parallel Increase size of UVEQ trace buffer Add more alarmgen instances on node Address alarmgen memory leak, by removing kafka commit Closes-Bug: 1632836 Change-Id: Ia8f883ed359c6b0173e9264ccb327ea8c103fcef"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Oct 18, 2016
2 parents 5eecdb5 + f551f5c commit b699de3
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 113 deletions.
233 changes: 136 additions & 97 deletions src/opserver/alarmgen.py
Expand Up @@ -20,6 +20,8 @@
import logging
logging.getLogger('kafka').addHandler(logging.StreamHandler())
logging.getLogger('kafka').setLevel(logging.WARNING)
logging.getLogger('kazoo').addHandler(logging.StreamHandler())
logging.getLogger('kazoo').setLevel(logging.WARNING)
try:
from collections import OrderedDict
except ImportError:
Expand Down Expand Up @@ -885,7 +887,7 @@ def __init__(self, conf, test_logger=None):
self.trace_buf = [
{'name':'DiscoveryMsg', 'size':1000},
{'name':'AlarmStateChangeTrace', 'size':1000},
{'name':'UVEQTrace', 'size':10000}
{'name':'UVEQTrace', 'size':20000}
]
# Create trace buffers
for buf in self.trace_buf:
Expand Down Expand Up @@ -988,20 +990,31 @@ def libpart_cb(self, part_list):
newset = set(part_list)
oldset = self._partset
self._partset = newset

self._logger.error('Partition List : new %s old %s' % \
(str(newset),str(oldset)))

for addpart in (newset-oldset):
self._logger.error('Partition Add : %s' % addpart)
self.partition_change(addpart, True)

for delpart in (oldset-newset):
self._logger.error('Partition Del : %s' % delpart)
if not self.partition_change(delpart, False):
self._logger.error('Partition Del : %s failed!' % delpart)

try:
self._logger.error('Partition List : new %s old %s' % \
(str(newset),str(oldset)))

self._logger.error('Partition Add : %s' % str(newset-oldset))
self.partition_change(newset-oldset, True)

self._logger.error('Partition Del : %s' % str(oldset-newset))
if not self.partition_change(oldset-newset, False):
self._logger.error('Partition Del : %s failed!' % str(oldset-newset))
raise SystemExit

self._logger.error('Partition Del done: %s' % str(oldset-newset))

except Exception as ex:
template = "Exception {0} in Partition List. Arguments:\n{1!r}"
messag = template.format(type(ex).__name__, ex.args)
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
self._logger.error('Partition List failed %s %s' % \
(str(newset),str(oldset)))
except SystemExit:
raise SystemExit

self._logger.error('Partition List done : new %s old %s' % \
(str(newset),str(oldset)))

Expand Down Expand Up @@ -1246,6 +1259,41 @@ def run_alarm_timers(self, curr_time):
for alarm in update_alarms:
self.send_alarm_update(alarm[0], alarm[1])

def run_uve_agg(self, lredis, outp, part, acq_time):
# Write the aggregate UVE for all UVE updates for the
# given partition
rows = []
for ku,vu in outp.iteritems():
if vu is None:
# This message has no type!
# Its used to indicate a delete of the entire UVE
rows.append(OutputRow(key=ku, typ=None, val=None))
if len(rows) >= self._max_out_rows:
self.send_agg_uve(lredis,
self._instance_id,
part,
acq_time,
rows)
rows[:] = []
continue
for kt,vt in vu.iteritems():
rows.append(OutputRow(key=ku, typ=kt, val=vt))
if len(rows) >= self._max_out_rows:
self.send_agg_uve(lredis,
self._instance_id,
part,
acq_time,
rows)
rows[:] = []
# Flush all remaining rows
if len(rows):
self.send_agg_uve(lredis,
self._instance_id,
part,
acq_time,
rows)
rows[:] = []

def run_uve_processing(self):
"""
This function runs in its own gevent, and provides state compression
Expand Down Expand Up @@ -1327,7 +1375,7 @@ def run_uve_processing(self):
self._logger.info("UVE Process for %d : %d, %d remain" % \
(part, len(pendingset[part]), len(self._uveq[part])))

gevs[part] = gevent.spawn(self.handle_uve_notif,part,\
gevs[part] = gevent.spawn(self.handle_uve_notif, part,\
pendingset[part])
if kafka_topic_down:
ConnectionState.update(conn_type = ConnectionType.KAFKA_PUB,
Expand All @@ -1338,56 +1386,39 @@ def run_uve_processing(self):

if len(gevs):
gevent.joinall(gevs.values())
self._logger.info("UVE Processing joined")
outp={}
gevs_out={}
for part in gevs.keys():
# If UVE processing failed, requeue the working set
try:
outp = gevs[part].get()
outp[part] = gevs[part].get()
except Exception as ex:
template = "Exception {0} in notif worker. Arguments:\n{1!r}"
messag = template.format(type(ex).__name__, ex.args)
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
outp = None
if outp is None:
outp[part] = None
if outp[part] is None:
self._logger.error("UVE Process failed for %d" % part)
self.handle_uve_notifq(part, pendingset[part])
elif not part in self._workers:
outp[part] = None
self._logger.error(
"Part %d is gone, cannot process UVEs" % part)
else:
acq_time = self._workers[part].acq_time()
if len(outp):
rows = []
for ku,vu in outp.iteritems():
if vu is None:
# This message has no type!
# Its used to indicate a delete of the entire UVE
rows.append(OutputRow(key=ku, typ=None, val=None))
if len(rows) >= self._max_out_rows:
self.send_agg_uve(lredis,
self._instance_id,
part,
acq_time,
rows)
rows[:] = []
continue
for kt,vt in vu.iteritems():
rows.append(OutputRow(key=ku, typ=kt, val=vt))
if len(rows) >= self._max_out_rows:
self.send_agg_uve(lredis,
self._instance_id,
part,
acq_time,
rows)
rows[:] = []
# Flush all remaining rows
if len(rows):
self.send_agg_uve(lredis,
self._instance_id,
part,
acq_time,
rows)
rows[:] = []
self._logger.info("UVE Agg on %d items in part %d" % \
(len(outp), part))
gevs_out[part] = gevent.spawn(self.run_uve_agg, lredis,\
outp[part], part, self._workers[part].acq_time())

if len(gevs_out):
gevent.joinall(gevs_out.values())

# Check for exceptions during processing
for part in gevs_out.keys():
gevs_out[part].get()

# If there are alarm config changes, then start a gevent per
# partition to process the alarm config changes
if self._alarm_config_change_map:
Expand All @@ -1401,6 +1432,7 @@ def run_uve_processing(self):
alarm_config_change_map)
if alarm_workers:
gevent.joinall(alarm_workers.values())

except Exception as ex:
template = "Exception {0} in uve proc. Arguments:\n{1!r}"
messag = template.format(type(ex).__name__, ex.args)
Expand All @@ -1420,8 +1452,9 @@ def run_uve_processing(self):
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
raise SystemExit
if (curr - prev) < 0.5:
gevent.sleep(0.5 - (curr - prev))
if (curr - prev) < 1:
gevent.sleep(1 - (curr - prev))
self._logger.info("UVE Done")
else:
self._logger.info("UVE Process saturated")
gevent.sleep(0)
Expand Down Expand Up @@ -1535,6 +1568,7 @@ def stop_uve_partition(self, part):
if not part in self.ptab_info:
return
for tk in self.ptab_info[part].keys():
tcount = len(self.ptab_info[part][tk])
for rkey in self.ptab_info[part][tk].keys():
uk = tk + ":" + rkey
if tk in self.tab_alarms:
Expand All @@ -1548,7 +1582,8 @@ def stop_uve_partition(self, part):
(alarm_msg.log()))
alarm_msg.send(sandesh=self._sandesh)
del self.ptab_info[part][tk][rkey]
self._logger.error("UVE %s deleted in stop" % (uk))
self._logger.error("UVE stop removed %d UVEs of type %s" % \
(tcount, tk))
del self.ptab_info[part][tk]
del self.ptab_info[part]

Expand Down Expand Up @@ -1735,6 +1770,7 @@ def handle_uve_notif(self, part, uves):
uveq_trace.oper = "proc-output"
uveq_trace.trace_msg(name="UVEQTrace",\
sandesh=self._sandesh)
self._logger.info("Ending UVE proc for part %d" % part)
return output
else:
uveq_trace = UVEQTrace()
Expand All @@ -1743,6 +1779,7 @@ def handle_uve_notif(self, part, uves):
uveq_trace.oper = "proc-error"
uveq_trace.trace_msg(name="UVEQTrace",\
sandesh=self._sandesh)
self._logger.info("Ending UVE proc for part %d with error" % part)
return None

def handle_UVETableInfoReq(self, req):
Expand Down Expand Up @@ -1842,92 +1879,94 @@ def handle_AlarmConfigRequest(self, req):
res.response(req.context())
# end handle_AlarmConfigRequest

def partition_change(self, partno, enl):
def partition_change(self, parts, enl):
"""
Call this function when getting or giving up
ownership of a partition
Args:
partno : Partition Number
enl : True for acquiring, False for giving up
parts : Set of Partition Numbers, or a single partition
enl : True for acquiring, False for giving up
Returns:
status of operation (True for success)
"""
if not isinstance(parts,set):
parts = set([parts])
status = False
if enl:
if partno in self._workers:
self._logger.info("Dup partition %d" % partno)
if len(parts - set(self._workers.keys())) != len(parts):
self._logger.info("Dup partitions %s" % \
str(parts.intersection(set(self._workers.keys()))))
else:
cdisc = None
if self.disc:
cdisc = client.DiscoveryClient(
self._conf.discovery()['server'],
self._conf.discovery()['port'],
ModuleNames[Module.ALARM_GENERATOR],
'%s-%s-%d' % (self._hostname,
self._instance_id, partno))
ph = UveStreamProc(','.join(self._conf.kafka_broker_list()),
partno, self._conf.kafka_prefix()+"-uve-" + str(partno),
self._logger,
self.handle_uve_notifq, self._conf.host_ip(),
self.handle_resource_check,
self._instance_id,
self._conf.redis_server_port(),
self._conf.kafka_prefix()+"-workers")
ph.start()
self._workers[partno] = ph
self._uvestats[partno] = {}
for partno in parts:
ph = UveStreamProc(','.join(self._conf.kafka_broker_list()),
partno, self._conf.kafka_prefix()+"-uve-" + str(partno),
self._logger,
self.handle_uve_notifq, self._conf.host_ip(),
self.handle_resource_check,
self._instance_id,
self._conf.redis_server_port(),
self._conf.kafka_prefix()+"-workers")
ph.start()
self._workers[partno] = ph
self._uvestats[partno] = {}

tout = 1200
idx = 0
while idx < tout:
# When this partitions starts,
# uveq will get created
if partno not in self._uveq:
if len(parts - set(self._uveq.keys())) != 0:
gevent.sleep(.1)
else:
break
idx += 1
if partno in self._uveq:
if len(parts - set(self._uveq.keys())) == 0:
status = True
else:
# TODO: The partition has not started yet,
# but it still might start later.
# We possibly need to exit
status = False
self._logger.error("Unable to start partition %d" % partno)
self._logger.error("Unable to start partitions %s" % \
str(parts - set(self._uveq.keys())))
else:
if partno in self._workers:
ph = self._workers[partno]
self._logger.error("Kill part %s" % str(partno))
ph.kill(timeout=60)
try:
res,db = ph.get(False)
except gevent.Timeout:
self._logger.error("Unable to kill partition %d" % partno)
return False

self._logger.error("Returned " + str(res))
self._uveqf[partno] = self._workers[partno].acq_time()
del self._workers[partno]
del self._uvestats[partno]
if len(parts - set(self._workers.keys())) == 0:
for partno in parts:
ph = self._workers[partno]
self._logger.error("Kill part %s" % str(partno))
ph.kill(timeout=60)
try:
res,db = ph.get(False)
except gevent.Timeout:
self._logger.error("Unable to kill partition %d" % partno)
return False

self._logger.error("Returned " + str(res))
self._uveqf[partno] = self._workers[partno].acq_time()
del self._workers[partno]
del self._uvestats[partno]

tout = 1200
idx = 0
self._logger.error("Wait for parts %s to exit" % str(parts))
while idx < tout:
# When this partitions stop.s
# uveq will get destroyed
if partno in self._uveq:
if len(parts - set(self._uveq.keys())) != len(parts):
gevent.sleep(.1)
else:
break
idx += 1
if partno not in self._uveq:
if len(parts - set(self._uveq.keys())) == len(parts):
status = True
self._logger.error("Wait done for parts %s to exit" % str(parts))
else:
# TODO: The partition has not stopped yet
# but it still might stop later.
# We possibly need to exit
status = False
self._logger.error("Unable to stop partition %d" % partno)
self._logger.error("Unable to stop partitions %s" % \
str(parts.intersection(set(self._uveq.keys()))))
else:
self._logger.info("No partition %d" % partno)

Expand Down

0 comments on commit b699de3

Please sign in to comment.