From 7f5a1a1889a7272059a632256102bc978a3e96c7 Mon Sep 17 00:00:00 2001 From: mkheni Date: Tue, 4 Apr 2017 09:49:47 -0700 Subject: [PATCH] contrail-logs command does not complete in time and process hogs memory. If run in a scale setup and for a longer time period (i.e. 5-6h or more), contrail-logs would take a lot of memory and go to D state due to waiting for IO. This fix divides the query in multiple queries of 10 min so that contrail-logs does not take a lot of memory and have a smaller amount of data to write to the disk. Moreover, this fix would fetch the results for the next query while writing the results for the current query to the disk in parallel because writing to the disk may take some time. Change-Id: Ib68c7058e970e853288d8236ea4239bdb471b313 Closes-bug: #1673865 (cherry picked from commit 9ec7e8d1d2f34149556b4bf8e874445aa2ebfd68) --- src/opserver/log.py | 40 ++++++++--- src/opserver/opserver_util.py | 13 ++++ src/opserver/test/test_flow.py | 5 +- src/opserver/test/test_log.py | 117 ++++++++++++++++++++++++++------ src/opserver/test/test_stats.py | 5 +- 5 files changed, 148 insertions(+), 32 deletions(-) diff --git a/src/opserver/log.py b/src/opserver/log.py index 53c9086de3e..a41f49291dd 100755 --- a/src/opserver/log.py +++ b/src/opserver/log.py @@ -19,6 +19,7 @@ import logging.handlers import time import re +from multiprocessing import Process from opserver_util import OpServerUtils from sandesh_common.vns.ttypes import Module from sandesh_common.vns.constants import ModuleNames, NodeTypeNames @@ -73,16 +74,37 @@ def run(self): last = self._args.last) except: return -1 - result = self.query() - if result == -1: - return - # Accumulate the result before processing it as the - # formatting of result can be cpu intensive and hence would - # affect the overall time taken to fetch the result from the - # analytics-api. Since the query result ttl is set to 5 min - # in redis, it is necessary to improve the read throughput. - result_list = self.read_result(result) + + start_time = self._start_time + end_time = self._end_time + + result_list = [] + while int(end_time) - int(start_time) > 0: + if not self._args.reverse: + self._start_time = start_time + self._end_time = start_time + 10*60*pow(10,6) if (start_time + 10*60*pow(10,6) <= int(end_time)) else int(end_time) + else: + self._end_time = end_time + self._start_time = end_time - 10*60*pow(10,6) if (end_time - 10*60*pow(10,6) >= int(start_time)) else int(start_time) + + p = Process(target=self.display, args=(result_list,)) + p.start() + result = self.query() + if result == -1: + return + # Accumulate the result before processing it as the + # formatting of result can be cpu intensive and hence would + # affect the overall time taken to fetch the result from the + # analytics-api. Since the query result ttl is set to 5 min + # in redis, it is necessary to improve the read throughput. + result_list = self.read_result(result) + p.join() + if not self._args.reverse: + start_time = self._end_time + 1 + else: + end_time = self._start_time - 1 self.display(result_list) + except KeyboardInterrupt: return diff --git a/src/opserver/opserver_util.py b/src/opserver/opserver_util.py index 41d599c7be2..3d33d1135a1 100644 --- a/src/opserver/opserver_util.py +++ b/src/opserver/opserver_util.py @@ -208,6 +208,17 @@ def parse_start_end_time(start_time, end_time, last): 'now' in end_time: ostart_time = start_time oend_time = end_time + now = OpServerUtils.utc_timestamp_usec() + td = OpServerUtils.convert_to_time_delta(ostart_time[len('now'):]) + if td == None: + ostart_time = now + else: + ostart_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) + td = OpServerUtils.convert_to_time_delta(oend_time[len('now'):]) + if td == None: + oend_time = now + else: + oend_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) elif start_time.isdigit() and \ end_time.isdigit(): ostart_time = int(start_time) @@ -352,6 +363,8 @@ def get_query_result(opserver_ip, opserver_port, qid, user, password, @staticmethod def convert_to_time_delta(time_str): + if time_str == '' or time_str == None: + return None num = int(time_str[:-1]) if time_str.endswith('s'): return datetime.timedelta(seconds=num) diff --git a/src/opserver/test/test_flow.py b/src/opserver/test/test_flow.py index 04a7da998a3..83930fab44f 100755 --- a/src/opserver/test/test_flow.py +++ b/src/opserver/test/test_flow.py @@ -61,8 +61,11 @@ def test_1_noarg_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"table": "FlowRecordTable", "start_time": "now-10m", "end_time": "now", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}' + expected_result_str = '{"table": "FlowRecordTable", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}' expected_result_dict = json.loads(expected_result_str) + self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6)) + del query_dict['start_time'] + del query_dict['end_time'] for key in expected_result_dict: self.assertTrue(key in query_dict) self.assertTrue(expected_result_dict[key] == query_dict[key]) diff --git a/src/opserver/test/test_log.py b/src/opserver/test/test_log.py index 7ee248f2d90..1e786160482 100755 --- a/src/opserver/test/test_log.py +++ b/src/opserver/test/test_log.py @@ -9,9 +9,10 @@ from opserver.log import LogQuerier from opserver.opserver_util import OpServerUtils +from multiprocessing import Process test_num = 0 -query_dict = {} +query_list = [] query_result = { 1: [ {u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'a6s45', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-leftsvc-vn-left0', u'Type': 6, u'ModuleId': u'contrail-schema'} @@ -27,6 +28,9 @@ ], 5: [ {u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'nodec39', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-leftsvc-vn-left0', u'Type': 6, u'ModuleId': u'contrail-schema'} +], +6: [ +{u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'a6s45', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-leftsvc-vn-left0', u'Type': 6, u'ModuleId': u'contrail-schema'} ] } @@ -34,8 +38,8 @@ class LogQuerierTest(unittest.TestCase): @staticmethod def custom_post_url_http(url, params): - global query_dict - query_dict = json.loads(params) + global query_list + query_list.append(json.loads(params)) return '{"href": "/analytics/query/a415fe1e-51cb-11e5-aab0-00000a540d2d"}' @staticmethod @@ -46,24 +50,34 @@ def custom_get_query_result(opserver_ip, opserver_port, qid): return [] def custom_display(self, result): + if result == [] or result is None: + return try: self.assertTrue(result == query_result[test_num]) except KeyError: self.assertTrue(False) + def custom_process_start(self): + self._querier.display(self._querier.read_result(query_result[test_num])) + + def custom_process_end(self): + return + def setUp(self): self.maxDiff = None self._querier = LogQuerier() flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y)) - flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) - flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x)) - + self.query_expectations = flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) + self.display_expectations = flexmock(LogQuerier).should_receive('display').replace_with(lambda x: self.custom_display(x)) + self.process_start_expectations = flexmock(Process).should_receive('start').replace_with(lambda:self.custom_process_start()) + self.process_end_expectations = flexmock(Process).should_receive('join').replace_with(lambda:self.custom_process_end()) #@unittest.skip("skip test_1_no_arg") def test_1_no_arg(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 1 argv = sys.argv @@ -71,15 +85,22 @@ def test_1_no_arg(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # a few args #@unittest.skip("skip test_2_message_query") def test_2_message_query(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 2 argv = sys.argv @@ -87,15 +108,22 @@ def test_2_message_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "a6s45", "op": 1}, {"suffix": null, "value2": null, "name": "ModuleId", "value": "contrail-collector", "op": 1}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "GeneratorDbStatsUve", "op": 1}]], "filter": [[{"suffix": null, "value2": null, "name": "NodeType", "value": "Analytics", "op": 1}, {"suffix": null, "value2": null, "name": "InstanceId", "value": 0, "op": 1}]]}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "a6s45", "op": 1}, {"suffix": null, "value2": null, "name": "ModuleId", "value": "contrail-collector", "op": 1}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "GeneratorDbStatsUve", "op": 1}]], "filter": [[{"suffix": null, "value2": null, "name": "NodeType", "value": "Analytics", "op": 1}, {"suffix": null, "value2": null, "name": "InstanceId", "value": 0, "op": 1}]]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # a object values query #@unittest.skip("skip test_3_object_value") def test_3_object_value(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 3 argv = sys.argv @@ -103,15 +131,22 @@ def test_3_object_value(self): self._querier.run() sys.argv = argv - expected_result_str = '{"table": "ConfigObjectTable", "start_time": "now-10m", "end_time": "now", "select_fields": ["ObjectId"]}' + expected_result_str = '{"table": "ConfigObjectTable", "select_fields": ["ObjectId"]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # a object id query #@unittest.skip("skip test_4_object_id") def test_4_object_id(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 4 argv = sys.argv @@ -119,15 +154,22 @@ def test_4_object_id(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Messagetype", "ObjectLog", "SystemLog"], "table": "ConfigObjectTable", "where": [[{"suffix": null, "value2": null, "name": "ObjectId", "value": "virtual_network:default-domain:admin:vn1-take2", "op": 1}]]}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Messagetype", "ObjectLog", "SystemLog"], "table": "ConfigObjectTable", "where": [[{"suffix": null, "value2": null, "name": "ObjectId", "value": "virtual_network:default-domain:admin:vn1-take2", "op": 1}]]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # prefix query #@unittest.skip("skip test_5_prefix_query") def test_5_prefix_query(self): global test_num - global query_dict + global query_list + query_list = [] test_num = 5 argv = sys.argv @@ -135,10 +177,43 @@ def test_5_prefix_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "node", "op": 7}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "UveVirtualNetwork", "op": 7}]]}' + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "node", "op": 7}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "UveVirtualNetwork", "op": 7}]]}' expected_result_dict = json.loads(expected_result_str) - self.assertEqual(expected_result_dict, query_dict) + self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6)) + del query_list[0]['start_time'] + del query_list[0]['end_time'] + self.assertEqual(expected_result_dict, query_list[0]) + self.assertEqual(self.query_expectations.times_called,1) + self.assertEqual(self.display_expectations.times_called,2) + self.assertEqual(self.process_start_expectations.times_called,1) # end test_5_prefix_query + #@unittest.skip("skip test_6_long_query") + def test_6_long_query(self): + global test_num + global query_list + query_list = [] + test_num = 6 + + argv = sys.argv + sys.argv = "contrail-logs --start-time now-30m --end-time now".split() + self._querier.run() + sys.argv = argv + + expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}' + expected_result_dict = json.loads(expected_result_str) + self.assertEqual(self.query_expectations.times_called,3) + self.assertEqual(self.display_expectations.times_called,4) + self.assertEqual(self.process_start_expectations.times_called,3) + for i in range(len(query_list) - 1): + self.assertEqual(int(query_list[i]['end_time']) - int(query_list[i]['start_time']),10*60*pow(10,6)) + del query_list[i]['start_time'] + del query_list[i]['end_time'] + self.assertEqual(expected_result_dict, query_list[i]) + self.assertEqual(int(query_list[2]['end_time']) - int(query_list[2]['start_time']),10*60*pow(10,6) - 2) + del query_list[2]['start_time'] + del query_list[2]['end_time'] + self.assertEqual(expected_result_dict, query_list[2]) + if __name__ == '__main__': unittest.main() diff --git a/src/opserver/test/test_stats.py b/src/opserver/test/test_stats.py index 3e0cce3ae23..5440fc9dd77 100755 --- a/src/opserver/test/test_stats.py +++ b/src/opserver/test/test_stats.py @@ -51,8 +51,11 @@ def test_1_analytics_cpu_query(self): self._querier.run() sys.argv = argv - expected_result_str = '{"start_time": "now-10m", "end_time": "now", "select_fields": ["T=60", "SUM(process_mem_cpu_usage.cpu_share)"], "table": "StatTable.NodeStatus.process_mem_cpu_usage", "where": [[{"suffix": null, "value2": null, "name": "name", "value": "", "op": 7}]]}' + expected_result_str = '{"select_fields": ["T=60", "SUM(process_mem_cpu_usage.cpu_share)"], "table": "StatTable.NodeStatus.process_mem_cpu_usage", "where": [[{"suffix": null, "value2": null, "name": "name", "value": "", "op": 7}]]}' expected_result_dict = json.loads(expected_result_str) + self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6)) + del query_dict['start_time'] + del query_dict['end_time'] for key in expected_result_dict: self.assertTrue(key in query_dict) self.assertTrue(expected_result_dict[key] == query_dict[key])