diff --git a/src/opserver/log.py b/src/opserver/log.py
index 53c9086de3e..a41f49291dd 100755
--- a/src/opserver/log.py
+++ b/src/opserver/log.py
@@ -19,6 +19,7 @@
import logging.handlers
import time
import re
+from multiprocessing import Process
from opserver_util import OpServerUtils
from sandesh_common.vns.ttypes import Module
from sandesh_common.vns.constants import ModuleNames, NodeTypeNames
@@ -73,16 +74,37 @@ def run(self):
last = self._args.last)
except:
return -1
- result = self.query()
- if result == -1:
- return
- # Accumulate the result before processing it as the
- # formatting of result can be cpu intensive and hence would
- # affect the overall time taken to fetch the result from the
- # analytics-api. Since the query result ttl is set to 5 min
- # in redis, it is necessary to improve the read throughput.
- result_list = self.read_result(result)
+
+ start_time = self._start_time
+ end_time = self._end_time
+
+ result_list = []
+ while int(end_time) - int(start_time) > 0:
+ if not self._args.reverse:
+ self._start_time = start_time
+ self._end_time = start_time + 10*60*pow(10,6) if (start_time + 10*60*pow(10,6) <= int(end_time)) else int(end_time)
+ else:
+ self._end_time = end_time
+ self._start_time = end_time - 10*60*pow(10,6) if (end_time - 10*60*pow(10,6) >= int(start_time)) else int(start_time)
+
+ p = Process(target=self.display, args=(result_list,))
+ p.start()
+ result = self.query()
+ if result == -1:
+ return
+ # Accumulate the result before processing it as the
+ # formatting of result can be cpu intensive and hence would
+ # affect the overall time taken to fetch the result from the
+ # analytics-api. Since the query result ttl is set to 5 min
+ # in redis, it is necessary to improve the read throughput.
+ result_list = self.read_result(result)
+ p.join()
+ if not self._args.reverse:
+ start_time = self._end_time + 1
+ else:
+ end_time = self._start_time - 1
self.display(result_list)
+
except KeyboardInterrupt:
return
diff --git a/src/opserver/opserver_util.py b/src/opserver/opserver_util.py
index 4c353216a49..8958654784a 100644
--- a/src/opserver/opserver_util.py
+++ b/src/opserver/opserver_util.py
@@ -390,6 +390,17 @@ def parse_start_end_time(start_time, end_time, last):
'now' in end_time:
ostart_time = start_time
oend_time = end_time
+ now = OpServerUtils.utc_timestamp_usec()
+ td = OpServerUtils.convert_to_time_delta(ostart_time[len('now'):])
+ if td == None:
+ ostart_time = now
+ else:
+ ostart_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6)
+ td = OpServerUtils.convert_to_time_delta(oend_time[len('now'):])
+ if td == None:
+ oend_time = now
+ else:
+ oend_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6)
elif start_time.isdigit() and \
end_time.isdigit():
ostart_time = int(start_time)
@@ -534,6 +545,8 @@ def get_query_result(opserver_ip, opserver_port, qid, user, password,
@staticmethod
def convert_to_time_delta(time_str):
+ if time_str == '' or time_str == None:
+ return None
num = int(time_str[:-1])
if time_str.endswith('s'):
return datetime.timedelta(seconds=num)
diff --git a/src/opserver/test/test_flow.py b/src/opserver/test/test_flow.py
index 04a7da998a3..83930fab44f 100755
--- a/src/opserver/test/test_flow.py
+++ b/src/opserver/test/test_flow.py
@@ -61,8 +61,11 @@ def test_1_noarg_query(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"table": "FlowRecordTable", "start_time": "now-10m", "end_time": "now", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}'
+ expected_result_str = '{"table": "FlowRecordTable", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}'
expected_result_dict = json.loads(expected_result_str)
+ self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6))
+ del query_dict['start_time']
+ del query_dict['end_time']
for key in expected_result_dict:
self.assertTrue(key in query_dict)
self.assertTrue(expected_result_dict[key] == query_dict[key])
diff --git a/src/opserver/test/test_log.py b/src/opserver/test/test_log.py
index 7ee248f2d90..1e786160482 100755
--- a/src/opserver/test/test_log.py
+++ b/src/opserver/test/test_log.py
@@ -9,9 +9,10 @@
from opserver.log import LogQuerier
from opserver.opserver_util import OpServerUtils
+from multiprocessing import Process
test_num = 0
-query_dict = {}
+query_list = []
query_result = {
1: [
{u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'a6s45', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-left
svc-vn-left
0', u'Type': 6, u'ModuleId': u'contrail-schema'}
@@ -27,6 +28,9 @@
],
5: [
{u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'nodec39', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-left
svc-vn-left
0', u'Type': 6, u'ModuleId': u'contrail-schema'}
+],
+6: [
+{u'Category': None, u'NodeType': u'Config', u'Level': 2147483647, u'InstanceId': u'0', u'Messagetype': u'UveVirtualNetworkConfigTrace', u'Source': u'a6s45', u'SequenceNum': 6867683, u'MessageTS': 1442429588898861, u'Xmlmessage': u'default-domain:demo:svc-vn-left
svc-vn-left
0', u'Type': 6, u'ModuleId': u'contrail-schema'}
]
}
@@ -34,8 +38,8 @@ class LogQuerierTest(unittest.TestCase):
@staticmethod
def custom_post_url_http(url, params):
- global query_dict
- query_dict = json.loads(params)
+ global query_list
+ query_list.append(json.loads(params))
return '{"href": "/analytics/query/a415fe1e-51cb-11e5-aab0-00000a540d2d"}'
@staticmethod
@@ -46,24 +50,34 @@ def custom_get_query_result(opserver_ip, opserver_port, qid):
return []
def custom_display(self, result):
+ if result == [] or result is None:
+ return
try:
self.assertTrue(result == query_result[test_num])
except KeyError:
self.assertTrue(False)
+ def custom_process_start(self):
+ self._querier.display(self._querier.read_result(query_result[test_num]))
+
+ def custom_process_end(self):
+ return
+
def setUp(self):
self.maxDiff = None
self._querier = LogQuerier()
flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y))
- flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z))
- flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x))
-
+ self.query_expectations = flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z))
+ self.display_expectations = flexmock(LogQuerier).should_receive('display').replace_with(lambda x: self.custom_display(x))
+ self.process_start_expectations = flexmock(Process).should_receive('start').replace_with(lambda:self.custom_process_start())
+ self.process_end_expectations = flexmock(Process).should_receive('join').replace_with(lambda:self.custom_process_end())
#@unittest.skip("skip test_1_no_arg")
def test_1_no_arg(self):
global test_num
- global query_dict
+ global query_list
+ query_list = []
test_num = 1
argv = sys.argv
@@ -71,15 +85,22 @@ def test_1_no_arg(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}'
+ expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}'
expected_result_dict = json.loads(expected_result_str)
- self.assertEqual(expected_result_dict, query_dict)
+ self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6))
+ del query_list[0]['start_time']
+ del query_list[0]['end_time']
+ self.assertEqual(expected_result_dict, query_list[0])
+ self.assertEqual(self.query_expectations.times_called,1)
+ self.assertEqual(self.display_expectations.times_called,2)
+ self.assertEqual(self.process_start_expectations.times_called,1)
# a few args
#@unittest.skip("skip test_2_message_query")
def test_2_message_query(self):
global test_num
- global query_dict
+ global query_list
+ query_list = []
test_num = 2
argv = sys.argv
@@ -87,15 +108,22 @@ def test_2_message_query(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "a6s45", "op": 1}, {"suffix": null, "value2": null, "name": "ModuleId", "value": "contrail-collector", "op": 1}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "GeneratorDbStatsUve", "op": 1}]], "filter": [[{"suffix": null, "value2": null, "name": "NodeType", "value": "Analytics", "op": 1}, {"suffix": null, "value2": null, "name": "InstanceId", "value": 0, "op": 1}]]}'
+ expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "a6s45", "op": 1}, {"suffix": null, "value2": null, "name": "ModuleId", "value": "contrail-collector", "op": 1}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "GeneratorDbStatsUve", "op": 1}]], "filter": [[{"suffix": null, "value2": null, "name": "NodeType", "value": "Analytics", "op": 1}, {"suffix": null, "value2": null, "name": "InstanceId", "value": 0, "op": 1}]]}'
expected_result_dict = json.loads(expected_result_str)
- self.assertEqual(expected_result_dict, query_dict)
+ self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6))
+ del query_list[0]['start_time']
+ del query_list[0]['end_time']
+ self.assertEqual(expected_result_dict, query_list[0])
+ self.assertEqual(self.query_expectations.times_called,1)
+ self.assertEqual(self.display_expectations.times_called,2)
+ self.assertEqual(self.process_start_expectations.times_called,1)
# a object values query
#@unittest.skip("skip test_3_object_value")
def test_3_object_value(self):
global test_num
- global query_dict
+ global query_list
+ query_list = []
test_num = 3
argv = sys.argv
@@ -103,15 +131,22 @@ def test_3_object_value(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"table": "ConfigObjectTable", "start_time": "now-10m", "end_time": "now", "select_fields": ["ObjectId"]}'
+ expected_result_str = '{"table": "ConfigObjectTable", "select_fields": ["ObjectId"]}'
expected_result_dict = json.loads(expected_result_str)
- self.assertEqual(expected_result_dict, query_dict)
+ self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6))
+ del query_list[0]['start_time']
+ del query_list[0]['end_time']
+ self.assertEqual(expected_result_dict, query_list[0])
+ self.assertEqual(self.query_expectations.times_called,1)
+ self.assertEqual(self.display_expectations.times_called,2)
+ self.assertEqual(self.process_start_expectations.times_called,1)
# a object id query
#@unittest.skip("skip test_4_object_id")
def test_4_object_id(self):
global test_num
- global query_dict
+ global query_list
+ query_list = []
test_num = 4
argv = sys.argv
@@ -119,15 +154,22 @@ def test_4_object_id(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Messagetype", "ObjectLog", "SystemLog"], "table": "ConfigObjectTable", "where": [[{"suffix": null, "value2": null, "name": "ObjectId", "value": "virtual_network:default-domain:admin:vn1-take2", "op": 1}]]}'
+ expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Messagetype", "ObjectLog", "SystemLog"], "table": "ConfigObjectTable", "where": [[{"suffix": null, "value2": null, "name": "ObjectId", "value": "virtual_network:default-domain:admin:vn1-take2", "op": 1}]]}'
expected_result_dict = json.loads(expected_result_str)
- self.assertEqual(expected_result_dict, query_dict)
+ self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6))
+ del query_list[0]['start_time']
+ del query_list[0]['end_time']
+ self.assertEqual(expected_result_dict, query_list[0])
+ self.assertEqual(self.query_expectations.times_called,1)
+ self.assertEqual(self.display_expectations.times_called,2)
+ self.assertEqual(self.process_start_expectations.times_called,1)
# prefix query
#@unittest.skip("skip test_5_prefix_query")
def test_5_prefix_query(self):
global test_num
- global query_dict
+ global query_list
+ query_list = []
test_num = 5
argv = sys.argv
@@ -135,10 +177,43 @@ def test_5_prefix_query(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"sort": 1, "start_time": "now-10m", "sort_fields": ["MessageTS"], "end_time": "now", "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "node", "op": 7}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "UveVirtualNetwork", "op": 7}]]}'
+ expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable", "where": [[{"suffix": null, "value2": null, "name": "Source", "value": "node", "op": 7}, {"suffix": null, "value2": null, "name": "Messagetype", "value": "UveVirtualNetwork", "op": 7}]]}'
expected_result_dict = json.loads(expected_result_str)
- self.assertEqual(expected_result_dict, query_dict)
+ self.assertEqual(int(query_list[0]['end_time']) - int(query_list[0]['start_time']),10*60*pow(10,6))
+ del query_list[0]['start_time']
+ del query_list[0]['end_time']
+ self.assertEqual(expected_result_dict, query_list[0])
+ self.assertEqual(self.query_expectations.times_called,1)
+ self.assertEqual(self.display_expectations.times_called,2)
+ self.assertEqual(self.process_start_expectations.times_called,1)
# end test_5_prefix_query
+ #@unittest.skip("skip test_6_long_query")
+ def test_6_long_query(self):
+ global test_num
+ global query_list
+ query_list = []
+ test_num = 6
+
+ argv = sys.argv
+ sys.argv = "contrail-logs --start-time now-30m --end-time now".split()
+ self._querier.run()
+ sys.argv = argv
+
+ expected_result_str = '{"sort": 1, "sort_fields": ["MessageTS"], "select_fields": ["MessageTS", "Source", "ModuleId", "Category", "Messagetype", "SequenceNum", "Xmlmessage", "Type", "Level", "NodeType", "InstanceId"], "table": "MessageTable"}'
+ expected_result_dict = json.loads(expected_result_str)
+ self.assertEqual(self.query_expectations.times_called,3)
+ self.assertEqual(self.display_expectations.times_called,4)
+ self.assertEqual(self.process_start_expectations.times_called,3)
+ for i in range(len(query_list) - 1):
+ self.assertEqual(int(query_list[i]['end_time']) - int(query_list[i]['start_time']),10*60*pow(10,6))
+ del query_list[i]['start_time']
+ del query_list[i]['end_time']
+ self.assertEqual(expected_result_dict, query_list[i])
+ self.assertEqual(int(query_list[2]['end_time']) - int(query_list[2]['start_time']),10*60*pow(10,6) - 2)
+ del query_list[2]['start_time']
+ del query_list[2]['end_time']
+ self.assertEqual(expected_result_dict, query_list[2])
+
if __name__ == '__main__':
unittest.main()
diff --git a/src/opserver/test/test_stats.py b/src/opserver/test/test_stats.py
index 3e0cce3ae23..5440fc9dd77 100755
--- a/src/opserver/test/test_stats.py
+++ b/src/opserver/test/test_stats.py
@@ -51,8 +51,11 @@ def test_1_analytics_cpu_query(self):
self._querier.run()
sys.argv = argv
- expected_result_str = '{"start_time": "now-10m", "end_time": "now", "select_fields": ["T=60", "SUM(process_mem_cpu_usage.cpu_share)"], "table": "StatTable.NodeStatus.process_mem_cpu_usage", "where": [[{"suffix": null, "value2": null, "name": "name", "value": "", "op": 7}]]}'
+ expected_result_str = '{"select_fields": ["T=60", "SUM(process_mem_cpu_usage.cpu_share)"], "table": "StatTable.NodeStatus.process_mem_cpu_usage", "where": [[{"suffix": null, "value2": null, "name": "name", "value": "", "op": 7}]]}'
expected_result_dict = json.loads(expected_result_str)
+ self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6))
+ del query_dict['start_time']
+ del query_dict['end_time']
for key in expected_result_dict:
self.assertTrue(key in query_dict)
self.assertTrue(expected_result_dict[key] == query_dict[key])