Skip to content

Commit

Permalink
Merge "Providing a new alarms GET interface in analytics-api (/analyt…
Browse files Browse the repository at this point in the history
…ics/alarms) This interface is always served from the Aggregated UVECache Adding tests for this new interface, and for recently-added UVECache"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Oct 14, 2015
2 parents f9ffb04 + 49056f2 commit a16da39
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 158 deletions.
47 changes: 41 additions & 6 deletions src/opserver/opserver.py
Expand Up @@ -486,18 +486,18 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
self._args.partitions, self._args.redis_password)

# TODO: For now, use DBCache during systemless test only
ucache = self._uvedbstream
usecache = True
if self._args.disc_server_ip:
ucache = None
usecache = False
else:
if self._args.partitions == 0:
ucache = None
usecache = False

self._uve_server = UVEServer(('127.0.0.1',
self._args.redis_server_port),
self._logger,
self._args.redis_password,
ucache)
self._uvedbstream, usecache)

self._LEVEL_LIST = []
for k in SandeshLevel._VALUES_TO_NAMES:
Expand Down Expand Up @@ -537,7 +537,7 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
self._state_server.update_redis_list(self.redis_uve_list)
self._uve_server.update_redis_uve_list(self.redis_uve_list)

self._analytics_links = ['uves', 'tables', 'queries', 'alarm-types']
self._analytics_links = ['uves', 'tables', 'queries', 'alarm-types', 'alarms']

self._VIRTUAL_TABLES = copy.deepcopy(_TABLES)

Expand Down Expand Up @@ -665,6 +665,7 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
bottle.route('/analytics/uves/<table>/<name:path>', 'GET', self.dyn_http_get)
bottle.route('/analytics/uves/<tables>', 'POST', self.dyn_http_post)
bottle.route('/analytics/alarm-types', 'GET', self.uve_alarm_http_types)
bottle.route('/analytics/alarms', 'GET', self.alarms_http_get)

# start gevent to monitor disk usage and automatically purge
if (self._args.auto_db_purge):
Expand Down Expand Up @@ -1328,6 +1329,11 @@ def _uve_filter_set(req):
filters = {}
filters['sfilt'] = req.get('sfilt')
filters['mfilt'] = req.get('mfilt')
tf = req.get('tablefilt')
if tf and tf in UVE_MAP:
filters['tablefilt'] = UVE_MAP[tf]
else:
filters['tablefilt'] = tf
if req.get('cfilt'):
infos = req['cfilt'].split(',')
filters['cfilt'] = OpServer._get_tfilter(infos)
Expand Down Expand Up @@ -1500,6 +1506,35 @@ def uve_alarm_http_types(self):
ret[aname] = avalue
return json.dumps(ret)

def alarms_http_get(self):
# common handling for all resource get
(ok, result) = self._get_common(bottle.request)
if not ok:
(code, msg) = result
abort(code, msg)

bottle.response.set_header('Content-Type', 'application/json')

req = bottle.request.query
try:
filters = OpServer._uve_filter_set(req)
except Exception as e:
return bottle.HTTPError(_ERRORS[errno.EBADMSG], e)
else:
filters['cfilt'] = { 'UVEAlarms':set() }
alarm_list = self._uve_server.get_alarms(filters)
alms = {}
for ak,av in alarm_list.iteritems():
alm_type = ak
if ak in _OBJECT_TABLES:
alm_type = _OBJECT_TABLES[ak].log_query_name
ulist = []
for uk, uv in av.iteritems():
ulist.append({'name':uk, 'value':uv})
alms[alm_type ] = ulist
return json.dumps(alms)
# end alarms_http_get

def dyn_list_http_get(self, tables):
# common handling for all resource get
(ok, result) = self._get_common(bottle.request)
Expand Down Expand Up @@ -1576,7 +1611,7 @@ def uves_http_get(self):

bottle.response.set_header('Content-Type', 'application/json')
return json.dumps(uvetype_links)
# end _uves_alarms_http_get
# end _uves_http_get

def alarms_ack_http_post(self):
self._post_common(bottle.request, None)
Expand Down
55 changes: 36 additions & 19 deletions src/opserver/partition_handler.py
Expand Up @@ -40,7 +40,7 @@ def __init__(self, logger, partitions):
self._partkeys[partno] = set()
self._uvedb = {}

def get_cache_list(self, utab, filters):
def get_cache_list(self, utab, filters, patterns, keysonly):
tables = None
if utab:
tables = [ utab ]
Expand All @@ -63,17 +63,32 @@ def get_cache_list(self, utab, filters):
if typ in self._typekeys:
if table in self._typekeys[typ]:
tqual[table].update(self._typekeys[typ][table])

for table in tables:
if not table in self._uvedb:
continue
barekeys = set(self._uvedb[table].keys())
if len(tfilter) != 0:
barekeys.intersection_update(tqual[table])
barekeys = set()
for bk in self._uvedb[table].keys():
if len(tfilter) != 0:
if not bk in tqual[table]:
continue
if patterns:
kfilter_match = False
for pattern in patterns:
if pattern.match(bk):
kfilter_match = True
break
if not kfilter_match:
continue
barekeys.add(bk)

brsp = self._get_uve_content(table, barekeys,\
tfilter, ackfilter, True)
tfilter, ackfilter, keysonly)
if len(brsp) != 0:
uve_list[table] = set(brsp.keys())
if keysonly:
uve_list[table] = set(brsp.keys())
else:
uve_list[table] = brsp
except Exception as ex:
template = "Exception {0} in uve list proc. Arguments:\n{1!r}"
messag = template.format(type(ex).__name__, ex.args)
Expand Down Expand Up @@ -130,7 +145,6 @@ def _get_uve_content(self, table, barekeys, tfilter, ackfilter, keysonly):
return brsp

def get_cache_uve(self, key, filters):
failures = False
rsp = {}
try:
filters = filters or {}
Expand All @@ -142,9 +156,9 @@ def get_cache_uve(self, key, filters):
table = key.split(":",1)[0]

if table not in self._uvedb:
return failures, rsp
return rsp
if barekey not in self._uvedb[table]:
return failures, rsp
return rsp

brsp = self._get_uve_content(table, set([barekey]),\
tfilter, ackfilter, False)
Expand All @@ -156,7 +170,7 @@ def get_cache_uve(self, key, filters):
messag = template.format(type(ex).__name__, ex.args)
self._logger.error("%s : traceback %s" % \
(messag, traceback.format_exc()))
return failures, rsp
return rsp

def store_uve(self, partno, pi, key, typ, value):
barekey = key.split(":",1)[1]
Expand Down Expand Up @@ -198,7 +212,9 @@ def store_uve(self, partno, pi, key, typ, value):
if not table in self._typekeys[typ]:
self._typekeys[typ][table] = set()
self._typekeys[typ][table].add(barekey)
self._uvedb[table][barekey]["__SOURCE__"] = pi._asdict()
self._uvedb[table][barekey]["__SOURCE__"] = \
{'instance_id':pi.instance_id, 'ip_address':pi.ip_address, \
'partition':partno}

def clear_partition(self, partno):
for key in self._partkeys[partno]:
Expand Down Expand Up @@ -309,7 +325,8 @@ def _run(self):
return None

class UveStreamer(gevent.Greenlet):
def __init__(self, logger, q, rfile, agp_cb, partitions, rpass):
def __init__(self, logger, q, rfile, agp_cb, partitions, rpass,\
USP_class = UveStreamPart):
gevent.Greenlet.__init__(self)
self._logger = logger
self._q = q
Expand All @@ -321,12 +338,13 @@ def __init__(self, logger, q, rfile, agp_cb, partitions, rpass):
self._rpass = rpass
self._ccb = None
self._uvedbcache = UveCacheProcessor(self._logger, partitions)
self._USP_class = USP_class

def get_uve(self, key, filters=None):
return self._uvedbcache.get_cache_uve(key, filters)
return False, self._uvedbcache.get_cache_uve(key, filters)

def get_uve_list(self, utab, filters):
return self._uvedbcache.get_cache_list(utab, filters)
def get_uve_list(self, utab, filters, patterns, keysonly = True):
return self._uvedbcache.get_cache_list(utab, filters, patterns, keysonly)

def partition_callback(self, partition, pi, key, type, value):
# gevent is non-premptive; we don't need locks
Expand Down Expand Up @@ -381,7 +399,7 @@ def _run(self):
self.partition_stop(elem)
self._uvedbcache.clear_partition(elem)
self.partition_start(elem, newagp[elem])
self._agp = newagp
self._agp = copy.deepcopy(newagp)
except gevent.GreenletExit:
break
self._logger.error("Stopping UveStreamer with %d partitions" % self._partitions)
Expand All @@ -400,14 +418,13 @@ def partition_start(self, partno, pi):
msg = {'event': 'clear', 'data':\
json.dumps({'partition':partno, 'acq_time':pi.acq_time})}
self._q.put(sse_pack(msg))
self._parts[partno] = UveStreamPart(partno, self._logger,
self._parts[partno] = self._USP_class(partno, self._logger,
self.partition_callback, pi, self._rpass)
self._parts[partno].start()

def partition_stop(self, partno):
self._logger.error("Stopping agguve part %d" % partno)
self._parts[partno].kill()
self._parts[partno].get()
del self._parts[partno]

class PartitionHandler(gevent.Greenlet):
Expand Down

0 comments on commit a16da39

Please sign in to comment.