Skip to content

Commit

Permalink
Merge "Added unit-tests for alarmgen"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jul 2, 2015
2 parents ca3bfcf + ccc677e commit d6ac139
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/libpartition/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ for file in local_sources:
sdist_depends = [setup_sources_rules, local_sources_rules]

cd_cmd = 'cd ' + Dir('.').path + ' && '
sdist_gen = PartEnv.Command('dist', 'setup.py', cd_cmd + 'python setup.py sdist')
sdist_gen = PartEnv.Command('dist/libpartition-0.1dev.tar.gz', 'setup.py', cd_cmd + 'python setup.py sdist')
# install everything before building distribution
PartEnv.Depends(sdist_gen, sdist_depends)

Expand Down
89 changes: 48 additions & 41 deletions src/opserver/alarmgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
from opserver_util import ServicePoller
from stevedore import hook
from pysandesh.util import UTCTimestampUsec
from libpartition.libpartition import PartitionClient
import discoveryclient.client as client

class AGTabStats(object):
""" This class is used to store per-UVE-table information
Expand Down Expand Up @@ -171,41 +173,48 @@ def unchanged(self):

class Controller(object):

@staticmethod
def fail_cb(manager, entrypoint, exception):
sandesh_global._logger.info("Load failed for %s with exception %s" % \
def fail_cb(self, manager, entrypoint, exception):
self._sandesh._logger.info("Load failed for %s with exception %s" % \
(str(entrypoint),str(exception)))

def __init__(self, conf):
def __init__(self, conf, test_logger=None):
self._conf = conf
module = Module.ALARM_GENERATOR
self._moduleid = ModuleNames[module]
node_type = Module2NodeType[module]
self._node_type_name = NodeTypeNames[node_type]
self._hostname = socket.gethostname()
self._instance_id = self._conf.worker_id()
sandesh_global.init_generator(self._moduleid, self._hostname,
is_collector = True
if test_logger is not None:
is_collector = False
self._sandesh = Sandesh()
self._sandesh.init_generator(self._moduleid, self._hostname,
self._node_type_name, self._instance_id,
self._conf.collectors(),
self._node_type_name,
self._conf.http_port(),
['opserver.sandesh', 'sandesh'],
host_ip=self._conf.host_ip())
sandesh_global.set_logging_params(
enable_local_log=self._conf.log_local(),
category=self._conf.log_category(),
level=self._conf.log_level(),
file=self._conf.log_file(),
enable_syslog=self._conf.use_syslog(),
syslog_facility=self._conf.syslog_facility())
self._logger = sandesh_global._logger
host_ip=self._conf.host_ip(),
connect_to_collector = is_collector)
if test_logger is not None:
self._logger = test_logger
else:
self._sandesh.set_logging_params(
enable_local_log=self._conf.log_local(),
category=self._conf.log_category(),
level=self._conf.log_level(),
file=self._conf.log_file(),
enable_syslog=self._conf.use_syslog(),
syslog_facility=self._conf.syslog_facility())
self._logger = self._sandesh._logger
# Trace buffer list
self.trace_buf = [
{'name':'DiscoveryMsg', 'size':1000}
]
# Create trace buffers
for buf in self.trace_buf:
sandesh_global.trace_buffer_create(name=buf['name'], size=buf['size'])
self._sandesh.trace_buffer_create(name=buf['name'], size=buf['size'])

tables = [ "ObjectCollectorInfo",
"ObjectDatabaseInfo",
Expand All @@ -223,7 +232,7 @@ def __init__(self, conf):
name=table,
invoke_on_load=True,
invoke_args=(),
on_load_failure_callback=Controller.fail_cb
on_load_failure_callback=self.fail_cb
)

for extn in self.mgrs[table][table]:
Expand All @@ -233,7 +242,7 @@ def __init__(self, conf):
self.tab_alarms[table] = {}
self.tab_perf[table] = AGTabStats()

ConnectionState.init(sandesh_global, self._hostname, self._moduleid,
ConnectionState.init(self._sandesh, self._hostname, self._moduleid,
self._instance_id,
staticmethod(ConnectionState.get_process_state_cb),
NodeStatusUVE, NodeStatus)
Expand All @@ -249,7 +258,6 @@ def __init__(self, conf):
self._libpart = None
self._partset = set()
if self._conf.discovery()['server']:
import discoveryclient.client as client
data = {
'ip-address': self._hostname ,
'port': self._instance_id
Expand Down Expand Up @@ -294,7 +302,7 @@ def libpart_cb(self, part_list):
agp.inst_parts = [agpi]

agp_trace = AlarmgenPartitionTrace(data=agp)
agp_trace.send()
agp_trace.send(sandesh=self._sandesh)

newset = set(part_list)
oldset = self._partset
Expand Down Expand Up @@ -322,7 +330,6 @@ def start_libpart(self, ag_list):
self._logger.error('Could not import libpartition: No alarmgen list')
return None
try:
from libpartition.libpartition import PartitionClient
self._logger.error('Starting PC')
agpi = AlarmgenPartionInfo()
agpi.instance = self._instance_id
Expand All @@ -333,7 +340,7 @@ def start_libpart(self, ag_list):
agp.inst_parts = [agpi]

agp_trace = AlarmgenPartitionTrace(data=agp)
agp_trace.send()
agp_trace.send(sandesh=self._sandesh)

pc = PartitionClient("alarmgen",
self._libpart_name, ag_list,
Expand Down Expand Up @@ -415,7 +422,7 @@ def stop_uve_partition(self, part):
alarm_msg = AlarmTrace(data=ustruct, table=tk)
self._logger.error('send del alarm for stop: %s' % \
(alarm_msg.log()))
alarm_msg.send()
alarm_msg.send(sandesh=self._sandesh)
del self.ptab_info[part][tk][rkey]
self._logger.error("UVE %s deleted in stop" % (uk))
del self.ptab_info[part][tk]
Expand Down Expand Up @@ -513,7 +520,7 @@ def handle_uve_notif(self, part, uves):
ustruct = UVEAlarms(name = uve_name, deleted = True)
alarm_msg = AlarmTrace(data=ustruct, table=tab)
self._logger.info('send del alarm: %s' % (alarm_msg.log()))
alarm_msg.send()
alarm_msg.send(sandesh=self._sandesh)
continue

# Handing Alarms
Expand Down Expand Up @@ -576,7 +583,7 @@ def handle_uve_notif(self, part, uves):
deleted = False)
alarm_msg = AlarmTrace(data=ustruct, table=tab)
self._logger.info('send alarm: %s' % (alarm_msg.log()))
alarm_msg.send()
alarm_msg.send(sandesh=self._sandesh)
return success

def handle_UVETableInfoReq(self, req):
Expand Down Expand Up @@ -763,16 +770,16 @@ def process_stats(self):
ukc.key = uk
ukc.count = uc
au_keys.append(ukc)
au_obj = AlarmgenUpdate(name=sandesh_global._source + ':' + \
sandesh_global._node_type + ':' + \
sandesh_global._module + ':' + \
sandesh_global._instance_id,
au_obj = AlarmgenUpdate(name=self._sandesh._source + ':' + \
self._sandesh._node_type + ':' + \
self._sandesh._module + ':' + \
self._sandesh._instance_id,
partition = pk,
table = ktab,
keys = au_keys,
notifs = None)
self._logger.debug('send key stats: %s' % (au_obj.log()))
au_obj.send()
au_obj.send(sandesh=self._sandesh)

for ktab,tab in din.iteritems():
au_notifs = []
Expand All @@ -785,16 +792,16 @@ def process_stats(self):
tkc.generator = kgen
tkc.collector = kcoll
au_notifs.append(tkc)
au_obj = AlarmgenUpdate(name=sandesh_global._source + ':' + \
sandesh_global._node_type + ':' + \
sandesh_global._module + ':' + \
sandesh_global._instance_id,
au_obj = AlarmgenUpdate(name=self._sandesh._source + ':' + \
self._sandesh._node_type + ':' + \
self._sandesh._module + ':' + \
self._sandesh._instance_id,
partition = pk,
table = ktab,
keys = None,
notifs = au_notifs)
self._logger.debug('send notif stats: %s' % (au_obj.log()))
au_obj.send()
au_obj.send(sandesh=self._sandesh)

au = AlarmgenStatus()
au.name = self._hostname
Expand All @@ -807,15 +814,15 @@ def process_stats(self):
ags.updates = n_updates
au.counters.append(ags)

agname = sandesh_global._source + ':' + \
sandesh_global._node_type + ':' + \
sandesh_global._module + ':' + \
sandesh_global._instance_id
agname = self._sandesh._source + ':' + \
self._sandesh._node_type + ':' + \
self._sandesh._module + ':' + \
self._sandesh._instance_id
au.alarmgens.append(agname)

atrace = AlarmgenStatusTrace(data = au)
self._logger.debug('send alarmgen status : %s' % (atrace.log()))
atrace.send()
atrace.send(sandesh=self._sandesh)

def handle_PartitionStatusReq(self, req):
''' Return the entire contents of the UVE DB for the
Expand Down Expand Up @@ -920,7 +927,7 @@ def run(self):
mod_cpu_state.module_cpu_info = [mod_cpu_info]

alarmgen_cpu_state_trace = ModuleCpuStateTrace(data=mod_cpu_state)
alarmgen_cpu_state_trace.send()
alarmgen_cpu_state_trace.send(sandesh=self._sandesh)

aly_cpu_state = AnalyticsCpuState()
aly_cpu_state.name = self._hostname
Expand All @@ -934,7 +941,7 @@ def run(self):
aly_cpu_state.cpu_info = [aly_cpu_info]

aly_cpu_state_trace = AnalyticsCpuStateTrace(data=aly_cpu_state)
aly_cpu_state_trace.send()
aly_cpu_state_trace.send(sandesh=self._sandesh)

# Send out the UVEKey-Count stats for this time period
self.process_stats()
Expand Down
35 changes: 25 additions & 10 deletions src/opserver/partition_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, brokers, partition, group, topic, logger, limit):
self._logger = logger
self._limit = limit
self._uvedb = {}
self._partoffset = None
self._partoffset = 0
self._kfk = None

def msg_handler(self, om):
Expand Down Expand Up @@ -89,21 +89,36 @@ def _run(self):
self._logger.info("%d could not handle %s" % (self._partition, str(mm)))
raise gevent.GreenletExit
except TypeError as ex:
self._logger.info("Type Error: %s" % str(ex.args))
self._logger.error("Type Error: %s trace %s" % \
(str(ex.args), traceback.format_exc()))
gevent.sleep(0.1)
except common.FailedPayloadsError as ex:
self._logger.info("Payload Error: %s" % str(ex.args))
self._logger.error("Payload Error: %s" % str(ex.args))
gevent.sleep(0.1)
except gevent.GreenletExit:
break
except AssertionError as ex:
self._partoffset = ex
break
except Exception as ex:
template = "An exception of type {0} occured. Arguments:\n{1!r}"
messag = template.format(type(ex).__name__, ex.args)
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
self.stop_partition()
gevent.sleep(2)
partdb = copy.deepcopy(self._uvedb)

partdb = {}
for coll in self._uvedb.keys():
partdb[coll] = {}
for gen in self._uvedb[coll].keys():
partdb[coll][gen] = {}
for tab in self._uvedb[coll][gen].keys():
for rkey in self._uvedb[coll][gen][tab].keys():
uk = tab + ":" + rkey
partdb[coll][gen][uk] = \
set(self._uvedb[coll][gen][tab][rkey].keys())

self._logger.error("Stopping %d pcount %d" % (self._partition, pcount))
self.stop_partition()
return self._partoffset, partdb
Expand Down Expand Up @@ -135,8 +150,8 @@ def resource_check(self):
This function compares the known collectors with the
list from discovery, and syncs UVE keys accordingly
'''

disc_instances = copy.deepcopy(self._us.redis_instances())
us_redis_inst = self._us.redis_instances()
disc_instances = copy.deepcopy(us_redis_inst)
r_added = disc_instances - self.disc_rset
r_deleted = self.disc_rset - disc_instances
for r_inst in r_deleted:
Expand All @@ -146,9 +161,11 @@ def resource_check(self):
self._logger.error("Part %d lost collector %s" % coll)
self.stop_partition(coll)
for r_inst in r_added:
self._logger.error("Part %d discovered new redis %s" % \
(self._partno, str(r_inst)))
res = self._us.get_part(self._partno, r_inst)
self._logger.error("Part %d discovered new redis %s with UVEs %s" % \
(self._partno, str(r_inst), str(res)))
self._logger.debug("Part %d reading UVEs from new redis %s" % \
(self._partno,str(res)))
self.start_partition(res)
self.disc_rset = disc_instances

Expand Down Expand Up @@ -231,8 +248,6 @@ def msg_handler(self, om):
# Ignore this message
self._logger.debug("%d Ignoring UVE %s" % (self._partition, str(om)))
return True
else:
self._logger.debug("%d Reading UVE %s" % (self._partition, str(om)))

if not self._uvedb[coll].has_key(gen):
self._uvedb[coll][gen] = {}
Expand Down
14 changes: 11 additions & 3 deletions src/opserver/test/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ local_sources = [
'analytics_statstest.py',
'analytics_db_test.py',
'overlay_to_underlay_mapper_test.py',
'alarm_test.py',
]
local_sources_rules = []
for file in local_sources:
Expand Down Expand Up @@ -69,15 +70,17 @@ build_pkgs = [
'#controller/src/config/test/utils/mockzoo',
'#controller/src/opserver/test',
env['TOP'] + '/tools/sandesh/library/python',
env['TOP'] + '/discovery/client',
env['TOP'] + '/libpartition',
env['TOP'] + '/sandesh/common',
env['TOP'] + '/config/common/',
env['TOP'] + '/opserver/plugins/alarm_process_status',
env['TOP'] + '/opserver/plugins/alarm_process_connectivity',
env['TOP'] + '/opserver/plugins/alarm_partial_sysinfo',
env['TOP'] + '/opserver/plugins/alarm_bgp_connectivity',
env['TOP'] + '/opserver/plugins/alarm_xmpp_connectivity',
env['TOP'] + '/opserver/plugins/alarm_vrouter_interface',
env['TOP'] + '/opserver']

#venv that we are building is in env['analytics_test']
env.Depends (env['analytics_test'], env['OPSERVER_PKG'])
env.Depends (env['analytics_test'], env['ALARM_PROCESS_STATUS_PKG'])
Expand All @@ -86,8 +89,13 @@ env.Depends (env['analytics_test'], env['ALARM_PARTIAL_SYSINFO_PKG'])
env.Depends (env['analytics_test'], env['ALARM_BGP_CONNECTIVITY_PKG'])
env.Depends (env['analytics_test'], env['ALARM_XMPP_CONNECTIVITY_PKG'])
env.Depends (env['analytics_test'], env['ALARM_VROUTER_INTERFACE_PKG'])
env.Requires (env['analytics_test'], env['TOP']+'/sandesh/common/dist/sandesh-common-0.1dev.tar.gz')
env.Requires (env['analytics_test'], env['TOP']+'/tools/sandesh/library/python/dist/sandesh-0.1dev.tar.gz')

env.Depends (env['analytics_test'], env['TOP']+'/sandesh/common/dist/sandesh-common-0.1dev.tar.gz')
env.Depends (env['analytics_test'], env['TOP']+'/tools/sandesh/library/python/dist/sandesh-0.1dev.tar.gz')
env.Depends (env['analytics_test'], env['TOP']+'/discovery/client/dist/discoveryclient-0.1dev.tar.gz')
env.Depends (env['analytics_test'], env['TOP']+'/libpartition/dist/libpartition-0.1dev.tar.gz')
env.Depends (env['analytics_test'], env['TOP']+'/config/common/dist/cfgm_common-0.1dev.tar.gz')

for local_sources_rule in local_sources_rules:
env.Depends (env['analytics_test'], local_sources_rule)

Expand Down

0 comments on commit d6ac139

Please sign in to comment.