diff --git a/compiler/generate/t_cpp_generator.cc b/compiler/generate/t_cpp_generator.cc index 0f914806..a46d28a4 100755 --- a/compiler/generate/t_cpp_generator.cc +++ b/compiler/generate/t_cpp_generator.cc @@ -132,12 +132,16 @@ class t_cpp_generator : public t_oop_generator { const vector& fields); std::string generate_sandesh_async_creator(t_sandesh *tsandesh, bool signature, bool expand_autogen, bool skip_autogen, std::string prefix, std::string suffix, bool category_level_file_line_first, - bool autogen_category_level); + bool autogen_category_level, bool drop_log_reason); void generate_sandesh_async_creators(ofstream &out, t_sandesh *tsandesh); void generate_sandesh_uve_creator(std::ofstream& out, t_sandesh* tsandesh); std::string generate_sandesh_trace_creator(t_sandesh *tsandesh, bool signature, bool expand_autogen, bool skip_autogen, std::string prefix, std::string suffix); void generate_sandesh_updater(ofstream& out, t_sandesh* tsandesh); + void generate_isRatelimitPass(ofstream& out, t_sandesh* tsandesh); + void generate_sandesh_static_rate_limit_log_def(ofstream& out, t_sandesh* tsandesh); + void generate_sandesh_static_rate_limit_mutex_def(ofstream& out, t_sandesh* tsandesh); + void generate_sandesh_static_rate_limit_buffer_def(ofstream& out, t_sandesh* tsandesh); #endif void generate_cpp_struct(t_struct* tstruct, bool is_exception); @@ -958,6 +962,8 @@ void t_cpp_generator::generate_cpp_sandesh(t_sandesh* tsandesh) { ((t_base_type *)tsandesh->get_type())->is_sandesh_uve(); bool is_alarm = ((t_base_type *)tsandesh->get_type())->is_sandesh_alarm(); + bool is_system = + ((t_base_type *)tsandesh->get_type())->is_sandesh_system(); generate_sandesh_definition(f_types_, tsandesh); generate_sandesh_fingerprint(f_types_impl_, tsandesh, true); @@ -980,6 +986,12 @@ void t_cpp_generator::generate_cpp_sandesh(t_sandesh* tsandesh) { if (is_uve || is_alarm) { generate_sandesh_updater(out,tsandesh); } + if (is_system) { + generate_sandesh_static_rate_limit_log_def(out, tsandesh); + generate_sandesh_static_rate_limit_mutex_def(out, tsandesh); + generate_sandesh_static_rate_limit_buffer_def(out, tsandesh); + } + } /** @@ -1099,7 +1111,9 @@ std::string t_cpp_generator::generate_sandesh_no_static_const_string_function(t_ std::string t_cpp_generator::generate_sandesh_async_creator(t_sandesh* tsandesh, bool signature, bool expand_autogen, bool skip_autogen, std::string prefix, std::string suffix, - bool category_level_file_line_first, bool autogen_category_level) { + bool category_level_file_line_first, bool autogen_category_level, bool + drop_log_reason) + { string result = ""; string temp = ""; string category_def = prefix + "category" + suffix; @@ -1107,22 +1121,28 @@ std::string t_cpp_generator::generate_sandesh_async_creator(t_sandesh* tsandesh, string category_dec = "std::string " + category_def; string level_dec = "SandeshLevel::type " + level_def; string module_name = "\"\""; + string drop_reason_dec; + string drop_reason_def; bool is_flow = ((t_base_type *)tsandesh->get_type())->is_sandesh_flow(); // Get members vector::const_iterator m_iter; const vector& members = tsandesh->get_members(); bool init_function = false; + if (drop_log_reason) { + drop_reason_def = prefix + "drop_reason" + suffix + ", "; + drop_reason_dec = "const std::string& " + drop_reason_def; + } if (category_level_file_line_first) { if (signature) { - result += "(" + category_dec + ", " + level_dec; + result += "(" + drop_reason_dec + category_dec + ", " + level_dec; if (!is_flow) result += ", std::string file, int32_t line"; } else { if (!autogen_category_level) { - result += "(" + category_def + ", " + level_def; + result += "(" + drop_reason_dec + category_def + ", " + level_def; if (!is_flow) result += ", __FILE__, __LINE__"; } else { - result += "(" + module_name + ", SandeshLevel::SYS_INFO"; + result += "(" + drop_reason_dec + module_name + ", SandeshLevel::SYS_INFO"; if (!is_flow) result += ", __FILE__, __LINE__"; } } @@ -1130,12 +1150,12 @@ std::string t_cpp_generator::generate_sandesh_async_creator(t_sandesh* tsandesh, } else { if (signature) { if (!autogen_category_level) { - result += "(" + category_dec + ", " + level_dec; + result += "(" + drop_reason_dec + category_dec + ", " + level_dec; init_function = true; } } else { if (!autogen_category_level) { - result += "(" + category_def + ", " + level_def; + result += "(" + drop_reason_def + category_def + ", " + level_def; init_function = true; } } @@ -1209,9 +1229,10 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh * std::string creator_func_name = "Send"; std::string macro_func_name = "Log"; bool is_flow = ((t_base_type *)tsandesh->get_type())->is_sandesh_flow(); + bool is_system = ((t_base_type *)tsandesh->get_type())->is_sandesh_system(); // Generate creator out << indent() << "static void " << creator_func_name << - generate_sandesh_async_creator(tsandesh, true, false, false, "", "", true, false) << + generate_sandesh_async_creator(tsandesh, true, false, false, "", "", true, false, false) << " {" << endl; indent_up(); out << indent() << "if (level >= SendingLevel()) {" << endl; @@ -1219,12 +1240,57 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh * out << indent() << "UpdateTxMsgFailStats(\"" << tsandesh->get_name() << "\", 0, SandeshTxDropReason::QueueLevel);" << endl; if (!is_flow) { + out << indent() << "std::string drop_reason = \"SANDESH: Queue Drop:" + " \";" << endl; out << indent() << "DropLog" << - generate_sandesh_async_creator(tsandesh, false, false, false, "", "", false, false) << + generate_sandesh_async_creator(tsandesh, false, false, false, "", "", false, false, true) << "; " << endl; } out << indent() << "return;" << endl; scope_down(out); + + //Adjust the buffer size if its capacity is different from configured size + if (is_system) { + out << indent() << "if (!HandleTest(level, category)) {" << endl; + indent_up(); + out << indent() << "if (!IsRatelimitPass()) {" << endl; + indent_up(); + out << indent() << "UpdateTxMsgFailStats(\"" << tsandesh->get_name() << + "\", 0, SandeshTxDropReason::RatelimitDrop);" << endl; + out << indent() << "if (do_rate_limit_drop_log_) {" << endl; + indent_up(); + out << indent() << "std::stringstream ratelimit_val;" << endl; + out << indent() << " ratelimit_val << Sandesh::get_send_rate_limit();" + << endl; + out << indent() << "std::string drop_reason = \"SANDESH: Ratelimit" + " Drop (\" + ratelimit_val.str() + std::string(\" messages" + "/second): \") ;" << endl; + out << indent() << "DropLog" << + generate_sandesh_async_creator(tsandesh, false, false, false, "", + "", false, false, true) << + "; " << endl; + out << indent() << "do_rate_limit_drop_log_ = false;" << endl; + scope_down(out); + out << indent() << "return;" << endl; + scope_down(out); + indent_down(); + indent(out) << "} else {" << endl; + indent_up(); + out << indent() << "if (IsLevelCategoryLoggingAllowed(level,category))" + " {" << endl; + indent_up(); + out << indent() << "std::stringstream ratelimit_val;" << endl; + out << indent() << " ratelimit_val << Sandesh::get_send_rate_limit();" + << endl; + out << indent() << "std::string drop_reason = \"\";" << endl; + out << indent() << "DropLog" << + generate_sandesh_async_creator(tsandesh, false, false, false, "", + "", false, false, true) << "; " << endl; + scope_down(out); + out << indent() << "return;" << endl; + scope_down(out); + } + out << indent() << tsandesh->get_name() << " * snh = new " << tsandesh->get_name() << generate_sandesh_no_static_const_string_function(tsandesh, false, false, false, false) << @@ -1240,11 +1306,11 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh * string creator_name_usc = underscore(creator_name); string creator_name_uc = uppercase(creator_name_usc); out << indent() << "#define " << creator_name_uc << - generate_sandesh_async_creator(tsandesh, false, false, true, "_", "", false, false) << + generate_sandesh_async_creator(tsandesh, false, false, true, "_", "", false, false, false) << "\\" << endl; indent_up(); out << indent() << tsandesh->get_name() << "::" << creator_func_name << - generate_sandesh_async_creator(tsandesh, false, true, false, "(_", ")", true, false) << + generate_sandesh_async_creator(tsandesh, false, true, false, "(_", ")", true, false, false) << endl; indent_down(); indent(out) << endl << endl; @@ -1254,11 +1320,11 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh * creator_name_usc = underscore(creator_name); creator_name_uc = uppercase(creator_name_usc); out << indent() << "#define " << creator_name_uc << - generate_sandesh_async_creator(tsandesh, false, false, true, "_", "", false, true) << + generate_sandesh_async_creator(tsandesh, false, false, true, "_", "", false, true, false) << "\\" << endl; indent_up(); out << indent() << tsandesh->get_name() << "::" << creator_func_name << - generate_sandesh_async_creator(tsandesh, false, true, false, "(_", ")", true, true) << + generate_sandesh_async_creator(tsandesh, false, true, false, "(_", ")", true, true, false) << endl; indent_down(); indent(out) << endl << endl; @@ -1266,13 +1332,23 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh * // Generate DropLog if (!is_flow) { out << indent() << "static void DropLog" << - generate_sandesh_async_creator(tsandesh, true, false, false, "", "", false, false) << + generate_sandesh_async_creator(tsandesh, true, false, false, "", "", false, false, true) << " {" << endl; indent_up(); generate_sandesh_logger(out, tsandesh, sandesh_logger::DROP_LOG); indent_down(); indent(out) << "}" << endl << endl; } + + // Generate WriteToBuffer + if (is_system) { + out << indent() << "static bool IsRatelimitPass() {" << endl; + indent_up(); + generate_isRatelimitPass(out, tsandesh); + indent_down(); + indent(out) << "}" << endl << endl; + } + } std::string t_cpp_generator::generate_sandesh_trace_creator(t_sandesh *tsandesh, @@ -1853,6 +1929,15 @@ void t_cpp_generator::generate_sandesh_definition(ofstream& out, out << indent() << "int32_t Write(" << "boost::shared_ptr oprot) const;" << endl; + if (((t_base_type *)t)->is_sandesh_system()) { + out << indent() << "static bool do_rate_limit_drop_log_;" << endl; + + out << indent() << "static boost::circular_buffer" + " rate_limit_buffer_;" << endl; + + out << indent() << "static tbb::mutex rate_limit_mutex_;" << endl; + } + out << endl; indent_down(); indent(out) << "};" << endl << endl; @@ -2697,6 +2782,24 @@ void t_cpp_generator::generate_sandesh_static_versionsig_def(ofstream& out, << tsandesh->get_4byte_fingerprint() << "U;" << endl << endl; } +void t_cpp_generator::generate_sandesh_static_rate_limit_buffer_def( + ofstream& out, t_sandesh* tsandesh) { + out << "boost::circular_buffer " << tsandesh->get_name() << + "::rate_limit_buffer_(Sandesh::get_send_rate_limit());" << endl << endl; +} + +void t_cpp_generator::generate_sandesh_static_rate_limit_mutex_def( + ofstream& out,t_sandesh* tsandesh) { + out << "tbb::mutex " << tsandesh->get_name() << "::rate_limit_mutex_; " + << endl << endl; +} + +void t_cpp_generator::generate_sandesh_static_rate_limit_log_def( + ofstream& out, t_sandesh* tsandesh) { + out << "bool " << tsandesh->get_name() << "::do_rate_limit_drop_log_ = true;" + << endl << endl; +} + /** * Makes a helper function to gen a sandesh reader. @@ -3345,14 +3448,12 @@ void t_cpp_generator::generate_sandesh_logger(ofstream& out, // Handle init if (!init) { init = true; - std::string category_str, level_str, logger_level_str, drop_str; + std::string category_str, level_str, logger_level_str ; if (ltype == sandesh_logger::DROP_LOG) { - drop_str = "\"SANDESH: Queue Drop: \" << "; category_str = "category"; level_str = "level"; logger_level_str = "SandeshLevel::SYS_ERR"; } else { - drop_str = ""; category_str = "category()"; level_str = "level()"; logger_level_str = level_str; @@ -3384,7 +3485,11 @@ void t_cpp_generator::generate_sandesh_logger(ofstream& out, if (((t_base_type *)tsandesh->get_type())->is_sandesh_system() || ((t_base_type *)tsandesh->get_type())->is_sandesh_object() || ((t_base_type *)tsandesh->get_type())->is_sandesh_flow()) { - out << indent() << "Xbuf << " << drop_str << category_str << + std::string drop_str="Xbuf << "; + if ( ltype == sandesh_logger::DROP_LOG ) { + drop_str = "Xbuf << drop_reason << "; + } + out << indent() << drop_str << category_str << " << \" [\" << LevelToString(" << level_str << ") << \"]: " << tsandesh->get_name() << ": \";" << endl; } else { @@ -3468,6 +3573,49 @@ void t_cpp_generator::generate_sandesh_trace(ofstream& out, "}" << endl << endl; } +/* + * Generate IsRatelimitPass function + * + */ +void t_cpp_generator::generate_isRatelimitPass(ofstream& out, + t_sandesh* tsandesh) { + out << indent() << "if (Sandesh::get_send_rate_limit() == 0) {" << endl; + indent_up(); + out << indent() << "return false;"; + indent_down(); + out << indent() << "}" << endl; + out << indent() << "tbb::mutex::scoped_lock lock(rate_limit_mutex_);" << endl; + out << indent() << "if (rate_limit_buffer_.capacity() !=" + " Sandesh::get_send_rate_limit()) {" << endl; + indent_up(); + out << indent() << "//Resize the buffer to the " + "buffer_threshold_" << endl; + out << indent() << "rate_limit_buffer_.rresize(Sandesh::get_send_rate_limit());" + << endl; + out << indent() << "rate_limit_buffer_.set_capacity(" + "Sandesh::get_send_rate_limit());" << endl; + indent_down(); + out << indent() << "}" << endl; + out << indent() << "time_t current_time = time(0);" << endl; + out << indent() << "if (rate_limit_buffer_.capacity() == rate_limit_buffer_" + ".size()) {" << endl; + indent_up(); + out << indent() << "if (*rate_limit_buffer_.begin() == current_time) {" << endl; + indent_up(); + out << indent() << "//update tx and call droplog" << endl; + out << indent() << "//Dont have to log more than once" << endl; + out << indent() << "return false;" << endl; + indent_down(); + out << indent() << "}" << endl; + indent_down(); + out << indent() << "}" << endl; + out << indent() << "//Should log failure after a sucessful write" << endl; + out << indent() << "do_rate_limit_drop_log_ = true;" << endl; + out << indent() << "rate_limit_buffer_.push_back(current_time);" << endl; + out << indent() << "return true;" << endl; +} + + #endif /** diff --git a/compiler/generate/t_py_generator.cc b/compiler/generate/t_py_generator.cc index e07a67c7..d4153efb 100644 --- a/compiler/generate/t_py_generator.cc +++ b/compiler/generate/t_py_generator.cc @@ -513,7 +513,8 @@ string t_py_generator::render_sandesh_includes() { "from pysandesh.sandesh_http import SandeshHttp\n" "from pysandesh.sandesh_uve import SandeshUVETypeMaps\n" "from pysandesh.util import UTCTimestampUsec, UTCTimestampUsecToString\n" - "from pysandesh.gen_py.sandesh.constants import *\n"; + "from pysandesh.gen_py.sandesh.constants import *\n" + "import collections\n"; } return sandesh_includes; } @@ -1406,7 +1407,13 @@ void t_py_generator::generate_py_sandesh_definition(ofstream& out, } else { indent(out) << "thrift_spec = None" << endl << endl; } - + + if (sandesh_type->is_sandesh_system()) { + indent(out) << "rate_limit_buffer = collections.deque(maxlen=sandesh_base." + "SandeshSystem._DEFAULT_SEND_RATELIMIT)" << endl << endl; + indent(out) << "do_rate_limit_drop_log = True" << endl << endl; + } + { out << indent() << "def __init__(self"; diff --git a/library/common/sandesh.sandesh b/library/common/sandesh.sandesh index ec8e086e..3ee913fa 100644 --- a/library/common/sandesh.sandesh +++ b/library/common/sandesh.sandesh @@ -60,6 +60,7 @@ enum SandeshLevel { enum SandeshTxDropReason { MinDropReason = 0, NoDrop, + RatelimitDrop, ValidationFailed, QueueLevel, NoClient, @@ -87,6 +88,7 @@ enum SandeshRxDropReason { const i32 SANDESH_KEY_HINT = 0x1 const i32 SANDESH_CONTROL_HINT = 0x2 const i32 SANDESH_SYNC_HINT = 0x4 +const u32 DEFAULT_SANDESH_SEND_RATELIMIT = 100 // Update ParseHeader function in sandesh_message_builder.cc // when modifying SandeshHeader below diff --git a/library/common/sandesh_uve.sandesh b/library/common/sandesh_uve.sandesh index 060ee419..d9bfd4c8 100644 --- a/library/common/sandesh_uve.sandesh +++ b/library/common/sandesh_uve.sandesh @@ -31,6 +31,7 @@ struct SandeshMessageStats { 58: u64 messages_sent_dropped_write_failed; 59: u64 messages_sent_dropped_wrong_client_sm_state; 60: u64 messages_sent_dropped_validation_failed; + 61: u64 messages_sent_dropped_rate_limited; // Bytes 81: u64 bytes_sent_dropped_no_queue; 82: u64 bytes_sent_dropped_no_client; @@ -42,6 +43,7 @@ struct SandeshMessageStats { 88: u64 bytes_sent_dropped_write_failed; 89: u64 bytes_sent_dropped_wrong_client_sm_state; 90: u64 bytes_sent_dropped_validation_failed; + 91: u64 bytes_sent_dropped_rate_limited; // Receive // Messages 101: u64 messages_received_dropped_no_queue; diff --git a/library/cpp/sandesh.cc b/library/cpp/sandesh.cc index 43fb7d13..eeecdbd0 100644 --- a/library/cpp/sandesh.cc +++ b/library/cpp/sandesh.cc @@ -67,6 +67,7 @@ log4cplus::Logger Sandesh::logger_ = log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("SANDESH")); Sandesh::ModuleContextMap Sandesh::module_context_; +tbb::atomic Sandesh::sandesh_send_ratelimit_; const char * Sandesh::SandeshRoleToString(SandeshRole::type role) { switch (role) { @@ -139,6 +140,11 @@ void Sandesh::Initialize(SandeshRole::type role, instance_id_ = instance_id; client_context_ = client_context; event_manager_ = evm; + //If Sandesh::sandesh_send_ratelimit_ is not defined by client, + // assign a default value to it + if (get_send_rate_limit() == 0) { + set_send_rate_limit(g_sandesh_constants.DEFAULT_SANDESH_SEND_RATELIMIT); + } InitReceive(Task::kTaskInstanceAny); http_port_ = SandeshHttp::Init(evm, module, http_port, &SandeshHttpCallback); @@ -565,8 +571,8 @@ int32_t Sandesh::ReceiveBinaryMsg(u_int8_t *buf, u_int32_t buf_len, bool Sandesh::HandleTest() { // Handle unit test scenario - if (IsUnitTest() || IsLevelUT()) { - if (IsLevelCategoryLoggingAllowed()) { + if (IsUnitTest() || IsLevelUT(level_)) { + if (IsLevelCategoryLoggingAllowed(level_, category_)) { ForcedLog(); } Release(); @@ -678,15 +684,16 @@ bool SandeshRequest::Enqueue(SandeshRxQueue *queue) { return true; } -bool Sandesh::IsLevelUT() { - return level_ >= SandeshLevel::UT_START && - level_ <= SandeshLevel::UT_END; +bool Sandesh::IsLevelUT(SandeshLevel::type level) { + return level >= SandeshLevel::UT_START && + level <= SandeshLevel::UT_END; } -bool Sandesh::IsLevelCategoryLoggingAllowed() const { - bool level_allowed = logging_level_ >= level_; +bool Sandesh::IsLevelCategoryLoggingAllowed(SandeshLevel::type level, + const std::string& category) { + bool level_allowed = logging_level_ >= level; bool category_allowed = !logging_category_.empty() ? - logging_category_ == category_ : true; + logging_category_ == category : true; return level_allowed && category_allowed; } @@ -694,7 +701,8 @@ bool Sandesh::IsLoggingAllowed() const { if (type_ == SandeshType::FLOW) { return enable_flow_log_; } else { - return IsLocalLoggingEnabled() && IsLevelCategoryLoggingAllowed(); + return IsLocalLoggingEnabled() && + IsLevelCategoryLoggingAllowed(level_, category_); } } @@ -811,3 +819,12 @@ void Sandesh::set_module_context(const std::string &module_name, result.first->second = context; } } + +bool SandeshSystem::HandleTest(SandeshLevel::type level, + const std::string& category) { + // Handle unit test scenario + if (IsUnitTest() || IsLevelUT(level)) { + return true; + } + return false; +} diff --git a/library/cpp/sandesh.h b/library/cpp/sandesh.h index bf91b203..23c486ac 100644 --- a/library/cpp/sandesh.h +++ b/library/cpp/sandesh.h @@ -288,6 +288,10 @@ class Sandesh { static const char* LevelToString(SandeshLevel::type level); static SandeshLevel::type StringToLevel(std::string level); static log4cplus::Logger& logger() { return logger_; } + static void set_send_rate_limit(uint32_t rate_limit) { + sandesh_send_ratelimit_ = rate_limit; + } + static uint32_t get_send_rate_limit() { return sandesh_send_ratelimit_; } protected: void set_timestamp(time_t timestamp) { timestamp_ = timestamp; } @@ -327,6 +331,9 @@ class Sandesh { } static SandeshCallback response_callback_; static SandeshClient *client_; + static bool IsLevelUT(SandeshLevel::type level); + static bool IsLevelCategoryLoggingAllowed(SandeshLevel::type level, + const std::string& category); private: friend class SandeshTracePerfTest; @@ -347,9 +354,6 @@ class Sandesh { unsigned short http_port, SandeshContext *client_context = NULL); - bool IsLevelUT(); - bool IsLevelCategoryLoggingAllowed() const; - static SandeshRole::type role_; static std::string module_; static std::string source_; @@ -383,6 +387,7 @@ class Sandesh { SandeshLevel::type level_; std::string category_; std::string name_; + static tbb::atomic sandesh_send_ratelimit_; }; #define SANDESH_LOG(_Level, _Msg) \ @@ -459,6 +464,8 @@ class SandeshSystem : public Sandesh { protected: SandeshSystem(const std::string& name, uint32_t seqno) : Sandesh(SandeshType::SYSTEM, name, seqno) {} + static bool HandleTest(SandeshLevel::type level, const + std::string& category); }; class SandeshObject : public Sandesh { diff --git a/library/cpp/sandesh_statistics.cc b/library/cpp/sandesh_statistics.cc index 58b030a4..a3c382bf 100644 --- a/library/cpp/sandesh_statistics.cc +++ b/library/cpp/sandesh_statistics.cc @@ -42,6 +42,10 @@ static void UpdateSandeshMessageStatsDrops(SandeshMessageStats *smstats, smstats->messages_sent_dropped_validation_failed++; smstats->bytes_sent_dropped_validation_failed += bytes; break; + case SandeshTxDropReason::RatelimitDrop: + smstats->messages_sent_dropped_rate_limited++; + smstats->bytes_sent_dropped_rate_limited += bytes; + break; case SandeshTxDropReason::QueueLevel: smstats->messages_sent_dropped_queue_level++; smstats->bytes_sent_dropped_queue_level += bytes; diff --git a/library/cpp/test/sandesh_message_test.cc b/library/cpp/test/sandesh_message_test.cc index 037055b5..9a4535aa 100644 --- a/library/cpp/test/sandesh_message_test.cc +++ b/library/cpp/test/sandesh_message_test.cc @@ -59,6 +59,61 @@ void SandeshRequestTest1::HandleRequest() const { namespace { +class SandeshSendRatelimitTest : public ::testing::Test { +protected: + SandeshSendRatelimitTest() { + } + + virtual void SetUp() { + msg_num_ = 0 ; + asyncserver_done = false; + evm_.reset(new EventManager()); + server_ = new SandeshServerTest(evm_.get(), + boost::bind(&SandeshSendRatelimitTest::ReceiveSandeshMsg, this, _1, _2)); + thread_.reset(new ServerThread(evm_.get())); + } + + virtual void TearDown() { + task_util::WaitForIdle(); + Sandesh::Uninit(); + task_util::WaitForIdle(); + TASK_UTIL_EXPECT_FALSE(server_->HasSessions()); + task_util::WaitForIdle(); + server_->Shutdown(); + task_util::WaitForIdle(); + TcpServerManager::DeleteServer(server_); + task_util::WaitForIdle(); + evm_->Shutdown(); + if (thread_.get() != NULL) { + thread_->Join(); + } + task_util::WaitForIdle(); + } + + bool ReceiveSandeshMsg(SandeshSession *session, + const SandeshMessage *msg) { + const SandeshHeader &header(msg->GetHeader()); + const SandeshXMLMessage *xmsg = + dynamic_cast(msg); + EXPECT_TRUE(xmsg != NULL); + std::string message(xmsg->ExtractMessage()); + + if (header.get_Type() == SandeshType::UVE) { + return true; + } + msg_num_++; + if (msg_num_ > 13) { + asyncserver_done = true; + } + return true; + } + + uint32_t msg_num_; + SandeshServerTest *server_; + std::auto_ptr thread_; + std::auto_ptr evm_; +}; + class SandeshAsyncTest : public ::testing::Test { protected: SandeshAsyncTest() { @@ -262,6 +317,45 @@ TEST_F(SandeshAsyncTest, Async) { } } +TEST_F(SandeshSendRatelimitTest, Buffer) { + server_->Initialize(0); + thread_->Start(); // Must be called after initialization + int port = server_->GetPort(); + ASSERT_LT(0, port); + // Connect to the server + Sandesh::set_send_rate_limit(10); + Sandesh::InitGenerator("SandeshSendRatelimitTest-Client", "localhost", + "Test", "Test", evm_.get(),0); + + Sandesh::ConnectToCollector("127.0.0.1", port); + EXPECT_TRUE(Sandesh::IsConnectToCollectorEnabled()); + EXPECT_TRUE(Sandesh::client() != NULL); + TASK_UTIL_EXPECT_TRUE(Sandesh::client()->state() == SandeshClientSM::ESTABLISHED); + Sandesh::SetLoggingParams(true, "SystemLogTest", SandeshLevel::SYS_INFO); + boost::ptr_map type_stats; + SandeshMessageStats agg_stats; + Sandesh::GetMsgStats(&type_stats, &agg_stats); + boost::ptr_map::iterator it; + it = type_stats.find("SystemLogTest"); + //Initialize dropped rate to 0 + it->second->stats.messages_sent_dropped_rate_limited = 0; + /* + * First 10 out of the 15 msgs will be recived.messages 15-20 should albe received + */ + for (int cnt = 0; cnt < 20; cnt++) { + SystemLogTest::Send("SystemLogTest", SandeshLevel::SYS_INFO, "Test", cnt, 100, "sat1string100"); + if (cnt == 15) { + sleep(1); + } + } + //Allow all messages to be recieved + sleep(1); + Sandesh::GetMsgStats(&type_stats, &agg_stats); + it = type_stats.find("SystemLogTest"); + TASK_UTIL_EXPECT_TRUE(msg_num_ == 20 - + it->second->stats.messages_sent_dropped_rate_limited); +} + class SandeshUVEAlarmTest : public ::testing::Test { protected: SandeshUVEAlarmTest() { diff --git a/library/python/pysandesh/sandesh_base.py b/library/python/pysandesh/sandesh_base.py index 9a2b91c1..ed635e30 100644 --- a/library/python/pysandesh/sandesh_base.py +++ b/library/python/pysandesh/sandesh_base.py @@ -13,6 +13,9 @@ import sandesh_logger as sand_logger import trace import util +import collections +import time +import copy from gen_py.sandesh.ttypes import SandeshType, SandeshLevel, \ SandeshTxDropReason @@ -692,11 +695,51 @@ def send(self, sandesh=sandesh_global): sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 + #If systemlog message first check if the transmit side buffer + # has space + if self._type == SandeshType.SYSTEM: + if (not self.is_rate_limit_pass(sandesh)): + return -1 self._seqnum = self.next_seqnum() if self.handle_test(sandesh): return 0 sandesh.send_sandesh(self) return 0 + + def is_rate_limit_pass(self,sandesh): + #If buffer size 0 return + if self.__class__.rate_limit_buffer.maxlen == 0: + return False + #Check if buffer resize is reqd + if (self.__class__.rate_limit_buffer.maxlen != \ + SandeshSystem.get_sandesh_send_rate_limit()): + temp_buffer = copy.deepcopy(self.__class__.rate_limit_buffer) + self.__class__.rate_limit_buffer = collections.deque(temp_buffer, \ + maxlen=SandeshSystem.get_sandesh_send_rate_limit()) + del temp_buffer + cur_time=int(time.time()) + #Check if circular buffer is full + if len(self.__class__.rate_limit_buffer) == \ + self.__class__.rate_limit_buffer.maxlen : + # Read the element in buffer and compare with cur_time + if(self.__class__.rate_limit_buffer[0] == cur_time): + #Sender generating more messages/sec than the + #buffer_threshold size + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.RatelimitDrop) + if self.__class__.do_rate_limit_drop_log: + sandesh._logger.error('SANDESH: Ratelimit Drop ' \ + '(%d messages/sec): for %s' % \ + (self.__class__.rate_limit_buffer.maxlen, \ + self.__class__.__name__)) + #Disable logging + self.__class__.do_rate_limit_drop_log = False + return False + #If logging is disabled enable it + self.__class__.do_rate_limit_drop_log = True + self.__class__.rate_limit_buffer.append(cur_time) + return True + # end send # end class SandeshAsync @@ -704,6 +747,18 @@ def send(self, sandesh=sandesh_global): class SandeshSystem(SandeshAsync): + _DEFAULT_SEND_RATELIMIT = DEFAULT_SANDESH_SEND_RATELIMIT + + @classmethod + def set_sandesh_send_rate_limit(cls, sandesh_send_rate_limit_val): + cls._DEFAULT_SEND_RATELIMIT = sandesh_send_rate_limit_val + # end set_sandesh_send_rate_limit + + @classmethod + def get_sandesh_send_rate_limit(cls): + return cls._DEFAULT_SEND_RATELIMIT + # end get_sandesh_send_rate_limit + def __init__(self): SandeshAsync.__init__(self) self._type = SandeshType.SYSTEM diff --git a/library/python/pysandesh/sandesh_stats.py b/library/python/pysandesh/sandesh_stats.py index 8ed2da63..29cc0929 100644 --- a/library/python/pysandesh/sandesh_stats.py +++ b/library/python/pysandesh/sandesh_stats.py @@ -146,6 +146,13 @@ def _update_tx_stats_internal(self, msg_stats, nbytes, drop_reason): else: msg_stats.messages_sent_dropped_wrong_client_sm_state = 1 msg_stats.bytes_sent_dropped_wrong_client_sm_state = nbytes + elif drop_reason is SandeshTxDropReason.RatelimitDrop: + if msg_stats.messages_sent_dropped_rate_limited: + msg_stats.messages_sent_dropped_rate_limited += 1 + msg_stats.bytes_sent_dropped_rate_limited += nbytes + else: + msg_stats.messages_sent_dropped_rate_limited = 1 + msg_stats.bytes_sent_dropped_rate_limited = nbytes else: assert 0, 'Unhandled Tx drop reason <%s>' % (str(drop_reason)) # end _update_tx_stats_internal diff --git a/library/python/pysandesh/test/sandesh_msg_test.py b/library/python/pysandesh/test/sandesh_msg_test.py index abd78014..5e53a905 100755 --- a/library/python/pysandesh/test/sandesh_msg_test.py +++ b/library/python/pysandesh/test/sandesh_msg_test.py @@ -13,6 +13,7 @@ import os import socket import test_utils +import time sys.path.insert(1, sys.path[0]+'/../../../python') @@ -90,6 +91,22 @@ def test_objectlog_msg_key_hint(self): self.assertNotEqual(-1, self._reader.read_msg(self._session.write_buf)) #end test_objectlog_msg_key_hint + def test_systemlog_msg_buffer_threshold(self): + print '------------------------------' + print ' Test SystemLog Msg Buffer Limit ' + print '------------------------------' + systemlog_msg = SystemLogTest() + self._expected_type = SandeshType.SYSTEM + self._expected_hints = 0 + SandeshSystem.set_sandesh_send_rate_limit(10) + time.sleep(1) + for i in xrange(0,15): + systemlog_msg.send(sandesh=sandesh_global) + self.assertEqual(5,sandesh_global.msg_stats(). \ + message_type_stats()['SystemLogTest']. \ + messages_sent_dropped_rate_limited) + #end test_systemlog_msg_buffer_threshold + #end class SandeshMsgTest if __name__ == '__main__':