Skip to content

Commit

Permalink
Merge "Adding some checks in Alarmgen to detect failures"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed May 17, 2016
2 parents 12325b9 + f7b0082 commit d052799
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/opserver/alarmgen.py
Expand Up @@ -961,16 +961,20 @@ def run_uve_processing(self):
password=self._conf.redis_password(),
db=7)
self.reconnect_agg_uve(lredis)
ConnectionState.update(conn_type = ConnectionType.REDIS_UVE,
name = 'AggregateRedis', status = ConnectionStatus.UP)
else:
if not lredis.exists(self._moduleid+':'+self._instance_id):
self._logger.error('Identified redis restart')
self.reconnect_agg_uve(lredis)
gevs = {}
pendingset = {}
kafka_topic_down = False
for part in self._uveq.keys():
if not len(self._uveq[part]):
continue
self._logger.info("UVE Process for %d" % part)
kafka_topic_down |= self._workers[part].failed()

# Allow the partition handlers to queue new UVEs without
# interfering with the work of processing the current UVEs
Expand All @@ -979,6 +983,13 @@ def run_uve_processing(self):

gevs[part] = gevent.spawn(self.handle_uve_notif,part,\
pendingset[part])
if kafka_topic_down:
ConnectionState.update(conn_type = ConnectionType.KAFKA_PUB,
name = 'KafkaTopic', status = ConnectionStatus.DOWN)
else:
ConnectionState.update(conn_type = ConnectionType.KAFKA_PUB,
name = 'KafkaTopic', status = ConnectionStatus.UP)

if len(gevs):
gevent.joinall(gevs.values())
for part in gevs.keys():
Expand Down Expand Up @@ -1031,6 +1042,8 @@ def run_uve_processing(self):
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
lredis = None
ConnectionState.update(conn_type = ConnectionType.REDIS_UVE,
name = 'AggregateRedis', status = ConnectionStatus.DOWN)
gevent.sleep(1)

curr = time.time()
Expand Down
7 changes: 7 additions & 0 deletions src/opserver/partition_handler.py
Expand Up @@ -559,6 +559,10 @@ def __init__(self, brokers, group, topic, logger, limit):
self._uvedb = {}
self._partoffset = 0
self._kfk = None
self._failed = False

def failed(self):
return self._failed

def msg_handler(self, mlist):
self._logger.info("%s Reading %s" % (self._topic, str(mlist)))
Expand All @@ -574,6 +578,7 @@ def _run(self):
pause = False
self._logger.error("New KafkaClient %s" % self._topic)
self._kfk = KafkaClient(self._brokers , "kc-" + self._topic)
self._failed = False
try:
consumer = SimpleConsumer(self._kfk, self._group, self._topic, buffer_size = 4096*4, max_buffer_size=4096*32)
#except:
Expand All @@ -582,6 +587,7 @@ def _run(self):
messag = template.format(type(ex).__name__, ex.args)
self._logger.error("Error: %s trace %s" % \
(messag, traceback.format_exc()))
self._failed = True
raise RuntimeError(messag)

self._logger.error("Starting %s" % self._topic)
Expand Down Expand Up @@ -631,6 +637,7 @@ def _run(self):
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
self.stop_partition()
self._failed = True
pause = True

self._logger.error("Stopping %s pcount %d" % (self._topic, pcount))
Expand Down

0 comments on commit d052799

Please sign in to comment.