From d2fa7e976c79bf6f752e3bfc95650f61f6ed7785 Mon Sep 17 00:00:00 2001 From: Anish Mehta Date: Wed, 7 Sep 2016 16:44:46 -0700 Subject: [PATCH] Adding support to sum structs in UVE Aggregation. Also removing the counter aggregation, which is not used or supported. Closes-Bug:1621271 Change-Id: Id2aff7264a68e16f1284e08ac475e6e57d3e2912 Conflicts: src/analytics/analytics.sandesh Change-Id: I21fff8c713b1127d8e499eba8937fa829917821f --- src/analytics/analytics.sandesh | 2 +- src/opserver/docs/opserver.rst | 16 +-- .../test/sandesh/virtual_network.sandesh | 8 +- src/opserver/test/test_uveserver.py | 97 ++++++++-------- src/opserver/uveserver.py | 108 +++++++++--------- 5 files changed, 119 insertions(+), 112 deletions(-) diff --git a/src/analytics/analytics.sandesh b/src/analytics/analytics.sandesh index c250302d450..cca195ece5e 100644 --- a/src/analytics/analytics.sandesh +++ b/src/analytics/analytics.sandesh @@ -130,7 +130,7 @@ struct UserDefinedLogStatistic { 1: string name (key="UserDefinedLogStatTable") 2: optional bool deleted 3: optional u64 rx_event (hidden="yes") - 4: optional uint64_t_P_ count (stats="rx_event:DSSum:", tags="") + 4: optional uint64_t_P_ count (stats="rx_event:DSSum:", tags="", aggtype="sum") } (period="60") /** diff --git a/src/opserver/docs/opserver.rst b/src/opserver/docs/opserver.rst index dd1cdb6c73a..89c7fd9ff2f 100644 --- a/src/opserver/docs/opserver.rst +++ b/src/opserver/docs/opserver.rst @@ -179,7 +179,7 @@ Example output for a virtual network UVE:: ], "in_bytes": { "#text": "2232972057", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "in_stats": { @@ -239,7 +239,7 @@ Example output for a virtual network UVE:: }, "in_tpkts": { "#text": "5156342", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "interface_list": { @@ -255,7 +255,7 @@ Example output for a virtual network UVE:: }, "out_bytes": { "#text": "2187615961", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "out_stats": { @@ -330,7 +330,7 @@ Example output for a virtual network UVE:: }, "out_tpkts": { "#text": "5134830", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "virtualmachine_list": { @@ -400,12 +400,12 @@ Example output for a virtual machine UVE:: { "in_bytes": { "#text": "2188895907", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "in_pkts": { "#text": "5130901", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "ip_address": { @@ -418,12 +418,12 @@ Example output for a virtual machine UVE:: }, "out_bytes": { "#text": "2201821626", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "out_pkts": { "#text": "5153526", - "@aggtype": "counter", + "@aggtype": "sum", "@type": "i64" }, "virtual_network": { diff --git a/src/opserver/test/sandesh/virtual_network.sandesh b/src/opserver/test/sandesh/virtual_network.sandesh index 6f8f63766f6..a7e65533709 100644 --- a/src/opserver/test/sandesh/virtual_network.sandesh +++ b/src/opserver/test/sandesh/virtual_network.sandesh @@ -20,10 +20,10 @@ struct UveVirtualNetworkAgent { 2: optional bool deleted 3: optional i32 total_acl_rules; 4: optional list interface_list (aggtype="union") - 5: optional u64 in_tpkts (aggtype="counter") - 6: optional u64 in_bytes (aggtype="counter") - 7: optional u64 out_tpkts (aggtype="counter") - 8: optional u64 out_bytes (aggtype="counter") + 5: optional u64 in_tpkts (aggtype="sum") + 6: optional u64 in_bytes (aggtype="sum") + 7: optional u64 out_tpkts (aggtype="sum") + 8: optional u64 out_bytes (aggtype="sum") 9: optional list virtualmachine_list (aggtype="union") 10: optional list vn_stats (tags=".other_vn,.vrouter") } diff --git a/src/opserver/test/test_uveserver.py b/src/opserver/test/test_uveserver.py index bc410bce2b9..252b10d515b 100755 --- a/src/opserver/test/test_uveserver.py +++ b/src/opserver/test/test_uveserver.py @@ -41,6 +41,22 @@ def MakeBasic(typ, val, aggtype=None): item['@aggtype'] = aggtype return item +def MakeStruct(typ, dval, aggtype=None): + item = {} + item['@type'] = 'struct' + if aggtype is not None: + item['@aggtype'] = aggtype + item[typ] = {} + for k,v in dval.iteritems(): + vmap = {} + if isinstance(v,int): + vmap['@type'] = 'u64' + vmap['#text'] = str(v) + else: + vmap['@type'] = 'string' + vmap['#text'] = v + item[typ][k] = vmap + return item def MakeList(typ, valname, val, aggtype=None): item = {} @@ -129,9 +145,9 @@ def MakeStringMap(inmap): 4: optional i32 total_virtual_machines ( aggtype="sum") 5: optional i32 total_acl_rules - 6: optional i64 in_tpkts (aggtype="counter") 7: optional list in_stats (aggtype="append") 8: optional map mstr (aggtype="union") + 9: optional IfStats ifstats (aggtype="sum") ''' @@ -143,9 +159,9 @@ def MakeUVEVirtualNetwork( connected_networks=None, total_virtual_machines=None, total_acl_rules=None, - in_tpkts=None, in_stats=None, - mstr=None): + mstr=None, + ifstats=None): rsult = copy.deepcopy(istate) if rsult is None: rsult = {} @@ -178,16 +194,16 @@ def MakeUVEVirtualNetwork( result['UVEVirtualNetwork']['total_acl_rules'] = {} result['UVEVirtualNetwork']['total_acl_rules'][source] = \ MakeBasic("i32", total_acl_rules) - if in_tpkts is not None: - if ('in_tpkts' not in result['UVEVirtualNetwork']): - result['UVEVirtualNetwork']['in_tpkts'] = {} - result['UVEVirtualNetwork']['in_tpkts'][source] = \ - MakeBasic("i64", in_tpkts, "counter") if in_stats is not None: if ('in_stats' not in result['UVEVirtualNetwork']): result['UVEVirtualNetwork']['in_stats'] = {} result['UVEVirtualNetwork']['in_stats'][source] = \ MakeVnStatList(in_stats) + if ifstats is not None: + if ('ifstats' not in result['UVEVirtualNetwork']): + result['UVEVirtualNetwork']['ifstats'] = {} + result['UVEVirtualNetwork']['ifstats'][source] = \ + MakeStruct("IfStats", ifstats, "sum") return rsult @@ -319,22 +335,22 @@ def test_map_union_agg(self): self.assertEqual(sorted(cn['map']['element']), sorted(res['UVEVirtualNetwork']['mstr']['map']['element'])) - def test_sum_agg(self): - logging.info("%%% Running test_sum_agg %%%") + def test_struct_sum_agg(self): + logging.info("%%% Running test_struct_sum_agg %%%") uvevn = MakeUVEVirtualNetwork( None, "abc-corp:vn-00", "10.10.10.10", - total_virtual_machines=4 + ifstats={"name":"foo", "inbytes":4} ) uvevn2 = MakeUVEVirtualNetwork( uvevn, "abc-corp:vn-00", "10.10.10.11", - total_virtual_machines=7 + ifstats={"inbytes":7} ) uvetest = MakeUVEVirtualNetwork( None, "abc-corp:vn-00", "10.10.10.10", - total_virtual_machines=11 + ifstats={"inbytes":11} ) pa = ParallelAggregator(uvevn2) @@ -343,46 +359,43 @@ def test_sum_agg(self): logging.info(json.dumps(res, indent=4, sort_keys=True)) cnt1 = uvetest["abc-corp:vn-00"]['UVEVirtualNetwork'][ - 'total_virtual_machines']["10.10.10.10"] + 'ifstats']["10.10.10.10"] self.assertEqual( - cnt1, res['UVEVirtualNetwork']['total_virtual_machines']) + cnt1, res['UVEVirtualNetwork']['ifstats']) - def test_counter_agg(self): - logging.info("%%% Running test_counter_agg %%%") + def test_elem_sum_agg(self): + logging.info("%%% Running test_elem_sum_agg %%%") uvevn = MakeUVEVirtualNetwork( - None, "abc-corp:vn-00", "previous", - in_tpkts=4 + None, "abc-corp:vn-00", "10.10.10.10", + total_virtual_machines=4 ) uvevn2 = MakeUVEVirtualNetwork( uvevn, "abc-corp:vn-00", "10.10.10.11", - in_tpkts=7 + total_virtual_machines=7 ) - uvevn3 = UVEServer.merge_previous( - uvevn2, "abc-corp:vn-00", "UVEVirtualNetwork", "in_tpkts", - uvevn["abc-corp:vn-00"]['UVEVirtualNetwork']['in_tpkts'][ - "previous"]) + uvetest = MakeUVEVirtualNetwork( + None, "abc-corp:vn-00", "10.10.10.10", + total_virtual_machines=11 + ) - pa = ParallelAggregator(uvevn3) + pa = ParallelAggregator(uvevn2) res = pa.aggregate("abc-corp:vn-00", False) - logging.info(json.dumps(res, indent=4, sort_keys=True)) - uvetest = MakeUVEVirtualNetwork( - None, "abc-corp:vn-00", "sample", - in_tpkts=15 - ) - in_tpkts = uvetest["abc-corp:vn-00"][ - 'UVEVirtualNetwork']['in_tpkts']["sample"] + logging.info(json.dumps(res, indent=4, sort_keys=True)) - self.assertEqual(in_tpkts, res['UVEVirtualNetwork']['in_tpkts']) + cnt1 = uvetest["abc-corp:vn-00"]['UVEVirtualNetwork'][ + 'total_virtual_machines']["10.10.10.10"] + self.assertEqual( + cnt1, res['UVEVirtualNetwork']['total_virtual_machines']) def test_append_agg(self): logging.info("%%% Running test_append_agg %%%") uvevn = MakeUVEVirtualNetwork( - None, "abc-corp:vn-00", "previous", + None, "abc-corp:vn-00", "10.10.10.10", in_stats=[("vn-01", "1000"), ("vn-02", "1800")], ) @@ -391,17 +404,7 @@ def test_append_agg(self): in_stats=[("vn-02", "1200"), ("vn-03", "1500")], ) - uveprev = MakeUVEVirtualNetwork( - None, "abc-corp:vn-00", "10.10.10.10", - in_stats=[("vn-01", "1000"), ("vn-03", "1700")], - ) - - uvevn3 = UVEServer.merge_previous( - uvevn2, "abc-corp:vn-00", "UVEVirtualNetwork", "in_stats", - uveprev["abc-corp:vn-00"]['UVEVirtualNetwork'][ - 'in_stats']["10.10.10.10"]) - - pa = ParallelAggregator(uvevn3) + pa = ParallelAggregator(uvevn2) res = pa.aggregate("abc-corp:vn-00", False) logging.info(json.dumps(res, indent=4, sort_keys=True)) @@ -410,8 +413,8 @@ def test_append_agg(self): uvetest = MakeUVEVirtualNetwork( None, "abc-corp:vn-00", "sample", - in_stats=[("vn-01", "2000"), ( - "vn-02", "3000"), ("vn-03", "3200")], + in_stats=[("vn-01", "1000"), ( + "vn-02", "3000"), ("vn-03", "1500")], ) uvetest["abc-corp:vn-00"]["UVEVirtualNetwork"]["in_stats"][ diff --git a/src/opserver/uveserver.py b/src/opserver/uveserver.py index 1b909d58a3b..a2dc460ab01 100644 --- a/src/opserver/uveserver.py +++ b/src/opserver/uveserver.py @@ -90,36 +90,6 @@ def fill_redis_uve_info(self, redis_uve_info): redis_uve_info.status = 'Connected' #end fill_redis_uve_info - @staticmethod - def merge_previous(state, key, typ, attr, prevdict): - print "%s New val is %s" % (attr, prevdict) - nstate = copy.deepcopy(state) - if UVEServer._is_agg_item(prevdict): - count = int(state[key][typ][attr]['previous']['#text']) - count += int(prevdict['#text']) - nstate[key][typ][attr]['previous']['#text'] = str(count) - - if UVEServer._is_agg_list(prevdict): - sname = ParallelAggregator.get_list_name( - state[key][typ][attr]['previous']) - count = len(prevdict['list'][sname]) + \ - len(state[key][typ][attr]['previous']['list'][sname]) - nstate[key][typ][attr]['previous']['list'][sname].extend( - prevdict['list'][sname]) - nstate[key][typ][attr]['previous']['list']['@size'] = \ - str(count) - - tstate = {} - tstate[typ] = {} - tstate[typ][attr] = copy.deepcopy( - nstate[key][typ][attr]['previous']) - nstate[key][typ][attr]['previous'] =\ - ParallelAggregator.consolidate_list(tstate, typ, attr) - - print "%s Merged val is %s"\ - % (attr, nstate[key][typ][attr]['previous']) - return nstate - def run(self): ConnectionState.update(conn_type = ConnectionType.REDIS_UVE, name = 'LOCAL', status = ConnectionStatus.INIT) @@ -159,15 +129,6 @@ def run(self): ConnectionState.update(conn_type = ConnectionType.REDIS_UVE, name = 'LOCAL', status = ConnectionStatus.UP) - @staticmethod - def _is_agg_item(attr): - if attr['@type'] in ['i8', 'i16', 'i32', 'i64', 'byte', - 'u8', 'u16', 'u32', 'u64']: - if '@aggtype' in attr: - if attr['@aggtype'] == "counter": - return True - return False - @staticmethod def _is_agg_list(attr): if attr['@type'] in ['list']: @@ -529,17 +490,26 @@ def _default_agg(self, oattr): items.append(source) return result - def _is_sum(self, oattr): + def _is_elem_sum(self, oattr): akey = oattr.keys()[0] + if oattr[akey]['@type'] not in ['i8', 'i16', 'i32', 'i64', + 'byte', 'u8', 'u16', 'u32', 'u64']: + return False if '@aggtype' not in oattr[akey]: return False - if oattr[akey]['@aggtype'] in ["sum"]: - return True - if oattr[akey]['@type'] in ['i8', 'i16', 'i32', 'i64', - 'byte', 'u8', 'u16', 'u32', 'u64']: - if oattr[akey]['@aggtype'] in ["counter"]: - return True - return False + if oattr[akey]['@aggtype'] != "sum": + return False + return True + + def _is_struct_sum(self, oattr): + akey = oattr.keys()[0] + if oattr[akey]['@type'] != "struct": + return False + if '@aggtype' not in oattr[akey]: + return False + if oattr[akey]['@aggtype'] != "sum": + return False + return True def _is_list_union(self, oattr): akey = oattr.keys()[0] @@ -591,7 +561,34 @@ def _get_list_key(elem): skey = sattr return skey - def _sum_agg(self, oattr): + def _struct_sum_agg(self, oattr): + akey = oattr.keys()[0] + result = copy.deepcopy(oattr[akey]) + sname = None + for sattr in result.keys(): + if sattr[0] != '@': + sname = sattr + break + if not sname: + return None + cmap = {} + for source,sval in oattr.iteritems(): + for attr, aval in sval[sname].iteritems(): + if aval['@type'] in ['i8', + 'i16', 'i32', 'i64', + 'byte', 'u8', 'u16', 'u32', 'u64']: + if attr not in cmap: + cmap[attr] = {} + cmap[attr]['@type'] = aval['@type'] + cmap[attr]['#text'] = int(aval['#text']) + else: + cmap[attr]['#text'] += int(aval['#text']) + for k,v in cmap.iteritems(): + v['#text'] = str(v['#text']) + result[sname] = cmap + return result + + def _elem_sum_agg(self, oattr): akey = oattr.keys()[0] result = copy.deepcopy(oattr[akey]) count = 0 @@ -748,13 +745,20 @@ def aggregate(self, key, flat, base_url = None): ltyp = typ result[typ] = {} for objattr in self._state[key][typ].keys(): - if self._is_sum(self._state[key][typ][objattr]): - sum_res = self._sum_agg(self._state[key][typ][objattr]) + if self._is_elem_sum(self._state[key][typ][objattr]): + sume_res = self._elem_sum_agg(self._state[key][typ][objattr]) + if flat: + result[typ][objattr] = \ + OpServerUtils.uve_attr_flatten(sume_res) + else: + result[typ][objattr] = sume_res + elif self._is_struct_sum(self._state[key][typ][objattr]): + sums_res = self._struct_sum_agg(self._state[key][typ][objattr]) if flat: result[typ][objattr] = \ - OpServerUtils.uve_attr_flatten(sum_res) + OpServerUtils.uve_attr_flatten(sums_res) else: - result[typ][objattr] = sum_res + result[typ][objattr] = sums_res elif self._is_list_union(self._state[key][typ][objattr]): unionl_res = self._list_union_agg( self._state[key][typ][objattr])