diff --git a/src/opserver/log.py b/src/opserver/log.py index 53c9086de3e..a41f49291dd 100755 --- a/src/opserver/log.py +++ b/src/opserver/log.py @@ -19,6 +19,7 @@ import logging.handlers import time import re +from multiprocessing import Process from opserver_util import OpServerUtils from sandesh_common.vns.ttypes import Module from sandesh_common.vns.constants import ModuleNames, NodeTypeNames @@ -73,16 +74,37 @@ def run(self): last = self._args.last) except: return -1 - result = self.query() - if result == -1: - return - # Accumulate the result before processing it as the - # formatting of result can be cpu intensive and hence would - # affect the overall time taken to fetch the result from the - # analytics-api. Since the query result ttl is set to 5 min - # in redis, it is necessary to improve the read throughput. - result_list = self.read_result(result) + + start_time = self._start_time + end_time = self._end_time + + result_list = [] + while int(end_time) - int(start_time) > 0: + if not self._args.reverse: + self._start_time = start_time + self._end_time = start_time + 10*60*pow(10,6) if (start_time + 10*60*pow(10,6) <= int(end_time)) else int(end_time) + else: + self._end_time = end_time + self._start_time = end_time - 10*60*pow(10,6) if (end_time - 10*60*pow(10,6) >= int(start_time)) else int(start_time) + + p = Process(target=self.display, args=(result_list,)) + p.start() + result = self.query() + if result == -1: + return + # Accumulate the result before processing it as the + # formatting of result can be cpu intensive and hence would + # affect the overall time taken to fetch the result from the + # analytics-api. Since the query result ttl is set to 5 min + # in redis, it is necessary to improve the read throughput. + result_list = self.read_result(result) + p.join() + if not self._args.reverse: + start_time = self._end_time + 1 + else: + end_time = self._start_time - 1 self.display(result_list) + except KeyboardInterrupt: return diff --git a/src/opserver/opserver_util.py b/src/opserver/opserver_util.py index 41d599c7be2..3d33d1135a1 100644 --- a/src/opserver/opserver_util.py +++ b/src/opserver/opserver_util.py @@ -208,6 +208,17 @@ def parse_start_end_time(start_time, end_time, last): 'now' in end_time: ostart_time = start_time oend_time = end_time + now = OpServerUtils.utc_timestamp_usec() + td = OpServerUtils.convert_to_time_delta(ostart_time[len('now'):]) + if td == None: + ostart_time = now + else: + ostart_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) + td = OpServerUtils.convert_to_time_delta(oend_time[len('now'):]) + if td == None: + oend_time = now + else: + oend_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) elif start_time.isdigit() and \ end_time.isdigit(): ostart_time = int(start_time) @@ -352,6 +363,8 @@ def get_query_result(opserver_ip, opserver_port, qid, user, password, @staticmethod def convert_to_time_delta(time_str): + if time_str == '' or time_str == None: + return None num = int(time_str[:-1]) if time_str.endswith('s'): return datetime.timedelta(seconds=num) diff --git a/src/opserver/test/test_flow.py b/src/opserver/test/test_flow.py index 04a7da998a3..83930fab44f 100755 --- a/src/opserver/test/test_flow.py +++ b/src/opserver/test/test_flow.py @@ -61,8 +61,11 @@ def test_1_noarg_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"table": "FlowRecordTable", "start_time": "now-10m", "end_time": "now", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}' + expected_result_str = '{"table": "FlowRecordTable", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}' expected_result_dict = json.loads(expected_result_str) + self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6)) + del query_dict['start_time'] + del query_dict['end_time'] for key in expected_result_dict: self.assertTrue(key in query_dict) self.assertTrue(expected_result_dict[key] == query_dict[key]) diff --git a/src/opserver/test/test_log.py b/src/opserver/test/test_log.py index 7ee248f2d90..1e786160482 100755 --- a/src/opserver/test/test_log.py +++ b/src/opserver/test/test_log.py @@ -9,9 +9,10 @@ from opserver.log import LogQuerier from opserver.opserver_util import OpServerUtils +from multiprocessing import Process test_num = 0 -query_dict = {} +query_list = [] query_result = { 1: [ {u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'a6s45', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-leftsvc-vn-left0', u'Type': 6, u'ModuleId': u'contrail-schema'} @@ -27,6 +28,9 @@ ], 5: [ {u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'nodec39', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-leftsvc-vn-left0', u'Type': 6, u'ModuleId': u'contrail-schema'} +], +6: [ +{u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'a6s45', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-leftsvc-vn-left0', u'Type': 6, u'ModuleId': u'contrail-schema'} ] } @@ -34,8 +38,8 @@ class LogQuerierTest(unittest.TestCase): @staticmethod def custom_post_url_http(url, params): - global query_dict - query_dict = json.loads(params) + global query_list + query_list.append(json.loads(params)) return '{"href": "/analytics/query/a415fe1e-51cb-11e5-aab0-00000a540d2d"}' @staticmethod @@ -46,24 +50,34 @@ def custom_get_query_result(opserver_ip, opserver_port, qid): return [] def custom_display(self, result): + if result == [] or result is None: + return try: self.assertTrue(result == query_result[test_num]) except KeyError: self.assertTrue(False) + def custom_process_start(self): + self._querier.display(self._querier.read_result(query_result[test_num])) + + def custom_process_end(self): + return + def setUp(self): self.maxDiff = None self._querier = LogQuerier() flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y)) - flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) - flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x)) - + self.query_expectations = flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) + self.display_expectations = flexmock(LogQuerier).should_receive('display').replace_with(lambda x: self.custom_display(x)) + self.process_start_expectations = flexmock(Process).should_receive('start').replace_with(lambda:self.custom_process_start()) + self.process_end_expectations = flexmock(Process).should_receive('join').replace_with(lambda:self.custom_process_end()) #@unittest.skip("skip test_1_no_arg") def test_1_no_arg(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 1 argv = sys.argv @@ -71,15 +85,22 @@ def test_1_no_arg(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # a few args #@unittest.skip("skip test_2_message_query") def test_2_message_query(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 2 argv = sys.argv @@ -87,15 +108,22 @@ def test_2_message_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "a6s45", "op": 1}, {"suffix": null, "value2": null, "name": "ModuleId", "value": "contrail-collector", "op": 1}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "GeneratorDbStatsUve", "op": 1}]], "filter": [[{"suffix": null, "value2": null, "name": "NodeType", "value": "Analytics", "op": 1}, {"suffix": null, "value2": null, "name": "InstanceId", "value": 0, "op": 1}]]}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "a6s45", "op": 1}, {"suffix": null, "value2": null, "name": "ModuleId", "value": "contrail-collector", "op": 1}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "GeneratorDbStatsUve", "op": 1}]], "filter": [[{"suffix": null, "value2": null, "name": "NodeType", "value": "Analytics", "op": 1}, {"suffix": null, "value2": null, "name": "InstanceId", "value": 0, "op": 1}]]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # a object values query #@unittest.skip("skip test_3_object_value") def test_3_object_value(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 3 argv = sys.argv @@ -103,15 +131,22 @@ def test_3_object_value(self): self._querier.run() sys.argv = argv - expected_result_str = '{"table": "ConfigObjectTable", "start_time": "now-10m", "end_time": "now", "select_fields": ["ObjectId"]}' + expected_result_str = '{"table": "ConfigObjectTable", "select_fields": ["ObjectId"]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # a object id query #@unittest.skip("skip test_4_object_id") def test_4_object_id(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 4 argv = sys.argv @@ -119,15 +154,22 @@ def test_4_object_id(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Messagetype", "ObjectLog", "SystemLog"], "table": "ConfigObjectTable", "where": [[{"suffix": null, "value2": null, "name": "ObjectId", "value": "virtual_network:default-domain:admin:vn1-take2", "op": 1}]]}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Messagetype", "ObjectLog", "SystemLog"], "table": "ConfigObjectTable", "where": [[{"suffix": null, "value2": null, "name": "ObjectId", "value": "virtual_network:default-domain:admin:vn1-take2", "op": 1}]]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # prefix query #@unittest.skip("skip test_5_prefix_query") def test_5_prefix_query(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 5 argv = sys.argv @@ -135,10 +177,43 @@ def test_5_prefix_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "node", "op": 7}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "UveVirtualNetwork", "op": 7}]]}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "node", "op": 7}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "UveVirtualNetwork", "op": 7}]]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # end test_5_prefix_query + #@unittest.skip("skip test_6_long_query") + def test_6_long_query(self): + global test_num + global query_list + query_list = [] + test_num = 6 + + argv = sys.argv + sys.argv = "contrail-logs --start-time now-30m --end-time now".split() + self._querier.run() + sys.argv = argv + + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}' + expected_result_dict = json.loads(expected_result_str) + self.assertEqual(self.query_expectations.times_called,3) + self.assertEqual(self.display_expectations.times_called,4) + self.assertEqual(self.process_start_expectations.times_called,3) + for i in range(len(query_list) - 1): + self.assertEqual(int(query_list[i]['end_time']) - int(query_list[i]['start_time']),10*60*pow(10,6)) + del query_list[i]['start_time'] + del query_list[i]['end_time'] + self.assertEqual(expected_result_dict, query_list[i]) + self.assertEqual(int(query_list[2]['end_time']) - int(query_list[2]['start_time']),10*60*pow(10,6) - 2) + del query_list[2]['start_time'] + del query_list[2]['end_time'] + self.assertEqual(expected_result_dict, query_list[2]) + if __name__ == '__main__': unittest.main() diff --git a/src/opserver/test/test_stats.py b/src/opserver/test/test_stats.py index 3e0cce3ae23..5440fc9dd77 100755 --- a/src/opserver/test/test_stats.py +++ b/src/opserver/test/test_stats.py @@ -51,8 +51,11 @@ def test_1_analytics_cpu_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"start_time": "now-10m", "end_time": "now", "select_fields": ["T=60", "SUM(process_mem_cpu_usage.cpu_share)"], "table": "StatTable.NodeStatus.process_mem_cpu_usage", "where": [[{"suffix": null, "value2": null, "name": "name", "value": "", "op": 7}]]}' + expected_result_str = '{"select_fields": ["T=60", "SUM(process_mem_cpu_usage.cpu_share)"], "table": "StatTable.NodeStatus.process_mem_cpu_usage", "where": [[{"suffix": null, "value2": null, "name": "name", "value": "", "op": 7}]]}' expected_result_dict = json.loads(expected_result_str) + self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6)) + del query_dict['start_time'] + del query_dict['end_time'] for key in expected_result_dict: self.assertTrue(key in query_dict) self.assertTrue(expected_result_dict[key] == query_dict[key])