Skip to content

Commit

Permalink
contrail-logs command does not complete in time and process hogs memory.
Browse files Browse the repository at this point in the history
If run in a scale setup and for a longer time period (i.e. 5-6h or more), contrail-logs would take a lot of memory and go to D state due to waiting
for IO. This fix divides the query in multiple queries of 10 min so that contrail-logs does not take a lot of memory and have a smaller amount of
data to write to the disk. Moreover, this fix would fetch the results for the next query while writing the results for the current query to the disk
in parallel because writing to the disk may take some time.

Change-Id: Ib68c7058e970e853288d8236ea4239bdb471b313
Closes-bug: #1673865
(cherry picked from commit 9ec7e8d)
  • Loading branch information
mkheni committed Apr 18, 2017
1 parent fc5349d commit 7f5a1a1
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 32 deletions.
40 changes: 31 additions & 9 deletions src/opserver/log.py
Expand Up @@ -19,6 +19,7 @@
import logging.handlers
import time
import re
from multiprocessing import Process
from opserver_util import OpServerUtils
from sandesh_common.vns.ttypes import Module
from sandesh_common.vns.constants import ModuleNames, NodeTypeNames
Expand Down Expand Up @@ -73,16 +74,37 @@ def run(self):
last = self._args.last)
except:
return -1
result = self.query()
if result == -1:
return
# Accumulate the result before processing it as the
# formatting of result can be cpu intensive and hence would
# affect the overall time taken to fetch the result from the
# analytics-api. Since the query result ttl is set to 5 min
# in redis, it is necessary to improve the read throughput.
result_list = self.read_result(result)

start_time = self._start_time
end_time = self._end_time

result_list = []
while int(end_time) - int(start_time) > 0:
if not self._args.reverse:
self._start_time = start_time
self._end_time = start_time + 10*60*pow(10,6) if (start_time + 10*60*pow(10,6) <= int(end_time)) else int(end_time)
else:
self._end_time = end_time
self._start_time = end_time - 10*60*pow(10,6) if (end_time - 10*60*pow(10,6) >= int(start_time)) else int(start_time)

p = Process(target=self.display, args=(result_list,))
p.start()
result = self.query()
if result == -1:
return
# Accumulate the result before processing it as the
# formatting of result can be cpu intensive and hence would
# affect the overall time taken to fetch the result from the
# analytics-api. Since the query result ttl is set to 5 min
# in redis, it is necessary to improve the read throughput.
result_list = self.read_result(result)
p.join()
if not self._args.reverse:
start_time = self._end_time + 1
else:
end_time = self._start_time - 1
self.display(result_list)

except KeyboardInterrupt:
return

Expand Down
13 changes: 13 additions & 0 deletions src/opserver/opserver_util.py
Expand Up @@ -208,6 +208,17 @@ def parse_start_end_time(start_time, end_time, last):
'now' in end_time:
ostart_time = start_time
oend_time = end_time
now = OpServerUtils.utc_timestamp_usec()
td = OpServerUtils.convert_to_time_delta(ostart_time[len('now'):])
if td == None:
ostart_time = now
else:
ostart_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6)
td = OpServerUtils.convert_to_time_delta(oend_time[len('now'):])
if td == None:
oend_time = now
else:
oend_time = now + (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6)
elif start_time.isdigit() and \
end_time.isdigit():
ostart_time = int(start_time)
Expand Down Expand Up @@ -352,6 +363,8 @@ def get_query_result(opserver_ip, opserver_port, qid, user, password,

@staticmethod
def convert_to_time_delta(time_str):
if time_str == '' or time_str == None:
return None
num = int(time_str[:-1])
if time_str.endswith('s'):
return datetime.timedelta(seconds=num)
Expand Down
5 changes: 4 additions & 1 deletion src/opserver/test/test_flow.py
Expand Up @@ -61,8 +61,11 @@ def test_1_noarg_query(self):
self._querier.run()
sys.argv = argv

expected_result_str = '{"table": "FlowRecordTable", "start_time": "now-10m", "end_time": "now", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}'
expected_result_str = '{"table": "FlowRecordTable", "dir": 1, "select_fields": ["UuidKey", "vrouter", "setup_time", "teardown_time", "sourcevn", "destvn", "sourceip", "destip", "protocol", "sport", "dport", "action", "direction_ing", "agg-bytes", "agg-packets", "sg_rule_uuid", "nw_ace_uuid", "vrouter_ip", "other_vrouter_ip", "vmi_uuid", "drop_reason"]}'
expected_result_dict = json.loads(expected_result_str)
self.assertEqual(int(query_dict['end_time']) - int(query_dict['start_time']), 10*60*pow(10,6))
del query_dict['start_time']
del query_dict['end_time']
for key in expected_result_dict:
self.assertTrue(key in query_dict)
self.assertTrue(expected_result_dict[key] == query_dict[key])
Expand Down

0 comments on commit 7f5a1a1

Please sign in to comment.