Skip to content

Commit

Permalink
Baseline changes ported from mainline for tracking Redis UVE connecti…
Browse files Browse the repository at this point in the history
…ons in OpServer

Change-Id: I82335d5de2fc040ffc2ba0b22037fe5879c5ea99
Partial-Bug: 1459973
Closes-Bug: 1449646
  • Loading branch information
anishmehta committed Jul 17, 2015
1 parent a418f45 commit f797920
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 171 deletions.
11 changes: 5 additions & 6 deletions src/opserver/alarmgen.py
Expand Up @@ -198,12 +198,11 @@ def handle_uve_notif(self, uves, remove = False):
if not self.mgrs.has_key(tab):
no_handlers.add(tab)
continue
if remove:
uve_data = []
else:
filters = {'kfilt': [uve_name]}
itr = self._us.multi_uve_get(tab, True, filters)
uve_data = itr.next()['value']
uve_data = {}
if not remove:
filters = {}
_,uve_data = self._us.get_uve(uv, True, filters)

if len(uve_data) == 0:
self._logger.info("UVE %s deleted" % uv)
if self.tab_alarms[tab].has_key(uv):
Expand Down
4 changes: 2 additions & 2 deletions src/opserver/opserver.py
Expand Up @@ -1403,7 +1403,7 @@ def _uve_alarm_http_post(self, is_alarm):
first = True
for key in filters['kfilt']:
uve_name = uve_tbl + ':' + key
rsp = self._uve_server.get_uve(uve_name, True, filters,
_, rsp = self._uve_server.get_uve(uve_name, True, filters,
is_alarm=is_alarm)
if rsp != {}:
data = {'name': key, 'value': rsp}
Expand Down Expand Up @@ -1463,7 +1463,7 @@ def _uve_alarm_http_get(self, name, is_alarm):
yield u', ' + json.dumps(gen)
yield u']}'
else:
rsp = self._uve_server.get_uve(uve_name, flat, filters,
_, rsp = self._uve_server.get_uve(uve_name, flat, filters,
is_alarm=is_alarm)
yield json.dumps(rsp)
# end _uve_alarm_http_get
Expand Down
240 changes: 77 additions & 163 deletions src/opserver/uveserver.py
Expand Up @@ -20,13 +20,13 @@
from gevent.coros import BoundedSemaphore
from pysandesh.util import UTCTimestampUsec
from pysandesh.connection_info import ConnectionState
from sandesh.viz.constants import _STAT_TABLES, STAT_OBJECTID_FIELD, STAT_VT_PREFIX
import traceback

class UVEServer(object):

def __init__(self, redis_uve_server, logger, api_port=None, redis_password=None):
self._local_redis_uve = redis_uve_server
self._redis_uve_list = []
self._redis_uve_map = {}
self._logger = logger
self._sem = BoundedSemaphore(1)
self._redis = None
Expand All @@ -40,7 +40,21 @@ def __init__(self, redis_uve_server, logger, api_port=None, redis_password=None)
#end __init__

def update_redis_uve_list(self, redis_uve_list):
self._redis_uve_list = redis_uve_list
newlist = set()
for elem in redis_uve_list:
newlist.add((elem[0],elem[1]))

# if some redis instances are gone, remove them from our map
for test_elem in self._redis_uve_map.keys():
if test_elem not in newlist:
del self._redis_uve_map[test_elem]

# new redis instances need to be inserted into the map
for test_elem in newlist:
if test_elem not in self._redis_uve_map:
(r_ip, r_port) = test_elem
self._redis_uve_map[test_elem] = redis.StrictRedis(
r_ip, r_port, password=self._redis_password, db=1)
# end update_redis_uve_list

def fill_redis_uve_info(self, redis_uve_info):
Expand Down Expand Up @@ -94,45 +108,6 @@ def run(self):
self._logger.debug("%s del received for " % value)
# value is of the format:
# DEL:<key>:<src>:<node-type>:<module>:<instance-id>:<message-type>:<seqno>
info = value.rsplit(":", 6)
key = info[0].split(":", 1)[1]
typ = info[5]

existing = self._redis.hgetall("PREVIOUS:" + key + ":" + typ)
tstate = {}
tstate[key] = {}
tstate[key][typ] = {}
state = UVEServer.convert_previous(existing, tstate, key, typ)

for attr, hval in self._redis.hgetall(value).iteritems():
snhdict = xmltodict.parse(hval)

if UVEServer._is_agg_list(snhdict[attr]):
if snhdict[attr]['list']['@size'] == "0":
continue
if snhdict[attr]['list']['@size'] == "1":
sname = ParallelAggregator.get_list_name(
snhdict[attr])
if not isinstance(
snhdict[attr]['list'][sname], list):
snhdict[attr]['list'][sname] = \
[snhdict[attr]['list'][sname]]

if (attr not in state[key][typ]):
# There is no existing entry for the UVE
vstr = json.dumps(snhdict[attr])
else:
# There is an existing entry
# Merge the new entry with the existing one
state = UVEServer.merge_previous(
state, key, typ, attr, snhdict[attr])
vstr = json.dumps(state[key][typ][attr]['previous'])

# Store the merged result back in the database
self._redis.sadd("PUVES:" + typ, key)
self._redis.sadd("PTYPES:" + key, typ)
self._redis.hset("PREVIOUS:" + key + ":" + typ, attr, vstr)

self._redis.delete(value)
except redis.exceptions.ResponseError:
#send redis connection down msg. Coule be bcos of authentication
Expand Down Expand Up @@ -171,71 +146,54 @@ def _is_agg_list(attr):
return True
return False

@staticmethod
def convert_previous(existing, state, key, typ, afilter=None):
# Take the existing delete record, and load it into the state dict
for attr, hval in existing.iteritems():
hdict = json.loads(hval)

if afilter is not None and len(afilter):
if attr not in afilter:
continue

# When recording deleted attributes, only record those
# for which delete-time aggregation is needed
if UVEServer._is_agg_item(hdict):
if (typ not in state[key]):
state[key][typ] = {}
if (attr not in state[key][typ]):
state[key][typ][attr] = {}
state[key][typ][attr]["previous"] = hdict

# For lists that require delete-time aggregation, we need
# to normailize lists of size 1, and ignore those of size 0
if UVEServer._is_agg_list(hdict):
if hdict['list']['@size'] != "0":
if (typ not in state[key]):
state[key][typ] = {}
if (attr not in state[key][typ]):
state[key][typ][attr] = {}
state[key][typ][attr]["previous"] = hdict
if hdict['list']['@size'] == "1":
sname = ParallelAggregator.get_list_name(hdict)
if not isinstance(hdict['list'][sname], list):
hdict['list'][sname] = [hdict['list'][sname]]

return state

def get_part(self, part):
uves = {}
for redis_uve in self._redis_uve_list:
gen_uves = {}
redish = redis.StrictRedis(host=redis_uve[0],
port=redis_uve[1], db=1)
for elems in redish.smembers("PART2KEY:" + str(part)):
info = elems.split(":", 5)
gen = info[0] + ":" + info[1] + ":" + info[2] + ":" + info[3]
key = info[5]
if not gen_uves.has_key(gen):
gen_uves[gen] = {}
gen_uves[gen][key] = 0
uves[redis_uve[0] + ":" + str(redis_uve[1])] = gen_uves
for r_inst in self._redis_uve_map.keys():
try:
(r_ip,r_port) = r_inst
if not self._redis_uve_map[r_inst]:
self._redis_uve_map[r_inst] = redis.StrictRedis(
host=r_ip, port=r_port,
password=self._redis_password, db=1)

redish = self._redis_uve_map[r_inst]
gen_uves = {}
for elems in redish.smembers("PART2KEY:" + str(part)):
info = elems.split(":", 5)
gen = info[0] + ":" + info[1] + ":" + info[2] + ":" + info[3]
key = info[5]
if not gen_uves.has_key(gen):
gen_uves[gen] = {}
gen_uves[gen][key] = 0
uves[r_ip + ":" + str(r_port)] = gen_uves
except Exception as e:
self._logger.error("get_part failed %s for : %s:%d tb %s" \
% (str(e), r_ip, r_port, traceback.format_exc()))
self._redis_uve_map[r_inst] = None
raise e
return uves

def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False):
def get_uve(self, key, flat, filters=None, is_alarm=False):

filters = filters or {}
sfilter = filters.get('sfilt')
mfilter = filters.get('mfilt')
tfilter = filters.get('cfilt')
ackfilter = filters.get('ackfilt')
state = {}
state[key] = {}
statdict = {}
for redis_uve in self._redis_uve_list:
redish = redis.StrictRedis(host=redis_uve[0],
port=redis_uve[1],
password=self._redis_password, db=1)
rsp = {}
failures = False

for r_inst in self._redis_uve_map.keys():
try:
(r_ip,r_port) = r_inst
if not self._redis_uve_map[r_inst]:
self._redis_uve_map[r_inst] = redis.StrictRedis(
host=r_ip, port=r_port,
password=self._redis_password, db=1)

redish = self._redis_uve_map[r_inst]
qmap = {}
ppe = redish.pipeline()
ppe.smembers("ALARM_ORIGINS:" + key)
Expand All @@ -259,7 +217,6 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False):
if mfilter != mdule:
continue
origins.add(smt)

ppeval = redish.pipeline()
for origs in origins:
ppeval.hgetall("VALUES:" + key + ":" + origs)
Expand All @@ -278,6 +235,7 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False):
afilter_list = set()
if tfilter is not None:
afilter_list = tfilter[typ]

for attr, value in odict.iteritems():
if len(afilter_list):
if attr not in afilter_list:
Expand Down Expand Up @@ -315,36 +273,6 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False):
snhdict[attr]['list']['@size'] = \
str(len(alarms))
else:
if not flat:
continue
if typ not in statdict:
statdict[typ] = {}
statdict[typ][attr] = []
statsattr = json.loads(value)
for elem in statsattr:
edict = {}
if elem["rtype"] == "list":
elist = redish.lrange(elem["href"], 0, -1)
for eelem in elist:
jj = json.loads(eelem).items()
edict[jj[0][0]] = jj[0][1]
elif elem["rtype"] == "zset":
elist = redish.zrange(
elem["href"], 0, -1, withscores=True)
for eelem in elist:
tdict = json.loads(eelem[0])
tval = long(tdict["ts"])
dt = datetime.datetime.utcfromtimestamp(
float(tval) / 1000000)
tms = (tval % 1000000) / 1000
tstr = dt.strftime('%Y %b %d %H:%M:%S')
edict[tstr + "." + str(tms)] = eelem[1]
elif elem["rtype"] == "hash":
elist = redish.hgetall(elem["href"])
edict = elist

statdict[typ][attr].append(
{elem["aggtype"]: edict})
continue

# print "Attr %s Value %s" % (attr, snhdict)
Expand All @@ -356,35 +284,17 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False):
key][typ][attr][dsource])
state[key][typ][attr][dsource] = snhdict[attr]

if sfilter is None and mfilter is None:
for ptyp in redish.smembers("PTYPES:" + key):
afilter = None
if tfilter is not None:
if ptyp not in tfilter:
continue
afilter = tfilter[ptyp]
existing = redish.hgetall("PREVIOUS:" + key + ":" + ptyp)
nstate = UVEServer.convert_previous(
existing, state, key, ptyp, afilter)
state = copy.deepcopy(nstate)

pa = ParallelAggregator(state)
rsp = pa.aggregate(key, flat)
except redis.exceptions.ConnectionError:
self._logger.error("Failed to connect to redis-uve: %s:%d" \
% (redis_uve[0], redis_uve[1]))
rsp = pa.aggregate(key, flat,)
except Exception as e:
self._logger.error("Exception: %s" % e)
return {}
self._logger.error("redis-uve failed %s for : %s:%d tb %s" \
% (str(e), r_ip, r_port, traceback.format_exc()))
self._redis_uve_map[r_inst] = None
failures = True
else:
self._logger.debug("Computed %s" % key)
self._logger.debug("Computed %s as %s" % (key,str(rsp)))

for k, v in statdict.iteritems():
if k in rsp:
mp = dict(v.items() + rsp[k].items())
statdict[k] = mp

return dict(rsp.items() + statdict.items())
return failures, rsp
# end get_uve

def get_uve_regex(self, key):
Expand All @@ -402,8 +312,8 @@ def multi_uve_get(self, table, flat, filters=None, is_alarm=False):
# so we don't pass them here
uve_list = self.get_uve_list(table, filters, False, is_alarm)
for uve_name in uve_list:
uve_val = self.get_uve(
table + ':' + uve_name, flat, filters, True, is_alarm)
_,uve_val = self.get_uve(
table + ':' + uve_name, flat, filters, is_alarm,)
if uve_val == {}:
continue
else:
Expand All @@ -420,11 +330,17 @@ def get_uve_list(self, table, filters=None, parse_afilter=False,
patterns = set()
for filt in kfilter:
patterns.add(self.get_uve_regex(filt))
for redis_uve in self._redis_uve_list:
redish = redis.StrictRedis(host=redis_uve[0],
port=redis_uve[1],
password=self._redis_password, db=1)

for r_inst in self._redis_uve_map.keys():
try:
(r_ip,r_port) = r_inst
if not self._redis_uve_map[r_inst]:
self._redis_uve_map[r_inst] = redis.StrictRedis(
host=r_ip, port=r_port,
password=self._redis_password, db=1)

redish = self._redis_uve_map[r_inst]

# For UVE queries, we wanna read both UVE and Alarm table
entries = redish.smembers('ALARM_TABLE:' + table)
if not is_alarm:
Expand Down Expand Up @@ -466,12 +382,10 @@ def get_uve_list(self, table, filters=None, parse_afilter=False,
if attrval is None:
continue
uve_list.add(uve_key)
except redis.exceptions.ConnectionError:
self._logger.error('Failed to connect to redis-uve: %s:%d' \
% (redis_uve[0], redis_uve[1]))
except Exception as e:
self._logger.error('Exception: %s' % e)
return set()
self._logger.error("get_uve_list failed %s for : %s:%d tb %s" \
% (str(e), r_ip, r_port, traceback.format_exc()))
self._redis_uve_map[r_inst] = None
return uve_list
# end get_uve_list

Expand Down Expand Up @@ -707,5 +621,5 @@ def aggregate(self, key, flat):
if __name__ == '__main__':
uveserver = UVEServer(None, 0, None, None)
gevent.spawn(uveserver.run())
uve_state = json.loads(uveserver.get_uve("abc-corp:vn02", False))
_, uve_state = json.loads(uveserver.get_uve("abc-corp:vn02", False))
print json.dumps(uve_state, indent=4, sort_keys=True)

0 comments on commit f797920

Please sign in to comment.