From 39d3e2e7c54b503949c6a81fad0f3c62a6edaf44 Mon Sep 17 00:00:00 2001 From: Anish Mehta Date: Tue, 2 Jun 2015 23:04:19 -0700 Subject: [PATCH] This is the code for building up a local UVE cache in the AlarmGenerator We will use this cache to calculate UVE changes, so that a post-agg UVE Stream can be generated in Kafka (this is not done yet) This commit also has optimizations for UVE reads in the AlarmGenerator - AG to Use get_uve instead of multi_uve_get - AG to use cfilt to get only the type which has changed (as per the pre-agg UVE Stream) - UveServer to use redis pipelines to reduce round-trips to redis - UVEServer should not create a new redis client for every UVE read operation. If a failure takes place while reading a UVE, the OpServer can ignore the failure, and rely on eventual consistency for clients. But AlarmGen must remember the UVE Keys so that they are processed again A state compression mechanism is provided for UVE processing. We accumulate changed UVEs in a working set (not a queue), and process the working set periodically This was tested in a single-box setup with 4000 UVEs contrail-stats --table AlarmgenUpdate.keys --where "name=*" --select keys.key "SUM(keys.count)" --last 1m | wc The system was processing 25000 UVE updates per minute. contrail-stats --table SandeshMessageStat.msg_info --where "name=*" --select msg_info.level "SUM(msg_info.messages)" --sort "SUM(msg_info.messages)" --last 1m contrail-stats --table AlarmgenUpdate.keys --where "name=*" --select "SUM(keys.count)" --last 1m The Kafka consumers is AlarmGen were able to process UVEs as fast as the collector generated updates. Due to state compression, UVE processing was able to keep up as well. /usr/share/kafka# bin/kafka-consumer-offset-checker.sh --zookeeper 127.0.0.1:2181 --topic uve-0 --group workers Change-Id: I82335d5de2fc040ffc2ba0b22037fe5879c5ea99 Partial-Bug: 1428271 --- src/opserver/alarmgen.py | 431 ++++++++++++++++++++++++++--- src/opserver/alarmgen_ctrl.sandesh | 40 ++- src/opserver/opserver.py | 4 +- src/opserver/partition_handler.py | 57 ++-- src/opserver/uveserver.py | 259 ++++++++--------- 5 files changed, 575 insertions(+), 216 deletions(-) diff --git a/src/opserver/alarmgen.py b/src/opserver/alarmgen.py index 1777750810a..c19580b9485 100644 --- a/src/opserver/alarmgen.py +++ b/src/opserver/alarmgen.py @@ -42,7 +42,9 @@ PartitionStatusResp, UVETableAlarmReq, UVETableAlarmResp, \ AlarmgenTrace, UVEKeyInfo, UVETypeInfo, AlarmgenStatusTrace, \ AlarmgenStatus, AlarmgenStats, AlarmgenPartitionTrace, \ - AlarmgenPartition, AlarmgenPartionInfo, AlarmgenUpdate + AlarmgenPartition, AlarmgenPartionInfo, AlarmgenUpdate, \ + UVETableInfoReq, UVETableInfoResp, UVEObjectInfo, UVEStructInfo, \ + UVETablePerfReq, UVETablePerfResp, UVETableInfo from sandesh.discovery.ttypes import CollectorTrace from cpuinfo import CpuInfoData @@ -50,6 +52,123 @@ from stevedore import hook from pysandesh.util import UTCTimestampUsec +class AGTabStats(object): + """ This class is used to store per-UVE-table information + about the time taken and number of instances when + a UVE was retrieved, published or evaluated for alarms + """ + def __init__(self): + self.reset() + + def record_get(self, get_time): + self.get_time += get_time + self.get_n += 1 + + def record_pub(self, get_time): + self.pub_time += get_time + self.pub_n += 1 + + def record_call(self, get_time): + self.call_time += get_time + self.call_n += 1 + + def get_result(self): + if self.get_n: + return self.get_time / self.get_n + else: + return 0 + + def pub_result(self): + if self.pub_n: + return self.pub_time / self.pub_n + else: + return 0 + + def call_result(self): + if self.call_n: + return self.call_time / self.call_n + else: + return 0 + + def reset(self): + self.call_time = 0 + self.call_n = 0 + self.get_time = 0 + self.get_n = 0 + self.pub_time = 0 + self.pub_n = 0 + + +class AGKeyInfo(object): + """ This class is used to maintain UVE contents + """ + + def __init__(self, part): + self._part = part + # key of struct name, value of content dict + + self.current_dict = {} + self.update({}) + + def update_single(self, typ, val): + # A single UVE struct has changed + # If the UVE has gone away, the val is passed in as None + + self.set_removed = set() + self.set_added = set() + self.set_changed = set() + self.set_unchanged = self.current_dict.keys() + + if typ in self.current_dict: + # the "added" set must stay empty in this case + if val is None: + self.set_unchanged.remove(typ) + self.set_removed.add(typ) + del self.current_dict[typ] + else: + # both "added" and "removed" will be empty + if val != self.current_dict[typ]: + self.set_unchanged.remove(typ) + self.set_changed.add(typ) + self.current_dict[typ] = val + else: + if val != None: + self.set_added.add(typ) + self.current_dict[typ] = val + + def update(self, new_dict): + # A UVE has changed, and we have the entire new + # content of the UVE available in new_dict + set_current = set(new_dict.keys()) + set_past = set(self.current_dict.keys()) + set_intersect = set_current.intersection(set_past) + + self.set_added = set_current - set_intersect + self.set_removed = set_past - set_intersect + self.set_changed = set() + self.set_unchanged = set() + for o in set_intersect: + if new_dict[o] != self.current_dict[o]: + self.set_changed.add(o) + else: + self.set_unchanged.add(o) + self.current_dict = new_dict + + def values(self): + return self.current_dict + + def added(self): + return self.set_added + + def removed(self): + return self.set_removed + + def changed(self): + return self.set_changed + + def unchanged(self): + return self.set_unchanged + class Controller(object): @staticmethod @@ -80,7 +199,6 @@ def __init__(self, conf): enable_syslog=self._conf.use_syslog(), syslog_facility=self._conf.syslog_facility()) self._logger = sandesh_global._logger - # Trace buffer list self.trace_buf = [ {'name':'DiscoveryMsg', 'size':1000} @@ -96,6 +214,9 @@ def __init__(self, conf): "ObjectConfigNode" ] self.mgrs = {} self.tab_alarms = {} + self.ptab_info = {} + self.tab_perf = {} + self.tab_perf_prev = {} for table in tables: self.mgrs[table] = hook.HookManager( namespace='contrail.analytics.alarms', @@ -110,6 +231,7 @@ def __init__(self, conf): (table, extn.name, extn.entry_point_target, extn.obj.__doc__)) self.tab_alarms[table] = {} + self.tab_perf[table] = AGTabStats() ConnectionState.init(sandesh_global, self._hostname, self._moduleid, self._instance_id, @@ -119,6 +241,8 @@ def __init__(self, conf): self._us = UVEServer(None, self._logger, self._conf.redis_password()) self._workers = {} + self._uveq = {} + self._uveqf = {} self.disc = None self._libpart_name = self._hostname + ":" + self._instance_id @@ -156,6 +280,8 @@ def __init__(self, conf): PartitionOwnershipReq.handle_request = self.handle_PartitionOwnershipReq PartitionStatusReq.handle_request = self.handle_PartitionStatusReq UVETableAlarmReq.handle_request = self.handle_UVETableAlarmReq + UVETableInfoReq.handle_request = self.handle_UVETableInfoReq + UVETablePerfReq.handle_request = self.handle_UVETablePerfReq def libpart_cb(self, part_list): @@ -185,6 +311,9 @@ def libpart_cb(self, part_list): self._logger.error('Partition Del : %s' % delpart) self.partition_change(delpart, False) + self._logger.error('Partition List done : new %s old %s' % \ + (str(newset),str(oldset))) + def start_libpart(self, ag_list): if not self._conf.zk_list(): self._logger.error('Could not import libpartition: No zookeeper') @@ -216,31 +345,175 @@ def start_libpart(self, ag_list): self._logger.error('Could not import libpartition: %s' % str(e)) return None - def handle_uve_notif(self, uves, remove = False): - self._logger.debug("Changed UVEs : %s" % str(uves)) - no_handlers = set() - for uv in uves: + def handle_uve_notifq(self, part, uves): + if part not in self._uveq: + self._uveq[part] = {} + for uv,types in uves.iteritems(): + if types is None: + self._uveq[part][uv] = None + else: + if uv in self._uveq[part]: + if self._uveq[part][uv] is not None: + self._uveq[part][uv].update(types) + else: + self._uveq[part][uv] = set() + self._uveq[part][uv].update(types) + + def run_uve_processing(self): + """ + This function runs in its own gevent, and provides state compression + for UVEs. + Kafka worker (PartitionHandler) threads detect which UVE have changed + and accumulate them onto a set. When this gevent runs, it processes + all UVEs of the set. Even if this gevent cannot run for a while, the + set should not grow in an unbounded manner (like a queue can) + """ + + while True: + for part in self._uveqf.keys(): + self._logger.error("Stop UVE processing for %d" % part) + self.stop_uve_partition(part) + del self._uveqf[part] + if part in self._uveq: + del self._uveq[part] + prev = time.time() + gevs = {} + for part in self._uveq.keys(): + if not len(self._uveq[part]): + continue + self._logger.info("UVE Process for %d" % part) + + # Allow the partition handlers to queue new UVEs without + # interfering with the work of processing the current UVEs + workingset = copy.deepcopy(self._uveq[part]) + self._uveq[part] = {} + + gevs[part] = gevent.spawn(self.handle_uve_notif,part,workingset) + if len(gevs): + gevent.joinall(gevs.values()) + for part in gevs.keys(): + # If UVE processing failed, requeue the working set + if not gevs[part].get(): + self._logger.error("UVE Process failed for %d" % part) + self.handle_uve_notifq(part, workingset) + curr = time.time() + if (curr - prev) < 0.1: + gevent.sleep(0.1 - (curr - prev)) + else: + self._logger.info("UVE Process saturated") + gevent.sleep(0) + + def stop_uve_partition(self, part): + for tk in self.ptab_info[part].keys(): + for uk in self.ptab_info[part][tk].keys(): + if tk in self.tab_alarms: + if uk in self.tab_alarms[tk]: + del self.tab_alarms[tk][uk] + ustruct = UVEAlarms(name = ok, deleted = True) + alarm_msg = AlarmTrace(data=ustruct, table=tk) + self._logger.info('send del alarm: %s' % (alarm_msg.log())) + alarm_msg.send() + del self.ptab_info[part][tk][uk] + self._logger.info("UVE %s deleted" % (uk)) + del self.ptab_info[part][tk] + del self.ptab_info[part] + + def handle_uve_notif(self, part, uves): + """ + Call this function when a UVE has changed. This can also + happed when taking ownership of a partition, or when a + generator is deleted. + Args: + part : Partition Number + uve : dict, where the key is the UVE Name. + The value is either a set of UVE structs, or "None", + which means that all UVE structs should be processed + + Returns: + status of operation (True for success) + """ + self._logger.debug("Changed part %d UVEs : %s" % (part, str(uves))) + success = True + for uv,types in uves.iteritems(): tab = uv.split(':',1)[0] + if tab not in self.tab_perf: + self.tab_perf[tab] = AGTabStats() + uve_name = uv.split(':',1)[1] - if not self.mgrs.has_key(tab): - no_handlers.add(tab) - continue - if remove: - uve_data = [] + prevt = UTCTimestampUsec() + filters = {} + if types: + filters["cfilt"] = {} + for typ in types: + filters["cfilt"][typ] = set() + failures, uve_data = self._us.get_uve(uv, True, filters) + if failures: + success = False + self.tab_perf[tab].record_get(UTCTimestampUsec() - prevt) + # Handling Agg UVEs + if not part in self.ptab_info: + self.ptab_info[part] = {} + + if not tab in self.ptab_info[part]: + self.ptab_info[part][tab] = {} + + if uve_name not in self.ptab_info[part][tab]: + self.ptab_info[part][tab][uve_name] = AGKeyInfo(part) + + prevt = UTCTimestampUsec() + if not types: + self.ptab_info[part][tab][uve_name].update(uve_data) + if len(self.ptab_info[part][tab][uve_name].removed()): + self._logger.info("UVE %s removed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].removed())) + if len(self.ptab_info[part][tab][uve_name].changed()): + self._logger.debug("UVE %s changed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].changed())) + if len(self.ptab_info[part][tab][uve_name].added()): + self._logger.debug("UVE %s added structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].added())) else: - filters = {'kfilt': [uve_name]} - itr = self._us.multi_uve_get(tab, True, filters) - uve_data = itr.next()['value'] - if len(uve_data) == 0: - self._logger.info("UVE %s deleted" % uv) - if self.tab_alarms[tab].has_key(uv): - del self.tab_alarms[tab][uv] - ustruct = UVEAlarms(name = uve_name, deleted = True) - alarm_msg = AlarmTrace(data=ustruct, table=tab) - self._logger.info('send del alarm: %s' % (alarm_msg.log())) - alarm_msg.send() + for typ in types: + val = None + if typ in uve_data: + val = uve_data[typ] + self.ptab_info[part][tab][uve_name].update_single(typ, val) + if len(self.ptab_info[part][tab][uve_name].removed()): + self._logger.info("UVE %s removed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].removed())) + if len(self.ptab_info[part][tab][uve_name].changed()): + self._logger.debug("UVE %s changed structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].changed())) + if len(self.ptab_info[part][tab][uve_name].added()): + self._logger.debug("UVE %s added structs %s" % (uve_name, \ + self.ptab_info[part][tab][uve_name].added())) + + local_uve = self.ptab_info[part][tab][uve_name].values() + + if len(local_uve.keys()) == 0: + self._logger.info("UVE %s deleted" % (uve_name)) + del self.ptab_info[part][tab][uve_name] + + self.tab_perf[tab].record_pub(UTCTimestampUsec() - prevt) + + # Withdraw the alarm if the UVE has no non-alarm structs + if len(local_uve.keys()) == 0 or \ + (len(local_uve.keys()) == 1 and "UVEAlarms" in local_uve): + if tab in self.tab_alarms: + if uv in self.tab_alarms[tab]: + del self.tab_alarms[tab][uv] + ustruct = UVEAlarms(name = uve_name, deleted = True) + alarm_msg = AlarmTrace(data=ustruct, table=tab) + self._logger.info('send del alarm: %s' % (alarm_msg.log())) + alarm_msg.send() + continue + + # Handing Alarms + if not self.mgrs.has_key(tab): continue - results = self.mgrs[tab].map_method("__call__", uv, uve_data) + prevt = UTCTimestampUsec() + results = self.mgrs[tab].map_method("__call__", uv, local_uve) + self.tab_perf[tab].record_call(UTCTimestampUsec() - prevt) new_uve_alarms = {} for res in results: nm, sev, errs = res @@ -296,9 +569,39 @@ def handle_uve_notif(self, uves, remove = False): alarm_msg = AlarmTrace(data=ustruct, table=tab) self._logger.info('send alarm: %s' % (alarm_msg.log())) alarm_msg.send() - - if len(no_handlers): - self._logger.debug('No Alarm Handlers for %s' % str(no_handlers)) + return success + + def handle_UVETableInfoReq(self, req): + if req.partition == -1: + parts = self.ptab_info.keys() + else: + parts = [req.partition] + + self._logger.info("Got UVETableInfoReq : %s" % str(parts)) + np = 1 + for part in parts: + if part not in self.ptab_info: + continue + tables = [] + for tab in self.ptab_info[part].keys(): + uvel = [] + for uk,uv in self.ptab_info[part][tab].iteritems(): + types = [] + for tk,tv in uv.values().iteritems(): + types.append(UVEStructInfo(type = tk, + content = json.dumps(tv))) + uvel.append(UVEObjectInfo( + name = uk, structs = types)) + tables.append(UVETableInfo(table = tab, uves = uvel)) + resp = UVETableInfoResp(partition = part) + resp.tables = tables + + if np == len(parts): + mr = False + else: + mr = True + resp.response(req.context(), mr) + np = np + 1 def handle_UVETableAlarmReq(self, req): status = False @@ -324,6 +627,27 @@ def handle_UVETableAlarmReq(self, req): resp.response(req.context(), mr) np = np + 1 + def handle_UVETablePerfReq(self, req): + status = False + if req.table == "all": + parts = self.tab_perf_prev.keys() + else: + parts = [req.table] + self._logger.info("Got UVETablePerfReq : %s" % str(parts)) + np = 1 + for pt in parts: + resp = UVETablePerfResp(table = pt) + resp.call_time = self.tab_perf_prev[pt].call_result() + resp.get_time = self.tab_perf_prev[pt].get_result() + resp.pub_time = self.tab_perf_prev[pt].pub_result() + resp.updates = self.tab_perf_prev[pt].get_n + + if np == len(parts): + mr = False + else: + mr = True + resp.response(req.context(), mr) + np = np + 1 def partition_change(self, partno, enl): """ @@ -337,19 +661,35 @@ def partition_change(self, partno, enl): """ status = False if enl: - if self._workers.has_key(partno): + if partno in self._workers: self._logger.info("Dup partition %d" % partno) else: - #uvedb = self._us.get_part(partno) ph = UveStreamProc(','.join(self._conf.kafka_broker_list()), partno, "uve-" + str(partno), self._logger, self._us.get_part, - self.handle_uve_notif) + self.handle_uve_notifq) ph.start() self._workers[partno] = ph - status = True + tout = 600 + idx = 0 + while idx < tout: + # When this partitions starts, + # uveq will get created + if partno not in self._uveq: + gevent.sleep(.1) + else: + break + idx += 1 + if partno in self._uveq: + status = True + else: + # TODO: The partition has not started yet, + # but it still might start later. + # We possibly need to exit + status = False + self._logger.error("Unable to start partition %d" % partno) else: - if self._workers.has_key(partno): + if partno in self._workers: ph = self._workers[partno] gevent.kill(ph) res,db = ph.get() @@ -358,7 +698,26 @@ def partition_change(self, partno, enl): for k,v in db.iteritems(): print "%s -> %s" % (k,str(v)) del self._workers[partno] - status = True + self._uveqf[partno] = True + + tout = 600 + idx = 0 + while idx < tout: + # When this partitions stop.s + # uveq will get destroyed + if partno in self._uveq: + gevent.sleep(.1) + else: + break + idx += 1 + if partno not in self._uveq: + status = True + else: + # TODO: The partition has not stopped yet + # but it still might stop later. + # We possibly need to exit + status = False + self._logger.error("Unable to stop partition %d" % partno) else: self._logger.info("No partition %d" % partno) @@ -377,6 +736,11 @@ def process_stats(self): the previous time period over all partitions and send it out ''' + self.tab_perf_prev = copy.deepcopy(self.tab_perf) + for kt in self.tab_perf.keys(): + #self.tab_perf_prev[kt] = copy.deepcopy(self.tab_perf[kt]) + self.tab_perf[kt].reset() + s_partitions = set() s_keys = set() n_updates = 0 @@ -462,6 +826,7 @@ def handle_PartitionStatusReq(self, req): resp.partition = pt if self._workers.has_key(pt): resp.enabled = True + resp.offset = self._workers[pt]._partoffset resp.uves = [] for kcoll,coll in self._workers[pt].contents().iteritems(): uci = UVECollInfo() @@ -568,7 +933,7 @@ def setup_controller(argv): def main(args=None): controller = setup_controller(args or ' '.join(sys.argv[1:])) gevs = [ - gevent.spawn(controller.run)] + gevent.spawn(controller.run), gevent.spawn(controller.run_uve_processing)] if controller.disc: sp1 = ServicePoller(controller._logger, CollectorTrace, controller.disc, \ diff --git a/src/opserver/alarmgen_ctrl.sandesh b/src/opserver/alarmgen_ctrl.sandesh index 2421c3f806c..45d067c292e 100644 --- a/src/opserver/alarmgen_ctrl.sandesh +++ b/src/opserver/alarmgen_ctrl.sandesh @@ -41,7 +41,45 @@ struct UVECollInfo { response sandesh PartitionStatusResp { 1: bool enabled 2: u32 partition - 3: list uves + 3: u64 offset + 4: list uves +} + +request sandesh UVETableInfoReq { + 1: u32 partition +} + +struct UVEStructInfo { + 1: string type + 2: string content +} + +struct UVEObjectInfo { + 1: string name + 3: list structs +} + +struct UVETableInfo { + 1: string table + 7: list uves +} + + +response sandesh UVETableInfoResp { + 2: u32 partition + 7: list tables +} + +request sandesh UVETablePerfReq { + 1: string table +} + +response sandesh UVETablePerfResp { + 1: string table + 3: u64 updates + 4: u64 get_time + 5: u64 pub_time + 6: u64 call_time } request sandesh UVETableAlarmReq { diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index bea789efdb8..87391e0a978 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -1421,7 +1421,7 @@ def _uve_alarm_http_post(self, is_alarm): first = True for key in filters['kfilt']: uve_name = uve_tbl + ':' + key - rsp = self._uve_server.get_uve(uve_name, True, filters, + _, rsp = self._uve_server.get_uve(uve_name, True, filters, is_alarm=is_alarm, base_url=base_url) if rsp != {}: @@ -1484,7 +1484,7 @@ def _uve_alarm_http_get(self, name, is_alarm): yield u', ' + json.dumps(gen) yield u']}' else: - rsp = self._uve_server.get_uve(uve_name, flat, filters, + _, rsp = self._uve_server.get_uve(uve_name, flat, filters, is_alarm=is_alarm, base_url=base_url) yield json.dumps(rsp) diff --git a/src/opserver/partition_handler.py b/src/opserver/partition_handler.py index b7df952512d..b418695fe0a 100644 --- a/src/opserver/partition_handler.py +++ b/src/opserver/partition_handler.py @@ -23,6 +23,7 @@ def __init__(self, brokers, partition, group, topic, logger, limit): self._limit = limit self._partdb = {} self._partoffset = None + self._kfk = None def msg_handler(self, om): self._partoffset = om.offset @@ -43,9 +44,9 @@ def _run(self): while True: try: self._logger.info("New KafkaClient %d" % self._partition) - kafka = KafkaClient(self._brokers ,str(os.getpid())) + self._kfk = KafkaClient(self._brokers ,str(os.getpid())) try: - consumer = SimpleConsumer(kafka, self._group, self._topic, buffer_size = 4096*4, max_buffer_size=4096*32) + consumer = SimpleConsumer(self._kfk, self._group, self._topic, buffer_size = 4096*4, max_buffer_size=4096*32) #except: except Exception as ex: template = "Consumer Failure {0} occured. Arguments:\n{1!r}" @@ -75,15 +76,17 @@ def _run(self): while True: try: - mm = consumer.get_message(timeout=None) - if mm is None: - continue - self._logger.debug("%d Reading offset %d" % (self._partition, mm.offset)) - consumer.commit() - pcount += 1 - if not self.msg_handler(mm): - self._logger.info("%d could not handle %s" % (self._partition, str(mm))) - raise gevent.GreenletExit + mlist = consumer.get_messages(10) + for mm in mlist: + if mm is None: + continue + self._logger.debug("%d Reading offset %d" % \ + (self._partition, mm.offset)) + consumer.commit() + pcount += 1 + if not self.msg_handler(mm): + self._logger.info("%d could not handle %s" % (self._partition, str(mm))) + raise gevent.GreenletExit except TypeError: gevent.sleep(0.1) except common.FailedPayloadsError as ex: @@ -126,17 +129,6 @@ def __init__(self, brokers, partition, uve_topic, logger, uvecb, callback): def __del__(self): self._logger.info("Destroying UVEStream for part %d" % self._partno) - self.stop_partition() - - def stop_partition(self): - uves = set() - for kcoll,coll in self._uvedb.iteritems(): - for kgen,gen in coll.iteritems(): - uves.update(set(gen.keys())) - self._logger.info("Stopping part %d with UVE keys %s" % \ - (self._partno, str(uves))) - self._callback(uves, True) - self._uvedb = {} def start_partition(self): ''' This function loads the initial UVE database. @@ -145,13 +137,14 @@ def start_partition(self): self._uvedb = self._uvecb(self._partno) self._logger.debug("Starting part %d with UVE db %s" % \ (self._partno,str(self._uvedb))) - uves = set() + uves = {} for kcoll,coll in self._uvedb.iteritems(): for kgen,gen in coll.iteritems(): - uves.update(set(gen.keys())) + for kk in gen.keys(): + uves[kk] = None self._logger.info("Starting part %d with UVE keys %s" % \ (self._partno,str(uves))) - self._callback(uves) + self._callback(self._partno, uves) def contents(self): return self._uvedb @@ -171,7 +164,7 @@ def stats(self): def msg_handler(self, om): self._partoffset = om.offset - chg = set() + chg = {} try: uv = json.loads(om.message.value) self._partdb[om.message.key] = uv @@ -212,7 +205,8 @@ def msg_handler(self, om): else: self._uvein[tab][coll][gen][uv["type"]] += 1 - chg.add(uv["key"]) + chg[uv["key"]] = set() + chg[uv["key"]].add(uv["type"]) else: # Record stats on UVE Keys being processed for uk in self._uvedb[coll][gen].keys(): @@ -225,9 +219,10 @@ def msg_handler(self, om): else: self._uveout[tab][uk] = 1 - # when a generator is delelted, we need to - # notify for *ALL* its UVEs - chg = set(self._uvedb[coll][gen].keys()) + # when a generator is delelted, we need to + # notify for *ALL* its UVEs + chg[uk] = None + del self._uvedb[coll][gen] # TODO : For the collector's generator, notify all @@ -238,7 +233,7 @@ def msg_handler(self, om): self._logger.info("%s" % messag) return False else: - self._callback(chg) + self._callback(self._partno, chg) return True if __name__ == '__main__': diff --git a/src/opserver/uveserver.py b/src/opserver/uveserver.py index a7659e9cd38..46a4fd5e6e3 100644 --- a/src/opserver/uveserver.py +++ b/src/opserver/uveserver.py @@ -21,12 +21,13 @@ from pysandesh.util import UTCTimestampUsec from pysandesh.connection_info import ConnectionState from sandesh.viz.constants import UVE_MAP +import traceback class UVEServer(object): def __init__(self, redis_uve_server, logger, redis_password=None): self._local_redis_uve = redis_uve_server - self._redis_uve_list = [] + self._redis_uve_map = {} self._logger = logger self._sem = BoundedSemaphore(1) self._redis = None @@ -43,7 +44,21 @@ def __init__(self, redis_uve_server, logger, redis_password=None): #end __init__ def update_redis_uve_list(self, redis_uve_list): - self._redis_uve_list = redis_uve_list + newlist = set() + for elem in redis_uve_list: + newlist.add((elem[0],elem[1])) + + # if some redis instances are gone, remove them from our map + for test_elem in self._redis_uve_map.keys(): + if test_elem not in newlist: + del self._redis_uve_map[test_elem] + + # new redis instances need to be inserted into the map + for test_elem in newlist: + if test_elem not in self._redis_uve_map: + (r_ip, r_port) = test_elem + self._redis_uve_map[test_elem] = redis.StrictRedis( + r_ip, r_port, password=self._redis_password, db=1) # end update_redis_uve_list def fill_redis_uve_info(self, redis_uve_info): @@ -97,45 +112,6 @@ def run(self): self._logger.debug("%s del received for " % value) # value is of the format: # DEL::::::: - info = value.rsplit(":", 6) - key = info[0].split(":", 1)[1] - typ = info[5] - - existing = self._redis.hgetall("PREVIOUS:" + key + ":" + typ) - tstate = {} - tstate[key] = {} - tstate[key][typ] = {} - state = UVEServer.convert_previous(existing, tstate, key, typ) - - for attr, hval in self._redis.hgetall(value).iteritems(): - snhdict = xmltodict.parse(hval) - - if UVEServer._is_agg_list(snhdict[attr]): - if snhdict[attr]['list']['@size'] == "0": - continue - if snhdict[attr]['list']['@size'] == "1": - sname = ParallelAggregator.get_list_name( - snhdict[attr]) - if not isinstance( - snhdict[attr]['list'][sname], list): - snhdict[attr]['list'][sname] = \ - [snhdict[attr]['list'][sname]] - - if (attr not in state[key][typ]): - # There is no existing entry for the UVE - vstr = json.dumps(snhdict[attr]) - else: - # There is an existing entry - # Merge the new entry with the existing one - state = UVEServer.merge_previous( - state, key, typ, attr, snhdict[attr]) - vstr = json.dumps(state[key][typ][attr]['previous']) - - # Store the merged result back in the database - self._redis.sadd("PUVES:" + typ, key) - self._redis.sadd("PTYPES:" + key, typ) - self._redis.hset("PREVIOUS:" + key + ":" + typ, attr, vstr) - self._redis.delete(value) except redis.exceptions.ResponseError: #send redis connection down msg. Coule be bcos of authentication @@ -174,58 +150,34 @@ def _is_agg_list(attr): return True return False - @staticmethod - def convert_previous(existing, state, key, typ, afilter=None): - # Take the existing delete record, and load it into the state dict - for attr, hval in existing.iteritems(): - hdict = json.loads(hval) - - if afilter is not None and len(afilter): - if attr not in afilter: - continue - - # When recording deleted attributes, only record those - # for which delete-time aggregation is needed - if UVEServer._is_agg_item(hdict): - if (typ not in state[key]): - state[key][typ] = {} - if (attr not in state[key][typ]): - state[key][typ][attr] = {} - state[key][typ][attr]["previous"] = hdict - - # For lists that require delete-time aggregation, we need - # to normailize lists of size 1, and ignore those of size 0 - if UVEServer._is_agg_list(hdict): - if hdict['list']['@size'] != "0": - if (typ not in state[key]): - state[key][typ] = {} - if (attr not in state[key][typ]): - state[key][typ][attr] = {} - state[key][typ][attr]["previous"] = hdict - if hdict['list']['@size'] == "1": - sname = ParallelAggregator.get_list_name(hdict) - if not isinstance(hdict['list'][sname], list): - hdict['list'][sname] = [hdict['list'][sname]] - - return state - def get_part(self, part): uves = {} - for redis_uve in self._redis_uve_list: - gen_uves = {} - redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], db=1) - for elems in redish.smembers("PART2KEY:" + str(part)): - info = elems.split(":", 5) - gen = info[0] + ":" + info[1] + ":" + info[2] + ":" + info[3] - key = info[5] - if not gen_uves.has_key(gen): - gen_uves[gen] = {} - gen_uves[gen][key] = 0 - uves[redis_uve[0] + ":" + str(redis_uve[1])] = gen_uves + for r_inst in self._redis_uve_map.keys(): + try: + (r_ip,r_port) = r_inst + if not self._redis_uve_map[r_inst]: + self._redis_uve_map[r_inst] = redis.StrictRedis( + host=r_ip, port=r_port, + password=self._redis_password, db=1) + + redish = self._redis_uve_map[r_inst] + gen_uves = {} + for elems in redish.smembers("PART2KEY:" + str(part)): + info = elems.split(":", 5) + gen = info[0] + ":" + info[1] + ":" + info[2] + ":" + info[3] + key = info[5] + if not gen_uves.has_key(gen): + gen_uves[gen] = {} + gen_uves[gen][key] = 0 + uves[r_ip + ":" + str(r_port)] = gen_uves + except Exception as e: + self._logger.error("get_part failed %s for : %s:%d tb %s" \ + % (str(e), r_ip, r_port, traceback.format_exc())) + self._redis_uve_map[r_inst] = None + raise e return uves - - def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False, base_url=None): + + def get_uve(self, key, flat, filters=None, is_alarm=False, base_url=None): filters = filters or {} sfilter = filters.get('sfilt') mfilter = filters.get('mfilt') @@ -233,39 +185,62 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False, base_url ackfilter = filters.get('ackfilt') state = {} state[key] = {} - statdict = {} - for redis_uve in self._redis_uve_list: - redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], - password=self._redis_password, db=1) + rsp = {} + failures = False + + for r_inst in self._redis_uve_map.keys(): try: + (r_ip,r_port) = r_inst + if not self._redis_uve_map[r_inst]: + self._redis_uve_map[r_inst] = redis.StrictRedis( + host=r_ip, port=r_port, + password=self._redis_password, db=1) + + redish = self._redis_uve_map[r_inst] qmap = {} - origins = redish.smembers("ALARM_ORIGINS:" + key) + + ppe = redish.pipeline() + ppe.smembers("ALARM_ORIGINS:" + key) if not is_alarm: - origins = origins.union(redish.smembers("ORIGINS:" + key)) + ppe.smembers("ORIGINS:" + key) + pperes = ppe.execute() + origins = set() + for origset in pperes: + for smt in origset: + tt = smt.rsplit(":",1)[1] + sm = smt.rsplit(":",1)[0] + source = sm.split(":", 1)[0] + mdule = sm.split(":", 1)[1] + if tfilter is not None: + if tt not in tfilter: + continue + if sfilter is not None: + if sfilter != source: + continue + if mfilter is not None: + if mfilter != mdule: + continue + origins.add(smt) + + ppeval = redish.pipeline() for origs in origins: - info = origs.rsplit(":", 1) - sm = info[0].split(":", 1) - source = sm[0] - if sfilter is not None: - if sfilter != source: - continue - mdule = sm[1] - if mfilter is not None: - if mfilter != mdule: - continue - dsource = source + ":" + mdule + ppeval.hgetall("VALUES:" + key + ":" + origs) + odictlist = ppeval.execute() - typ = info[1] - if tfilter is not None: - if typ not in tfilter: - continue + idx = 0 + for origs in origins: - odict = redish.hgetall("VALUES:" + key + ":" + origs) + odict = odictlist[idx] + idx = idx + 1 + + info = origs.rsplit(":", 1) + dsource = info[0] + typ = info[1] afilter_list = set() if tfilter is not None: afilter_list = tfilter[typ] + for attr, value in odict.iteritems(): if len(afilter_list): if attr not in afilter_list: @@ -314,35 +289,17 @@ def get_uve(self, key, flat, filters=None, multi=False, is_alarm=False, base_url key][typ][attr][dsource]) state[key][typ][attr][dsource] = snhdict[attr] - if sfilter is None and mfilter is None: - for ptyp in redish.smembers("PTYPES:" + key): - afilter = None - if tfilter is not None: - if ptyp not in tfilter: - continue - afilter = tfilter[ptyp] - existing = redish.hgetall("PREVIOUS:" + key + ":" + ptyp) - nstate = UVEServer.convert_previous( - existing, state, key, ptyp, afilter) - state = copy.deepcopy(nstate) - pa = ParallelAggregator(state, self._uve_reverse_map) rsp = pa.aggregate(key, flat, base_url) - except redis.exceptions.ConnectionError: - self._logger.error("Failed to connect to redis-uve: %s:%d" \ - % (redis_uve[0], redis_uve[1])) except Exception as e: - self._logger.error("Exception: %s" % e) - return {} + self._logger.error("redis-uve failed %s for : %s:%d tb %s" \ + % (str(e), r_ip, r_port, traceback.format_exc())) + self._redis_uve_map[r_inst] = None + failures = True else: - self._logger.debug("Computed %s" % key) + self._logger.debug("Computed %s as %s" % (key,str(rsp))) - for k, v in statdict.iteritems(): - if k in rsp: - mp = dict(v.items() + rsp[k].items()) - statdict[k] = mp - - return dict(rsp.items() + statdict.items()) + return failures, rsp # end get_uve def get_uve_regex(self, key): @@ -360,8 +317,8 @@ def multi_uve_get(self, table, flat, filters=None, is_alarm=False, base_url=None # so we don't pass them here uve_list = self.get_uve_list(table, filters, False, is_alarm) for uve_name in uve_list: - uve_val = self.get_uve( - table + ':' + uve_name, flat, filters, True, is_alarm, base_url) + _,uve_val = self.get_uve( + table + ':' + uve_name, flat, filters, is_alarm, base_url) if uve_val == {}: continue else: @@ -378,11 +335,17 @@ def get_uve_list(self, table, filters=None, parse_afilter=False, patterns = set() for filt in kfilter: patterns.add(self.get_uve_regex(filt)) - for redis_uve in self._redis_uve_list: - redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], - password=self._redis_password, db=1) + + for r_inst in self._redis_uve_map.keys(): try: + (r_ip,r_port) = r_inst + if not self._redis_uve_map[r_inst]: + self._redis_uve_map[r_inst] = redis.StrictRedis( + host=r_ip, port=r_port, + password=self._redis_password, db=1) + + redish = self._redis_uve_map[r_inst] + # For UVE queries, we wanna read both UVE and Alarm table entries = redish.smembers('ALARM_TABLE:' + table) if not is_alarm: @@ -424,12 +387,10 @@ def get_uve_list(self, table, filters=None, parse_afilter=False, if attrval is None: continue uve_list.add(uve_key) - except redis.exceptions.ConnectionError: - self._logger.error('Failed to connect to redis-uve: %s:%d' \ - % (redis_uve[0], redis_uve[1])) except Exception as e: - self._logger.error('Exception: %s' % e) - return set() + self._logger.error("get_uve_list failed %s for : %s:%d tb %s" \ + % (str(e), r_ip, r_port, traceback.format_exc())) + self._redis_uve_map[r_inst] = None return uve_list # end get_uve_list @@ -691,5 +652,5 @@ def aggregate(self, key, flat, base_url = None): if __name__ == '__main__': uveserver = UVEServer(None, 0, None, None) gevent.spawn(uveserver.run()) - uve_state = json.loads(uveserver.get_uve("abc-corp:vn02", False)) + _, uve_state = json.loads(uveserver.get_uve("abc-corp:vn02", False)) print json.dumps(uve_state, indent=4, sort_keys=True)