diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py
index e4ff652df10..9a903097b60 100644
--- a/src/opserver/opserver.py
+++ b/src/opserver/opserver.py
@@ -486,18 +486,18 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
self._args.partitions, self._args.redis_password)
# TODO: For now, use DBCache during systemless test only
- ucache = self._uvedbstream
+ usecache = True
if self._args.disc_server_ip:
- ucache = None
+ usecache = False
else:
if self._args.partitions == 0:
- ucache = None
+ usecache = False
self._uve_server = UVEServer(('127.0.0.1',
self._args.redis_server_port),
self._logger,
self._args.redis_password,
- ucache)
+ self._uvedbstream, usecache)
self._LEVEL_LIST = []
for k in SandeshLevel._VALUES_TO_NAMES:
@@ -537,7 +537,7 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
self._state_server.update_redis_list(self.redis_uve_list)
self._uve_server.update_redis_uve_list(self.redis_uve_list)
- self._analytics_links = ['uves', 'tables', 'queries', 'alarm-types']
+ self._analytics_links = ['uves', 'tables', 'queries', 'alarm-types', 'alarms']
self._VIRTUAL_TABLES = copy.deepcopy(_TABLES)
@@ -665,6 +665,7 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
bottle.route('/analytics/uves/
/', 'GET', self.dyn_http_get)
bottle.route('/analytics/uves/', 'POST', self.dyn_http_post)
bottle.route('/analytics/alarm-types', 'GET', self.uve_alarm_http_types)
+ bottle.route('/analytics/alarms', 'GET', self.alarms_http_get)
# start gevent to monitor disk usage and automatically purge
if (self._args.auto_db_purge):
@@ -1328,6 +1329,11 @@ def _uve_filter_set(req):
filters = {}
filters['sfilt'] = req.get('sfilt')
filters['mfilt'] = req.get('mfilt')
+ tf = req.get('tablefilt')
+ if tf and tf in UVE_MAP:
+ filters['tablefilt'] = UVE_MAP[tf]
+ else:
+ filters['tablefilt'] = tf
if req.get('cfilt'):
infos = req['cfilt'].split(',')
filters['cfilt'] = OpServer._get_tfilter(infos)
@@ -1500,6 +1506,35 @@ def uve_alarm_http_types(self):
ret[aname] = avalue
return json.dumps(ret)
+ def alarms_http_get(self):
+ # common handling for all resource get
+ (ok, result) = self._get_common(bottle.request)
+ if not ok:
+ (code, msg) = result
+ abort(code, msg)
+
+ bottle.response.set_header('Content-Type', 'application/json')
+
+ req = bottle.request.query
+ try:
+ filters = OpServer._uve_filter_set(req)
+ except Exception as e:
+ return bottle.HTTPError(_ERRORS[errno.EBADMSG], e)
+ else:
+ filters['cfilt'] = { 'UVEAlarms':set() }
+ alarm_list = self._uve_server.get_alarms(filters)
+ alms = {}
+ for ak,av in alarm_list.iteritems():
+ alm_type = ak
+ if ak in _OBJECT_TABLES:
+ alm_type = _OBJECT_TABLES[ak].log_query_name
+ ulist = []
+ for uk, uv in av.iteritems():
+ ulist.append({'name':uk, 'value':uv})
+ alms[alm_type ] = ulist
+ return json.dumps(alms)
+ # end alarms_http_get
+
def dyn_list_http_get(self, tables):
# common handling for all resource get
(ok, result) = self._get_common(bottle.request)
@@ -1576,7 +1611,7 @@ def uves_http_get(self):
bottle.response.set_header('Content-Type', 'application/json')
return json.dumps(uvetype_links)
- # end _uves_alarms_http_get
+ # end _uves_http_get
def alarms_ack_http_post(self):
self._post_common(bottle.request, None)
diff --git a/src/opserver/partition_handler.py b/src/opserver/partition_handler.py
index e02b4207226..d7e5d0ec906 100644
--- a/src/opserver/partition_handler.py
+++ b/src/opserver/partition_handler.py
@@ -40,7 +40,7 @@ def __init__(self, logger, partitions):
self._partkeys[partno] = set()
self._uvedb = {}
- def get_cache_list(self, utab, filters):
+ def get_cache_list(self, utab, filters, patterns, keysonly):
tables = None
if utab:
tables = [ utab ]
@@ -63,17 +63,32 @@ def get_cache_list(self, utab, filters):
if typ in self._typekeys:
if table in self._typekeys[typ]:
tqual[table].update(self._typekeys[typ][table])
-
+
for table in tables:
if not table in self._uvedb:
continue
- barekeys = set(self._uvedb[table].keys())
- if len(tfilter) != 0:
- barekeys.intersection_update(tqual[table])
+ barekeys = set()
+ for bk in self._uvedb[table].keys():
+ if len(tfilter) != 0:
+ if not bk in tqual[table]:
+ continue
+ if patterns:
+ kfilter_match = False
+ for pattern in patterns:
+ if pattern.match(bk):
+ kfilter_match = True
+ break
+ if not kfilter_match:
+ continue
+ barekeys.add(bk)
+
brsp = self._get_uve_content(table, barekeys,\
- tfilter, ackfilter, True)
+ tfilter, ackfilter, keysonly)
if len(brsp) != 0:
- uve_list[table] = set(brsp.keys())
+ if keysonly:
+ uve_list[table] = set(brsp.keys())
+ else:
+ uve_list[table] = brsp
except Exception as ex:
template = "Exception {0} in uve list proc. Arguments:\n{1!r}"
messag = template.format(type(ex).__name__, ex.args)
@@ -130,7 +145,6 @@ def _get_uve_content(self, table, barekeys, tfilter, ackfilter, keysonly):
return brsp
def get_cache_uve(self, key, filters):
- failures = False
rsp = {}
try:
filters = filters or {}
@@ -142,9 +156,9 @@ def get_cache_uve(self, key, filters):
table = key.split(":",1)[0]
if table not in self._uvedb:
- return failures, rsp
+ return rsp
if barekey not in self._uvedb[table]:
- return failures, rsp
+ return rsp
brsp = self._get_uve_content(table, set([barekey]),\
tfilter, ackfilter, False)
@@ -156,7 +170,7 @@ def get_cache_uve(self, key, filters):
messag = template.format(type(ex).__name__, ex.args)
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
- return failures, rsp
+ return rsp
def store_uve(self, partno, pi, key, typ, value):
barekey = key.split(":",1)[1]
@@ -198,7 +212,9 @@ def store_uve(self, partno, pi, key, typ, value):
if not table in self._typekeys[typ]:
self._typekeys[typ][table] = set()
self._typekeys[typ][table].add(barekey)
- self._uvedb[table][barekey]["__SOURCE__"] = pi._asdict()
+ self._uvedb[table][barekey]["__SOURCE__"] = \
+ {'instance_id':pi.instance_id, 'ip_address':pi.ip_address, \
+ 'partition':partno}
def clear_partition(self, partno):
for key in self._partkeys[partno]:
@@ -309,7 +325,8 @@ def _run(self):
return None
class UveStreamer(gevent.Greenlet):
- def __init__(self, logger, q, rfile, agp_cb, partitions, rpass):
+ def __init__(self, logger, q, rfile, agp_cb, partitions, rpass,\
+ USP_class = UveStreamPart):
gevent.Greenlet.__init__(self)
self._logger = logger
self._q = q
@@ -321,12 +338,13 @@ def __init__(self, logger, q, rfile, agp_cb, partitions, rpass):
self._rpass = rpass
self._ccb = None
self._uvedbcache = UveCacheProcessor(self._logger, partitions)
+ self._USP_class = USP_class
def get_uve(self, key, filters=None):
- return self._uvedbcache.get_cache_uve(key, filters)
+ return False, self._uvedbcache.get_cache_uve(key, filters)
- def get_uve_list(self, utab, filters):
- return self._uvedbcache.get_cache_list(utab, filters)
+ def get_uve_list(self, utab, filters, patterns, keysonly = True):
+ return self._uvedbcache.get_cache_list(utab, filters, patterns, keysonly)
def partition_callback(self, partition, pi, key, type, value):
# gevent is non-premptive; we don't need locks
@@ -381,7 +399,7 @@ def _run(self):
self.partition_stop(elem)
self._uvedbcache.clear_partition(elem)
self.partition_start(elem, newagp[elem])
- self._agp = newagp
+ self._agp = copy.deepcopy(newagp)
except gevent.GreenletExit:
break
self._logger.error("Stopping UveStreamer with %d partitions" % self._partitions)
@@ -400,14 +418,13 @@ def partition_start(self, partno, pi):
msg = {'event': 'clear', 'data':\
json.dumps({'partition':partno, 'acq_time':pi.acq_time})}
self._q.put(sse_pack(msg))
- self._parts[partno] = UveStreamPart(partno, self._logger,
+ self._parts[partno] = self._USP_class(partno, self._logger,
self.partition_callback, pi, self._rpass)
self._parts[partno].start()
def partition_stop(self, partno):
self._logger.error("Stopping agguve part %d" % partno)
self._parts[partno].kill()
- self._parts[partno].get()
del self._parts[partno]
class PartitionHandler(gevent.Greenlet):
diff --git a/src/opserver/test/test_alarm.py b/src/opserver/test/test_alarm.py
index 68d337ff9d6..e4bdab3c659 100755
--- a/src/opserver/test/test_alarm.py
+++ b/src/opserver/test/test_alarm.py
@@ -16,7 +16,8 @@
from kafka.common import OffsetAndMessage,Message
from opserver.uveserver import UVEServer
-from opserver.partition_handler import PartitionHandler, UveStreamProc
+from opserver.partition_handler import PartitionHandler, UveStreamProc, \
+ UveStreamer, UveStreamPart, PartInfo
from opserver.alarmgen import Controller
from opserver.alarmgen_cfg import CfgParser
@@ -30,92 +31,38 @@
PartHandlerOutput = namedtuple("PartHandlerOutput",
["callbacks", "uvedb"])
-# Tests for the PartitionHandler class
-class TestPartitionHandler(unittest.TestCase):
-
- def setUp(self):
- self.ph = None
- self.test_spec = None
- self.stage = 0
- self.step = 0
- #self.done = False
- pass
-
- def tearDown(self):
- pass
-
- def callback_proc(self, part, uves):
- self.assertNotEqual(self.stage, len(self.test_spec))
- stage = self.test_spec[self.stage]
- o = stage.o.callbacks[self.step]
- self.assertEqual(uves, o,
- "Error in stage %d step %d\nActual %s\nExpected %s" % \
- (self.stage, self.step, str(uves), str(o)))
- self.step += 1
- if len(stage.o.callbacks) == self.step:
- self.step = 0
- self.stage += 1
-
- @mock.patch('opserver.partition_handler.UVEServer', autospec=True)
- @mock.patch('opserver.partition_handler.KafkaClient', autospec=True)
- @mock.patch('opserver.partition_handler.SimpleConsumer', autospec=True)
- # Test intialization and shutdown, along with basic Kafka, partition start
- # and UVE read operations
- @unittest.skip('Skipping PartHandler test')
- # TODO: Needs to be updated or removed
- def test_00_init(self, mock_SimpleConsumer, mock_KafkaClient, mock_UVEServer):
- self.test_spec = [
- TestStage(
- i = PartHandlerInput(
- redis_instances = set([("127.0.0.1",44444,0)]),
- get_part = ("127.0.0.1:44444",
- { "gen1" :
- { "ObjectXX:uve1" : set(["type1"]) }}),
- get_messages = [OffsetAndMessage(offset=0,
- message=Message(magic=0, attributes=0, key='',
- value=('{"message":"UVEUpdate","key":"ObjectYY:uve2",'
- '"type":"type2","gen":"gen1","coll":'
- '"127.0.0.1:44444","deleted":false}')))]),
- o = PartHandlerOutput(
- callbacks = [
- { "ObjectXX:uve1" : None },
- { "ObjectYY:uve2" : set(["type2"])},
- ],
- uvedb = None)
- ),
- TestStage(
- i = PartHandlerInput(
- redis_instances = gevent.GreenletExit(),
- get_part = None,
- get_messages = None),
- o = PartHandlerOutput(
- callbacks = [
- { "ObjectXX:uve1" : None,
- "ObjectYY:uve2" : None },
- ],
- uvedb = {"127.0.0.1:44444" :
- { "gen1" :
- { "ObjectXX:uve1" : set(["type1"]),
- "ObjectYY:uve2" : set(["type2"])}}}),
-
- )
- ]
- mock_UVEServer.return_value.redis_instances.side_effect = \
- [x.i.redis_instances for x in self.test_spec]
-
- mock_UVEServer.return_value.get_part.side_effect = \
- [x.i.get_part for x in self.test_spec if x.i.get_part is not None]
-
- mock_SimpleConsumer.return_value.get_messages.side_effect = \
- [x.i.get_messages for x in self.test_spec]
-
- self.ph = UveStreamProc('no-brokers', 1, "uve-1", logging,
- self.callback_proc, "127.0.0.1", mock_UVEServer.return_value)
- self.ph.start()
- res,db = self.ph.get(timeout = 10)
- if (isinstance(res,AssertionError)):
- raise res
- self.assertEqual(db, self.test_spec[-1].o.uvedb)
+class TestChecker(object):
+ @retry(delay=1, tries=3)
+ def checker_dict(self,expected,actual,match=True):
+ residual = actual
+ matched = True
+ result = False
+ for elem in expected:
+ if residual and elem in residual:
+ if isinstance(residual,dict):
+ residual = residual[elem]
+ else:
+ residual = None
+ else:
+ matched = False
+ if match:
+ result = matched
+ else:
+ result = not matched
+ logging.info("dict exp %s actual %s match %s" % \
+ (str(expected), str(actual), str(match)))
+ return result
+
+ @retry(delay=1, tries=3)
+ def checker_exact(self,expected,actual,match=True):
+ result = False
+ logging.info("exact exp %s actual %s match %s" % \
+ (str(expected), str(actual), str(match)))
+ if expected == actual:
+ return match
+ else:
+ result = not match
+ return result
class Mock_base(collections.Callable,collections.MutableMapping):
def __init__(self, *args, **kwargs):
@@ -158,7 +105,7 @@ def __call__(self, key, flat, filters):
class Mock_get_messages(Mock_base):
def __init__(self, *args, **kwargs):
- Mock_base.__init__(self, *args, **kwargs)
+ Mock_base.__init__(self)
def __call__(self, num, timeout):
vals = []
@@ -171,10 +118,115 @@ def __call__(self, num, timeout):
else:
return [None]
+class Mock_agp(Mock_base):
+ def __init__(self, *args, **kwargs):
+ Mock_base.__init__(self, *args, **kwargs)
+
+ def __call__(self):
+ logging.info("Reading AGP %s" % str(self.store))
+ val = self.store
+ return val
+
+class Mock_usp(object):
+ def __init__(self, partno, logger, cb, pi, rpass):
+ self._cb = cb
+ self._partno = partno
+ self._pi = pi
+ self._started = False
+
+ def start(self):
+ self._started = True
+
+ def kill(self):
+ self._started = False
+
+ def __call__(self, key, type, value):
+ if self._started:
+ self._cb(self._partno, self._pi, key, type, value)
+
+# Tests for UveStreamer and UveCache
+class TestUveStreamer(unittest.TestCase, TestChecker):
+ @classmethod
+ def setUpClass(cls):
+ pass
+
+ @classmethod
+ def tearDownClass(cls):
+ pass
+
+ def setUp(self):
+ self.mock_agp = Mock_agp()
+ self.ustr = UveStreamer(logging, None, None, self.mock_agp, 2, None, Mock_usp)
+ self.ustr.start()
+ self.mock_agp[0] = PartInfo(ip_address = "127.0.0.1",
+ acq_time = 666,
+ instance_id = "0",
+ port = 6379)
+ self.mock_agp[1] = PartInfo(ip_address = "127.0.0.1",
+ acq_time = 777,
+ instance_id = "0",
+ port = 6379)
+
+ def tearDown(self):
+ self.ustr.kill()
+
+ #@unittest.skip('Skipping UveStreamer')
+ def test_00_init(self):
+ self.assertTrue(self.checker_dict([0], self.ustr._parts))
+ self.ustr._parts[0]("ObjectXX:uve1","type1",{"xx": 0})
+ self.assertTrue(self.checker_dict(\
+ ["ObjectXX","uve1","type1","xx"],\
+ self.ustr._uvedbcache._uvedb))
+ self.assertTrue(self.checker_dict(\
+ ["type1","ObjectXX","uve1"],\
+ self.ustr._uvedbcache._typekeys))
+ self.assertTrue(self.checker_dict(\
+ [0,"ObjectXX:uve1"],\
+ self.ustr._uvedbcache._partkeys))
+
+ # remove partition. UVE should go too
+ del self.mock_agp[0]
+ self.assertTrue(self.checker_dict(\
+ ["ObjectXX","uve1"],\
+ self.ustr._uvedbcache._uvedb, False))
+ self.assertTrue(self.checker_dict(\
+ ["type1"],\
+ self.ustr._uvedbcache._typekeys, False))
+ self.assertTrue(self.checker_exact(\
+ set(),
+ self.ustr._uvedbcache._partkeys[0]))
+
+ #@unittest.skip('Skipping UveStreamer')
+ def test_00_deluve(self):
+ self.assertTrue(self.checker_dict([0], self.ustr._parts))
+ self.ustr._parts[0]("ObjectXX:uve1","type1",{"xx": 0})
+ self.assertTrue(self.checker_dict(\
+ ["ObjectXX","uve1","type1","xx"],\
+ self.ustr._uvedbcache._uvedb))
+ self.assertTrue(self.checker_dict(\
+ ["type1","ObjectXX","uve1"],\
+ self.ustr._uvedbcache._typekeys))
+ self.assertTrue(self.checker_dict(\
+ [0,"ObjectXX:uve1"],\
+ self.ustr._uvedbcache._partkeys))
+
+ # remove UVE
+ self.ustr._parts[0]("ObjectXX:uve1",None,None)
+ self.assertTrue(self.checker_dict(\
+ ["ObjectXX","uve1"],\
+ self.ustr._uvedbcache._uvedb, False))
+ self.assertTrue(self.checker_dict(\
+ ["type1","ObjectXX"],\
+ self.ustr._uvedbcache._typekeys, False))
+ self.assertTrue(self.checker_exact(\
+ set(),
+ self.ustr._uvedbcache._partkeys[0]))
+
+
# Tests for all AlarmGenerator code, using mocks for
# external interfaces for UVEServer, Kafka, libpartition
# and Discovery
-class TestAlarmGen(unittest.TestCase):
+class TestAlarmGen(unittest.TestCase, TestChecker):
@classmethod
def setUpClass(cls):
cls._pc = mock.patch('opserver.alarmgen.PartitionClient', autospec=True)
@@ -208,36 +260,6 @@ def setUp(self):
def tearDown(self):
self._agtask.kill()
- @retry(delay=1, tries=3)
- def checker_dict(self,expected,actual,match=True):
- residual = actual
- matched = True
- result = False
- for elem in expected:
- if elem in residual:
- residual = residual[elem]
- else:
- matched = False
- if match:
- result = matched
- else:
- result = not matched
- if not result:
- logging.info("exp %s actual %s match %s" % \
- (str(expected), str(actual), str(match)))
- return result
-
- @retry(delay=1, tries=3)
- def checker_exact(self,expected,actual,match=True):
- result = False
- if expected == actual:
- return match
- else:
- result = not match
- if not result:
- logging.info("exp %s actual %s match %s" % \
- (str(expected), str(actual), str(match)))
- return result
@mock.patch('opserver.alarmgen.Controller.send_agg_uve')
@mock.patch.object(UVEServer, 'get_part')
diff --git a/src/opserver/test/test_analytics_uve.py b/src/opserver/test/test_analytics_uve.py
index 5034558c1e7..115e84101d0 100755
--- a/src/opserver/test/test_analytics_uve.py
+++ b/src/opserver/test/test_analytics_uve.py
@@ -594,6 +594,51 @@ def test_08_uve_alarm_filter(self):
'default-domain:project1:vn2',
'default-domain:project2:vn1'
],
+ 'get_alarms': {
+ 'virtual-network': [
+ { 'name' : 'default-domain:project1:vn2',
+ 'value' : { 'UVEAlarms': {
+ 'alarms': [
+ {
+ 'type': 'InPktsThreshold',
+ 'description': [
+ {
+ 'rule': 'UveVirtualNetworkAgent.in_tpkts < 2',
+ 'value': 'UveVirtualNetworkAgent.in_tpkts == 2'
+ }
+ ]
+ },
+ {
+ 'type': 'InBytesThreshold',
+ 'description': [
+ {
+ 'rule': 'UveVirtualNetworkAgent.in_bytes < 512',
+ 'value': 'UveVirtualNetworkAgent.in_bytes == 1024',
+ }
+ ],
+ 'ack': True
+ }
+ ]
+ } }
+ },
+ { 'name' : 'default-domain:project2:vn1',
+ 'value' : { 'UVEAlarms': {
+ 'alarms': [
+ {
+ 'type': 'ConfigNotPresent',
+ 'description': [
+ {
+ 'rule': 'UveVirtualNetworkConfig != False',
+ 'value': 'UveVirtualNetworkConfig == False'
+ }
+ ],
+ 'ack': False
+ }
+ ]
+ } }
+ },
+ ]
+ },
'uve_get_post': {
'value': [
{
@@ -887,6 +932,26 @@ def test_08_uve_alarm_filter(self):
'default-domain:project2:*',
'invalid-vn:*'
],
+ 'get_alarms': {
+ 'virtual-network': [
+ { 'name' : 'default-domain:project2:vn1',
+ 'value' : { 'UVEAlarms': {
+ 'alarms': [
+ {
+ 'type': 'ConfigNotPresent',
+ 'description': [
+ {
+ 'rule': 'UveVirtualNetworkConfig != False',
+ 'value': 'UveVirtualNetworkConfig == False'
+ }
+ ],
+ 'ack': False
+ }
+ ]
+ } }
+ },
+ ]
+ },
'uve_list_get': [
'default-domain:project2:vn1'
],
@@ -1390,6 +1455,41 @@ def test_08_uve_alarm_filter(self):
'default-domain:project1:vn2',
'default-domain:project2:vn1'
],
+ 'get_alarms': {
+ 'virtual-network': [
+ { 'name' : 'default-domain:project1:vn2',
+ 'value' : { 'UVEAlarms': {
+ 'alarms': [
+ {
+ 'type': 'InPktsThreshold',
+ 'description': [
+ {
+ 'rule': 'UveVirtualNetworkAgent.in_tpkts < 2',
+ 'value': 'UveVirtualNetworkAgent.in_tpkts == 2'
+ }
+ ]
+ },
+ ]
+ } }
+ },
+ { 'name' : 'default-domain:project2:vn1',
+ 'value' : { 'UVEAlarms': {
+ 'alarms': [
+ {
+ 'type': 'ConfigNotPresent',
+ 'description': [
+ {
+ 'rule': 'UveVirtualNetworkConfig != False',
+ 'value': 'UveVirtualNetworkConfig == False'
+ }
+ ],
+ 'ack': False
+ }
+ ]
+ } }
+ },
+ ]
+ },
'uve_get_post': {
'value': [
{
@@ -1711,6 +1811,11 @@ def test_08_uve_alarm_filter(self):
filts=filters, exp_uves=filt_test[i]['uve_get_post']))
assert(vizd_obj.verify_uve_post(vn_table,
filts=filters, exp_uves=filt_test[i]['uve_get_post']))
+ if 'get_alarms' in filt_test[i]:
+ filters['tablefilt'] = 'virtual-network'
+ assert(vizd_obj.verify_get_alarms(vn_table,
+ filts=filters, exp_uves=filt_test[i]['get_alarms']))
+
# end test_08_uve_alarm_filter
@staticmethod
diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py
index 0357c0c0293..16e3376d682 100644
--- a/src/opserver/test/utils/analytics_fixture.py
+++ b/src/opserver/test/utils/analytics_fixture.py
@@ -2196,6 +2196,8 @@ def _get_filters_string(self, filters):
filt = []
if filters.get('kfilt') is not None:
filt.append('kfilt=%s' % (','.join(filters['kfilt'])))
+ if filters.get('tablefilt') is not None:
+ filt.append('tablefilt=%s' % (filters['tablefilt']))
if filters.get('sfilt') is not None:
filt.append('sfilt=%s' % (filters['sfilt']))
if filters.get('mfilt') is not None:
@@ -2251,8 +2253,16 @@ def _verify_uves(self, exp_uves, actual_uves):
self.logger.info('Actual UVEs: %s' % (str(actual_uves)))
if actual_uves is None:
return False
- exp_uve_value = exp_uves['value']
- actual_uve_value = actual_uves['value']
+ etk = exp_uves.keys()
+ atk = actual_uves.keys()
+ if len(etk):
+ exp_uve_value = exp_uves[etk[0]]
+ else:
+ exp_uve_value = []
+ if len(atk):
+ actual_uve_value = actual_uves[atk[0]]
+ else:
+ actual_uve_value = []
self.logger.info('Remove token from alarms')
self._remove_alarm_token(exp_uve_value)
self._remove_alarm_token(actual_uve_value)
@@ -2263,6 +2273,20 @@ def _verify_uves(self, exp_uves, actual_uves):
return actual_uve_value == exp_uve_value
# end _verify_uves
+ @retry(delay=1, tries=4)
+ def verify_get_alarms(self, table, filts=None, exp_uves=None):
+ vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
+ filters = self._get_filters_string(filts)
+ self.logger.info('verify_get_alarms: %s' % (filters))
+ try:
+ actual_uves = vns.get_alarms(filters)
+ except Exception as err:
+ self.logger.error('Failed to get response for %s: %s' % \
+ (query, str(err)))
+ assert(False)
+ return self._verify_uves(exp_uves, actual_uves)
+ # end verify_multi_uve_get
+
@retry(delay=1, tries=4)
def verify_multi_uve_get(self, table, filts=None, exp_uves=None):
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
diff --git a/src/opserver/test/utils/opserver_introspect_utils.py b/src/opserver/test/utils/opserver_introspect_utils.py
index 2a3014647ff..0a099de2960 100644
--- a/src/opserver/test/utils/opserver_introspect_utils.py
+++ b/src/opserver/test/utils/opserver_introspect_utils.py
@@ -59,6 +59,9 @@ def get_table_column_values(self, table, col_name):
def uve_query(self, query):
return self.dict_get('analytics/uves/%s' % query)
+ def get_alarms(self, filters):
+ return self.dict_get('analytics/alarms?%s' % filters)
+
def post_uve_request(self, table, json_body):
url = 'http://%s:%s/analytics/uves/%s' % (self._ip, str(self._port), table)
try:
diff --git a/src/opserver/uveserver.py b/src/opserver/uveserver.py
index cce75f1fc81..551dfe6eea2 100644
--- a/src/opserver/uveserver.py
+++ b/src/opserver/uveserver.py
@@ -27,12 +27,14 @@
class UVEServer(object):
- def __init__(self, redis_uve_server, logger, redis_password=None, uvedbcache=None):
+ def __init__(self, redis_uve_server, logger, redis_password=None, \
+ uvedbcache=None, usecache=False):
self._local_redis_uve = redis_uve_server
self._redis_uve_map = {}
self._logger = logger
self._redis = None
self._uvedbcache = uvedbcache
+ self._usecache = usecache
self._redis_password = redis_password
self._uve_reverse_map = {}
for h,m in UVE_MAP.iteritems():
@@ -64,7 +66,7 @@ def update_redis_uve_list(self, redis_uve_list):
r_ip+":"+str(r_port), ConnectionStatus.INIT)
# Exercise redis connections to update health
if len(newlist):
- self.get_uve("ObjectCollectorInfo:__NONE__", True, None)
+ self.get_uve("ObjectCollectorInfo:__NONE__", False, None)
# end update_redis_uve_list
@@ -230,7 +232,7 @@ def get_uve(self, key, flat, filters=None, base_url=None):
tfilter = filters.get('cfilt')
ackfilter = filters.get('ackfilt')
- if flat and not sfilter and not mfilter and self._uvedbcache:
+ if flat and not sfilter and not mfilter and self._usecache:
return self._uvedbcache.get_uve(key, filters)
is_alarm = False
@@ -361,10 +363,41 @@ def get_uve_regex(self, key):
return re.compile(regex)
# end get_uve_regex
+ def get_alarms(self, filters):
+ tables = filters.get('tablefilt')
+ kfilter = filters.get('kfilt')
+ patterns = None
+ if kfilter is not None:
+ patterns = set()
+ for filt in kfilter:
+ patterns.add(self.get_uve_regex(filt))
+
+ rsp = self._uvedbcache.get_uve_list(tables, filters, patterns, False)
+ return rsp
+
+ # end multi_uve_get
def multi_uve_get(self, table, flat, filters=None, base_url=None):
- # get_uve_list cannot handle attribute names very efficiently,
- # so we don't pass them here
- uve_list = self.get_uve_list(table, filters, False)
+ sfilter = filters.get('sfilt')
+ mfilter = filters.get('mfilt')
+ kfilter = filters.get('kfilt')
+
+ patterns = None
+ if kfilter is not None:
+ patterns = set()
+ for filt in kfilter:
+ patterns.add(self.get_uve_regex(filt))
+
+ if not sfilter and not mfilter and self._usecache:
+ rsp = self._uvedbcache.get_uve_list(table, filters, patterns, True)
+ if table in rsp:
+ uve_list = rsp[table]
+ else:
+ uve_list = set()
+ else:
+ # get_uve_list cannot handle attribute names very efficiently,
+ # so we don't pass them here
+ uve_list = self.get_uve_list(table, filters, False)
+
for uve_name in uve_list:
_,uve_val = self.get_uve(
table + ':' + uve_name, flat, filters, base_url)
@@ -373,6 +406,7 @@ def multi_uve_get(self, table, flat, filters=None, base_url=None):
else:
uve = {'name': uve_name, 'value': uve_val}
yield uve
+
# end multi_uve_get
def get_uve_list(self, table, filters=None, parse_afilter=False):
@@ -386,16 +420,17 @@ def get_uve_list(self, table, filters=None, parse_afilter=False):
sfilter = filters.get('sfilt')
mfilter = filters.get('mfilt')
+ patterns = None
if kfilter is not None:
patterns = set()
for filt in kfilter:
patterns.add(self.get_uve_regex(filt))
- else:
- if not sfilter and not mfilter and self._uvedbcache:
- rsp = self._uvedbcache.get_uve_list(table, filters)
- if table in rsp:
- uve_list = rsp[table]
- return uve_list
+
+ if not sfilter and not mfilter and self._usecache:
+ rsp = self._uvedbcache.get_uve_list(table, filters, patterns)
+ if table in rsp:
+ uve_list = rsp[table]
+ return uve_list
for r_inst in self._redis_uve_map.keys():
try: