From 132000a5414b8aed60b6c23a8a4b93cdc5e97ff0 Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Fri, 22 Jan 2016 18:47:45 -0800 Subject: [PATCH] Add Tx and Rx drop stats in pysandesh - Update drop stats in Tx and Rx path Change-Id: I43cf79140942393c57d9e28e743611b96b875f01 Closes-Bug: #1472106 --- library/python/pysandesh/sandesh_base.py | 34 ++++- library/python/pysandesh/sandesh_client.py | 22 +++- .../python/pysandesh/sandesh_connection.py | 20 ++- library/python/pysandesh/sandesh_req_impl.py | 34 ++--- library/python/pysandesh/sandesh_session.py | 23 ++-- .../python/pysandesh/sandesh_state_machine.py | 2 + library/python/pysandesh/sandesh_stats.py | 121 ++++++++++++------ 7 files changed, 168 insertions(+), 88 deletions(-) diff --git a/library/python/pysandesh/sandesh_base.py b/library/python/pysandesh/sandesh_base.py index 1705fb1a..3a570592 100644 --- a/library/python/pysandesh/sandesh_base.py +++ b/library/python/pysandesh/sandesh_base.py @@ -16,10 +16,9 @@ from gen_py.sandesh.ttypes import SandeshType, SandeshLevel from gen_py.sandesh.constants import * -from sandesh_client import SandeshClient from sandesh_http import SandeshHttp -from sandesh_stats import SandeshStats from sandesh_trace import SandeshTraceRequestRunner +from sandesh_client import SandeshClient from sandesh_uve import SandeshUVETypeMaps, SandeshUVEPerTypeMap from work_queue import WorkQueue @@ -77,7 +76,8 @@ def init_generator(self, module, source, node_type, instance_id, logger_config_file=logger_config_file) self._logger.info('SANDESH: CONNECT TO COLLECTOR: %s', connect_to_collector) - self._stats = SandeshStats() + from sandesh_stats import SandeshMessageStatistics + self._msg_stats = SandeshMessageStatistics() self._trace = trace.Trace() self._sandesh_request_dict = {} self._uve_type_maps = SandeshUVETypeMaps(self._logger) @@ -167,6 +167,10 @@ def set_logging_file(self, file): self._sandesh_logger.set_logging_file(file) # end set_logging_file + def is_logging_dropped_allowed(self, sandesh): + return True + # end is_logging_dropped_allowed + def is_send_queue_enabled(self): return self._send_queue_enabled # end is_send_queue_enabled @@ -190,9 +194,9 @@ def init_collector(self): pass # end init_collector - def stats(self): - return self._stats - # end stats + def msg_stats(self): + return self._msg_stats + # end msg_stats @classmethod def next_seqnum(cls): @@ -331,6 +335,8 @@ def send_sandesh(self, tx_sandesh): self._logger.log( sand_logger.SandeshLogger.get_py_logger_level( tx_sandesh.level()), tx_sandesh.log()) + self._msg_stats.update_tx_drop_stats(tx_sandesh.__class__.__name__, + 0) # end send_sandesh def send_generator_info(self): @@ -650,6 +656,8 @@ def send(self, sandesh=sandesh_global): try: self.validate() except e: + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -704,6 +712,8 @@ def request(self, context='', sandesh=sandesh_global): try: self.validate() except e: + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -732,6 +742,8 @@ def response(self, context='', more=False, sandesh=sandesh_global): try: self.validate() except e: + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -763,6 +775,8 @@ def send(self, isseq=False, seqno=0, context='', try: self.validate() except e: + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -773,9 +787,15 @@ def send(self, isseq=False, seqno=0, context='', uve_type_map = sandesh._uve_type_maps.get_uve_type_map( self.__class__.__name__) if uve_type_map is None: + sandesh._logger.error('sandesh uve <%s> not registered: %s'\ + % (self.__class__.__name__, self.log())) + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) return -1 self._seqnum = self.next_seqnum() if not uve_type_map.update_uve(self): + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) sandesh._logger.error('Failed to update sandesh in cache. ' + self.log()) return -1 @@ -819,6 +839,8 @@ def send_trace(self, context='', more=False, try: self.validate() except e: + sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__, + 0) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 diff --git a/library/python/pysandesh/sandesh_client.py b/library/python/pysandesh/sandesh_client.py index 5b7cc68d..ec42c2ee 100644 --- a/library/python/pysandesh/sandesh_client.py +++ b/library/python/pysandesh/sandesh_client.py @@ -45,13 +45,16 @@ def send_sandesh(self, sandesh): self._connection.session().enqueue_sandesh(sandesh) else: if (self._connection.session() is None): - self._logger.log( - SandeshLogger.get_py_logger_level(sandesh.level()), - "No Connection: %s" % sandesh.log()) + error_str = "No Connection" + self._sandesh_instance.msg_stats().update_tx_drop_stats( + sandesh.__class__.__name__, 0) else: - self._logger.log( - SandeshLogger.get_py_logger_level(sandesh.level()), - "No ModuleId: %s" % sandesh.log()) + error_str = "No ModuleId" + self._sandesh_instance.msg_stats().update_tx_drop_stats( + sandesh.__class__.__name__, 0) + if self._sandesh_instance.is_logging_dropped_allowed(sandesh): + self._logger.error( + "SANDESH: %s: %s" % (error_str, sandesh.log())) return 0 #end send_sandesh @@ -59,17 +62,22 @@ def send_uve_sandesh(self, uve_sandesh): self._connection.statemachine().on_sandesh_uve_msg_send(uve_sandesh) #end send_uve_sandesh - def handle_sandesh_msg(self, sandesh_name, sandesh_xml): + def handle_sandesh_msg(self, sandesh_name, sandesh_xml, msg_len): transport = TTransport.TMemoryBuffer(sandesh_xml) protocol_factory = TXMLProtocol.TXMLProtocolFactory() protocol = protocol_factory.getProtocol(transport) sandesh_req = self._sandesh_instance.get_sandesh_request_object(sandesh_name) if sandesh_req: if sandesh_req.read(protocol) == -1: + self._sandesh_instance.update_rx_drop_stats(sandesh_name, + msg_len) self._logger.error('Failed to decode sandesh request "%s"' \ % (sandesh_name)) else: + self._sandesh_instance.update_rx_stats(sandesh_name, msg_len) self._sandesh_instance.enqueue_sandesh_request(sandesh_req) + else: + self._sandesh_instance.update_rx_drop_stats(sandesh_name, msg_len) #end handle_sandesh_msg def handle_sandesh_ctrl_msg(self, sandesh_ctrl_msg): diff --git a/library/python/pysandesh/sandesh_connection.py b/library/python/pysandesh/sandesh_connection.py index 0c62e915..d6ab50de 100644 --- a/library/python/pysandesh/sandesh_connection.py +++ b/library/python/pysandesh/sandesh_connection.py @@ -8,13 +8,12 @@ import gevent import os -from sandesh_session import SandeshSession -from sandesh_session import SandeshReader -from sandesh_state_machine import SandeshStateMachine, Event from transport import TTransport from protocol import TXMLProtocol -from gen_py.sandesh.constants import * +from sandesh_session import SandeshSession, SandeshReader +from sandesh_state_machine import SandeshStateMachine, Event from sandesh_uve import SandeshUVETypeMaps +from gen_py.sandesh.constants import * class SandeshConnection(object): @@ -147,13 +146,15 @@ def _handle_collector_update(self, collector_info): def _receive_sandesh_msg(self, session, msg): (hdr, hdr_len, sandesh_name) = SandeshReader.extract_sandesh_header(msg) if sandesh_name is None: + self._sandesh_instance.msg_stats().update_rx_drop_stats( + '__UNKNOWN__', len(msg)) self._logger.error('Failed to decode sandesh header for "%s"' % (msg)) return - # update the sandesh rx stats - self._sandesh_instance.stats().update_stats(sandesh_name, len(msg), False) if hdr.Hints & SANDESH_CONTROL_HINT: self._logger.debug('Received sandesh control message [%s]' % (sandesh_name)) if sandesh_name != 'SandeshCtrlServerToClient': + self._sandesh_instance.msg_stats().update_rx_drop_stats( + sandesh_name, len(msg)) self._logger.error('Invalid sandesh control message [%s]' % (sandesh_name)) return transport = TTransport.TMemoryBuffer(msg[hdr_len:]) @@ -162,13 +163,18 @@ def _receive_sandesh_msg(self, session, msg): from gen_py.sandesh_ctrl.ttypes import SandeshCtrlServerToClient sandesh_ctrl_msg = SandeshCtrlServerToClient() if sandesh_ctrl_msg.read(protocol) == -1: + self._sandesh_instance.msg_stats().update_rx_drop_stats( + sandesh_name, len(msg)) self._logger.error('Failed to decode sandesh control message "%s"' %(msg)) else: + self._sandesh_instance.msg_stats().update_rx_stats( + sandesh_name, len(msg)) self._state_machine.on_sandesh_ctrl_msg_receive(session, sandesh_ctrl_msg, hdr.Source) else: self._logger.debug('Received sandesh message [%s]' % (sandesh_name)) - self._client.handle_sandesh_msg(sandesh_name, msg[hdr_len:]) + self._client.handle_sandesh_msg(sandesh_name, + msg[hdr_len:], len(msg)) #end _receive_sandesh_msg #end class SandeshConnection diff --git a/library/python/pysandesh/sandesh_req_impl.py b/library/python/pysandesh/sandesh_req_impl.py index 618603bd..542c1609 100644 --- a/library/python/pysandesh/sandesh_req_impl.py +++ b/library/python/pysandesh/sandesh_req_impl.py @@ -44,7 +44,7 @@ def __init__(self, sandesh): SandeshLoggingParamsStatus.handle_request = \ self.sandesh_logging_params_status_handle_request SandeshMessageStatsReq.handle_request = \ - self.sandesh_stats_handle_request + self.sandesh_msg_stats_handle_request SandeshTraceBufferListRequest.handle_request = \ self.sandesh_trace_buffer_list_request_handle_request SandeshTraceRequest.handle_request = \ @@ -169,28 +169,16 @@ def sandesh_alarm_types_req_handle_request(self, sandesh_req): alarm_types_res.response(sandesh_req.context()) # end sandesh_alarm_types_req_handle_request - def sandesh_stats_handle_request(self, sandesh_req): - sandesh_stats = self._sandesh.stats() - stats_map = sandesh_stats.stats_map() - stats_list = [] - for sandesh_name, stats in stats_map.iteritems(): - msg_stat = SandeshMessageStats(stats.tx_count, - stats.tx_bytes, - stats.rx_count, - stats.rx_bytes) - mtype_stat = SandeshMessageTypeStats(sandesh_name, - msg_stat) - stats_list.append(mtype_stat) - + def sandesh_msg_stats_handle_request(self, sandesh_req): + sandesh_msg_stats = self._sandesh.msg_stats() + msg_type_stats = sandesh_msg_stats.message_type_stats() + msg_stats_list = [] + for msg_type, stats in msg_type_stats.iteritems(): + mtype_stat = SandeshMessageTypeStats(msg_type, stats) + msg_stats_list.append(mtype_stat) gen_stats = SandeshGeneratorStats() - gen_stats.type_stats = stats_list - gen_stats.aggregate_stats = SandeshMessageStats() - gen_stats.aggregate_stats.messages_sent = sandesh_stats._sandesh_sent - gen_stats.aggregate_stats.bytes_sent = sandesh_stats._bytes_sent - gen_stats.aggregate_stats.messages_received = \ - sandesh_stats._sandesh_received - gen_stats.aggregate_stats.bytes_received = \ - sandesh_stats._bytes_received + gen_stats.type_stats = msg_stats_list + gen_stats.aggregate_stats = sandesh_msg_stats.aggregate_stats() connection = self._sandesh.client().connection() if connection and connection.session(): session = connection.session() @@ -203,7 +191,7 @@ def sandesh_stats_handle_request(self, sandesh_req): gen_stats.send_queue_stats = send_queue_stats stats_resp = SandeshMessageStatsResp(gen_stats) stats_resp.response(sandesh_req.context()) - # end sandesh_stats_handle_request + # end sandesh_msg_stats_handle_request def sandesh_trace_buffer_list_request_handle_request(self, sandesh_req): tbuf_info_list = [SandeshTraceBufInfo(trace_buf_name=tbuf) diff --git a/library/python/pysandesh/sandesh_session.py b/library/python/pysandesh/sandesh_session.py index 9c762877..09a20c22 100644 --- a/library/python/pysandesh/sandesh_session.py +++ b/library/python/pysandesh/sandesh_session.py @@ -144,7 +144,7 @@ def __init__(self, session): # Public functions @staticmethod - def encode_sandesh(sandesh): + def encode_sandesh(sandesh, sandesh_instance=None): transport = TTransport.TMemoryBuffer() protocol_factory = TXMLProtocol.TXMLProtocolFactory() protocol = protocol_factory.getProtocol(transport) @@ -166,11 +166,15 @@ def encode_sandesh(sandesh): sandesh.instance_id()) # write the sandesh header if sandesh_hdr.write(protocol) < 0: - print 'Error in encoding sandesh header' + if sandesh_instance is not None: + sandesh_instance.msg_stats().update_tx_drop_stats( + sandesh.__class__.__name__, 0) return None # write the sandesh if sandesh.write(protocol) < 0: - print 'Error in encoding sandesh' + if sandesh_instance is not None: + sandesh_instance.msg_stats().update_tx_drop_stats( + sandesh.__class__.__name__, 0) return None # get the message msg = transport.getvalue() @@ -191,9 +195,8 @@ def send_msg(self, sandesh, more): self._logger.error('Failed to send sandesh') return -1 # update sandesh tx stats - sandesh_name = sandesh.__class__.__name__ - self._sandesh_instance.stats().update_stats( - sandesh_name, len(send_buf), True) + self._sandesh_instance.msg_stats().update_tx_stats( + sandesh.__class__.__name__, len(send_buf)) if more: self._send_msg_more(send_buf) else: @@ -317,9 +320,11 @@ def _send_sandesh(self, sandesh): else: more = True if not self._connected: - self._logger.log( - SandeshLogger.get_py_logger_level(sandesh.level()), - sandesh.log()) + if self._sandesh_instance.is_logging_dropped_allowed(sandesh): + self._logger.error( + "SANDESH: %s: %s" % ("Not connected", sandesh.log())) + self._sandesh_instance.msg_stats().update_tx_drop_stats( + sandesh.__class__.__name__, 0) return if sandesh.is_logging_allowed(self._sandesh_instance): self._logger.log( diff --git a/library/python/pysandesh/sandesh_state_machine.py b/library/python/pysandesh/sandesh_state_machine.py index 544c42f0..2b23bf00 100644 --- a/library/python/pysandesh/sandesh_state_machine.py +++ b/library/python/pysandesh/sandesh_state_machine.py @@ -453,6 +453,8 @@ def _dequeue_event(self, event): if self._fsm.current == State._ESTABLISHED or self._fsm.current == State._CLIENT_INIT: self._connection.handle_sandesh_uve_msg(event.msg) else: + self._connection.sandesh_instance().msg_stats().update_tx_drop_stats( + event.msg.__class__.__name__, 0) self._logger.info("Discarding event[%s] in state[%s]" \ % (event.event, self._fsm.current)) elif event.event == Event._EV_SANDESH_CTRL_MESSAGE_RECV and \ diff --git a/library/python/pysandesh/sandesh_stats.py b/library/python/pysandesh/sandesh_stats.py index d01ea2c1..a3638a82 100644 --- a/library/python/pysandesh/sandesh_stats.py +++ b/library/python/pysandesh/sandesh_stats.py @@ -6,45 +6,94 @@ # sandesh_stats.py # -class SandeshStats(object): - class SandeshStatsElem(object): - def __init__(self): - self.tx_count = 0 - self.tx_bytes = 0 - self.rx_count = 0 - self.rx_bytes = 0 - #end __init__ - #end SandeshStatsElem +from pysandesh.sandesh_base import Sandesh +from pysandesh.gen_py.sandesh_uve.ttypes import SandeshMessageStats + +class SandeshMessageStatistics(object): def __init__(self): - self._sandesh_sent = 0 - self._bytes_sent = 0 - self._sandesh_received = 0 - self._bytes_received = 0 - self._stats_map = {} - #end __init__ - - def stats_map(self): - return self._stats_map - #end stats_map - - def update_stats(self, sandesh_name, bytes, is_tx): + self._message_type_stats = {} + self._aggregate_stats = SandeshMessageStats() + # end __init__ + + def message_type_stats(self): + return self._message_type_stats + # end message_type_stats + + def aggregate_stats(self): + return self._aggregate_stats + # end aggregate_stats + + def update_tx_stats(self, message_type, nbytes): + return self._update_tx_rx_stats_internal(message_type, nbytes, True, + False) + # end update_tx_stats + + def update_tx_drop_stats(self, message_type, nbytes): + return self._update_tx_rx_stats_internal(message_type, nbytes, True, + True) + # end update_tx_drop_stats + + def update_rx_stats(self, message_type, nbytes): + return self._update_tx_rx_stats_internal(message_type, nbytes, False, + False) + # end update_rx_stats + + def update_rx_drop_stats(self, message_type, nbytes): + return self._update_tx_rx_stats_internal(message_type, nbytes, False, + True) + # end update_rx_drop_stats + + def _update_tx_rx_stats_internal(self, message_type, nbytes, tx, drop): try: - stats_elem = self._stats_map[sandesh_name] + message_stats = self._message_type_stats[message_type] except KeyError: - stats_elem = SandeshStats.SandeshStatsElem() + message_stats = SandeshMessageStats() + self._message_type_stats[message_type] = message_stats finally: - if is_tx: - stats_elem.tx_count += 1 - stats_elem.tx_bytes += bytes - self._sandesh_sent += 1 - self._bytes_sent += bytes + if tx: + self._update_tx_stats_internal(message_stats, nbytes, drop) + self._update_tx_stats_internal(self._aggregate_stats, nbytes, + drop) + else: + self._update_rx_stats_internal(message_stats, nbytes, drop) + self._update_rx_stats_internal(self._aggregate_stats, nbytes, + drop) + return True + # end update_tx_rx_stats_internal + + def _update_tx_stats_internal(self, msg_stats, nbytes, drop): + if not drop: + try: + msg_stats.messages_sent += 1 + msg_stats.bytes_sent += nbytes + except TypeError: + msg_stats.messages_sent = 1 + msg_stats.bytes_sent = nbytes + else: + if msg_stats.messages_sent_dropped: + msg_stats.messages_sent_dropped += 1 + msg_stats.bytes_sent_dropped += nbytes else: - stats_elem.rx_count += 1 - stats_elem.rx_bytes += bytes - self._sandesh_received += 1 - self._bytes_received += bytes - self._stats_map[sandesh_name] = stats_elem - #end update_stats - -#end class SandeshStats + msg_stats.messages_sent_dropped = 1 + msg_stats.bytes_sent_dropped = nbytes + # end _update_tx_stats_internal + + def _update_rx_stats_internal(self, msg_stats, nbytes, drop): + if not drop: + if msg_stats.messages_received: + msg_stats.messages_received += 1 + msg_stats.bytes_received += nbytes + else: + msg_stats.messages_received = 1 + msg_stats.bytes_received = nbytes + else: + if msg_stats.messages_received_dropped: + msg_stats.messages_received_dropped += 1 + msg_stats.bytes_received_dropped += nbytes + else: + msg_stats.messages_received_dropped = 1 + msg_stats.bytes_received_dropped = nbytes + # end _update_rx_stats_internal + +# end class SandeshMessageStatistics