Skip to content

Commit

Permalink
This fix adds sandesh systemlog message throttling support.
Browse files Browse the repository at this point in the history
We are throttling the message on the send side with a circular
buffer of certail value. The size of the buffer would be the
messages/second that can be sent by the application.
If application sends more messages than the configured rate,
it will be dropped and a syslog message is recorded for each
overshoot.Following logic is added in the send part for both
c++ and python:

1. Record the current time in curr_time
2. If CB is full:
      2.1. Compare time of the next free buffer with current time
      2.2. If difference is more than a sec, then enque else drop
3. enqueue in CB

Currently the CB size cannot be changed midway
Partial-Bug: 1469414

Change-Id: I2817ed643f1cd02ca916332c0d4f09e3f7b5f2c5
  • Loading branch information
arvindvis committed Sep 1, 2015
1 parent 90e2637 commit fbe1e94
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 32 deletions.
184 changes: 166 additions & 18 deletions compiler/generate/t_cpp_generator.cc

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions compiler/generate/t_py_generator.cc
Expand Up @@ -513,7 +513,8 @@ string t_py_generator::render_sandesh_includes() {
"from pysandesh.sandesh_http import SandeshHttp\n"
"from pysandesh.sandesh_uve import SandeshUVETypeMaps\n"
"from pysandesh.util import UTCTimestampUsec, UTCTimestampUsecToString\n"
"from pysandesh.gen_py.sandesh.constants import *\n";
"from pysandesh.gen_py.sandesh.constants import *\n"
"import collections\n";
}
return sandesh_includes;
}
Expand Down Expand Up @@ -1406,7 +1407,13 @@ void t_py_generator::generate_py_sandesh_definition(ofstream& out,
} else {
indent(out) << "thrift_spec = None" << endl << endl;
}


if (sandesh_type->is_sandesh_system()) {
indent(out) << "rate_limit_buffer = collections.deque(maxlen=sandesh_base."
"SandeshSystem._DEFAULT_SEND_RATELIMIT)" << endl << endl;
indent(out) << "do_rate_limit_drop_log = True" << endl << endl;
}

{
out <<
indent() << "def __init__(self";
Expand Down
2 changes: 2 additions & 0 deletions library/common/sandesh.sandesh
Expand Up @@ -60,6 +60,7 @@ enum SandeshLevel {
enum SandeshTxDropReason {
MinDropReason = 0,
NoDrop,
RatelimitDrop,
ValidationFailed,
QueueLevel,
NoClient,
Expand Down Expand Up @@ -87,6 +88,7 @@ enum SandeshRxDropReason {
const i32 SANDESH_KEY_HINT = 0x1
const i32 SANDESH_CONTROL_HINT = 0x2
const i32 SANDESH_SYNC_HINT = 0x4
const u32 DEFAULT_SANDESH_SEND_RATELIMIT = 100

// Update ParseHeader function in sandesh_message_builder.cc
// when modifying SandeshHeader below
Expand Down
2 changes: 2 additions & 0 deletions library/common/sandesh_uve.sandesh
Expand Up @@ -31,6 +31,7 @@ struct SandeshMessageStats {
58: u64 messages_sent_dropped_write_failed;
59: u64 messages_sent_dropped_wrong_client_sm_state;
60: u64 messages_sent_dropped_validation_failed;
61: u64 messages_sent_dropped_rate_limited;
// Bytes
81: u64 bytes_sent_dropped_no_queue;
82: u64 bytes_sent_dropped_no_client;
Expand All @@ -42,6 +43,7 @@ struct SandeshMessageStats {
88: u64 bytes_sent_dropped_write_failed;
89: u64 bytes_sent_dropped_wrong_client_sm_state;
90: u64 bytes_sent_dropped_validation_failed;
91: u64 bytes_sent_dropped_rate_limited;
// Receive
// Messages
101: u64 messages_received_dropped_no_queue;
Expand Down
35 changes: 26 additions & 9 deletions library/cpp/sandesh.cc
Expand Up @@ -67,6 +67,7 @@ log4cplus::Logger Sandesh::logger_ =
log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("SANDESH"));

Sandesh::ModuleContextMap Sandesh::module_context_;
tbb::atomic<uint32_t> Sandesh::sandesh_send_ratelimit_;

const char * Sandesh::SandeshRoleToString(SandeshRole::type role) {
switch (role) {
Expand Down Expand Up @@ -139,6 +140,11 @@ void Sandesh::Initialize(SandeshRole::type role,
instance_id_ = instance_id;
client_context_ = client_context;
event_manager_ = evm;
//If Sandesh::sandesh_send_ratelimit_ is not defined by client,
// assign a default value to it
if (get_send_rate_limit() == 0) {
set_send_rate_limit(g_sandesh_constants.DEFAULT_SANDESH_SEND_RATELIMIT);
}

InitReceive(Task::kTaskInstanceAny);
http_port_ = SandeshHttp::Init(evm, module, http_port, &SandeshHttpCallback);
Expand Down Expand Up @@ -565,8 +571,8 @@ int32_t Sandesh::ReceiveBinaryMsg(u_int8_t *buf, u_int32_t buf_len,

bool Sandesh::HandleTest() {
// Handle unit test scenario
if (IsUnitTest() || IsLevelUT()) {
if (IsLevelCategoryLoggingAllowed()) {
if (IsUnitTest() || IsLevelUT(level_)) {
if (IsLevelCategoryLoggingAllowed(level_, category_)) {
ForcedLog();
}
Release();
Expand Down Expand Up @@ -678,23 +684,25 @@ bool SandeshRequest::Enqueue(SandeshRxQueue *queue) {
return true;
}

bool Sandesh::IsLevelUT() {
return level_ >= SandeshLevel::UT_START &&
level_ <= SandeshLevel::UT_END;
bool Sandesh::IsLevelUT(SandeshLevel::type level) {
return level >= SandeshLevel::UT_START &&
level <= SandeshLevel::UT_END;
}

bool Sandesh::IsLevelCategoryLoggingAllowed() const {
bool level_allowed = logging_level_ >= level_;
bool Sandesh::IsLevelCategoryLoggingAllowed(SandeshLevel::type level,
const std::string& category) {
bool level_allowed = logging_level_ >= level;
bool category_allowed = !logging_category_.empty() ?
logging_category_ == category_ : true;
logging_category_ == category : true;
return level_allowed && category_allowed;
}

bool Sandesh::IsLoggingAllowed() const {
if (type_ == SandeshType::FLOW) {
return enable_flow_log_;
} else {
return IsLocalLoggingEnabled() && IsLevelCategoryLoggingAllowed();
return IsLocalLoggingEnabled() &&
IsLevelCategoryLoggingAllowed(level_, category_);
}
}

Expand Down Expand Up @@ -811,3 +819,12 @@ void Sandesh::set_module_context(const std::string &module_name,
result.first->second = context;
}
}

bool SandeshSystem::HandleTest(SandeshLevel::type level,
const std::string& category) {
// Handle unit test scenario
if (IsUnitTest() || IsLevelUT(level)) {
return true;
}
return false;
}
13 changes: 10 additions & 3 deletions library/cpp/sandesh.h
Expand Up @@ -288,6 +288,10 @@ class Sandesh {
static const char* LevelToString(SandeshLevel::type level);
static SandeshLevel::type StringToLevel(std::string level);
static log4cplus::Logger& logger() { return logger_; }
static void set_send_rate_limit(uint32_t rate_limit) {
sandesh_send_ratelimit_ = rate_limit;
}
static uint32_t get_send_rate_limit() { return sandesh_send_ratelimit_; }

protected:
void set_timestamp(time_t timestamp) { timestamp_ = timestamp; }
Expand Down Expand Up @@ -327,6 +331,9 @@ class Sandesh {
}
static SandeshCallback response_callback_;
static SandeshClient *client_;
static bool IsLevelUT(SandeshLevel::type level);
static bool IsLevelCategoryLoggingAllowed(SandeshLevel::type level,
const std::string& category);

private:
friend class SandeshTracePerfTest;
Expand All @@ -347,9 +354,6 @@ class Sandesh {
unsigned short http_port,
SandeshContext *client_context = NULL);

bool IsLevelUT();
bool IsLevelCategoryLoggingAllowed() const;

static SandeshRole::type role_;
static std::string module_;
static std::string source_;
Expand Down Expand Up @@ -383,6 +387,7 @@ class Sandesh {
SandeshLevel::type level_;
std::string category_;
std::string name_;
static tbb::atomic<uint32_t> sandesh_send_ratelimit_;
};

#define SANDESH_LOG(_Level, _Msg) \
Expand Down Expand Up @@ -459,6 +464,8 @@ class SandeshSystem : public Sandesh {
protected:
SandeshSystem(const std::string& name, uint32_t seqno) :
Sandesh(SandeshType::SYSTEM, name, seqno) {}
static bool HandleTest(SandeshLevel::type level, const
std::string& category);
};

class SandeshObject : public Sandesh {
Expand Down
4 changes: 4 additions & 0 deletions library/cpp/sandesh_statistics.cc
Expand Up @@ -42,6 +42,10 @@ static void UpdateSandeshMessageStatsDrops(SandeshMessageStats *smstats,
smstats->messages_sent_dropped_validation_failed++;
smstats->bytes_sent_dropped_validation_failed += bytes;
break;
case SandeshTxDropReason::RatelimitDrop:
smstats->messages_sent_dropped_rate_limited++;
smstats->bytes_sent_dropped_rate_limited += bytes;
break;
case SandeshTxDropReason::QueueLevel:
smstats->messages_sent_dropped_queue_level++;
smstats->bytes_sent_dropped_queue_level += bytes;
Expand Down
94 changes: 94 additions & 0 deletions library/cpp/test/sandesh_message_test.cc
Expand Up @@ -59,6 +59,61 @@ void SandeshRequestTest1::HandleRequest() const {

namespace {

class SandeshSendRatelimitTest : public ::testing::Test {
protected:
SandeshSendRatelimitTest() {
}

virtual void SetUp() {
msg_num_ = 0 ;
asyncserver_done = false;
evm_.reset(new EventManager());
server_ = new SandeshServerTest(evm_.get(),
boost::bind(&SandeshSendRatelimitTest::ReceiveSandeshMsg, this, _1, _2));
thread_.reset(new ServerThread(evm_.get()));
}

virtual void TearDown() {
task_util::WaitForIdle();
Sandesh::Uninit();
task_util::WaitForIdle();
TASK_UTIL_EXPECT_FALSE(server_->HasSessions());
task_util::WaitForIdle();
server_->Shutdown();
task_util::WaitForIdle();
TcpServerManager::DeleteServer(server_);
task_util::WaitForIdle();
evm_->Shutdown();
if (thread_.get() != NULL) {
thread_->Join();
}
task_util::WaitForIdle();
}

bool ReceiveSandeshMsg(SandeshSession *session,
const SandeshMessage *msg) {
const SandeshHeader &header(msg->GetHeader());
const SandeshXMLMessage *xmsg =
dynamic_cast<const SandeshXMLMessage *>(msg);
EXPECT_TRUE(xmsg != NULL);
std::string message(xmsg->ExtractMessage());

if (header.get_Type() == SandeshType::UVE) {
return true;
}
msg_num_++;
if (msg_num_ > 13) {
asyncserver_done = true;
}
return true;
}

uint32_t msg_num_;
SandeshServerTest *server_;
std::auto_ptr<ServerThread> thread_;
std::auto_ptr<EventManager> evm_;
};

class SandeshAsyncTest : public ::testing::Test {
protected:
SandeshAsyncTest() {
Expand Down Expand Up @@ -262,6 +317,45 @@ TEST_F(SandeshAsyncTest, Async) {
}
}

TEST_F(SandeshSendRatelimitTest, Buffer) {
server_->Initialize(0);
thread_->Start(); // Must be called after initialization
int port = server_->GetPort();
ASSERT_LT(0, port);
// Connect to the server
Sandesh::set_send_rate_limit(10);
Sandesh::InitGenerator("SandeshSendRatelimitTest-Client", "localhost",
"Test", "Test", evm_.get(),0);

Sandesh::ConnectToCollector("127.0.0.1", port);
EXPECT_TRUE(Sandesh::IsConnectToCollectorEnabled());
EXPECT_TRUE(Sandesh::client() != NULL);
TASK_UTIL_EXPECT_TRUE(Sandesh::client()->state() == SandeshClientSM::ESTABLISHED);
Sandesh::SetLoggingParams(true, "SystemLogTest", SandeshLevel::SYS_INFO);
boost::ptr_map<std::string, SandeshMessageTypeStats> type_stats;
SandeshMessageStats agg_stats;
Sandesh::GetMsgStats(&type_stats, &agg_stats);
boost::ptr_map<std::string, SandeshMessageTypeStats>::iterator it;
it = type_stats.find("SystemLogTest");
//Initialize dropped rate to 0
it->second->stats.messages_sent_dropped_rate_limited = 0;
/*
* First 10 out of the 15 msgs will be recived.messages 15-20 should albe received
*/
for (int cnt = 0; cnt < 20; cnt++) {
SystemLogTest::Send("SystemLogTest", SandeshLevel::SYS_INFO, "Test", cnt, 100, "sat1string100");
if (cnt == 15) {
sleep(1);
}
}
//Allow all messages to be recieved
sleep(1);
Sandesh::GetMsgStats(&type_stats, &agg_stats);
it = type_stats.find("SystemLogTest");
TASK_UTIL_EXPECT_TRUE(msg_num_ == 20 -
it->second->stats.messages_sent_dropped_rate_limited);
}

class SandeshUVEAlarmTest : public ::testing::Test {
protected:
SandeshUVEAlarmTest() {
Expand Down
55 changes: 55 additions & 0 deletions library/python/pysandesh/sandesh_base.py
Expand Up @@ -15,6 +15,9 @@
import sandesh_logger as sand_logger
import trace
import util
import collections
import time
import copy

from gen_py.sandesh.ttypes import SandeshType, SandeshLevel, \
SandeshTxDropReason
Expand Down Expand Up @@ -705,18 +708,70 @@ def send(self, sandesh=sandesh_global):
sandesh._logger.error('sandesh "%s" validation failed [%s]'
% (self.__class__.__name__, e))
return -1
#If systemlog message first check if the transmit side buffer
# has space
if self._type == SandeshType.SYSTEM:
if (not self.is_rate_limit_pass(sandesh)):
return -1
self._seqnum = self.next_seqnum()
if self.handle_test(sandesh):
return 0
sandesh.send_sandesh(self)
return 0

def is_rate_limit_pass(self,sandesh):
#If buffer size 0 return
if self.__class__.rate_limit_buffer.maxlen == 0:
return False
#Check if buffer resize is reqd
if (self.__class__.rate_limit_buffer.maxlen != \
SandeshSystem.get_sandesh_send_rate_limit()):
temp_buffer = copy.deepcopy(self.__class__.rate_limit_buffer)
self.__class__.rate_limit_buffer = collections.deque(temp_buffer, \
maxlen=SandeshSystem.get_sandesh_send_rate_limit())
del temp_buffer
cur_time=int(time.time())
#Check if circular buffer is full
if len(self.__class__.rate_limit_buffer) == \
self.__class__.rate_limit_buffer.maxlen :
# Read the element in buffer and compare with cur_time
if(self.__class__.rate_limit_buffer[0] == cur_time):
#Sender generating more messages/sec than the
#buffer_threshold size
sandesh.msg_stats().update_tx_stats(self.__class__.__name__,
0, SandeshTxDropReason.RatelimitDrop)
if self.__class__.do_rate_limit_drop_log:
sandesh._logger.error('SANDESH: Ratelimit Drop ' \
'(%d messages/sec): for %s' % \
(self.__class__.rate_limit_buffer.maxlen, \
self.__class__.__name__))
#Disable logging
self.__class__.do_rate_limit_drop_log = False
return False
#If logging is disabled enable it
self.__class__.do_rate_limit_drop_log = True
self.__class__.rate_limit_buffer.append(cur_time)
return True

# end send

# end class SandeshAsync


class SandeshSystem(SandeshAsync):

_DEFAULT_SEND_RATELIMIT = DEFAULT_SANDESH_SEND_RATELIMIT

@classmethod
def set_sandesh_send_rate_limit(cls, sandesh_send_rate_limit_val):
cls._DEFAULT_SEND_RATELIMIT = sandesh_send_rate_limit_val
# end set_sandesh_send_rate_limit

@classmethod
def get_sandesh_send_rate_limit(cls):
return cls._DEFAULT_SEND_RATELIMIT
# end get_sandesh_send_rate_limit

def __init__(self):
SandeshAsync.__init__(self)
self._type = SandeshType.SYSTEM
Expand Down
7 changes: 7 additions & 0 deletions library/python/pysandesh/sandesh_stats.py
Expand Up @@ -147,6 +147,13 @@ def _update_tx_stats_internal(self, msg_stats, nbytes, drop_reason):
else:
msg_stats.messages_sent_dropped_wrong_client_sm_state = 1
msg_stats.bytes_sent_dropped_wrong_client_sm_state = nbytes
elif drop_reason is SandeshTxDropReason.RatelimitDrop:
if msg_stats.messages_sent_dropped_rate_limited:
msg_stats.messages_sent_dropped_rate_limited += 1
msg_stats.bytes_sent_dropped_rate_limited += nbytes
else:
msg_stats.messages_sent_dropped_rate_limited = 1
msg_stats.bytes_sent_dropped_rate_limited = nbytes
else:
assert 0, 'Unhandled Tx drop reason <%s>' % (str(drop_reason))
# end _update_tx_stats_internal
Expand Down

0 comments on commit fbe1e94

Please sign in to comment.