From 74205ef2cc55cd038bd4089ef658d9f73b62be5c Mon Sep 17 00:00:00 2001 From: Sundaresan Rajangam Date: Tue, 8 Sep 2015 11:16:12 -0700 Subject: [PATCH] Added size based watermarks in pysandesh library - Added size based watermarks for sandesh send queue. - python sandesh compiler would generate a function named __sizeof__() for each structure/sandesh defined in .sandesh If a python object has __sizeof__() defined, then sys.getsizeof() would call this function to get the size of the object. TODO: - Add UT for auto-generated function __sizeof__() Change-Id: I5ebbb67ad56e4ed9d65e4296eafa43ad7fda94f4 Partial-Bug: #1484800 --- compiler/generate/t_py_generator.cc | 53 +++++++++- library/python/pysandesh/sandesh_base.py | 96 +++++++++++-------- library/python/pysandesh/sandesh_client.py | 11 +-- library/python/pysandesh/sandesh_session.py | 72 +++++++++++--- .../python/pysandesh/sandesh_state_machine.py | 3 +- .../python/pysandesh/test/sandesh_msg_test.py | 50 +++++----- .../pysandesh/test/sandesh_session_test.py | 93 +++++++++++++++++- .../python/pysandesh/test/work_queue_test.py | 2 +- 8 files changed, 288 insertions(+), 92 deletions(-) diff --git a/compiler/generate/t_py_generator.cc b/compiler/generate/t_py_generator.cc index d4153efb..12fc9b68 100644 --- a/compiler/generate/t_py_generator.cc +++ b/compiler/generate/t_py_generator.cc @@ -134,6 +134,7 @@ class t_py_generator : public t_generator { void generate_py_function_helpers(t_function* tfunction); #ifdef SANDESH void generate_py_struct_logger(std::ofstream& out, t_struct* tstruct); + void generate_py_struct_sizeof(std::ofstream& out, t_struct* tstruct); #endif /** @@ -166,6 +167,11 @@ class t_py_generator : public t_generator { void generate_py_sandesh_alarm_list(std::ofstream& out); void generate_py_sandesh_alarm_data_list(std::ofstream& out); void generate_py_sandesh_trace(std::ofstream& out, t_sandesh* tsandesh); + void generate_py_sandesh_sizeof(std::ofstream& out, t_sandesh* tsandesh); +#endif + +#ifdef SANDESH + void generate_py_sizeof(std::ofstream& out, const vector& fields); #endif /** @@ -505,7 +511,9 @@ string t_py_generator::render_sandesh_includes() { std::string sandesh_includes("\n"); sandesh_includes += "import cStringIO\n" - "import uuid\n"; + "import uuid\n" + "from sys import getsizeof\n" + "from itertools import chain\n"; if (module != "sandesh") { sandesh_includes += "import bottle\n" @@ -965,6 +973,7 @@ void t_py_generator::generate_py_struct_definition(ofstream& out, #ifdef SANDESH generate_py_struct_logger(out, tstruct); + generate_py_struct_sizeof(out, tstruct); #endif // For exceptions only, generate a __str__ method. This is @@ -1521,6 +1530,7 @@ void t_py_generator::generate_py_sandesh_definition(ofstream& out, } generate_py_sandesh_compare(out, tsandesh); + generate_py_sandesh_sizeof(out, tsandesh); if (!gen_slots_) { // Printing utilities so that on the command line thrift @@ -2246,6 +2256,47 @@ void t_py_generator::generate_py_struct_logger(ofstream &out, indent_down(); } +void t_py_generator::generate_py_sizeof(std::ofstream& out, + const vector& fields) { + indent(out) << "def __sizeof__(self):" << endl; + indent_up(); + indent(out) << "size = 0" << endl; + vector::const_iterator f_iter; + for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) { + std::string fname((*f_iter)->get_name()); + indent(out) << "if self." << fname << " is not None:" << endl; + indent_up(); + indent(out) << "size += getsizeof(self." << fname << ")" << endl; + t_type* type = get_true_type((*f_iter)->get_type()); + if (type->is_container()) { + if (type->is_map()) { + indent(out) << + "size += sum(map(getsizeof, chain.from_iterable(self." << + fname << ")))" << endl; + + } else { + indent(out) << + "size += sum(map(getsizeof, self." << fname << "))" << endl; + } + } + indent_down(); + } + indent(out) << "return size" << endl << endl; + indent_down(); +} + +void t_py_generator::generate_py_struct_sizeof(std::ofstream& out, + t_struct* tstruct) { + const vector& fields = tstruct->get_members(); + generate_py_sizeof(out, fields); +} + +void t_py_generator::generate_py_sandesh_sizeof(std::ofstream& out, + t_sandesh* tsandesh) { + const vector& fields = tsandesh->get_members(); + generate_py_sizeof(out, fields); +} + void t_py_generator::generate_py_sandesh_hint(ofstream& out, t_sandesh* tsandesh) { bool has_key_annotation = tsandesh->has_key_annotation(); diff --git a/library/python/pysandesh/sandesh_base.py b/library/python/pysandesh/sandesh_base.py index ed635e30..440aff41 100644 --- a/library/python/pysandesh/sandesh_base.py +++ b/library/python/pysandesh/sandesh_base.py @@ -5,18 +5,21 @@ # # Sandesh # + import importlib import os +import sys import pkgutil - import gevent -import sandesh_logger as sand_logger -import trace -import util +import json +import base64 import collections import time import copy +import sandesh_logger as sand_logger +import trace +import util from gen_py.sandesh.ttypes import SandeshType, SandeshLevel, \ SandeshTxDropReason from gen_py.sandesh.constants import * @@ -76,6 +79,7 @@ def init_generator(self, module, source, node_type, instance_id, self._collectors = collectors self._connect_to_collector = connect_to_collector self._rcv_queue = WorkQueue(self._process_rx_sandesh) + self._send_level = SandeshLevel.INVALID self._init_logger(module, logger_class=logger_class, logger_config_file=logger_config_file) self._logger.info('SANDESH: CONNECT TO COLLECTOR: %s', @@ -209,6 +213,18 @@ def set_send_queue(self, enable): connection.session().send_queue().may_be_start_runner() # end set_send_queue + def set_send_level(self, count, sandesh_level): + if self._send_level != sandesh_level: + self._logger.info('Sandesh Send Level [%s] -> [%s]' % \ + (SandeshLevel._VALUES_TO_NAMES[self._send_level], + SandeshLevel._VALUES_TO_NAMES[sandesh_level])) + self._send_level = sandesh_level + # end set_send_level + + def send_level(self): + return self._send_level + # end send_level + def init_collector(self): pass # end init_collector @@ -327,7 +343,7 @@ def is_unit_test(self): def handle_test(self, sandesh_init): if sandesh_init.is_unit_test() or self._is_level_ut(): - if self._is_logging_allowed(sandesh_init): + if self.is_logging_allowed(sandesh_init): sandesh_init._logger.debug(self.log()) return True return False @@ -360,17 +376,26 @@ def send_sandesh(self, tx_sandesh): self._client.send_sandesh(tx_sandesh) else: if self._connect_to_collector: - if self.is_logging_dropped_allowed(tx_sandesh): - self._logger.error('SANDESH: No Client: %s', - tx_sandesh.log()) + self.drop_tx_sandesh(tx_sandesh, SandeshTxDropReason.NoClient, + tx_sandesh.level()) else: - self._logger.log( - sand_logger.SandeshLogger.get_py_logger_level( - tx_sandesh.level()), tx_sandesh.log()) - self._msg_stats.update_tx_stats(tx_sandesh.__class__.__name__, - 0, SandeshTxDropReason.NoClient) + self.drop_tx_sandesh(tx_sandesh, SandeshTxDropReason.NoClient) # end send_sandesh + def drop_tx_sandesh(self, tx_sandesh, drop_reason, level=None): + self._msg_stats.update_tx_stats(tx_sandesh.__class__.__name__, + sys.getsizeof(tx_sandesh), drop_reason) + if self.is_logging_dropped_allowed(tx_sandesh): + if level is not None: + self._logger.log( + sand_logger.SandeshLogger.get_py_logger_level(level), + tx_sandesh.log()) + else: + self._logger.error('SANDESH: [DROP: %s] %s' % \ + (SandeshTxDropReason._VALUES_TO_NAMES[drop_reason], + tx_sandesh.log())) + # end drop_tx_sandesh + def send_generator_info(self): from gen_py.sandesh_uve.ttypes import SandeshClientInfo, \ ModuleClientState, SandeshModuleClientTrace @@ -690,15 +715,17 @@ def send(self, sandesh=sandesh_global): try: self.validate() except e: - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.ValidationFailed) - sandesh._logger.error('sandesh "%s" validation failed [%s]' - % (self.__class__.__name__, e)) + sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed) + return -1 + if self._level >= sandesh.send_level(): + sandesh.drop_tx_sandesh(self, SandeshTxDropReason.QueueLevel) return -1 - #If systemlog message first check if the transmit side buffer + # For systemlog message, first check if the transmit side buffer # has space if self._type == SandeshType.SYSTEM: if (not self.is_rate_limit_pass(sandesh)): + sandesh.drop_tx_sandesh(self, + SandeshTxDropReason.RatelimitDrop) return -1 self._seqnum = self.next_seqnum() if self.handle_test(sandesh): @@ -725,8 +752,6 @@ def is_rate_limit_pass(self,sandesh): if(self.__class__.rate_limit_buffer[0] == cur_time): #Sender generating more messages/sec than the #buffer_threshold size - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.RatelimitDrop) if self.__class__.do_rate_limit_drop_log: sandesh._logger.error('SANDESH: Ratelimit Drop ' \ '(%d messages/sec): for %s' % \ @@ -798,10 +823,7 @@ def request(self, context='', sandesh=sandesh_global): try: self.validate() except e: - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.ValidationFailed) - sandesh._logger.error('sandesh "%s" validation failed [%s]' - % (self.__class__.__name__, e)) + sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed) return -1 if context == 'ctrl': self._hints |= SANDESH_CONTROL_HINT @@ -828,10 +850,7 @@ def response(self, context='', more=False, sandesh=sandesh_global): try: self.validate() except e: - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.ValidationFailed) - sandesh._logger.error('sandesh "%s" validation failed [%s]' - % (self.__class__.__name__, e)) + sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed) return -1 self._context = context self._more = more @@ -861,10 +880,7 @@ def send(self, isseq=False, seqno=0, context='', try: self.validate() except e: - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.ValidationFailed) - sandesh._logger.error('sandesh "%s" validation failed [%s]' - % (self.__class__.__name__, e)) + sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed) return -1 if isseq is True: self._seqnum = seqno @@ -874,16 +890,15 @@ def send(self, isseq=False, seqno=0, context='', self.__class__.__name__) if uve_type_map is None: sandesh._logger.error('sandesh uve <%s> not registered: %s'\ - % (self.__class__.__name__, self.log())) - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.ValidationFailed) + % (self.__class__.__name__)) + sandesh.drop_tx_sandesh(self, + SandeshTxDropReason.ValidationFailed) return -1 self._seqnum = self.next_seqnum() if not uve_type_map.update_uve(self): - sandesh.msg_stats().update_tx_stats(self.__class__.__name__, - 0, SandeshTxDropReason.ValidationFailed) - sandesh._logger.error('Failed to update sandesh in cache. ' - + self.log()) + sandesh._logger.error('Failed to update sandesh in cache') + sandesh.drop_tx_sandesh(self, + SandeshTxDropReason.ValidationFailed) return -1 self._context = context self._more = more @@ -926,8 +941,7 @@ def send_trace(self, context='', more=False, try: self.validate() except e: - sandesh._logger.error('sandesh "%s" validation failed [%s]' - % (self.__class__.__name__, e)) + sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed) return -1 self._context = context self._more = more diff --git a/library/python/pysandesh/sandesh_client.py b/library/python/pysandesh/sandesh_client.py index 02be4d63..abe1ac2b 100644 --- a/library/python/pysandesh/sandesh_client.py +++ b/library/python/pysandesh/sandesh_client.py @@ -46,18 +46,11 @@ def send_sandesh(self, sandesh): self._connection.session().enqueue_sandesh(sandesh) else: if (self._connection.session() is None): - error_str = "No Connection" - self._sandesh_instance.msg_stats().update_tx_stats( - sandesh.__class__.__name__, 0, + self._sandesh_instance.drop_tx_sandesh(sandesh, SandeshTxDropReason.NoSession) else: - error_str = "No ModuleId" - self._sandesh_instance.msg_stats().update_tx_stats( - sandesh.__class__.__name__, 0, + self._sandesh_instance.drop_tx_sandesh(sandesh, SandeshTxDropReason.ClientSendFailed) - if self._sandesh_instance.is_logging_dropped_allowed(sandesh): - self._logger.error( - "SANDESH: %s: %s" % (error_str, sandesh.log())) return 0 #end send_sandesh diff --git a/library/python/pysandesh/sandesh_session.py b/library/python/pysandesh/sandesh_session.py index 7c84e912..4525f60d 100644 --- a/library/python/pysandesh/sandesh_session.py +++ b/library/python/pysandesh/sandesh_session.py @@ -7,12 +7,14 @@ # import socket +import sys +from functools import partial from transport import TTransport from protocol import TXMLProtocol -from work_queue import WorkQueue +from work_queue import WorkQueue, WaterMark from tcp_session import TcpSession from sandesh_logger import SandeshLogger -from gen_py.sandesh.ttypes import SandeshTxDropReason +from gen_py.sandesh.ttypes import SandeshLevel, SandeshTxDropReason _XML_SANDESH_OPEN = '' _XML_SANDESH_OPEN_ATTR_LEN = '= send_level: + queue_level_drop += 1 + self.assertEqual(queue_level_drop, sandesh_global.msg_stats().\ + aggregate_stats().messages_sent_dropped_queue_level) + # end test_sandesh_queue_level_drop + +# end class SandeshMsgTest if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2, catchbreak=True) diff --git a/library/python/pysandesh/test/sandesh_session_test.py b/library/python/pysandesh/test/sandesh_session_test.py index 99d6e6c3..c8998b88 100755 --- a/library/python/pysandesh/test/sandesh_session_test.py +++ b/library/python/pysandesh/test/sandesh_session_test.py @@ -18,7 +18,9 @@ sys.path.insert(1, sys.path[0]+'/../../../python') from pysandesh import sandesh_session +from pysandesh.sandesh_session import * from pysandesh.sandesh_base import * +from pysandesh.test.gen_py.msg_test.ttypes import * sandesh_test_started = False @@ -250,5 +252,94 @@ def test_send_msg_all(self): #end class SandeshWriterTest + +class SandeshSendQueueTest(unittest.TestCase): + + def setUp(self): + self.maxDiff = None + self.sandesh_sendq = SandeshSendQueue(self.sandesh_queue_callback) + # end setUp + + def tearDown(self): + pass + # end tearDown + + def sandesh_queue_callback(self): + pass + # end sandesh_queue_callback + + def test_sandesh_send_queue_size(self): + # enqueue sandesh and verify that the queue size is incremented + # by sandesh size + systemlog = SystemLogTest() + self.sandesh_sendq.enqueue(SandeshSendQueue.Element(systemlog)) + expected_qsize = sys.getsizeof(systemlog) + self.assertEqual(expected_qsize, self.sandesh_sendq.size()) + + # enqueue one more entry + objectlog = ObjectLogTest(StructObject()) + self.sandesh_sendq.enqueue(SandeshSendQueue.Element(objectlog)) + expected_qsize += sys.getsizeof(objectlog) + self.assertEqual(expected_qsize, self.sandesh_sendq.size()) + + # dequeue sandesh and verify that the queue size is decremented + # by sandesh size + self.sandesh_sendq.dequeue() + expected_qsize -= sys.getsizeof(systemlog) + self.assertEqual(expected_qsize, self.sandesh_sendq.size()) + + # dequeue last entry + self.sandesh_sendq.dequeue() + expected_qsize = 0 + self.assertEqual(expected_qsize, self.sandesh_sendq.size()) + # end test_sandesh_send_queue_size + +# end class SandeshSendQueueTest + + +class SandeshSessionTest(unittest.TestCase): + + def setUp(self): + self.sandesh_instance = Sandesh() + self.sandesh_instance.init_generator('SandeshSessionTest', 'localhost', + 'UT', 0, None, 'context', -1, connect_to_collector=False) + # end setUp + + def tearDown(self): + pass + # end tearDown + + def verify_watermarks(self, expected_wms, actual_wms): + expected_wms.sort() + actual_wms.sort() + print '== verify watermarks ==' + print expected_wms + self.assertEqual(len(expected_wms), len(actual_wms)) + for i in range(len(expected_wms)): + self.assertEqual(expected_wms[i][0], actual_wms[i].size) + # Invoke the watermark callback and verify that the + # send_level is set correctly + actual_wms[i].callback(expected_wms[i][0]) + self.assertEqual(expected_wms[i][1], + self.sandesh_instance.send_level()) + # end verify_watermarks + + def test_send_queue_watermarks(self): + session = SandeshSession(self.sandesh_instance, None, None, None) + wms = SandeshSendQueue._SENDQ_WATERMARKS + # verify high watermarks are set properly in sandesh send queue + high_wms = [wm for wm in wms if wm[2] is True] + sendq_hwms = session.send_queue().high_watermarks() + self.verify_watermarks(high_wms, sendq_hwms) + + # verify low watermarks are set properly in sandesh send queue + low_wms = [wm for wm in wms if wm[2] is False] + sendq_lwms = session.send_queue().low_watermarks() + self.verify_watermarks(low_wms, sendq_lwms) + # end test_send_queue_watermarks + +# end class SandeshSessionTest + + if __name__ == '__main__': - unittest.main() + unittest.main(verbosity=2, catchbreak=True) diff --git a/library/python/pysandesh/test/work_queue_test.py b/library/python/pysandesh/test/work_queue_test.py index 562aadf2..645e4ac1 100755 --- a/library/python/pysandesh/test/work_queue_test.py +++ b/library/python/pysandesh/test/work_queue_test.py @@ -40,7 +40,7 @@ def __init__(self): self.wm_cb_count = 0 # end __init__ - def callback(self, qsize, wm_cb_type=cb_type): + def callback(self, qsize, wm_cb_type): self.wm_cb_qsize = qsize self.wm_cb_type = wm_cb_type self.wm_cb_count += 1