Skip to content

Commit

Permalink
This fix adds sandesh systemlog message throttling support.
Browse files Browse the repository at this point in the history
We are throttling the message on the send side with a circular
buffer of certail value. The size of the buffer would be the
messages/second that can be sent by the application.
If application sends more messages than the configured rate,
it will be dropped and a syslog message is recorded for each
overshoot.Following logic is added in the send part for both
c++ and python:

1. Record the current time in curr_time
2. If CB is full:
      2.1. Compare time of the next free buffer with current time
      2.2. If difference is more than a sec, then enque else drop
3. enqueue in CB

Currently the CB size cannot be changed midway
Partial-Bug: 1469414

Conflicts:
	compiler/generate/t_cpp_generator.cc
	library/common/sandesh.sandesh
	library/common/sandesh_uve.sandesh
	library/cpp/sandesh.cc
	library/cpp/sandesh.h
	library/cpp/sandesh_statistics.cc
	library/cpp/test/sandesh_message_test.cc
	library/python/pysandesh/sandesh_stats.py

Change-Id: I2817ed643f1cd02ca916332c0d4f09e3f7b5f2c5
  • Loading branch information
arvindvis committed Apr 5, 2016
1 parent c0ba0ed commit 6ef149a
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 8 deletions.
138 changes: 137 additions & 1 deletion compiler/generate/t_cpp_generator.cc
Expand Up @@ -138,6 +138,10 @@ class t_cpp_generator : public t_oop_generator {
std::string generate_sandesh_trace_creator(t_sandesh *tsandesh, bool signature, bool expand_autogen, bool skip_autogen,
std::string prefix, std::string suffix);
void generate_sandesh_updater(ofstream& out, t_sandesh* tsandesh);
void generate_isRatelimitPass(ofstream& out, t_sandesh* tsandesh);
void generate_sandesh_static_rate_limit_log_def(ofstream& out, t_sandesh* tsandesh);
void generate_sandesh_static_rate_limit_mutex_def(ofstream& out, t_sandesh* tsandesh);
void generate_sandesh_static_rate_limit_buffer_def(ofstream& out, t_sandesh* tsandesh);
#endif

void generate_cpp_struct(t_struct* tstruct, bool is_exception);
Expand Down Expand Up @@ -958,6 +962,8 @@ void t_cpp_generator::generate_cpp_sandesh(t_sandesh* tsandesh) {
((t_base_type *)tsandesh->get_type())->is_sandesh_uve();
bool is_alarm =
((t_base_type *)tsandesh->get_type())->is_sandesh_alarm();
bool is_system =
((t_base_type *)tsandesh->get_type())->is_sandesh_system();

generate_sandesh_definition(f_types_, tsandesh);
generate_sandesh_fingerprint(f_types_impl_, tsandesh, true);
Expand All @@ -980,6 +986,12 @@ void t_cpp_generator::generate_cpp_sandesh(t_sandesh* tsandesh) {
if (is_uve || is_alarm) {
generate_sandesh_updater(out,tsandesh);
}
if (is_system) {
generate_sandesh_static_rate_limit_log_def(out, tsandesh);
generate_sandesh_static_rate_limit_mutex_def(out, tsandesh);
generate_sandesh_static_rate_limit_buffer_def(out, tsandesh);
}

}

/**
Expand Down Expand Up @@ -1216,6 +1228,7 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh *
std::string creator_func_name = "Send";
std::string macro_func_name = "Log";
bool is_flow = ((t_base_type *)tsandesh->get_type())->is_sandesh_flow();
bool is_system = ((t_base_type *)tsandesh->get_type())->is_sandesh_system();
// Generate creator
out << indent() << "static void " << creator_func_name <<
generate_sandesh_async_creator(tsandesh, true, false, false, "", "", true, false, false) <<
Expand All @@ -1225,7 +1238,7 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh *
indent_up();
if (!is_flow) {
out << indent() << "if (IsLevelCategoryLoggingAllowed(level, category))"
" {" << endl;
" {" << endl;
indent_up();
out << indent() << "std::string drop_reason = \"\";" << endl;
out << indent() << "DropLog" <<
Expand All @@ -1248,6 +1261,49 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh *
}
out << indent() << "return;" << endl;
scope_down(out);

//Adjust the buffer size if its capacity is different from configured size
if (is_system) {
out << indent() << "if (!HandleTest(level, category)) {" << endl;
indent_up();
out << indent() << "if (!IsRatelimitPass()) {" << endl;
indent_up();
out << indent() << "UpdateSandeshStats(\"" << tsandesh->get_name() << "\", 0, true, true);" <<
endl;
out << indent() << "if (do_rate_limit_drop_log_) {" << endl;
indent_up();
out << indent() << "std::stringstream ratelimit_val;" << endl;
out << indent() << " ratelimit_val << Sandesh::get_send_rate_limit();"
<< endl;
out << indent() << "std::string drop_reason = \"SANDESH: Ratelimit"
" Drop (\" + ratelimit_val.str() + std::string(\" messages"
"/second): \") ;" << endl;
out << indent() << "DropLog" <<
generate_sandesh_async_creator(tsandesh, false, false, false, "",
"", false, false, true) <<
"; " << endl;
out << indent() << "do_rate_limit_drop_log_ = false;" << endl;
scope_down(out);
out << indent() << "return;" << endl;
scope_down(out);
indent_down();
indent(out) << "} else {" << endl;
indent_up();
out << indent() << "if (IsLevelCategoryLoggingAllowed(level,category))"
" {" << endl;
indent_up();
out << indent() << "std::stringstream ratelimit_val;" << endl;
out << indent() << " ratelimit_val << Sandesh::get_send_rate_limit();"
<< endl;
out << indent() << "std::string drop_reason = \"\";" << endl;
out << indent() << "DropLog" <<
generate_sandesh_async_creator(tsandesh, false, false, false, "",
"", false, false, true) << "; " << endl;
scope_down(out);
out << indent() << "return;" << endl;
scope_down(out);
}

out << indent() << tsandesh->get_name() <<
" * snh = new " << tsandesh->get_name() <<
generate_sandesh_no_static_const_string_function(tsandesh, false, false, false, false) <<
Expand Down Expand Up @@ -1296,6 +1352,16 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh *
indent_down();
indent(out) << "}" << endl << endl;
}

// Generate WriteToBuffer
if (is_system) {
out << indent() << "static bool IsRatelimitPass() {" << endl;
indent_up();
generate_isRatelimitPass(out, tsandesh);
indent_down();
indent(out) << "}" << endl << endl;
}

}

std::string t_cpp_generator::generate_sandesh_trace_creator(t_sandesh *tsandesh,
Expand Down Expand Up @@ -1876,6 +1942,15 @@ void t_cpp_generator::generate_sandesh_definition(ofstream& out,
out << indent() << "int32_t Write(" <<
"boost::shared_ptr<contrail::sandesh::protocol::TProtocol> oprot) const;" << endl;

if (((t_base_type *)t)->is_sandesh_system()) {
out << indent() << "static bool do_rate_limit_drop_log_;" << endl;

out << indent() << "static boost::circular_buffer<time_t>"
" rate_limit_buffer_;" << endl;

out << indent() << "static tbb::mutex rate_limit_mutex_;" << endl;
}

out << endl;
indent_down();
indent(out) << "};" << endl << endl;
Expand Down Expand Up @@ -2720,6 +2795,24 @@ void t_cpp_generator::generate_sandesh_static_versionsig_def(ofstream& out,
<< tsandesh->get_4byte_fingerprint() << "U;" << endl << endl;
}

void t_cpp_generator::generate_sandesh_static_rate_limit_buffer_def(
ofstream& out, t_sandesh* tsandesh) {
out << "boost::circular_buffer<time_t> " << tsandesh->get_name() <<
"::rate_limit_buffer_(Sandesh::get_send_rate_limit());" << endl << endl;
}

void t_cpp_generator::generate_sandesh_static_rate_limit_mutex_def(
ofstream& out,t_sandesh* tsandesh) {
out << "tbb::mutex " << tsandesh->get_name() << "::rate_limit_mutex_; "
<< endl << endl;
}

void t_cpp_generator::generate_sandesh_static_rate_limit_log_def(
ofstream& out, t_sandesh* tsandesh) {
out << "bool " << tsandesh->get_name() << "::do_rate_limit_drop_log_ = true;"
<< endl << endl;
}


/**
* Makes a helper function to gen a sandesh reader.
Expand Down Expand Up @@ -3493,6 +3586,49 @@ void t_cpp_generator::generate_sandesh_trace(ofstream& out,
"}" << endl << endl;
}

/*
* Generate IsRatelimitPass function
*
*/
void t_cpp_generator::generate_isRatelimitPass(ofstream& out,
t_sandesh* tsandesh) {
out << indent() << "if (Sandesh::get_send_rate_limit() == 0) {" << endl;
indent_up();
out << indent() << "return false;";
indent_down();
out << indent() << "}" << endl;
out << indent() << "tbb::mutex::scoped_lock lock(rate_limit_mutex_);" << endl;
out << indent() << "if (rate_limit_buffer_.capacity() !="
" Sandesh::get_send_rate_limit()) {" << endl;
indent_up();
out << indent() << "//Resize the buffer to the "
"buffer_threshold_" << endl;
out << indent() << "rate_limit_buffer_.rresize(Sandesh::get_send_rate_limit());"
<< endl;
out << indent() << "rate_limit_buffer_.set_capacity("
"Sandesh::get_send_rate_limit());" << endl;
indent_down();
out << indent() << "}" << endl;
out << indent() << "time_t current_time = time(0);" << endl;
out << indent() << "if (rate_limit_buffer_.capacity() == rate_limit_buffer_"
".size()) {" << endl;
indent_up();
out << indent() << "if (*rate_limit_buffer_.begin() == current_time) {" << endl;
indent_up();
out << indent() << "//update tx and call droplog" << endl;
out << indent() << "//Dont have to log more than once" << endl;
out << indent() << "return false;" << endl;
indent_down();
out << indent() << "}" << endl;
indent_down();
out << indent() << "}" << endl;
out << indent() << "//Should log failure after a sucessful write" << endl;
out << indent() << "do_rate_limit_drop_log_ = true;" << endl;
out << indent() << "rate_limit_buffer_.push_back(current_time);" << endl;
out << indent() << "return true;" << endl;
}


#endif

/**
Expand Down
11 changes: 9 additions & 2 deletions compiler/generate/t_py_generator.cc
Expand Up @@ -513,7 +513,8 @@ string t_py_generator::render_sandesh_includes() {
"from pysandesh.sandesh_http import SandeshHttp\n"
"from pysandesh.sandesh_uve import SandeshUVETypeMaps\n"
"from pysandesh.util import UTCTimestampUsec, UTCTimestampUsecToString\n"
"from pysandesh.gen_py.sandesh.constants import *\n";
"from pysandesh.gen_py.sandesh.constants import *\n"
"import collections\n";
}
return sandesh_includes;
}
Expand Down Expand Up @@ -1406,7 +1407,13 @@ void t_py_generator::generate_py_sandesh_definition(ofstream& out,
} else {
indent(out) << "thrift_spec = None" << endl << endl;
}


if (sandesh_type->is_sandesh_system()) {
indent(out) << "rate_limit_buffer = collections.deque(maxlen=sandesh_base."
"SandeshSystem._DEFAULT_SEND_RATELIMIT)" << endl << endl;
indent(out) << "do_rate_limit_drop_log = True" << endl << endl;
}

{
out <<
indent() << "def __init__(self";
Expand Down
1 change: 1 addition & 0 deletions library/common/sandesh.sandesh
Expand Up @@ -60,6 +60,7 @@ enum SandeshLevel {
const i32 SANDESH_KEY_HINT = 0x1
const i32 SANDESH_CONTROL_HINT = 0x2
const i32 SANDESH_SYNC_HINT = 0x4
const u32 DEFAULT_SANDESH_SEND_RATELIMIT = 100

// Update ParseHeader function in sandesh_message_builder.cc
// when modifying SandeshHeader below
Expand Down
19 changes: 17 additions & 2 deletions library/cpp/sandesh.cc
Expand Up @@ -68,6 +68,7 @@ log4cplus::Logger Sandesh::logger_ =
log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("SANDESH"));

Sandesh::ModuleContextMap Sandesh::module_context_;
tbb::atomic<uint32_t> Sandesh::sandesh_send_ratelimit_;

const char * Sandesh::SandeshRoleToString(SandeshRole::type role) {
switch (role) {
Expand Down Expand Up @@ -140,6 +141,11 @@ bool Sandesh::Initialize(SandeshRole::type role,
instance_id_ = instance_id;
client_context_ = client_context;
event_manager_ = evm;
//If Sandesh::sandesh_send_ratelimit_ is not defined by client,
// assign a default value to it
if (get_send_rate_limit() == 0) {
set_send_rate_limit(g_sandesh_constants.DEFAULT_SANDESH_SEND_RATELIMIT);
}

InitReceive(Task::kTaskInstanceAny);
bool success(SandeshHttp::Init(evm, module, http_port,
Expand Down Expand Up @@ -691,7 +697,7 @@ bool Sandesh::IsLevelUT(SandeshLevel::type level) {
}

bool Sandesh::IsLevelCategoryLoggingAllowed(SandeshLevel::type level,
const std::string &category) {
const std::string& category) {
bool level_allowed = logging_level_ >= level;
bool category_allowed = !logging_category_.empty() ?
logging_category_ == category : true;
Expand All @@ -703,7 +709,7 @@ bool Sandesh::IsLoggingAllowed() const {
return enable_flow_log_;
} else {
return IsLocalLoggingEnabled() &&
IsLevelCategoryLoggingAllowed(level_, category_);
IsLevelCategoryLoggingAllowed(level_, category_);
}
}

Expand Down Expand Up @@ -907,3 +913,12 @@ void Sandesh::set_module_context(const std::string &module_name,
result.first->second = context;
}
}

bool SandeshSystem::HandleTest(SandeshLevel::type level,
const std::string& category) {
// Handle unit test scenario
if (IsUnitTest() || IsLevelUT(level)) {
return true;
}
return false;
}
12 changes: 9 additions & 3 deletions library/cpp/sandesh.h
Expand Up @@ -289,6 +289,10 @@ class Sandesh {
static const char* LevelToString(SandeshLevel::type level);
static SandeshLevel::type StringToLevel(std::string level);
static log4cplus::Logger& logger() { return logger_; }
static void set_send_rate_limit(uint32_t rate_limit) {
sandesh_send_ratelimit_ = rate_limit;
}
static uint32_t get_send_rate_limit() { return sandesh_send_ratelimit_; }

protected:
void set_timestamp(time_t timestamp) { timestamp_ = timestamp; }
Expand Down Expand Up @@ -329,8 +333,9 @@ class Sandesh {
}
static SandeshCallback response_callback_;
static SandeshClient *client_;
static bool IsLevelUT(SandeshLevel::type level);
static bool IsLevelCategoryLoggingAllowed(SandeshLevel::type level,
const std::string &category);
const std::string& category);

private:
friend class SandeshTracePerfTest;
Expand All @@ -351,8 +356,6 @@ class Sandesh {
unsigned short http_port,
SandeshContext *client_context = NULL);

static bool IsLevelUT(SandeshLevel::type level);

static SandeshRole::type role_;
static std::string module_;
static std::string source_;
Expand Down Expand Up @@ -387,6 +390,7 @@ class Sandesh {
SandeshLevel::type level_;
std::string category_;
std::string name_;
static tbb::atomic<uint32_t> sandesh_send_ratelimit_;
};

#define SANDESH_LOG(_Level, _Msg) \
Expand Down Expand Up @@ -463,6 +467,8 @@ class SandeshSystem : public Sandesh {
protected:
SandeshSystem(const std::string& name, uint32_t seqno) :
Sandesh(SandeshType::SYSTEM, name, seqno) {}
static bool HandleTest(SandeshLevel::type level, const
std::string& category);
};

class SandeshObject : public Sandesh {
Expand Down

0 comments on commit 6ef149a

Please sign in to comment.