diff --git a/compiler/generate/t_cpp_generator.cc b/compiler/generate/t_cpp_generator.cc index f1251ddc..0f914806 100755 --- a/compiler/generate/t_cpp_generator.cc +++ b/compiler/generate/t_cpp_generator.cc @@ -1217,7 +1217,7 @@ void t_cpp_generator::generate_sandesh_async_creators(ofstream &out, t_sandesh * out << indent() << "if (level >= SendingLevel()) {" << endl; indent_up(); out << indent() << "UpdateTxMsgFailStats(\"" << tsandesh->get_name() << - "\", 0, Sandesh::DropReason::Send::QueueLevel);" << endl; + "\", 0, SandeshTxDropReason::QueueLevel);" << endl; if (!is_flow) { out << indent() << "DropLog" << generate_sandesh_async_creator(tsandesh, false, false, false, "", "", false, false) << diff --git a/library/common/sandesh.sandesh b/library/common/sandesh.sandesh index 412e9065..ec8e086e 100644 --- a/library/common/sandesh.sandesh +++ b/library/common/sandesh.sandesh @@ -57,6 +57,33 @@ enum SandeshLevel { INVALID = 0x7fffffff } +enum SandeshTxDropReason { + MinDropReason = 0, + NoDrop, + ValidationFailed, + QueueLevel, + NoClient, + NoSession, + NoQueue, + ClientSendFailed, + HeaderWriteFailed, + WriteFailed, + SessionNotConnected, + WrongClientSMState, + MaxDropReason +} + +enum SandeshRxDropReason { + MinDropReason = 0, + NoDrop, + QueueLevel, + NoQueue, + ControlMsgFailed, + CreateFailed, + DecodingFailed, + MaxDropReason +} + const i32 SANDESH_KEY_HINT = 0x1 const i32 SANDESH_CONTROL_HINT = 0x2 const i32 SANDESH_SYNC_HINT = 0x4 diff --git a/library/common/sandesh_uve.sandesh b/library/common/sandesh_uve.sandesh index 15ff8428..060ee419 100644 --- a/library/common/sandesh_uve.sandesh +++ b/library/common/sandesh_uve.sandesh @@ -30,6 +30,7 @@ struct SandeshMessageStats { 57: u64 messages_sent_dropped_header_write_failed; 58: u64 messages_sent_dropped_write_failed; 59: u64 messages_sent_dropped_wrong_client_sm_state; + 60: u64 messages_sent_dropped_validation_failed; // Bytes 81: u64 bytes_sent_dropped_no_queue; 82: u64 bytes_sent_dropped_no_client; @@ -40,6 +41,7 @@ struct SandeshMessageStats { 87: u64 bytes_sent_dropped_header_write_failed; 88: u64 bytes_sent_dropped_write_failed; 89: u64 bytes_sent_dropped_wrong_client_sm_state; + 90: u64 bytes_sent_dropped_validation_failed; // Receive // Messages 101: u64 messages_received_dropped_no_queue; diff --git a/library/cpp/sandesh.cc b/library/cpp/sandesh.cc index 8f0634a4..43fb7d13 100644 --- a/library/cpp/sandesh.cc +++ b/library/cpp/sandesh.cc @@ -447,7 +447,7 @@ bool Sandesh::Enqueue(SandeshQueue *queue) { SANDESH_LOG(ERROR, __func__ << ": SandeshQueue NULL : Dropping Message: " << ToString()); } - UpdateTxMsgFailStats(name_, 0, Sandesh::DropReason::Send::NoQueue); + UpdateTxMsgFailStats(name_, 0, SandeshTxDropReason::NoQueue); Release(); return false; } @@ -584,7 +584,7 @@ bool Sandesh::SendEnqueue() { Log(); } } - UpdateTxMsgFailStats(name_, 0, Sandesh::DropReason::Send::NoClient); + UpdateTxMsgFailStats(name_, 0, SandeshTxDropReason::NoClient); Release(); return false; } @@ -593,7 +593,7 @@ bool Sandesh::SendEnqueue() { SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " << ToString()); } UpdateTxMsgFailStats(name_, 0, - Sandesh::DropReason::Send::ClientSendFailed); + SandeshTxDropReason::ClientSendFailed); Release(); return false; } @@ -648,7 +648,7 @@ bool SandeshUVE::Dispatch(SandeshConnection * sconn) { if (!client_->SendSandeshUVE(this)) { SANDESH_LOG(ERROR, "SandeshUVE : Send FAILED: " << ToString()); UpdateTxMsgFailStats(Name(), 0, - Sandesh::DropReason::Send::ClientSendFailed); + SandeshTxDropReason::ClientSendFailed); Release(); return false; } @@ -659,7 +659,7 @@ bool SandeshUVE::Dispatch(SandeshConnection * sconn) { } else { Log(); } - UpdateTxMsgFailStats(Name(), 0, Sandesh::DropReason::Send::NoClient); + UpdateTxMsgFailStats(Name(), 0, SandeshTxDropReason::NoClient); Release(); return false; } @@ -667,7 +667,7 @@ bool SandeshUVE::Dispatch(SandeshConnection * sconn) { bool SandeshRequest::Enqueue(SandeshRxQueue *queue) { if (!queue) { SANDESH_LOG(ERROR, "SandeshRequest: No RxQueue: " << ToString()); - UpdateRxMsgFailStats(Name(), 0, Sandesh::DropReason::Recv::NoQueue); + UpdateRxMsgFailStats(Name(), 0, SandeshRxDropReason::NoQueue); Release(); return false; } @@ -733,7 +733,7 @@ void Sandesh::UpdateRxMsgStats(const std::string &msg_name, } void Sandesh::UpdateRxMsgFailStats(const std::string &msg_name, - uint64_t bytes, Sandesh::DropReason::Recv::type dreason) { + uint64_t bytes, SandeshRxDropReason::type dreason) { tbb::mutex::scoped_lock lock(stats_mutex_); msg_stats_.UpdateRecvFailed(msg_name, bytes, dreason); } @@ -745,7 +745,7 @@ void Sandesh::UpdateTxMsgStats(const std::string &msg_name, } void Sandesh::UpdateTxMsgFailStats(const std::string &msg_name, - uint64_t bytes, Sandesh::DropReason::Send::type dreason) { + uint64_t bytes, SandeshTxDropReason::type dreason) { tbb::mutex::scoped_lock lock(stats_mutex_); msg_stats_.UpdateSendFailed(msg_name, bytes, dreason); } diff --git a/library/cpp/sandesh.h b/library/cpp/sandesh.h index 9ef973d6..bf91b203 100644 --- a/library/cpp/sandesh.h +++ b/library/cpp/sandesh.h @@ -137,37 +137,6 @@ class Sandesh { Test, }; }; - struct DropReason { - struct Send { - enum type { - MinDropReason, - NoDrop = MinDropReason, - QueueLevel, - NoClient, - NoSession, - NoQueue, - ClientSendFailed, - HeaderWriteFailed, - WriteFailed, - SessionNotConnected, - WrongClientSMState, - MaxDropReason, - }; - }; - struct Recv { - enum type { - MinDropReason, - NoDrop = MinDropReason, - QueueLevel, - NoQueue, - ControlMsgFailed, - CreateFailed, - DecodingFailed, - MaxDropReason, - }; - }; - }; - typedef boost::tuple QueueWaterMarkInfo; typedef boost::function CollectorSubFn; @@ -252,10 +221,10 @@ class Sandesh { static bool SendReady(SandeshConnection * sconn = NULL); static void UpdateRxMsgStats(const std::string &msg_name, uint64_t bytes); static void UpdateRxMsgFailStats(const std::string &msg_name, - uint64_t bytes, DropReason::Recv::type dreason); + uint64_t bytes, SandeshRxDropReason::type dreason); static void UpdateTxMsgStats(const std::string &msg_name, uint64_t bytes); static void UpdateTxMsgFailStats(const std::string &msg_name, - uint64_t bytes, DropReason::Send::type dreason); + uint64_t bytes, SandeshTxDropReason::type dreason); static void GetMsgStats( std::vector *mtype_stats, SandeshMessageStats *magg_stats); diff --git a/library/cpp/sandesh_client.cc b/library/cpp/sandesh_client.cc index 75f3b4fb..f536ac1e 100644 --- a/library/cpp/sandesh_client.cc +++ b/library/cpp/sandesh_client.cc @@ -161,7 +161,7 @@ bool SandeshClient::ReceiveMsg(const std::string& msg, Sandesh::UpdateRxMsgStats(sandesh_name, msg.size()); } else { Sandesh::UpdateRxMsgFailStats(sandesh_name, msg.size(), - Sandesh::DropReason::Recv::ControlMsgFailed); + SandeshRxDropReason::ControlMsgFailed); } return success; } @@ -171,7 +171,7 @@ bool SandeshClient::ReceiveMsg(const std::string& msg, if (sandesh == NULL) { SANDESH_LOG(ERROR, __func__ << ": Unknown sandesh: " << sandesh_name); Sandesh::UpdateRxMsgFailStats(sandesh_name, msg.size(), - Sandesh::DropReason::Recv::CreateFailed); + SandeshRxDropReason::CreateFailed); return true; } boost::shared_ptr btrans = @@ -184,7 +184,7 @@ bool SandeshClient::ReceiveMsg(const std::string& msg, if (xfer < 0) { SANDESH_LOG(ERROR, __func__ << ": Decoding " << sandesh_name << " FAILED"); Sandesh::UpdateRxMsgFailStats(sandesh_name, msg.size(), - Sandesh::DropReason::Recv::DecodingFailed); + SandeshRxDropReason::DecodingFailed); return false; } diff --git a/library/cpp/sandesh_client_sm.cc b/library/cpp/sandesh_client_sm.cc index 7590dd7a..b8e6d167 100644 --- a/library/cpp/sandesh_client_sm.cc +++ b/library/cpp/sandesh_client_sm.cc @@ -500,7 +500,7 @@ struct ClientInit : public sc::state { snh->ToString()); } Sandesh::UpdateTxMsgFailStats(snh->Name(), 0, - Sandesh::DropReason::Send::WrongClientSMState); + SandeshTxDropReason::WrongClientSMState); SM_LOG(INFO, "Received UVE message in wrong state : " << snh->Name()); snh->Release(); return discard_event(); @@ -672,7 +672,7 @@ void SandeshClientSMImpl::ReleaseSandesh(const Ev &event) { SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " << snh->ToString()); } Sandesh::UpdateTxMsgFailStats(snh->Name(), 0, - Sandesh::DropReason::Send::WrongClientSMState); + SandeshTxDropReason::WrongClientSMState); SM_LOG(DEBUG, "Wrong state: " << StateName() << " for event: " << event.Name() << " message: " << snh->Name()); snh->Release(); diff --git a/library/cpp/sandesh_session.cc b/library/cpp/sandesh_session.cc index 0ce00ae5..913d0d24 100644 --- a/library/cpp/sandesh_session.cc +++ b/library/cpp/sandesh_session.cc @@ -106,7 +106,7 @@ void SandeshWriter::SendMsg(Sandesh *sandesh, bool more) { " Sequence Number:" << sandesh->seqnum()); session_->increment_send_msg_fail(); Sandesh::UpdateTxMsgFailStats(sandesh->Name(), 0, - Sandesh::DropReason::Send::HeaderWriteFailed); + SandeshTxDropReason::HeaderWriteFailed); sandesh->Release(); return; } @@ -119,7 +119,7 @@ void SandeshWriter::SendMsg(Sandesh *sandesh, bool more) { " Sequence Number:" << sandesh->seqnum()); session_->increment_send_msg_fail(); Sandesh::UpdateTxMsgFailStats(sandesh->Name(), 0, - Sandesh::DropReason::Send::WriteFailed); + SandeshTxDropReason::WriteFailed); sandesh->Release(); return; } @@ -359,7 +359,7 @@ bool SandeshSession::SendMsg(Sandesh *sandesh) { } increment_send_msg_fail(); Sandesh::UpdateTxMsgFailStats(sandesh->Name(), 0, - Sandesh::DropReason::Send::SessionNotConnected); + SandeshTxDropReason::SessionNotConnected); sandesh->Release(); return true; } diff --git a/library/cpp/sandesh_state_machine.cc b/library/cpp/sandesh_state_machine.cc index 3f3586e3..c14d8d3a 100644 --- a/library/cpp/sandesh_state_machine.cc +++ b/library/cpp/sandesh_state_machine.cc @@ -610,7 +610,7 @@ void SandeshStateMachine::UpdateRxMsgStats(const std::string &msg_name, } void SandeshStateMachine::UpdateRxMsgFailStats(const std::string &msg_name, - size_t msg_size, Sandesh::DropReason::Recv::type dreason) { + size_t msg_size, SandeshRxDropReason::type dreason) { tbb::mutex::scoped_lock lock(smutex_); message_stats_.UpdateRecvFailed(msg_name, msg_size, dreason); } @@ -626,7 +626,7 @@ void SandeshStateMachine::OnSandeshMessage(SandeshSession *session, if (DoDropSandeshMessage(header, message_drop_level_)) { // Update message statistics UpdateRxMsgFailStats(message_type, msg.size(), - Sandesh::DropReason::Recv::QueueLevel); + SandeshRxDropReason::QueueLevel); delete xmessage; return; } @@ -643,7 +643,7 @@ void SandeshStateMachine::OnSandeshMessage(SandeshSession *session, << ret << ")"); // Update message statistics UpdateRxMsgFailStats(message_type, msg.size(), - Sandesh::DropReason::Recv::ControlMsgFailed); + SandeshRxDropReason::ControlMsgFailed); delete xmessage; return; } diff --git a/library/cpp/sandesh_state_machine.h b/library/cpp/sandesh_state_machine.h index 1e95de1a..6cd2b1a3 100644 --- a/library/cpp/sandesh_state_machine.h +++ b/library/cpp/sandesh_state_machine.h @@ -176,7 +176,7 @@ class SandeshStateMachine : void UpdateRxMsgStats(const std::string &msg_name, size_t msg_size); void UpdateRxMsgFailStats(const std::string &msg_name, - size_t msg_size, Sandesh::DropReason::Recv::type dreason); + size_t msg_size, SandeshRxDropReason::type dreason); void UpdateEventDequeue(const sc::event_base &event); void UpdateEventDequeueFail(const sc::event_base &event); void UpdateEventEnqueue(const sc::event_base &event); diff --git a/library/cpp/sandesh_statistics.cc b/library/cpp/sandesh_statistics.cc index 984d5a36..58b030a4 100644 --- a/library/cpp/sandesh_statistics.cc +++ b/library/cpp/sandesh_statistics.cc @@ -34,75 +34,79 @@ void SandeshMessageStatistics::Get( } static void UpdateSandeshMessageStatsDrops(SandeshMessageStats *smstats, - bool sent, uint64_t bytes, Sandesh::DropReason::Send::type send_dreason, - Sandesh::DropReason::Recv::type recv_dreason) { + bool sent, uint64_t bytes, SandeshTxDropReason::type send_dreason, + SandeshRxDropReason::type recv_dreason) { if (sent) { switch (send_dreason) { - case Sandesh::DropReason::Send::QueueLevel: + case SandeshTxDropReason::ValidationFailed: + smstats->messages_sent_dropped_validation_failed++; + smstats->bytes_sent_dropped_validation_failed += bytes; + break; + case SandeshTxDropReason::QueueLevel: smstats->messages_sent_dropped_queue_level++; smstats->bytes_sent_dropped_queue_level += bytes; break; - case Sandesh::DropReason::Send::NoClient: + case SandeshTxDropReason::NoClient: smstats->messages_sent_dropped_no_client++; smstats->bytes_sent_dropped_no_client += bytes; break; - case Sandesh::DropReason::Send::NoSession: + case SandeshTxDropReason::NoSession: smstats->messages_sent_dropped_no_session++; smstats->bytes_sent_dropped_no_session += bytes; break; - case Sandesh::DropReason::Send::NoQueue: + case SandeshTxDropReason::NoQueue: smstats->messages_sent_dropped_no_queue++; smstats->bytes_sent_dropped_no_queue += bytes; break; - case Sandesh::DropReason::Send::ClientSendFailed: + case SandeshTxDropReason::ClientSendFailed: smstats->messages_sent_dropped_client_send_failed++; smstats->bytes_sent_dropped_client_send_failed += bytes; break; - case Sandesh::DropReason::Send::WrongClientSMState: + case SandeshTxDropReason::WrongClientSMState: smstats->messages_sent_dropped_wrong_client_sm_state++; smstats->bytes_sent_dropped_wrong_client_sm_state += bytes; break; - case Sandesh::DropReason::Send::WriteFailed: + case SandeshTxDropReason::WriteFailed: smstats->messages_sent_dropped_write_failed++; smstats->bytes_sent_dropped_write_failed += bytes; break; - case Sandesh::DropReason::Send::HeaderWriteFailed: + case SandeshTxDropReason::HeaderWriteFailed: smstats->messages_sent_dropped_header_write_failed++; smstats->bytes_sent_dropped_header_write_failed += bytes; break; - case Sandesh::DropReason::Send::SessionNotConnected: + case SandeshTxDropReason::SessionNotConnected: smstats->messages_sent_dropped_session_not_connected++; smstats->bytes_sent_dropped_session_not_connected += bytes; break; default: - break; + assert(0); } smstats->messages_sent_dropped++; smstats->bytes_sent_dropped += bytes; } else { switch (recv_dreason) { - case Sandesh::DropReason::Recv::QueueLevel: + case SandeshRxDropReason::QueueLevel: smstats->messages_received_dropped_queue_level++; smstats->bytes_received_dropped_queue_level += bytes; break; - case Sandesh::DropReason::Recv::NoQueue: + case SandeshRxDropReason::NoQueue: smstats->messages_received_dropped_no_queue++; smstats->bytes_received_dropped_no_queue += bytes; break; - case Sandesh::DropReason::Recv::DecodingFailed: + case SandeshRxDropReason::DecodingFailed: smstats->messages_received_dropped_decoding_failed++; smstats->bytes_received_dropped_decoding_failed += bytes; break; - case Sandesh::DropReason::Recv::ControlMsgFailed: + case SandeshRxDropReason::ControlMsgFailed: smstats->messages_received_dropped_control_msg_failed++; smstats->bytes_received_dropped_control_msg_failed += bytes; break; - case Sandesh::DropReason::Recv::CreateFailed: + case SandeshRxDropReason::CreateFailed: smstats->messages_received_dropped_create_failed++; smstats->bytes_received_dropped_create_failed += bytes; break; default: - break; + assert(0); } smstats->messages_received_dropped++; smstats->bytes_received_dropped += bytes; @@ -112,31 +116,31 @@ static void UpdateSandeshMessageStatsDrops(SandeshMessageStats *smstats, void SandeshMessageStatistics::UpdateSend(const std::string &msg_name, uint64_t bytes) { UpdateInternal(msg_name, bytes, true, false, - Sandesh::DropReason::Send::NoDrop, Sandesh::DropReason::Recv::NoDrop); + SandeshTxDropReason::NoDrop, SandeshRxDropReason::NoDrop); } void SandeshMessageStatistics::UpdateSendFailed(const std::string &msg_name, - uint64_t bytes, Sandesh::DropReason::Send::type dreason) { + uint64_t bytes, SandeshTxDropReason::type dreason) { UpdateInternal(msg_name, bytes, true, true, dreason, - Sandesh::DropReason::Recv::NoDrop); + SandeshRxDropReason::NoDrop); } void SandeshMessageStatistics::UpdateRecv(const std::string &msg_name, uint64_t bytes) { UpdateInternal(msg_name, bytes, false, false, - Sandesh::DropReason::Send::NoDrop, Sandesh::DropReason::Recv::NoDrop); + SandeshTxDropReason::NoDrop, SandeshRxDropReason::NoDrop); } void SandeshMessageStatistics::UpdateRecvFailed(const std::string &msg_name, - uint64_t bytes, Sandesh::DropReason::Recv::type dreason) { + uint64_t bytes, SandeshRxDropReason::type dreason) { UpdateInternal(msg_name, bytes, false, true, - Sandesh::DropReason::Send::NoDrop, dreason); + SandeshTxDropReason::NoDrop, dreason); } void SandeshMessageStatistics::UpdateInternal(const std::string &msg_name, uint64_t bytes, bool is_tx, bool dropped, - Sandesh::DropReason::Send::type send_dreason, - Sandesh::DropReason::Recv::type recv_dreason) { + SandeshTxDropReason::type send_dreason, + SandeshRxDropReason::type recv_dreason) { boost::ptr_map::iterator it = type_stats_.find(msg_name); if (it == type_stats_.end()) { diff --git a/library/cpp/sandesh_statistics.h b/library/cpp/sandesh_statistics.h index 51f0183b..70d44ee5 100644 --- a/library/cpp/sandesh_statistics.h +++ b/library/cpp/sandesh_statistics.h @@ -14,10 +14,10 @@ class SandeshMessageStatistics { void UpdateSend(const std::string &msg_name, uint64_t bytes); void UpdateSendFailed(const std::string &msg_name, uint64_t bytes, - Sandesh::DropReason::Send::type dreason); + SandeshTxDropReason::type dreason); void UpdateRecv(const std::string &msg_name, uint64_t bytes); void UpdateRecvFailed(const std::string &msg_name, uint64_t bytes, - Sandesh::DropReason::Recv::type dreason); + SandeshRxDropReason::type dreason); void Get(std::vector *mtype_stats, SandeshMessageStats *magg_stats) const; void Get(boost::ptr_map *mtype_stats, @@ -26,8 +26,8 @@ class SandeshMessageStatistics { private: void UpdateInternal(const std::string &msg_name, uint64_t bytes, bool is_tx, bool dropped, - Sandesh::DropReason::Send::type send_dreason, - Sandesh::DropReason::Recv::type recv_dreason); + SandeshTxDropReason::type send_dreason, + SandeshRxDropReason::type recv_dreason); boost::ptr_map type_stats_; SandeshMessageStats agg_stats_; diff --git a/library/cpp/test/sandesh_statistics_test.cc b/library/cpp/test/sandesh_statistics_test.cc index 81bb148d..8407847b 100644 --- a/library/cpp/test/sandesh_statistics_test.cc +++ b/library/cpp/test/sandesh_statistics_test.cc @@ -26,27 +26,27 @@ TEST_F(SandeshStatisticsTest, MsgStats) { msg_stats_.UpdateSend("Test1", 64); msg_stats_.UpdateSend("Test1", 64); // TX Fail - for (int i = static_cast(Sandesh::DropReason::Send::MinDropReason); - i < static_cast(Sandesh::DropReason::Send::MaxDropReason); + for (int i = static_cast(SandeshTxDropReason::MinDropReason+1); + i < static_cast(SandeshTxDropReason::MaxDropReason); i++) { - if (i == static_cast(Sandesh::DropReason::Send::NoDrop)) { + if (i == static_cast(SandeshTxDropReason::NoDrop)) { continue; } msg_stats_.UpdateSendFailed("Test1", 64, - static_cast(i)); + static_cast(i)); } // RX msg_stats_.UpdateRecv("Test", 64); msg_stats_.UpdateRecv("Test1", 128); // RX Fail - for (int i = static_cast(Sandesh::DropReason::Recv::MinDropReason); - i < static_cast(Sandesh::DropReason::Recv::MaxDropReason); + for (int i = static_cast(SandeshRxDropReason::MinDropReason+1); + i < static_cast(SandeshRxDropReason::MaxDropReason); i++) { - if (i == static_cast(Sandesh::DropReason::Recv::NoDrop)) { + if (i == static_cast(SandeshRxDropReason::NoDrop)) { continue; } msg_stats_.UpdateRecvFailed("Test1", 64, - static_cast(i)); + static_cast(i)); } // Get boost::ptr_map mt_stats; @@ -71,8 +71,8 @@ TEST_F(SandeshStatisticsTest, MsgStats) { EXPECT_EQ(2 , test1_sms->messages_sent); EXPECT_EQ(128, test1_sms->bytes_sent); int expected_sent_msg_dropped( - static_cast(Sandesh::DropReason::Send::MaxDropReason) - - static_cast(Sandesh::DropReason::Send::MinDropReason) - 1); + static_cast(SandeshTxDropReason::MaxDropReason) - + static_cast(SandeshTxDropReason::MinDropReason) - 2); int expected_sent_bytes_dropped(expected_sent_msg_dropped * 64); EXPECT_EQ(expected_sent_msg_dropped, test1_sms->messages_sent_dropped); EXPECT_EQ(expected_sent_bytes_dropped, test1_sms->bytes_sent_dropped); @@ -80,6 +80,7 @@ TEST_F(SandeshStatisticsTest, MsgStats) { EXPECT_EQ(1, test1_sms->messages_sent_dropped_no_client); EXPECT_EQ(1, test1_sms->messages_sent_dropped_no_session); EXPECT_EQ(1, test1_sms->messages_sent_dropped_queue_level); + EXPECT_EQ(1, test1_sms->messages_sent_dropped_validation_failed); EXPECT_EQ(1, test1_sms->messages_sent_dropped_client_send_failed); EXPECT_EQ(1, test1_sms->messages_sent_dropped_session_not_connected); EXPECT_EQ(1, test1_sms->messages_sent_dropped_header_write_failed); @@ -95,8 +96,8 @@ TEST_F(SandeshStatisticsTest, MsgStats) { EXPECT_EQ(64, test1_sms->bytes_sent_dropped_write_failed); EXPECT_EQ(64, test1_sms->bytes_sent_dropped_wrong_client_sm_state); int expected_recv_msg_dropped( - static_cast(Sandesh::DropReason::Recv::MaxDropReason) - - static_cast(Sandesh::DropReason::Recv::MinDropReason) - 1); + static_cast(SandeshRxDropReason::MaxDropReason) - + static_cast(SandeshRxDropReason::MinDropReason) - 2); int expected_recv_bytes_dropped(expected_recv_msg_dropped * 64); EXPECT_EQ(1, test1_sms->messages_received); EXPECT_EQ(128, test1_sms->bytes_received); diff --git a/library/python/pysandesh/sandesh_base.py b/library/python/pysandesh/sandesh_base.py index 974c4a37..9a2b91c1 100644 --- a/library/python/pysandesh/sandesh_base.py +++ b/library/python/pysandesh/sandesh_base.py @@ -14,12 +14,12 @@ import trace import util -from gen_py.sandesh.ttypes import SandeshType, SandeshLevel +from gen_py.sandesh.ttypes import SandeshType, SandeshLevel, \ + SandeshTxDropReason from gen_py.sandesh.constants import * -from sandesh_client import SandeshClient from sandesh_http import SandeshHttp -from sandesh_stats import SandeshStats from sandesh_trace import SandeshTraceRequestRunner +from sandesh_client import SandeshClient from sandesh_uve import SandeshUVETypeMaps, SandeshUVEPerTypeMap from work_queue import WorkQueue @@ -77,7 +77,8 @@ def init_generator(self, module, source, node_type, instance_id, logger_config_file=logger_config_file) self._logger.info('SANDESH: CONNECT TO COLLECTOR: %s', connect_to_collector) - self._stats = SandeshStats() + from sandesh_stats import SandeshMessageStatistics + self._msg_stats = SandeshMessageStatistics() self._trace = trace.Trace() self._sandesh_request_dict = {} self._uve_type_maps = SandeshUVETypeMaps(self._logger) @@ -209,9 +210,9 @@ def init_collector(self): pass # end init_collector - def stats(self): - return self._stats - # end stats + def msg_stats(self): + return self._msg_stats + # end msg_stats @classmethod def next_seqnum(cls): @@ -363,6 +364,8 @@ def send_sandesh(self, tx_sandesh): self._logger.log( sand_logger.SandeshLogger.get_py_logger_level( tx_sandesh.level()), tx_sandesh.log()) + self._msg_stats.update_tx_stats(tx_sandesh.__class__.__name__, + 0, SandeshTxDropReason.NoClient) # end send_sandesh def send_generator_info(self): @@ -684,6 +687,8 @@ def send(self, sandesh=sandesh_global): try: self.validate() except e: + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.ValidationFailed) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -738,6 +743,8 @@ def request(self, context='', sandesh=sandesh_global): try: self.validate() except e: + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.ValidationFailed) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -766,6 +773,8 @@ def response(self, context='', more=False, sandesh=sandesh_global): try: self.validate() except e: + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.ValidationFailed) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -797,6 +806,8 @@ def send(self, isseq=False, seqno=0, context='', try: self.validate() except e: + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.ValidationFailed) sandesh._logger.error('sandesh "%s" validation failed [%s]' % (self.__class__.__name__, e)) return -1 @@ -807,9 +818,15 @@ def send(self, isseq=False, seqno=0, context='', uve_type_map = sandesh._uve_type_maps.get_uve_type_map( self.__class__.__name__) if uve_type_map is None: + sandesh._logger.error('sandesh uve <%s> not registered: %s'\ + % (self.__class__.__name__, self.log())) + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.ValidationFailed) return -1 self._seqnum = self.next_seqnum() if not uve_type_map.update_uve(self): + sandesh.msg_stats().update_tx_stats(self.__class__.__name__, + 0, SandeshTxDropReason.ValidationFailed) sandesh._logger.error('Failed to update sandesh in cache. ' + self.log()) return -1 diff --git a/library/python/pysandesh/sandesh_client.py b/library/python/pysandesh/sandesh_client.py index b26c7fe3..02be4d63 100644 --- a/library/python/pysandesh/sandesh_client.py +++ b/library/python/pysandesh/sandesh_client.py @@ -11,6 +11,7 @@ from transport import TTransport from protocol import TXMLProtocol from sandesh_uve import SandeshUVETypeMaps +from gen_py.sandesh.ttypes import SandeshTxDropReason, SandeshRxDropReason class SandeshClient(object): @@ -46,8 +47,14 @@ def send_sandesh(self, sandesh): else: if (self._connection.session() is None): error_str = "No Connection" + self._sandesh_instance.msg_stats().update_tx_stats( + sandesh.__class__.__name__, 0, + SandeshTxDropReason.NoSession) else: error_str = "No ModuleId" + self._sandesh_instance.msg_stats().update_tx_stats( + sandesh.__class__.__name__, 0, + SandeshTxDropReason.ClientSendFailed) if self._sandesh_instance.is_logging_dropped_allowed(sandesh): self._logger.error( "SANDESH: %s: %s" % (error_str, sandesh.log())) @@ -58,17 +65,23 @@ def send_uve_sandesh(self, uve_sandesh): self._connection.statemachine().on_sandesh_uve_msg_send(uve_sandesh) #end send_uve_sandesh - def handle_sandesh_msg(self, sandesh_name, sandesh_xml): + def handle_sandesh_msg(self, sandesh_name, sandesh_xml, msg_len): transport = TTransport.TMemoryBuffer(sandesh_xml) protocol_factory = TXMLProtocol.TXMLProtocolFactory() protocol = protocol_factory.getProtocol(transport) sandesh_req = self._sandesh_instance.get_sandesh_request_object(sandesh_name) if sandesh_req: if sandesh_req.read(protocol) == -1: + self._sandesh_instance.update_rx_stats(sandesh_name, msg_len, + SandeshRxDropReason.DecodingFailed) self._logger.error('Failed to decode sandesh request "%s"' \ % (sandesh_name)) else: + self._sandesh_instance.update_rx_stats(sandesh_name, msg_len) self._sandesh_instance.enqueue_sandesh_request(sandesh_req) + else: + self._sandesh_instance.update_rx_stats(sandesh_name, msg_len, + SandeshRxDropReason.CreateFailed) #end handle_sandesh_msg def handle_sandesh_ctrl_msg(self, sandesh_ctrl_msg): diff --git a/library/python/pysandesh/sandesh_connection.py b/library/python/pysandesh/sandesh_connection.py index 0c62e915..4a02303b 100644 --- a/library/python/pysandesh/sandesh_connection.py +++ b/library/python/pysandesh/sandesh_connection.py @@ -8,13 +8,13 @@ import gevent import os -from sandesh_session import SandeshSession -from sandesh_session import SandeshReader -from sandesh_state_machine import SandeshStateMachine, Event from transport import TTransport from protocol import TXMLProtocol -from gen_py.sandesh.constants import * +from sandesh_session import SandeshSession, SandeshReader +from sandesh_state_machine import SandeshStateMachine, Event from sandesh_uve import SandeshUVETypeMaps +from gen_py.sandesh.ttypes import SandeshRxDropReason +from gen_py.sandesh.constants import * class SandeshConnection(object): @@ -147,13 +147,16 @@ def _handle_collector_update(self, collector_info): def _receive_sandesh_msg(self, session, msg): (hdr, hdr_len, sandesh_name) = SandeshReader.extract_sandesh_header(msg) if sandesh_name is None: + self._sandesh_instance.msg_stats().update_rx_stats('__UNKNOWN__', + len(msg), SandeshRxDropReason.DecodingFailed) self._logger.error('Failed to decode sandesh header for "%s"' % (msg)) return - # update the sandesh rx stats - self._sandesh_instance.stats().update_stats(sandesh_name, len(msg), False) if hdr.Hints & SANDESH_CONTROL_HINT: self._logger.debug('Received sandesh control message [%s]' % (sandesh_name)) if sandesh_name != 'SandeshCtrlServerToClient': + self._sandesh_instance.msg_stats().update_rx_stats( + sandesh_name, len(msg), + SandeshRxDropReason.ControlMsgFailed) self._logger.error('Invalid sandesh control message [%s]' % (sandesh_name)) return transport = TTransport.TMemoryBuffer(msg[hdr_len:]) @@ -162,13 +165,19 @@ def _receive_sandesh_msg(self, session, msg): from gen_py.sandesh_ctrl.ttypes import SandeshCtrlServerToClient sandesh_ctrl_msg = SandeshCtrlServerToClient() if sandesh_ctrl_msg.read(protocol) == -1: + self._sandesh_instance.msg_stats().update_rx_stats( + sandesh_name, len(msg), + SandeshRxDropReason.DecodingFailed) self._logger.error('Failed to decode sandesh control message "%s"' %(msg)) else: + self._sandesh_instance.msg_stats().update_rx_stats( + sandesh_name, len(msg)) self._state_machine.on_sandesh_ctrl_msg_receive(session, sandesh_ctrl_msg, hdr.Source) else: self._logger.debug('Received sandesh message [%s]' % (sandesh_name)) - self._client.handle_sandesh_msg(sandesh_name, msg[hdr_len:]) + self._client.handle_sandesh_msg(sandesh_name, + msg[hdr_len:], len(msg)) #end _receive_sandesh_msg #end class SandeshConnection diff --git a/library/python/pysandesh/sandesh_req_impl.py b/library/python/pysandesh/sandesh_req_impl.py index 618603bd..542c1609 100644 --- a/library/python/pysandesh/sandesh_req_impl.py +++ b/library/python/pysandesh/sandesh_req_impl.py @@ -44,7 +44,7 @@ def __init__(self, sandesh): SandeshLoggingParamsStatus.handle_request = \ self.sandesh_logging_params_status_handle_request SandeshMessageStatsReq.handle_request = \ - self.sandesh_stats_handle_request + self.sandesh_msg_stats_handle_request SandeshTraceBufferListRequest.handle_request = \ self.sandesh_trace_buffer_list_request_handle_request SandeshTraceRequest.handle_request = \ @@ -169,28 +169,16 @@ def sandesh_alarm_types_req_handle_request(self, sandesh_req): alarm_types_res.response(sandesh_req.context()) # end sandesh_alarm_types_req_handle_request - def sandesh_stats_handle_request(self, sandesh_req): - sandesh_stats = self._sandesh.stats() - stats_map = sandesh_stats.stats_map() - stats_list = [] - for sandesh_name, stats in stats_map.iteritems(): - msg_stat = SandeshMessageStats(stats.tx_count, - stats.tx_bytes, - stats.rx_count, - stats.rx_bytes) - mtype_stat = SandeshMessageTypeStats(sandesh_name, - msg_stat) - stats_list.append(mtype_stat) - + def sandesh_msg_stats_handle_request(self, sandesh_req): + sandesh_msg_stats = self._sandesh.msg_stats() + msg_type_stats = sandesh_msg_stats.message_type_stats() + msg_stats_list = [] + for msg_type, stats in msg_type_stats.iteritems(): + mtype_stat = SandeshMessageTypeStats(msg_type, stats) + msg_stats_list.append(mtype_stat) gen_stats = SandeshGeneratorStats() - gen_stats.type_stats = stats_list - gen_stats.aggregate_stats = SandeshMessageStats() - gen_stats.aggregate_stats.messages_sent = sandesh_stats._sandesh_sent - gen_stats.aggregate_stats.bytes_sent = sandesh_stats._bytes_sent - gen_stats.aggregate_stats.messages_received = \ - sandesh_stats._sandesh_received - gen_stats.aggregate_stats.bytes_received = \ - sandesh_stats._bytes_received + gen_stats.type_stats = msg_stats_list + gen_stats.aggregate_stats = sandesh_msg_stats.aggregate_stats() connection = self._sandesh.client().connection() if connection and connection.session(): session = connection.session() @@ -203,7 +191,7 @@ def sandesh_stats_handle_request(self, sandesh_req): gen_stats.send_queue_stats = send_queue_stats stats_resp = SandeshMessageStatsResp(gen_stats) stats_resp.response(sandesh_req.context()) - # end sandesh_stats_handle_request + # end sandesh_msg_stats_handle_request def sandesh_trace_buffer_list_request_handle_request(self, sandesh_req): tbuf_info_list = [SandeshTraceBufInfo(trace_buf_name=tbuf) diff --git a/library/python/pysandesh/sandesh_session.py b/library/python/pysandesh/sandesh_session.py index 100fe8d7..7c84e912 100644 --- a/library/python/pysandesh/sandesh_session.py +++ b/library/python/pysandesh/sandesh_session.py @@ -12,6 +12,7 @@ from work_queue import WorkQueue from tcp_session import TcpSession from sandesh_logger import SandeshLogger +from gen_py.sandesh.ttypes import SandeshTxDropReason _XML_SANDESH_OPEN = '' _XML_SANDESH_OPEN_ATTR_LEN = '' % (str(drop_reason)) + # end _update_tx_stats_internal + + def _update_rx_stats_internal(self, msg_stats, nbytes, drop_reason): + if drop_reason is SandeshRxDropReason.NoDrop: + if msg_stats.messages_received: + msg_stats.messages_received += 1 + msg_stats.bytes_received += nbytes + else: + msg_stats.messages_received = 1 + msg_stats.bytes_received = nbytes + else: + if msg_stats.messages_received_dropped: + msg_stats.messages_received_dropped += 1 + msg_stats.bytes_received_dropped += nbytes + else: + msg_stats.messages_received_dropped = 1 + msg_stats.bytes_received_dropped = nbytes + if drop_reason is SandeshRxDropReason.QueueLevel: + if msg_stats.messages_received_dropped_queue_level: + msg_stats.messages_received_dropped_queue_level += 1 + msg_stats.bytes_received_dropped_queue_level += nbytes + else: + msg_stats.messages_received_dropped_queue_level = 1 + msg_stats.bytes_received_dropped_queue_level = nbytes + elif drop_reason is SandeshRxDropReason.NoQueue: + if msg_stats.messages_received_dropped_no_queue: + msg_stats.messages_received_dropped_no_queue += 1 + msg_stats.bytes_received_dropped_no_queue += nbytes + else: + msg_stats.messages_received_dropped_no_queue = 1 + msg_stats.bytes_received_dropped_no_queue = nbytes + elif drop_reason is SandeshRxDropReason.ControlMsgFailed: + if msg_stats.messages_received_dropped_control_msg_failed: + msg_stats.messages_received_dropped_control_msg_failed += 1 + msg_stats.bytes_received_dropped_control_msg_failed += nbytes + else: + msg_stats.messages_received_dropped_control_msg_failed = 1 + msg_stats.bytes_received_dropped_control_msg_failed = nbytes + elif drop_reason is SandeshRxDropReason.CreateFailed: + if msg_stats.messages_received_dropped_create_failed: + msg_stats.messages_received_dropped_create_failed += 1 + msg_stats.bytes_received_dropped_create_failed += nbytes + else: + msg_stats.messages_received_dropped_create_failed = 1 + msg_stats.bytes_received_dropped_create_failed = nbytes + elif drop_reason is SandeshRxDropReason.DecodingFailed: + if msg_stats.messages_received_dropped_decoding_failed: + msg_stats.messages_received_dropped_decoding_failed += 1 + msg_stats.bytes_received_dropped_decoding_failed += nbytes + else: + msg_stats.messages_received_dropped_decoding_failed = 1 + msg_stats.bytes_received_dropped_decoding_failed = nbytes + else: + assert 0, 'Unhandled Rx drop reason <%s>' % (str(drop_reason)) + # end _update_rx_stats_internal + +# end class SandeshMessageStatistics diff --git a/library/python/pysandesh/test/SConscript b/library/python/pysandesh/test/SConscript index d6396393..e243932c 100644 --- a/library/python/pysandesh/test/SConscript +++ b/library/python/pysandesh/test/SConscript @@ -27,7 +27,8 @@ test_modules = [ 'sandesh_trace_test.py', 'sandesh_http_test.py', 'sandesh_uve_alarm_test.py', - 'conn_info_test.py' + 'conn_info_test.py', + 'sandesh_stats_test.py' ] test_modules_rules = [] diff --git a/library/python/pysandesh/test/sandesh_stats_test.py b/library/python/pysandesh/test/sandesh_stats_test.py new file mode 100755 index 00000000..c44c0289 --- /dev/null +++ b/library/python/pysandesh/test/sandesh_stats_test.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + +# +# sandesh_stats_test +# + +import unittest +import sys + +sys.path.insert(1, sys.path[0]+'/../../../python') + +from pysandesh.sandesh_base import Sandesh +from pysandesh.sandesh_stats import SandeshMessageStatistics +from pysandesh.gen_py.sandesh.ttypes import SandeshTxDropReason, \ + SandeshRxDropReason + +class SandeshMessageStatisticsTest(unittest.TestCase): + + def setUp(self): + self.maxDiff = None + self.sandesh_msg_stats = SandeshMessageStatistics() + # end setUp + + def tearDown(self): + pass + # end tearDown + + def _verify_tx_drop_stats(self, stats, tmsg, tbytes, nmsg, nbytes): + self.assertEqual(tmsg, stats.messages_sent_dropped) + self.assertEqual(tbytes, stats.bytes_sent_dropped) + self.assertEqual(nmsg, stats.messages_sent_dropped_validation_failed) + self.assertEqual(nbytes, stats.bytes_sent_dropped_validation_failed) + self.assertEqual(nmsg, stats.messages_sent_dropped_queue_level) + self.assertEqual(nbytes, stats.bytes_sent_dropped_queue_level) + self.assertEqual(nmsg, stats.messages_sent_dropped_no_client) + self.assertEqual(nbytes, stats.bytes_sent_dropped_no_client) + self.assertEqual(nmsg, stats.messages_sent_dropped_no_session) + self.assertEqual(nbytes, stats.bytes_sent_dropped_no_session) + self.assertEqual(nmsg, stats.messages_sent_dropped_no_queue) + self.assertEqual(nbytes, stats.bytes_sent_dropped_no_queue) + self.assertEqual(nmsg, stats.messages_sent_dropped_client_send_failed) + self.assertEqual(nbytes, stats.bytes_sent_dropped_client_send_failed) + self.assertEqual(nmsg, stats.messages_sent_dropped_header_write_failed) + self.assertEqual(nbytes, stats.bytes_sent_dropped_header_write_failed) + self.assertEqual(nmsg, stats.messages_sent_dropped_write_failed) + self.assertEqual(nbytes, stats.bytes_sent_dropped_write_failed) + self.assertEqual(nmsg, stats.messages_sent_dropped_session_not_connected) + self.assertEqual(nbytes, stats.bytes_sent_dropped_session_not_connected) + self.assertEqual(nmsg, stats.messages_sent_dropped_wrong_client_sm_state) + self.assertEqual(nbytes, stats.bytes_sent_dropped_wrong_client_sm_state) + # end _verify_tx_drop_stats + + def test_update_tx_stats(self): + # update tx stats -> default reason - SandeshTxDropReason.NoDrop + self.assertTrue(self.sandesh_msg_stats.update_tx_stats('Message1', 64)) + self.assertTrue(self.sandesh_msg_stats.update_tx_stats('Message2', 192)) + + # update tx stats - drops + # for all SandeshTxDropReason values except SandeshTxDropReason.NoDrop + drop_nmsg = 0 + drop_nbytes = 0 + ndrops_per_reason = 2 + nbytes = 128 + for n in range(ndrops_per_reason): + for reason in range(SandeshTxDropReason.MinDropReason+1, + SandeshTxDropReason.MaxDropReason): + if reason is not SandeshTxDropReason.NoDrop: + self.assertTrue(self.sandesh_msg_stats.update_tx_stats( + 'Message2', nbytes, reason)) + drop_nmsg += 1 + drop_nbytes += nbytes + + # verify aggregate stats + agg_stats = self.sandesh_msg_stats.aggregate_stats() + self.assertEqual(2, agg_stats.messages_sent) + self.assertEqual(256, agg_stats.bytes_sent) + self._verify_tx_drop_stats(agg_stats, drop_nmsg, drop_nbytes, + ndrops_per_reason, ndrops_per_reason*nbytes) + + # verify per msg type stats + # Message1 + msg_type_stats = self.sandesh_msg_stats.message_type_stats()['Message1'] + self.assertEqual(1, msg_type_stats.messages_sent) + self.assertEqual(64, msg_type_stats.bytes_sent) + self._verify_tx_drop_stats(msg_type_stats, None, None, None, None) + # Message2 + msg_type_stats = self.sandesh_msg_stats.message_type_stats()['Message2'] + self.assertEqual(1, msg_type_stats.messages_sent) + self.assertEqual(192, msg_type_stats.bytes_sent) + self._verify_tx_drop_stats(msg_type_stats, drop_nmsg, drop_nbytes, + ndrops_per_reason, ndrops_per_reason*nbytes) + + # Invalid tx drop reason + self.assertFalse(self.sandesh_msg_stats.update_tx_stats('Message1', 64, + SandeshTxDropReason.MinDropReason)) + self.assertFalse(self.sandesh_msg_stats.update_tx_stats('Message1', 64, + SandeshTxDropReason.MinDropReason-1)) + self.assertFalse(self.sandesh_msg_stats.update_tx_stats('Message1', 64, + SandeshTxDropReason.MaxDropReason)) + self.assertFalse(self.sandesh_msg_stats.update_tx_stats('Message1', 64, + SandeshTxDropReason.MaxDropReason+1)) + # end test_update_tx_stats + + def _verify_rx_drop_stats(self, stats, tmsg, tbytes, nmsg, nbytes): + self.assertEqual(tmsg, stats.messages_received_dropped) + self.assertEqual(tbytes, stats.bytes_received_dropped) + self.assertEqual(nmsg, stats.messages_received_dropped_queue_level) + self.assertEqual(nbytes, stats.bytes_received_dropped_queue_level) + self.assertEqual(nmsg, stats.messages_received_dropped_no_queue) + self.assertEqual(nbytes, stats.bytes_received_dropped_no_queue) + self.assertEqual(nmsg, stats.messages_received_dropped_control_msg_failed) + self.assertEqual(nbytes, stats.bytes_received_dropped_control_msg_failed) + self.assertEqual(nmsg, stats.messages_received_dropped_create_failed) + self.assertEqual(nbytes, stats.bytes_received_dropped_create_failed) + self.assertEqual(nmsg, stats.messages_received_dropped_decoding_failed) + self.assertEqual(nbytes, stats.bytes_received_dropped_decoding_failed) + # end _verify_rx_drop_stats + + def test_update_rx_stats(self): + # update rx stats -> default reason - SandeshRxDropReason.NoDrop + self.assertTrue(self.sandesh_msg_stats.update_rx_stats('Message1', 64)) + self.assertTrue(self.sandesh_msg_stats.update_rx_stats('Message2', 192)) + + # update rx stats - drops + # for all SandeshRxDropReason values except SandeshRxDropReason.NoDrop + drop_nmsg = 0 + drop_nbytes = 0 + ndrops_per_reason = 2 + nbytes = 128 + for n in range(ndrops_per_reason): + for reason in range(SandeshRxDropReason.MinDropReason+1, + SandeshRxDropReason.MaxDropReason): + if reason is not SandeshRxDropReason.NoDrop: + self.assertTrue(self.sandesh_msg_stats.update_rx_stats( + 'Message2', nbytes, reason)) + drop_nmsg += 1 + drop_nbytes += nbytes + + # verify aggregate stats + agg_stats = self.sandesh_msg_stats.aggregate_stats() + self.assertEqual(2, agg_stats.messages_received) + self.assertEqual(256, agg_stats.bytes_received) + self._verify_rx_drop_stats(agg_stats, drop_nmsg, drop_nbytes, + ndrops_per_reason, ndrops_per_reason*nbytes) + + # verify per msg type stats + # Message1 + msg_type_stats = self.sandesh_msg_stats.message_type_stats()['Message1'] + self.assertEqual(1, msg_type_stats.messages_received) + self.assertEqual(64, msg_type_stats.bytes_received) + self._verify_rx_drop_stats(msg_type_stats, None, None, None, None) + # Message2 + msg_type_stats = self.sandesh_msg_stats.message_type_stats()['Message2'] + self.assertEqual(1, msg_type_stats.messages_received) + self.assertEqual(192, msg_type_stats.bytes_received) + self._verify_rx_drop_stats(msg_type_stats, drop_nmsg, drop_nbytes, + ndrops_per_reason, ndrops_per_reason*nbytes) + + # Invalid rx drop reason + self.assertFalse(self.sandesh_msg_stats.update_rx_stats('Message1', 32, + SandeshRxDropReason.MinDropReason)) + self.assertFalse(self.sandesh_msg_stats.update_rx_stats('Message1', 64, + SandeshRxDropReason.MinDropReason-1)) + self.assertFalse(self.sandesh_msg_stats.update_rx_stats('Message1', 96, + SandeshRxDropReason.MaxDropReason)) + self.assertFalse(self.sandesh_msg_stats.update_rx_stats('Message1', 64, + SandeshRxDropReason.MaxDropReason+1)) + # end test_update_rx_stats + +# end class SandeshMessageStatisticsTest + +if __name__ == '__main__': + unittest.main(verbosity=2, catchbreak=True)