diff --git a/src/opserver/alarmgen.py b/src/opserver/alarmgen.py index 1777750810a..c19580b9485 100644 --- a/src/opserver/alarmgen.py +++ b/src/opserver/alarmgen.py @@ -42,7 +42,9 @@ PartitionStatusResp, UVETableAlarmReq, UVETableAlarmResp, \ AlarmgenTrace, UVEKeyInfo, UVETypeInfo, AlarmgenStatusTrace, \ AlarmgenStatus, AlarmgenStats, AlarmgenPartitionTrace, \ - AlarmgenPartition, AlarmgenPartionInfo, AlarmgenUpdate + AlarmgenPartition, AlarmgenPartionInfo, AlarmgenUpdate, \ + UVETableInfoReq, UVETableInfoResp, UVEObjectInfo, UVEStructInfo, \ + UVETablePerfReq, UVETablePerfResp, UVETableInfo from sandesh.discovery.ttypes import CollectorTrace from cpuinfo import CpuInfoData @@ -50,6 +52,123 @@ from stevedore import hook from pysandesh.util import UTCTimestampUsec +class AGTabStats(object): + """ This class is used to store per-UVE-table information + about the time taken and number of instances when + a UVE was retrieved, published or evaluated for alarms + """ + def __init__(self): + self.reset() + + def record_get(self, get_time): + self.get_time += get_time + self.get_n += 1 + + def record_pub(self, get_time): + self.pub_time += get_time + self.pub_n += 1 + + def record_call(self, get_time): + self.call_time += get_time + self.call_n += 1 + + def get_result(self): + if self.get_n: + return self.get_time / self.get_n + else: + return 0 + + def pub_result(self): + if self.pub_n: + return self.pub_time / self.pub_n + else: + return 0 + + def call_result(self): + if self.call_n: + return self.call_time / self.call_n + else: + return 0 + + def reset(self): + self.call_time = 0 + self.call_n = 0 + self.get_time = 0 + self.get_n = 0 + self.pub_time = 0 + self.pub_n = 0 + + +class AGKeyInfo(object): + """ This class is used to maintain UVE contents + """ + + def __init__(self, part): + self._part = part + # key of struct name, value of content dict + + self.current_dict = {} + self.update({}) + + def update_single(self, typ, val): + # A single UVE struct has changed + # If the UVE has gone away, the val is passed in as None + + self.set_removed = set() + self.set_added = set() + self.set_changed = set() + self.set_unchanged = self.current_dict.keys() + + if typ in self.current_dict: + # the "added" set must stay empty in this case + if val is None: + self.set_unchanged.remove(typ) + self.set_removed.add(typ) + del self.current_dict[typ] + else: + # both "added" and "removed" will be empty + if val != self.current_dict[typ]: + self.set_unchanged.remove(typ) + self.set_changed.add(typ) + self.current_dict[typ] = val + else: + if val != None: + self.set_added.add(typ) + self.current_dict[typ] = val + + def update(self, new_dict): + # A UVE has changed, and we have the entire new + # content of the UVE available in new_dict + set_current = set(new_dict.keys()) + set_past = set(self.current_dict.keys()) + set_intersect = set_current.intersection(set_past) + + self.set_added = set_current - set_intersect + self.set_removed = set_past - set_intersect + self.set_changed = set() + self.set_unchanged = set() + for o in set_intersect: + if new_dict[o] != self.current_dict[o]: + self.set_changed.add(o) + else: + self.set_unchanged.add(o) + self.current_dict = new_dict + + def values(self): + return self.current_dict + + def added(self): + return self.set_added + + def removed(self): + return self.set_removed + + def changed(self): + return self.set_changed + + def unchanged(self): + return self.set_unchanged + class Controller(object): @staticmethod @@ -80,7 +199,6 @@ def __init__(self, conf): enable_syslog=self._conf.use_syslog(), syslog_facility=self._conf.syslog_facility()) self._logger = sandesh_global._logger - # Trace buffer list self.trace_buf = [ {'name':'DiscoveryMsg', 'size':1000} @@ -96,6 +214,9 @@ def __init__(self, conf): "ObjectConfigNode" ] self.mgrs = {} self.tab_alarms = {} + self.ptab_info = {} + self.tab_perf = {} + self.tab_perf_prev = {} for table in tables: self.mgrs[table] = hook.HookManager( namespace='contrail.analytics.alarms', @@ -110,6 +231,7 @@ def __init__(self, conf): (table, extn.name, extn.entry_point_target, extn.obj.__doc__)) self.tab_alarms[table] = {} + self.tab_perf[table] = AGTabStats() ConnectionState.init(sandesh_global, self._hostname, self._moduleid, self._instance_id, @@ -119,6 +241,8 @@ def __init__(self, conf): self._us = UVEServer(None, self._logger, self._conf.redis_password()) self._workers = {} + self._uveq = {} + self._uveqf = {} self.disc = None self._libpart_name = self._hostname + ":" + self._instance_id @@ -156,6 +280,8 @@ def __init__(self, conf): PartitionOwnershipReq.handle_request = self.handle_PartitionOwnershipReq PartitionStatusReq.handle_request = self.handle_PartitionStatusReq UVETableAlarmReq.handle_request = self.handle_UVETableAlarmReq + UVETableInfoReq.handle_request = self.handle_UVETableInfoReq + UVETablePerfReq.handle_request = self.handle_UVETablePerfReq def libpart_cb(self, part_list): @@ -185,6 +311,9 @@ def libpart_cb(self, part_list): self._logger.error('Partition Del : %s' % delpart) self.partition_change(delpart, False) + self._logger.error('Partition List done : new %s old %s' % \ + (str(newset),str(oldset))) + def start_libpart(self, ag_list): if not self._conf.zk_list(): self._logger.error('Could not import libpartition: No zookeeper') @@ -216,31 +345,175 @@ def start_libpart(self, ag_list): self._logger.error('Could not import libpartition: %s' % str(e)) return None - def handle_uve_notif(self, uves, remove = False): - self._logger.debug("Changed UVEs : %s" % str(uves)) - no_handlers = set() - for uv in uves: + def handle_uve_notifq(self, part, uves): + if part not in self._uveq: + self._uveq[part] = {} + for uv,types in uves.iteritems(): + if types is None: + self._uveq[part][uv] = None + else: + if uv in self._uveq[part]: + if self._uveq[part][uv] is not None: + self._uveq[part][uv].update(types) + else: + self._uveq[part][uv] = set() + self._uveq[part][uv].update(types) + + def run_uve_processing(self): + """ + This function runs in its own gevent, and provides state compression + for UVEs. + Kafka worker (PartitionHandler) threads detect which UVE have changed + and accumulate them onto a set. When this gevent runs, it processes + all UVEs of the set. Even if this gevent cannot run for a while, the + set should not grow in an unbounded manner (like a queue can) + """ + + while True: + for part in self._uveqf.keys(): + self._logger.error("Stop UVE processing for %d" % part) + self.stop_uve_partition(part) + del self._uveqf[part] + if part in self._uveq: + del self._uveq[part] + prev = time.time() + gevs = {} + for part in self._uveq.keys(): + if not len(self._uveq[part]): + continue + self._logger.info("UVE Process for %d" % part) + + # Allow the partition handlers to queue new UVEs without + # interfering with the work of processing the current UVEs + workingset = copy.deepcopy(self._uveq[part]) + self._uveq[part] = {} + + gevs[part] = gevent.spawn(self.handle_uve_notif,part,workingset) + if len(gevs): + gevent.joinall(gevs.values()) + for part in gevs.keys(): + # If UVE processing failed, requeue the working set + if not gevs[part].get(): + self._logger.error("UVE Process failed for %d" % part) + self.handle_uve_notifq(part, workingset) + curr = time.time() + if (curr - prev) < 0.1: + gevent.sleep(0.1 - (curr - prev)) + else: + self._logger.info("UVE Process saturated") + gevent.sleep(0) + + def stop_uve_partition(self, part): + for tk in self.ptab_info[part].keys(): + for uk in self.ptab_info[part][tk].keys(): + if tk in self.tab_alarms: + if uk in self.tab_alarms[tk]: + del self.tab_alarms[tk][uk] + ustruct = UVEAlarms(name = ok, deleted = True) + alarm_msg = AlarmTrace(data=ustruct, table=tk) + self._logger.info('send del alarm: %s' % (alarm_msg.log())) + alarm_msg.send() + del self.ptab_info[part][tk][uk] + self._logger.info("UVE %s deleted" % (uk)) + del self.ptab_info[part][tk] + del self.ptab_info[part] + + def handle_uve_notif(self, part, uves): + """ + Call this function when a UVE has changed. This can also + happed when taking ownership of a partition, or when a + generator is deleted. + Args: + part : Partition Number + uve : dict, where the key is the UVE Name. + The value is either a set of UVE structs, or "None", + which means that all UVE structs should be processed + + Returns: + status of operation (True for success) + """ + self._logger.debug("Changed part %d UVEs : %s" % (part, str(uves))) + success = True + for uv,types in uves.iteritems(): tab = uv.split(':',1)[0] + if tab not in self.tab_perf: + self.tab_perf[tab] = AGTabStats() + uve_name = uv.split(':',1)[1] - if not self.mgrs.has_key(tab): - no_handlers.add(tab) - continue - if remove: - uve_data = [] + prevt = UTCTimestampUsec() + filters = {} + if types: + filters["cfilt"] = {} + for typ in types: + filters["cfilt"][typ] = set() + failures, uve_data = self._us.get_uve(uv, True, filters) + if failures: + success = False + self.tab_perf[tab].record_get(UTCTimestampUsec() - prevt) + # Handling Agg UVEs + if not part in self.ptab_info: + self.ptab_info[part] = {} + + if not tab in self.ptab_info[part]: + self.ptab_info[part][tab] = {} + + if uve_name not in self.ptab_info[part][tab]: + self.ptab_info[part][tab][uve_name] = AGKeyInfo(part) + + prevt = UTCTimestampUsec() + if not types: + self.ptab_info[part][tab][uve_name].update(uve_data) + if len(self.ptab_info[part][tab][uve_name].removed()): + self._logger.info("UVE %s removed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].removed())) + if len(self.ptab_info[part][tab][uve_name].changed()): + self._logger.debug("UVE %s changed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].changed())) + if len(self.ptab_info[part][tab][uve_name].added()): + self._logger.debug("UVE %s added structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].added())) else: - filters = {'kfilt': [uve_name]} - itr = self._us.multi_uve_get(tab, True, filters) - uve_data = itr.next()['value'] - if len(uve_data) == 0: - self._logger.info("UVE %s deleted" % uv) - if self.tab_alarms[tab].has_key(uv): - del self.tab_alarms[tab][uv] - ustruct = UVEAlarms(name = uve_name, deleted = True) - alarm_msg = AlarmTrace(data=ustruct, table=tab) - self._logger.info('send del alarm: %s' % (alarm_msg.log())) - alarm_msg.send() + for typ in types: + val = None + if typ in uve_data: + val = uve_data[typ] + self.ptab_info[part][tab][uve_name].update_single(typ, val) + if len(self.ptab_info[part][tab][uve_name].removed()): + self._logger.info("UVE %s removed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].removed())) + if len(self.ptab_info[part][tab][uve_name].changed()): + self._logger.debug("UVE %s changed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].changed())) + if len(self.ptab_info[part][tab][uve_name].added()): + self._logger.debug("UVE %s added structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].added())) + + local_uve = self.ptab_info[part][tab][uve_name].values() + + if len(local_uve.keys()) == 0: + self._logger.info("UVE %s deleted" % (uve_name)) + del self.ptab_info[part][tab][uve_name] + + self.tab_perf[tab].record_pub(UTCTimestampUsec() - prevt) + + # Withdraw the alarm if the UVE has no non-alarm structs + if len(local_uve.keys()) == 0 or \ + (len(local_uve.keys()) == 1 and "UVEAlarms" in local_uve): + if tab in self.tab_alarms: + if uv in self.tab_alarms[tab]: + del self.tab_alarms[tab][uv] + ustruct = UVEAlarms(name = uve_name, deleted = True) + alarm_msg = AlarmTrace(data=ustruct, table=tab) + self._logger.info('send del alarm: %s' % (alarm_msg.log())) + alarm_msg.send() + continue + + # Handing Alarms + if not self.mgrs.has_key(tab): continue - results = self.mgrs[tab].map_method("__call__", uv, uve_data) + prevt = UTCTimestampUsec() + results = self.mgrs[tab].map_method("__call__", uv, local_uve) + self.tab_perf[tab].record_call(UTCTimestampUsec() - prevt) new_uve_alarms = {} for res in results: nm, sev, errs = res @@ -296,9 +569,39 @@ def handle_uve_notif(self, uves, remove = False): alarm_msg = AlarmTrace(data=ustruct, table=tab) self._logger.info('send alarm: %s' % (alarm_msg.log())) alarm_msg.send() - - if len(no_handlers): - self._logger.debug('No Alarm Handlers for %s' % str(no_handlers)) + return success + + def handle_UVETableInfoReq(self, req): + if req.partition == -1: + parts = self.ptab_info.keys() + else: + parts = [req.partition] + + self._logger.info("Got UVETableInfoReq : %s" % str(parts)) + np = 1 + for part in parts: + if part not in self.ptab_info: + continue + tables = [] + for tab in self.ptab_info[part].keys(): + uvel = [] + for uk,uv in self.ptab_info[part][tab].iteritems(): + types = [] + for tk,tv in uv.values().iteritems(): + types.append(UVEStructInfo(type = tk, + content = json.dumps(tv))) + uvel.append(UVEObjectInfo( + name = uk, structs = types)) + tables.append(UVETableInfo(table = tab, uves = uvel)) + resp = UVETableInfoResp(partition = part) + resp.tables = tables + + if np == len(parts): + mr = False + else: + mr = True + resp.response(req.context(), mr) + np = np + 1 def handle_UVETableAlarmReq(self, req): status = False @@ -324,6 +627,27 @@ def handle_UVETableAlarmReq(self, req): resp.response(req.context(), mr) np = np + 1 + def handle_UVETablePerfReq(self, req): + status = False + if req.table == "all": + parts = self.tab_perf_prev.keys() + else: + parts = [req.table] + self._logger.info("Got UVETablePerfReq : %s" % str(parts)) + np = 1 + for pt in parts: + resp = UVETablePerfResp(table = pt) + resp.call_time = self.tab_perf_prev[pt].call_result() + resp.get_time = self.tab_perf_prev[pt].get_result() + resp.pub_time = self.tab_perf_prev[pt].pub_result() + resp.updates = self.tab_perf_prev[pt].get_n + + if np == len(parts): + mr = False + else: + mr = True + resp.response(req.context(), mr) + np = np + 1 def partition_change(self, partno, enl): """ @@ -337,19 +661,35 @@ def partition_change(self, partno, enl): """ status = False if enl: - if self._workers.has_key(partno): + if partno in self._workers: self._logger.info("Dup partition %d" % partno) else: - #uvedb = self._us.get_part(partno) ph = UveStreamProc(','.join(self._conf.kafka_broker_list()), partno, "uve-" + str(partno), self._logger, self._us.get_part, - self.handle_uve_notif) + self.handle_uve_notifq) ph.start() self._workers[partno] = ph - status = True + tout = 600 + idx = 0 + while idx < tout: + # When this partitions starts, + # uveq will get created + if partno not in self._uveq: + gevent.sleep(.1) + else: + break + idx += 1 + if partno in self._uveq: + status = True + else: + # TODO: The partition has not started yet, + # but it still might start later. + # We possibly need to exit + status = False + self._logger.error("Unable to start partition %d" % partno) else: - if self._workers.has_key(partno): + if partno in self._workers: ph = self._workers[partno] gevent.kill(ph) res,db = ph.get() @@ -358,7 +698,26 @@ def partition_change(self, partno, enl): for k,v in db.iteritems(): print "%s -> %s" % (k,str(v)) del self._workers[partno] - status = True + self._uveqf[partno] = True + + tout = 600 + idx = 0 + while idx < tout: + # When this partitions stop.s + # uveq will get destroyed + if partno in self._uveq: + gevent.sleep(.1) + else: + break + idx += 1 + if partno not in self._uveq: + status = True + else: + # TODO: The partition has not stopped yet + # but it still might stop later. + # We possibly need to exit + status = False + self._logger.error("Unable to stop partition %d" % partno) else: self._logger.info("No partition %d" % partno) @@ -377,6 +736,11 @@ def process_stats(self): the previous time period over all partitions and send it out ''' + self.tab_perf_prev = copy.deepcopy(self.tab_perf) + for kt in self.tab_perf.keys(): + #self.tab_perf_prev[kt] = copy.deepcopy(self.tab_perf[kt]) + self.tab_perf[kt].reset() + s_partitions = set() s_keys = set() n_updates = 0 @@ -462,6 +826,7 @@ def handle_PartitionStatusReq(self, req): resp.partition = pt if self._workers.has_key(pt): resp.enabled = True + resp.offset = self._workers[pt]._partoffset resp.uves = [] for kcoll,coll in self._workers[pt].contents().iteritems(): uci = UVECollInfo() @@ -568,7 +933,7 @@ def setup_controller(argv): def main(args=None): controller = setup_controller(args or ' '.join(sys.argv[1:])) gevs = [ - gevent.spawn(controller.run)] + gevent.spawn(controller.run), gevent.spawn(controller.run_uve_processing)] if controller.disc: sp1 = ServicePoller(controller._logger, CollectorTrace, controller.disc, \ diff --git a/src/opserver/alarmgen_ctrl.sandesh b/src/opserver/alarmgen_ctrl.sandesh index 2421c3f806c..45d067c292e 100644 --- a/src/opserver/alarmgen_ctrl.sandesh +++ b/src/opserver/alarmgen_ctrl.sandesh @@ -41,7 +41,45 @@ struct UVECollInfo { response sandesh PartitionStatusResp { 1: bool enabled 2: u32 partition - 3: list uves + 3: u64 offset + 4: list uves +} + +request sandesh UVETableInfoReq { + 1: u32 partition +} + +struct UVEStructInfo { + 1: string type + 2: string content +} + +struct UVEObjectInfo { + 1: string name + 3: list structs +} + +struct UVETableInfo { + 1: string table + 7: list uves +} + + +response sandesh UVETableInfoResp { + 2: u32 partition + 7: list tables +} + +request sandesh UVETablePerfReq { + 1: string table +} + +response sandesh UVETablePerfResp { + 1: string table + 3: u64 updates + 4: u64 get_time + 5: u64 pub_time + 6: u64 call_time } request sandesh UVETableAlarmReq { diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index bea789efdb8..87391e0a978 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -1421,7 +1421,7 @@ def _uve_alarm_http_post(self, is_alarm): first = True for key in filters['kfilt']: uve_name = uve_tbl + ':' + key - rsp = self._uve_server.get_uve(uve_name, True, filters, + _, rsp = self._uve_server.get_uve(uve_name, True, filters, is_alarm=is_alarm, base_url=base_url) if rsp != {}: @@ -1484,7 +1484,7 @@ def _uve_alarm_http_get(self, name, is_alarm): yield u', ' + json.dumps(gen) yield u']}' else: - rsp = self._uve_server.get_uve(uve_name, flat, filters, + _, rsp = self._uve_server.get_uve(uve_name, flat, filters, is_alarm=is_alarm, base_url=base_url) yield json.dumps(rsp) diff --git a/src/opserver/partition_handler.py b/src/opserver/partition_handler.py index b7df952512d..b418695fe0a 100644 --- a/src/opserver/partition_handler.py +++ b/src/opserver/partition_handler.py @@ -23,6 +23,7 @@ def __init__(self, brokers, partition, group, topic, logger, limit): self._limit = limit self._partdb = {} self._partoffset = None + self._kfk = None def msg_handler(self, om): self._partoffset = om.offset @@ -43,9 +44,9 @@ def _run(self): while True: try: self._logger.info("New KafkaClient %d" % self._partition) - kafka = KafkaClient(self._brokers ,str(os.getpid())) + self._kfk = KafkaClient(self._brokers ,str(os.getpid())) try: - consumer = SimpleConsumer(kafka, self._group, self._topic, buffer_size = 4096*4, max_buffer_size=4096*32) + consumer = SimpleConsumer(self._kfk, self._group, self._topic, buffer_size = 4096*4, max_buffer_size=4096*32) #except: except Exception as ex: template = "Consumer Failure {0} occured. Arguments:\n{1!r}" @@ -75,15 +76,17 @@ def _run(self): while True: try: - mm = consumer.get_message(timeout=None) - if mm is None: - continue - self._logger.debug("%d Reading offset %d" % (self._partition, mm.offset)) - consumer.commit() - pcount += 1 - if not self.msg_handler(mm): - self._logger.info("%d could not handle %s" % (self._partition, str(mm))) - raise gevent.GreenletExit + mlist = consumer.get_messages(10) + for mm in mlist: + if mm is None: + continue + self._logger.debug("%d Reading offset %d" % \ + (self._partition, mm.offset)) + consumer.commit() + pcount += 1 + if not self.msg_handler(mm): + self._logger.info("%d could not handle %s" % (self._partition, str(mm))) + raise gevent.GreenletExit except TypeError: gevent.sleep(0.1) except common.FailedPayloadsError as ex: @@ -126,17 +129,6 @@ def __init__(self, brokers, partition, uve_topic, logger, uvecb, callback): def __del__(self): self._logger.info("Destroying UVEStream for part %d" % self._partno) - self.stop_partition() - - def stop_partition(self): - uves = set() - for kcoll,coll in self._uvedb.iteritems(): - for kgen,gen in coll.iteritems(): - uves.update(set(gen.keys())) - self._logger.info("Stopping part %d with UVE keys %s" % \ - (self._partno, str(uves))) - self._callback(uves, True) - self._uvedb = {} def start_partition(self): ''' This function loads the initial UVE database. @@ -145,13 +137,14 @@ def start_partition(self): self._uvedb = self._uvecb(self._partno) self._logger.debug("Starting part %d with UVE db %s" % \ (self._partno,str(self._uvedb))) - uves = set() + uves = {} for kcoll,coll in self._uvedb.iteritems(): for kgen,gen in coll.iteritems(): - uves.update(set(gen.keys())) + for kk in gen.keys(): + uves[kk] = None self._logger.info("Starting part %d with UVE keys %s" % \ (self._partno,str(uves))) - self._callback(uves) + self._callback(self._partno, uves) def contents(self): return self._uvedb @@ -171,7 +164,7 @@ def stats(self): def msg_handler(self, om): self._partoffset = om.offset - chg = set() + chg = {} try: uv = json.loads(om.message.value) self._partdb[om.message.key] = uv @@ -212,7 +205,8 @@ def msg_handler(self, om): else: self._uvein[tab][coll][gen][uv["type"]] += 1 - chg.add(uv["key"]) + chg[uv["key"]] = set() + chg[uv["key"]].add(uv["type"]) else: # Record stats on UVE Keys being processed for uk in self._uvedb[coll][gen].keys(): @@ -225,9 +219,10 @@ def msg_handler(self, om): else: self._uveout[tab][uk] = 1 - # when a generator is delelted, we need to - # notify for *ALL* its UVEs - chg = set(self._uvedb[coll][gen].keys()) + # when a generator is delelted, we need to + # notify for *ALL* its UVEs + chg[uk] = None + del self._uvedb[coll][gen] # TODO : For the collector's generator, notify all @@ -238,7 +233,7 @@ def msg_handler(self, om): self._logger.info("%s" % messag) return False else: - self._callback(chg) + self._callback(self._partno, chg) return True if __name__ == '__main__': diff --git a/src/opserver/uveserver.py b/src/opserver/uveserver.py index a7659e9cd38..46a4fd5e6e3 100644 --- a/src/opserver/uveserver.py +++ b/src/opserver/uveserver.py @@ -21,12 +21,13 @@ from pysandesh.util import UTCTimestampUsec from pysandesh.connection_info import ConnectionState from sandesh.viz.constants import UVE_MAP +import traceback class UVEServer(object): def __init__(self, redis_uve_server, logger, redis_password=None): self._local_redis_uve = redis_uve_server - self._redis_uve_list = [] + self._redis_uve_map = {} self._logger = logger self._sem = BoundedSemaphore(1) self._redis = None @@ -43,7 +44,21 @@ def __init__(self, redis_uve_server, logger, redis_password=None): #end __init__ def update_redis_uve_list(self, redis_uve_list): - self._redis_uve_list = redis_uve_list + newlist = set() + for elem in redis_uve_list: + newlist.add((elem[0],elem[1])) + + # if some redis instances are gone, remove them from our map + for test_elem in self._redis_uve_map.keys(): + if test_elem not in newlist: + del self._redis_uve_map[test_elem] + + # new redis instances need to be inserted into the map + for test_elem in newlist: + if test_elem not in self._redis_uve_map: + (r_ip, r_port) = test_elem + self._redis_uve_map[test_elem] = redis.StrictRedis( + r_ip, r_port, password=self._redis_password, db=1) # end update_redis_uve_list def fill_redis_uve_info(self, redis_uve_info): @@ -97,45 +112,6 @@ def run(self): self._logger.debug("%s del received for " % value) # value is of the format: # DEL::::::: - info = value.rsplit(":", 6) - key = info[0].split(":", 1)[1] - typ = info[5] - - existing = self._redis.hgetall("PREVIOUS:" + key + ":" + typ) - tstate = {} - tstate[key] = {} - tstate[key][typ] = {} - state = UVEServer.convert_previous(existing, tstate, key, typ) - - for attr, hval in self._redis.hgetall(value).iteritems(): - snhdict = xmltodict.parse(hval) - - if UVEServer._is_agg_list(snhdict[attr]): - if snhdict[attr]['list']['@size'] == "0": - continue - if snhdict[attr]['list']['@size'] == "1": - sname = ParallelAggregator.get_list_name( - snhdict[attr]) - if not isinstance( - snhdict[attr]['list'][sname], list): - snhdict[attr]['list'][sname] = \ - [snhdict[attr]['list'][sname]] - - if (attr not in state[key][typ]): - # There is no existing entry for the UVE - vstr = json.dumps(snhdict[attr]) - else: - # There is an existing entry - # Merge the new entry with the existing one - state = UVEServer.merge_previous( - state, key, typ, attr, snhdict[attr]) - vstr = json.dumps(state[key][typ][attr]['previous']) - - # Store the merged result back in the database - self._redis.sadd("PUVES:" + typ, key) - self._redis.sadd("PTYPES:" + key, typ) - self._redis.hset("PREVIOUS:" + key + ":" + typ, attr, vstr) - self._redis.delete(value) except redis.exceptions.ResponseError: #send redis connection down msg. Coule be bcos of authentication @@ -174,58 +150,34 @@ def _is_agg_list(attr): return True return False - @staticmethod - def convert_previous(existing, state, key, typ, afilter=None): - # Take the existing delete record, and load it into the state dict - for attr, hval in existing.iteritems(): - hdict = json.loads(hval) - - if afilter is not None and len(afilter): - if attr not in afilter: - continue - - # When recording deleted attributes, only record those - # for which delete-time aggregation is needed - if UVEServer._is_agg_item(hdict): - if (typ not in state[key]): - state[key][typ] = {} - if (attr not in state[key][typ]): - state[key][typ][attr] = {} - state[key][typ][attr]["previous"] = hdict - - # For lists that require delete-time aggregation, we need - # to normailize lists of size 1, and ignore those of size 0 - if UVEServer._is_agg_list(hdict): - if hdict['list']['@size'] != "0": - if (typ not in state[key]): - state[key][typ] = {} - if (attr not in state[key][typ]): - state[key][typ][attr] = {} - state[key][typ][attr]["previous"] = hdict - if hdict['list']['@size'] == "1": - sname = ParallelAggregator.get_list_name(hdict) - if not isinstance(hdict['list'][sname], list): - hdict['list'][sname] = [hdict['list'][sname]] - - return state - def get_part(self, part): uves = {} - for redis_uve in self._redis_uve_list: - gen_uves = {} - redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], db=1) - for elems in redish.smembers("PART2KEY:" + str(part)): - info = elems.split(":", 5) - gen = info[0] + ":" + info[1] + ":" + info[2] + ":" + info[3] - key = info[5] - if not gen_uves.has_key(gen): - gen_uves[gen] = {} - gen_uves[gen][key] = 0 - uves[redis_uve[0] + ":" + str(redis_uve[1])] = gen_uves + for r_inst in self._redis_uve_map.keys(): + try: + (r_ip,r_port) = r_inst + if not self._redis_uve_map[r_inst]: + self._redis_uve_map[r_inst] = redis.StrictRedis( + host=r_ip, port=r_port, + password=self._redis_password, db=1) + + redish = self._redis_uve_map[r_inst] + gen_uves = {} + for elems in redish.smembers("PART2KEY:" + str(part)): + info = elems.split(":", 5) + gen = info[0] + ":" + info[1] + ":" + info[2] + ":" + info[3] + key = info[5] + if not gen_uves.has_key(gen): + gen_uves[gen] = {} + gen_uves[gen][key] = 0 + uves[r_ip + ":" + str(r_port)] = gen_uves + except Exception as e: + self._logger.error("get_part failed %s for : %s:%d tb %s" \ + % (str(e), r_ip, r_port, traceback.format_exc())) + self._redis_uve_map[r_inst] = None + raise e return uves - - def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False, base_url=None): + + def get_uve(self, key, flat, filters=None, is_alarm=False, base_url=None): filters = filters or {} sfilter = filters.get('sfilt') mfilter = filters.get('mfilt') @@ -233,39 +185,62 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False, base_url ackfilter = filters.get('ackfilt') state = {} state[key] = {} - statdict = {} - for redis_uve in self._redis_uve_list: - redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], - password=self._redis_password, db=1) + rsp = {} + failures = False + + for r_inst in self._redis_uve_map.keys(): try: + (r_ip,r_port) = r_inst + if not self._redis_uve_map[r_inst]: + self._redis_uve_map[r_inst] = redis.StrictRedis( + host=r_ip, port=r_port, + password=self._redis_password, db=1) + + redish = self._redis_uve_map[r_inst] qmap = {} - origins = redish.smembers("ALARM_ORIGINS:" + key) + + ppe = redish.pipeline() + ppe.smembers("ALARM_ORIGINS:" + key) if not is_alarm: - origins = origins.union(redish.smembers("ORIGINS:" + key)) + ppe.smembers("ORIGINS:" + key) + pperes = ppe.execute() + origins = set() + for origset in pperes: + for smt in origset: + tt = smt.rsplit(":",1)[1] + sm = smt.rsplit(":",1)[0] + source = sm.split(":", 1)[0] + mdule = sm.split(":", 1)[1] + if tfilter is not None: + if tt not in tfilter: + continue + if sfilter is not None: + if sfilter != source: + continue + if mfilter is not None: + if mfilter != mdule: + continue + origins.add(smt) + + ppeval = redish.pipeline() for origs in origins: - info = origs.rsplit(":", 1) - sm = info[0].split(":", 1) - source = sm[0] - if sfilter is not None: - if sfilter != source: - continue - mdule = sm[1] - if mfilter is not None: - if mfilter != mdule: - continue - dsource = source + ":" + mdule + ppeval.hgetall("VALUES:" + key + ":" + origs) + odictlist = ppeval.execute() - typ = info[1] - if tfilter is not None: - if typ not in tfilter: - continue + idx = 0 + for origs in origins: - odict = redish.hgetall("VALUES:" + key + ":" + origs) + odict = odictlist[idx] + idx = idx + 1 + + info = origs.rsplit(":", 1) + dsource = info[0] + typ = info[1] afilter_list = set() if tfilter is not None: afilter_list = tfilter[typ] + for attr, value in odict.iteritems(): if len(afilter_list): if attr not in afilter_list: @@ -314,35 +289,17 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False, base_url key][typ][attr][dsource]) state[key][typ][attr][dsource] = snhdict[attr] - if sfilter is None and mfilter is None: - for ptyp in redish.smembers("PTYPES:" + key): - afilter = None - if tfilter is not None: - if ptyp not in tfilter: - continue - afilter = tfilter[ptyp] - existing = redish.hgetall("PREVIOUS:" + key + ":" + ptyp) - nstate = UVEServer.convert_previous( - existing, state, key, ptyp, afilter) - state = copy.deepcopy(nstate) - pa = ParallelAggregator(state, self._uve_reverse_map) rsp = pa.aggregate(key, flat, base_url) - except redis.exceptions.ConnectionError: - self._logger.error("Failed to connect to redis-uve: %s:%d" \ - % (redis_uve[0], redis_uve[1])) except Exception as e: - self._logger.error("Exception: %s" % e) - return {} + self._logger.error("redis-uve failed %s for : %s:%d tb %s" \ + % (str(e), r_ip, r_port, traceback.format_exc())) + self._redis_uve_map[r_inst] = None + failures = True else: - self._logger.debug("Computed %s" % key) + self._logger.debug("Computed %s as %s" % (key,str(rsp))) - for k, v in statdict.iteritems(): - if k in rsp: - mp = dict(v.items() + rsp[k].items()) - statdict[k] = mp - - return dict(rsp.items() + statdict.items()) + return failures, rsp # end get_uve def get_uve_regex(self, key): @@ -360,8 +317,8 @@ def multi_uve_get(self, table, flat, filters=None, is_alarm=False, base_url=None # so we don't pass them here uve_list = self.get_uve_list(table, filters, False, is_alarm) for uve_name in uve_list: - uve_val = self.get_uve( - table + ':' + uve_name, flat, filters, True, is_alarm, base_url) + _,uve_val = self.get_uve( + table + ':' + uve_name, flat, filters, is_alarm, base_url) if uve_val == {}: continue else: @@ -378,11 +335,17 @@ def get_uve_list(self, table, filters=None, parse_afilter=False, patterns = set() for filt in kfilter: patterns.add(self.get_uve_regex(filt)) - for redis_uve in self._redis_uve_list: - redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], - password=self._redis_password, db=1) + + for r_inst in self._redis_uve_map.keys(): try: + (r_ip,r_port) = r_inst + if not self._redis_uve_map[r_inst]: + self._redis_uve_map[r_inst] = redis.StrictRedis( + host=r_ip, port=r_port, + password=self._redis_password, db=1) + + redish = self._redis_uve_map[r_inst] + # For UVE queries, we wanna read both UVE and Alarm table entries = redish.smembers('ALARM_TABLE:' + table) if not is_alarm: @@ -424,12 +387,10 @@ def get_uve_list(self, table, filters=None, parse_afilter=False, if attrval is None: continue uve_list.add(uve_key) - except redis.exceptions.ConnectionError: - self._logger.error('Failed to connect to redis-uve: %s:%d' \ - % (redis_uve[0], redis_uve[1])) except Exception as e: - self._logger.error('Exception: %s' % e) - return set() + self._logger.error("get_uve_list failed %s for : %s:%d tb %s" \ + % (str(e), r_ip, r_port, traceback.format_exc())) + self._redis_uve_map[r_inst] = None return uve_list # end get_uve_list @@ -691,5 +652,5 @@ def aggregate(self, key, flat, base_url = None): if __name__ == '__main__': uveserver = UVEServer(None, 0, None, None) gevent.spawn(uveserver.run()) - uve_state = json.loads(uveserver.get_uve("abc-corp:vn02", False)) + _, uve_state = json.loads(uveserver.get_uve("abc-corp:vn02", False)) print json.dumps(uve_state, indent=4, sort_keys=True)