Skip to content

Commit

Permalink
Added size based watermarks in pysandesh library
Browse files Browse the repository at this point in the history
- Added size based watermarks for sandesh send queue.
- python sandesh compiler would generate a function named __sizeof__()
  for each structure/sandesh defined in .sandesh
  If a python object has __sizeof__() defined, then sys.getsizeof()
  would call this function to get the size of the object.

TODO:
 - Add UT for auto-generated function __sizeof__()

Change-Id: I5ebbb67ad56e4ed9d65e4296eafa43ad7fda94f4
Partial-Bug: #1484800
  • Loading branch information
Sundaresan Rajangam committed Sep 8, 2015
1 parent 56c4adc commit 74205ef
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 92 deletions.
53 changes: 52 additions & 1 deletion compiler/generate/t_py_generator.cc
Expand Up @@ -134,6 +134,7 @@ class t_py_generator : public t_generator {
void generate_py_function_helpers(t_function* tfunction);
#ifdef SANDESH
void generate_py_struct_logger(std::ofstream& out, t_struct* tstruct);
void generate_py_struct_sizeof(std::ofstream& out, t_struct* tstruct);
#endif

/**
Expand Down Expand Up @@ -166,6 +167,11 @@ class t_py_generator : public t_generator {
void generate_py_sandesh_alarm_list(std::ofstream& out);
void generate_py_sandesh_alarm_data_list(std::ofstream& out);
void generate_py_sandesh_trace(std::ofstream& out, t_sandesh* tsandesh);
void generate_py_sandesh_sizeof(std::ofstream& out, t_sandesh* tsandesh);
#endif

#ifdef SANDESH
void generate_py_sizeof(std::ofstream& out, const vector<t_field*>& fields);
#endif

/**
Expand Down Expand Up @@ -505,7 +511,9 @@ string t_py_generator::render_sandesh_includes() {
std::string sandesh_includes("\n");
sandesh_includes +=
"import cStringIO\n"
"import uuid\n";
"import uuid\n"
"from sys import getsizeof\n"
"from itertools import chain\n";
if (module != "sandesh") {
sandesh_includes +=
"import bottle\n"
Expand Down Expand Up @@ -965,6 +973,7 @@ void t_py_generator::generate_py_struct_definition(ofstream& out,

#ifdef SANDESH
generate_py_struct_logger(out, tstruct);
generate_py_struct_sizeof(out, tstruct);
#endif

// For exceptions only, generate a __str__ method. This is
Expand Down Expand Up @@ -1521,6 +1530,7 @@ void t_py_generator::generate_py_sandesh_definition(ofstream& out,
}

generate_py_sandesh_compare(out, tsandesh);
generate_py_sandesh_sizeof(out, tsandesh);

if (!gen_slots_) {
// Printing utilities so that on the command line thrift
Expand Down Expand Up @@ -2246,6 +2256,47 @@ void t_py_generator::generate_py_struct_logger(ofstream &out,
indent_down();
}

void t_py_generator::generate_py_sizeof(std::ofstream& out,
const vector<t_field*>& fields) {
indent(out) << "def __sizeof__(self):" << endl;
indent_up();
indent(out) << "size = 0" << endl;
vector<t_field*>::const_iterator f_iter;
for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
std::string fname((*f_iter)->get_name());
indent(out) << "if self." << fname << " is not None:" << endl;
indent_up();
indent(out) << "size += getsizeof(self." << fname << ")" << endl;
t_type* type = get_true_type((*f_iter)->get_type());
if (type->is_container()) {
if (type->is_map()) {
indent(out) <<
"size += sum(map(getsizeof, chain.from_iterable(self." <<
fname << ")))" << endl;

} else {
indent(out) <<
"size += sum(map(getsizeof, self." << fname << "))" << endl;
}
}
indent_down();
}
indent(out) << "return size" << endl << endl;
indent_down();
}

void t_py_generator::generate_py_struct_sizeof(std::ofstream& out,
t_struct* tstruct) {
const vector<t_field*>& fields = tstruct->get_members();
generate_py_sizeof(out, fields);
}

void t_py_generator::generate_py_sandesh_sizeof(std::ofstream& out,
t_sandesh* tsandesh) {
const vector<t_field*>& fields = tsandesh->get_members();
generate_py_sizeof(out, fields);
}

void t_py_generator::generate_py_sandesh_hint(ofstream& out,
t_sandesh* tsandesh) {
bool has_key_annotation = tsandesh->has_key_annotation();
Expand Down
96 changes: 55 additions & 41 deletions library/python/pysandesh/sandesh_base.py
Expand Up @@ -5,18 +5,21 @@
#
# Sandesh
#

import importlib
import os
import sys
import pkgutil

import gevent
import sandesh_logger as sand_logger
import trace
import util
import json
import base64
import collections
import time
import copy

import sandesh_logger as sand_logger
import trace
import util
from gen_py.sandesh.ttypes import SandeshType, SandeshLevel, \
SandeshTxDropReason
from gen_py.sandesh.constants import *
Expand Down Expand Up @@ -76,6 +79,7 @@ def init_generator(self, module, source, node_type, instance_id,
self._collectors = collectors
self._connect_to_collector = connect_to_collector
self._rcv_queue = WorkQueue(self._process_rx_sandesh)
self._send_level = SandeshLevel.INVALID
self._init_logger(module, logger_class=logger_class,
logger_config_file=logger_config_file)
self._logger.info('SANDESH: CONNECT TO COLLECTOR: %s',
Expand Down Expand Up @@ -209,6 +213,18 @@ def set_send_queue(self, enable):
connection.session().send_queue().may_be_start_runner()
# end set_send_queue

def set_send_level(self, count, sandesh_level):
if self._send_level != sandesh_level:
self._logger.info('Sandesh Send Level [%s] -> [%s]' % \
(SandeshLevel._VALUES_TO_NAMES[self._send_level],
SandeshLevel._VALUES_TO_NAMES[sandesh_level]))
self._send_level = sandesh_level
# end set_send_level

def send_level(self):
return self._send_level
# end send_level

def init_collector(self):
pass
# end init_collector
Expand Down Expand Up @@ -327,7 +343,7 @@ def is_unit_test(self):

def handle_test(self, sandesh_init):
if sandesh_init.is_unit_test() or self._is_level_ut():
if self._is_logging_allowed(sandesh_init):
if self.is_logging_allowed(sandesh_init):
sandesh_init._logger.debug(self.log())
return True
return False
Expand Down Expand Up @@ -360,17 +376,26 @@ def send_sandesh(self, tx_sandesh):
self._client.send_sandesh(tx_sandesh)
else:
if self._connect_to_collector:
if self.is_logging_dropped_allowed(tx_sandesh):
self._logger.error('SANDESH: No Client: %s',
tx_sandesh.log())
self.drop_tx_sandesh(tx_sandesh, SandeshTxDropReason.NoClient,
tx_sandesh.level())
else:
self._logger.log(
sand_logger.SandeshLogger.get_py_logger_level(
tx_sandesh.level()), tx_sandesh.log())
self._msg_stats.update_tx_stats(tx_sandesh.__class__.__name__,
0, SandeshTxDropReason.NoClient)
self.drop_tx_sandesh(tx_sandesh, SandeshTxDropReason.NoClient)
# end send_sandesh

def drop_tx_sandesh(self, tx_sandesh, drop_reason, level=None):
self._msg_stats.update_tx_stats(tx_sandesh.__class__.__name__,
sys.getsizeof(tx_sandesh), drop_reason)
if self.is_logging_dropped_allowed(tx_sandesh):
if level is not None:
self._logger.log(
sand_logger.SandeshLogger.get_py_logger_level(level),
tx_sandesh.log())
else:
self._logger.error('SANDESH: [DROP: %s] %s' % \
(SandeshTxDropReason._VALUES_TO_NAMES[drop_reason],
tx_sandesh.log()))
# end drop_tx_sandesh

def send_generator_info(self):
from gen_py.sandesh_uve.ttypes import SandeshClientInfo, \
ModuleClientState, SandeshModuleClientTrace
Expand Down Expand Up @@ -690,15 +715,17 @@ def send(self, sandesh=sandesh_global):
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.ValidationFailed)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed)
return -1
if self._level >= sandesh.send_level():
sandesh.drop_tx_sandesh(self, SandeshTxDropReason.QueueLevel)
return -1
#If systemlog message first check if the transmit side buffer
# For systemlog message, first check if the transmit side buffer
# has space
if self._type == SandeshType.SYSTEM:
if (not self.is_rate_limit_pass(sandesh)):
sandesh.drop_tx_sandesh(self,
SandeshTxDropReason.RatelimitDrop)
return -1
self._seqnum = self.next_seqnum()
if self.handle_test(sandesh):
Expand All @@ -725,8 +752,6 @@ def is_rate_limit_pass(self,sandesh):
if(self.__class__.rate_limit_buffer[0] == cur_time):
#Sender generating more messages/sec than the
#buffer_threshold size
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.RatelimitDrop)
if self.__class__.do_rate_limit_drop_log:
sandesh._logger.error('SANDESH: Ratelimit Drop ' \
'(%d messages/sec): for %s' % \
Expand Down Expand Up @@ -798,10 +823,7 @@ def request(self, context='', sandesh=sandesh_global):
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.ValidationFailed)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed)
return -1
if context == 'ctrl':
self._hints |= SANDESH_CONTROL_HINT
Expand All @@ -828,10 +850,7 @@ def response(self, context='', more=False, sandesh=sandesh_global):
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.ValidationFailed)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed)
return -1
self._context = context
self._more = more
Expand Down Expand Up @@ -861,10 +880,7 @@ def send(self, isseq=False, seqno=0, context='',
try:
self.validate()
except e:
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.ValidationFailed)
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed)
return -1
if isseq is True:
self._seqnum = seqno
Expand All @@ -874,16 +890,15 @@ def send(self, isseq=False, seqno=0, context='',
self.__class__.__name__)
if uve_type_map is None:
sandesh._logger.error('sandesh uve <%s> not registered: %s'\
% (self.__class__.__name__, self.log()))
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.ValidationFailed)
% (self.__class__.__name__))
sandesh.drop_tx_sandesh(self,
SandeshTxDropReason.ValidationFailed)
return -1
self._seqnum = self.next_seqnum()
if not uve_type_map.update_uve(self):
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.ValidationFailed)
sandesh._logger.error('Failed to update sandesh in cache. '
+ self.log())
sandesh._logger.error('Failed to update sandesh in cache')
sandesh.drop_tx_sandesh(self,
SandeshTxDropReason.ValidationFailed)
return -1
self._context = context
self._more = more
Expand Down Expand Up @@ -926,8 +941,7 @@ def send_trace(self, context='', more=False,
try:
self.validate()
except e:
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
sandesh.drop_tx_sandesh(self, SandeshTxDropReason.ValidationFailed)
return -1
self._context = context
self._more = more
Expand Down
11 changes: 2 additions & 9 deletions library/python/pysandesh/sandesh_client.py
Expand Up @@ -46,18 +46,11 @@ def send_sandesh(self, sandesh):
self._connection.session().enqueue_sandesh(sandesh)
else:
if (self._connection.session() is None):
error_str = "No Connection"
self._sandesh_instance.msg_stats().update_tx_stats(
sandesh.__class__.__name__, 0,
self._sandesh_instance.drop_tx_sandesh(sandesh,
SandeshTxDropReason.NoSession)
else:
error_str = "No ModuleId"
self._sandesh_instance.msg_stats().update_tx_stats(
sandesh.__class__.__name__, 0,
self._sandesh_instance.drop_tx_sandesh(sandesh,
SandeshTxDropReason.ClientSendFailed)
if self._sandesh_instance.is_logging_dropped_allowed(sandesh):
self._logger.error(
"SANDESH: %s: %s" % (error_str, sandesh.log()))
return 0
#end send_sandesh

Expand Down

0 comments on commit 74205ef

Please sign in to comment.