Skip to content

Commit

Permalink
Add Tx and Rx drop stats in pysandesh
Browse files Browse the repository at this point in the history
- Update drop stats in Tx and Rx path

Change-Id: I43cf79140942393c57d9e28e743611b96b875f01
Closes-Bug: #1472106
  • Loading branch information
Megh Bhatt authored and arvindvis committed Apr 7, 2016
1 parent b5352ba commit 132000a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 88 deletions.
34 changes: 28 additions & 6 deletions library/python/pysandesh/sandesh_base.py
Expand Up @@ -16,10 +16,9 @@

from gen_py.sandesh.ttypes import SandeshType, SandeshLevel
from gen_py.sandesh.constants import *
from sandesh_client import SandeshClient
from sandesh_http import SandeshHttp
from sandesh_stats import SandeshStats
from sandesh_trace import SandeshTraceRequestRunner
from sandesh_client import SandeshClient
from sandesh_uve import SandeshUVETypeMaps, SandeshUVEPerTypeMap
from work_queue import WorkQueue

Expand Down Expand Up @@ -77,7 +76,8 @@ def init_generator(self, module, source, node_type, instance_id,
logger_config_file=logger_config_file)
self._logger.info('SANDESH: CONNECT TO COLLECTOR: %s',
connect_to_collector)
self._stats = SandeshStats()
from sandesh_stats import SandeshMessageStatistics
self._msg_stats = SandeshMessageStatistics()
self._trace = trace.Trace()
self._sandesh_request_dict = {}
self._uve_type_maps = SandeshUVETypeMaps(self._logger)
Expand Down Expand Up @@ -167,6 +167,10 @@ def set_logging_file(self, file):
self._sandesh_logger.set_logging_file(file)
# end set_logging_file

def is_logging_dropped_allowed(self, sandesh):
return True
# end is_logging_dropped_allowed

def is_send_queue_enabled(self):
return self._send_queue_enabled
# end is_send_queue_enabled
Expand All @@ -190,9 +194,9 @@ def init_collector(self):
pass
# end init_collector

def stats(self):
return self._stats
# end stats
def msg_stats(self):
return self._msg_stats
# end msg_stats

@classmethod
def next_seqnum(cls):
Expand Down Expand Up @@ -331,6 +335,8 @@ def send_sandesh(self, tx_sandesh):
self._logger.log(
sand_logger.SandeshLogger.get_py_logger_level(
tx_sandesh.level()), tx_sandesh.log())
self._msg_stats.update_tx_drop_stats(tx_sandesh.__class__.__name__,
0)
# end send_sandesh

def send_generator_info(self):
Expand Down Expand Up @@ -650,6 +656,8 @@ def send(self, sandesh=sandesh_global):
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
return -1
Expand Down Expand Up @@ -704,6 +712,8 @@ def request(self, context='', sandesh=sandesh_global):
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
return -1
Expand Down Expand Up @@ -732,6 +742,8 @@ def response(self, context='', more=False, sandesh=sandesh_global):
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
return -1
Expand Down Expand Up @@ -763,6 +775,8 @@ def send(self, isseq=False, seqno=0, context='',
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
return -1
Expand All @@ -773,9 +787,15 @@ def send(self, isseq=False, seqno=0, context='',
uve_type_map = sandesh._uve_type_maps.get_uve_type_map(
self.__class__.__name__)
if uve_type_map is None:
sandesh._logger.error('sandesh uve <%s> not registered: %s'\
% (self.__class__.__name__, self.log()))
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
return -1
self._seqnum = self.next_seqnum()
if not uve_type_map.update_uve(self):
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
sandesh._logger.error('Failed to update sandesh in cache. '
+ self.log())
return -1
Expand Down Expand Up @@ -819,6 +839,8 @@ def send_trace(self, context='', more=False,
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_drop_stats(self.__class__.__name__,
0)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
return -1
Expand Down
22 changes: 15 additions & 7 deletions library/python/pysandesh/sandesh_client.py
Expand Up @@ -45,31 +45,39 @@ def send_sandesh(self, sandesh):
self._connection.session().enqueue_sandesh(sandesh)
else:
if (self._connection.session() is None):
self._logger.log(
SandeshLogger.get_py_logger_level(sandesh.level()),
"No Connection: %s" % sandesh.log())
error_str = "No Connection"
self._sandesh_instance.msg_stats().update_tx_drop_stats(
sandesh.__class__.__name__, 0)
else:
self._logger.log(
SandeshLogger.get_py_logger_level(sandesh.level()),
"No ModuleId: %s" % sandesh.log())
error_str = "No ModuleId"
self._sandesh_instance.msg_stats().update_tx_drop_stats(
sandesh.__class__.__name__, 0)
if self._sandesh_instance.is_logging_dropped_allowed(sandesh):
self._logger.error(
"SANDESH: %s: %s" % (error_str, sandesh.log()))
return 0
#end send_sandesh

def send_uve_sandesh(self, uve_sandesh):
self._connection.statemachine().on_sandesh_uve_msg_send(uve_sandesh)
#end send_uve_sandesh

def handle_sandesh_msg(self, sandesh_name, sandesh_xml):
def handle_sandesh_msg(self, sandesh_name, sandesh_xml, msg_len):
transport = TTransport.TMemoryBuffer(sandesh_xml)
protocol_factory = TXMLProtocol.TXMLProtocolFactory()
protocol = protocol_factory.getProtocol(transport)
sandesh_req = self._sandesh_instance.get_sandesh_request_object(sandesh_name)
if sandesh_req:
if sandesh_req.read(protocol) == -1:
self._sandesh_instance.update_rx_drop_stats(sandesh_name,
msg_len)
self._logger.error('Failed to decode sandesh request "%s"' \
% (sandesh_name))
else:
self._sandesh_instance.update_rx_stats(sandesh_name, msg_len)
self._sandesh_instance.enqueue_sandesh_request(sandesh_req)
else:
self._sandesh_instance.update_rx_drop_stats(sandesh_name, msg_len)
#end handle_sandesh_msg

def handle_sandesh_ctrl_msg(self, sandesh_ctrl_msg):
Expand Down
20 changes: 13 additions & 7 deletions library/python/pysandesh/sandesh_connection.py
Expand Up @@ -8,13 +8,12 @@

import gevent
import os
from sandesh_session import SandeshSession
from sandesh_session import SandeshReader
from sandesh_state_machine import SandeshStateMachine, Event
from transport import TTransport
from protocol import TXMLProtocol
from gen_py.sandesh.constants import *
from sandesh_session import SandeshSession, SandeshReader
from sandesh_state_machine import SandeshStateMachine, Event
from sandesh_uve import SandeshUVETypeMaps
from gen_py.sandesh.constants import *

class SandeshConnection(object):

Expand Down Expand Up @@ -147,13 +146,15 @@ def _handle_collector_update(self, collector_info):
def _receive_sandesh_msg(self, session, msg):
(hdr, hdr_len, sandesh_name) = SandeshReader.extract_sandesh_header(msg)
if sandesh_name is None:
self._sandesh_instance.msg_stats().update_rx_drop_stats(
'__UNKNOWN__', len(msg))
self._logger.error('Failed to decode sandesh header for "%s"' % (msg))
return
# update the sandesh rx stats
self._sandesh_instance.stats().update_stats(sandesh_name, len(msg), False)
if hdr.Hints & SANDESH_CONTROL_HINT:
self._logger.debug('Received sandesh control message [%s]' % (sandesh_name))
if sandesh_name != 'SandeshCtrlServerToClient':
self._sandesh_instance.msg_stats().update_rx_drop_stats(
sandesh_name, len(msg))
self._logger.error('Invalid sandesh control message [%s]' % (sandesh_name))
return
transport = TTransport.TMemoryBuffer(msg[hdr_len:])
Expand All @@ -162,13 +163,18 @@ def _receive_sandesh_msg(self, session, msg):
from gen_py.sandesh_ctrl.ttypes import SandeshCtrlServerToClient
sandesh_ctrl_msg = SandeshCtrlServerToClient()
if sandesh_ctrl_msg.read(protocol) == -1:
self._sandesh_instance.msg_stats().update_rx_drop_stats(
sandesh_name, len(msg))
self._logger.error('Failed to decode sandesh control message "%s"' %(msg))
else:
self._sandesh_instance.msg_stats().update_rx_stats(
sandesh_name, len(msg))
self._state_machine.on_sandesh_ctrl_msg_receive(session, sandesh_ctrl_msg,
hdr.Source)
else:
self._logger.debug('Received sandesh message [%s]' % (sandesh_name))
self._client.handle_sandesh_msg(sandesh_name, msg[hdr_len:])
self._client.handle_sandesh_msg(sandesh_name,
msg[hdr_len:], len(msg))
#end _receive_sandesh_msg

#end class SandeshConnection
34 changes: 11 additions & 23 deletions library/python/pysandesh/sandesh_req_impl.py
Expand Up @@ -44,7 +44,7 @@ def __init__(self, sandesh):
SandeshLoggingParamsStatus.handle_request = \
self.sandesh_logging_params_status_handle_request
SandeshMessageStatsReq.handle_request = \
self.sandesh_stats_handle_request
self.sandesh_msg_stats_handle_request
SandeshTraceBufferListRequest.handle_request = \
self.sandesh_trace_buffer_list_request_handle_request
SandeshTraceRequest.handle_request = \
Expand Down Expand Up @@ -169,28 +169,16 @@ def sandesh_alarm_types_req_handle_request(self, sandesh_req):
alarm_types_res.response(sandesh_req.context())
# end sandesh_alarm_types_req_handle_request

def sandesh_stats_handle_request(self, sandesh_req):
sandesh_stats = self._sandesh.stats()
stats_map = sandesh_stats.stats_map()
stats_list = []
for sandesh_name, stats in stats_map.iteritems():
msg_stat = SandeshMessageStats(stats.tx_count,
stats.tx_bytes,
stats.rx_count,
stats.rx_bytes)
mtype_stat = SandeshMessageTypeStats(sandesh_name,
msg_stat)
stats_list.append(mtype_stat)

def sandesh_msg_stats_handle_request(self, sandesh_req):
sandesh_msg_stats = self._sandesh.msg_stats()
msg_type_stats = sandesh_msg_stats.message_type_stats()
msg_stats_list = []
for msg_type, stats in msg_type_stats.iteritems():
mtype_stat = SandeshMessageTypeStats(msg_type, stats)
msg_stats_list.append(mtype_stat)
gen_stats = SandeshGeneratorStats()
gen_stats.type_stats = stats_list
gen_stats.aggregate_stats = SandeshMessageStats()
gen_stats.aggregate_stats.messages_sent = sandesh_stats._sandesh_sent
gen_stats.aggregate_stats.bytes_sent = sandesh_stats._bytes_sent
gen_stats.aggregate_stats.messages_received = \
sandesh_stats._sandesh_received
gen_stats.aggregate_stats.bytes_received = \
sandesh_stats._bytes_received
gen_stats.type_stats = msg_stats_list
gen_stats.aggregate_stats = sandesh_msg_stats.aggregate_stats()
connection = self._sandesh.client().connection()
if connection and connection.session():
session = connection.session()
Expand All @@ -203,7 +191,7 @@ def sandesh_stats_handle_request(self, sandesh_req):
gen_stats.send_queue_stats = send_queue_stats
stats_resp = SandeshMessageStatsResp(gen_stats)
stats_resp.response(sandesh_req.context())
# end sandesh_stats_handle_request
# end sandesh_msg_stats_handle_request

def sandesh_trace_buffer_list_request_handle_request(self, sandesh_req):
tbuf_info_list = [SandeshTraceBufInfo(trace_buf_name=tbuf)
Expand Down
23 changes: 14 additions & 9 deletions library/python/pysandesh/sandesh_session.py
Expand Up @@ -144,7 +144,7 @@ def __init__(self, session):
# Public functions

@staticmethod
def encode_sandesh(sandesh):
def encode_sandesh(sandesh, sandesh_instance=None):
transport = TTransport.TMemoryBuffer()
protocol_factory = TXMLProtocol.TXMLProtocolFactory()
protocol = protocol_factory.getProtocol(transport)
Expand All @@ -166,11 +166,15 @@ def encode_sandesh(sandesh):
sandesh.instance_id())
# write the sandesh header
if sandesh_hdr.write(protocol) < 0:
print 'Error in encoding sandesh header'
if sandesh_instance is not None:
sandesh_instance.msg_stats().update_tx_drop_stats(
sandesh.__class__.__name__, 0)
return None
# write the sandesh
if sandesh.write(protocol) < 0:
print 'Error in encoding sandesh'
if sandesh_instance is not None:
sandesh_instance.msg_stats().update_tx_drop_stats(
sandesh.__class__.__name__, 0)
return None
# get the message
msg = transport.getvalue()
Expand All @@ -191,9 +195,8 @@ def send_msg(self, sandesh, more):
self._logger.error('Failed to send sandesh')
return -1
# update sandesh tx stats
sandesh_name = sandesh.__class__.__name__
self._sandesh_instance.stats().update_stats(
sandesh_name, len(send_buf), True)
self._sandesh_instance.msg_stats().update_tx_stats(
sandesh.__class__.__name__, len(send_buf))
if more:
self._send_msg_more(send_buf)
else:
Expand Down Expand Up @@ -317,9 +320,11 @@ def _send_sandesh(self, sandesh):
else:
more = True
if not self._connected:
self._logger.log(
SandeshLogger.get_py_logger_level(sandesh.level()),
sandesh.log())
if self._sandesh_instance.is_logging_dropped_allowed(sandesh):
self._logger.error(
"SANDESH: %s: %s" % ("Not connected", sandesh.log()))
self._sandesh_instance.msg_stats().update_tx_drop_stats(
sandesh.__class__.__name__, 0)
return
if sandesh.is_logging_allowed(self._sandesh_instance):
self._logger.log(
Expand Down
2 changes: 2 additions & 0 deletions library/python/pysandesh/sandesh_state_machine.py
Expand Up @@ -453,6 +453,8 @@ def _dequeue_event(self, event):
if self._fsm.current == State._ESTABLISHED or self._fsm.current == State._CLIENT_INIT:
self._connection.handle_sandesh_uve_msg(event.msg)
else:
self._connection.sandesh_instance().msg_stats().update_tx_drop_stats(
event.msg.__class__.__name__, 0)
self._logger.info("Discarding event[%s] in state[%s]" \
% (event.event, self._fsm.current))
elif event.event == Event._EV_SANDESH_CTRL_MESSAGE_RECV and \
Expand Down

0 comments on commit 132000a

Please sign in to comment.