Skip to content

Commit

Permalink
Add generator back pressure mechanism in collector. Defer the
Browse files Browse the repository at this point in the history
state machine dequeue when the database queue crosses a threshold
so that no more entries are enqueued to the database queue.
Similarly defer the session reader when the state machine queue
crosses a threshold so that no more events are enqueued to the
state machine queue. Deferring the session reader should lead
to the socket transmit buffer on the generator getting full
and build up of the transmit sandesh queue and eventually
drop of the sandesh messages there.
Closes-Bug: #1447349
Change-Id: Id14f97f7d9a035569eb22541e888202f2886d4dd
(cherry picked from commit b5a8d14)
  • Loading branch information
Megh Bhatt committed Apr 30, 2015
1 parent 49e00ad commit feb6c0d
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 16 deletions.
2 changes: 1 addition & 1 deletion library/cpp/sandesh.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Sandesh {
};
};

typedef boost::tuple<size_t, SandeshLevel::type, bool> QueueWaterMarkInfo;
typedef boost::tuple<size_t, SandeshLevel::type, bool, bool> QueueWaterMarkInfo;
typedef boost::function<void (std::string serviceName, uint8_t numbOfInstances,
DiscoveryServiceClient::ServiceHandler)> CollectorSubFn;
// Initialization APIs
Expand Down
12 changes: 6 additions & 6 deletions library/cpp/sandesh_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ const std::string SandeshClient::kSessionReaderTask = "io::ReaderTask";
bool SandeshClient::task_policy_set_ = false;
const std::vector<Sandesh::QueueWaterMarkInfo>
SandeshClient::kSessionWaterMarkInfo = boost::assign::tuple_list_of
(100000, SandeshLevel::SYS_EMERG, true)
(50000, SandeshLevel::SYS_ERR, true)
(10000, SandeshLevel::SYS_DEBUG, true)
(75000, SandeshLevel::SYS_ERR, false)
(10000, SandeshLevel::SYS_DEBUG, false)
(2500, SandeshLevel::INVALID, false);
(100000, SandeshLevel::SYS_EMERG, true, false)
(50000, SandeshLevel::SYS_ERR, true, false)
(10000, SandeshLevel::SYS_DEBUG, true, false)
(75000, SandeshLevel::SYS_ERR, false, false)
(10000, SandeshLevel::SYS_DEBUG, false, false)
(2500, SandeshLevel::INVALID, false, false);

SandeshClient::SandeshClient(EventManager *evm,
Endpoint primary, Endpoint secondary, Sandesh::CollectorSubFn csf)
Expand Down
36 changes: 32 additions & 4 deletions library/cpp/sandesh_state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -774,26 +774,49 @@ void SandeshStateMachine::Enqueue(const Ev &event) {
}

void SandeshStateMachine::SetSandeshMessageDropLevel(size_t queue_count,
SandeshLevel::type level) {
SandeshLevel::type level, boost::function<void (void)> cb) {
if (message_drop_level_ != level) {
SM_LOG(INFO, "SANDESH MESSAGE DROP LEVEL: [" <<
Sandesh::LevelToString(message_drop_level_) << "] -> [" <<
Sandesh::LevelToString(level) << "], SM QUEUE COUNT: " <<
queue_count);
message_drop_level_ = level;
if (!cb.empty()) {
cb();
}
}
}

void SandeshStateMachine::SetDeferSessionReader(bool defer_reader) {
if (session_ != NULL) {
SM_LOG(INFO, "SANDESH Session Reader Defer : " << defer_reader);
session_->SetDeferReader(defer_reader);
}
}

void SandeshStateMachine::SetQueueWaterMarkInfo(
Sandesh::QueueWaterMarkInfo &wm) {
bool high(boost::get<2>(wm));
size_t queue_count(boost::get<0>(wm));
WaterMarkInfo wmi(queue_count,
boost::bind(&SandeshStateMachine::SetSandeshMessageDropLevel,
this, _1, boost::get<1>(wm)));
bool defer_undefer(boost::get<3>(wm));
boost::function<void (void)> cb;
if (high) {
if (defer_undefer) {
cb = boost::bind(&SandeshStateMachine::SetDeferSessionReader,
this, true);
}
WaterMarkInfo wmi(queue_count,
boost::bind(&SandeshStateMachine::SetSandeshMessageDropLevel,
this, _1, boost::get<1>(wm), cb));
work_queue_.SetHighWaterMark(wmi);
} else {
if (defer_undefer) {
cb = boost::bind(&SandeshStateMachine::SetDeferSessionReader,
this, false);
}
WaterMarkInfo wmi(queue_count,
boost::bind(&SandeshStateMachine::SetSandeshMessageDropLevel,
this, _1, boost::get<1>(wm), cb));
work_queue_.SetLowWaterMark(wmi);
}
}
Expand Down Expand Up @@ -836,3 +859,8 @@ size_t SandeshStateMachine::EventQueue::AtomicDecrementQueueCount(
return count_.fetch_and_decrement() - 1;
}
}

void SandeshStateMachine::SetDeferDequeue(bool defer_dequeue) {
SM_LOG(INFO, "SANDESH Set Defer Dequeue: " << defer_dequeue);
work_queue_.set_disable(defer_dequeue);
}
4 changes: 3 additions & 1 deletion library/cpp/sandesh_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class SandeshStateMachine :

void Initialize();
void SetAdminState(bool down);
void SetDeferDequeue(bool defer);

// State transitions
template <class Ev> void OnIdle(const Ev &event);
Expand Down Expand Up @@ -178,7 +179,8 @@ class SandeshStateMachine :
void UpdateEventEnqueueFail(const sc::event_base &event);
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail);
void SetSandeshMessageDropLevel(size_t queue_count,
SandeshLevel::type level);
SandeshLevel::type level, boost::function<void (void)> cb);
void SetDeferSessionReader(bool defer_reader);

const char *prefix_;
typedef WorkQueue<EventContainer> EventQueue;
Expand Down
79 changes: 75 additions & 4 deletions library/cpp/test/sandesh_state_machine_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class SandeshSessionMock : public SandeshSession {
TaskScheduler::GetInstance()->GetTaskId("sandesh::Test::StateMachine"),
TaskScheduler::GetInstance()->GetTaskId("io::ReaderTask")),
state_(state),
direction_(direction) {
direction_(direction),
defer_reader_(false) {
}

~SandeshSessionMock() {
Expand All @@ -69,12 +70,20 @@ class SandeshSessionMock : public SandeshSession {
SandeshSession::Close();
}

virtual void SetDeferReader(bool defer_reader) {
defer_reader_ = defer_reader;
}

virtual bool IsReaderDeferred() const {
return defer_reader_;
}
State state() { return state_; }
Direction direction() { return direction_; }

private:
State state_;
Direction direction_;
bool defer_reader_;
};

class SandeshServerMock : public SandeshServer {
Expand Down Expand Up @@ -125,7 +134,8 @@ class SandeshServerMock : public SandeshServer {
SandeshSessionMock *old_session_;
};

const std::string FakeMessageSandeshBegin("<FakeSandesh type=\"sandesh\">");
const std::string FakeMessageSandeshBegin("<FakeSandesh type=\"sandesh\"><str1 type=\"string\" identifier=\"1\">");
const std::string FakeMessageSandeshEnd("</str1></FakeSandesh>");

static void CreateFakeMessage(uint8_t *data, size_t length) {
size_t offset = 0;
Expand Down Expand Up @@ -163,8 +173,11 @@ static void CreateFakeMessage(uint8_t *data, size_t length) {
memcpy(data + offset, FakeMessageSandeshBegin.c_str(),
FakeMessageSandeshBegin.size());
offset += FakeMessageSandeshBegin.size();
memset(data + offset, '0', length - offset);
offset += length - offset;
memset(data + offset, '0', length - offset - FakeMessageSandeshEnd.size());
offset += length - offset - FakeMessageSandeshEnd.size();
memcpy(data + offset, FakeMessageSandeshEnd.c_str(),
FakeMessageSandeshEnd.size());
offset += FakeMessageSandeshEnd.size();
EXPECT_EQ(offset, length);
}

Expand All @@ -179,6 +192,7 @@ class SandeshServerStateMachineTest : public ::testing::Test {
task_util::WaitForIdle();
sm_ = connection_->state_machine();
sm_->set_idle_hold_time(1);
sm_->SetGeneratorKey("Test");
}

~SandeshServerStateMachineTest() {
Expand Down Expand Up @@ -318,6 +332,9 @@ class SandeshServerStateMachineTest : public ::testing::Test {
string xml((const char *)msg, sizeof(msg));
sm_->OnSandeshMessage(session, xml);
}
SandeshLevel::type MessageDropLevel() const {
return sm_->message_drop_level_;
}

bool IdleHoldTimerRunning() { return sm_->idle_hold_timer_->running(); }

Expand Down Expand Up @@ -385,6 +402,60 @@ TEST_F(SandeshServerStateMachineTest, Matrix) {
}
}

TEST_F(SandeshServerStateMachineTest, WaterMark) {
// Set watermarks
std::vector<Sandesh::QueueWaterMarkInfo> wm_info =
boost::assign::tuple_list_of
(1*1024, SandeshLevel::SYS_EMERG, true, true)
(512, SandeshLevel::INVALID, false, true);
for (int i = 0; i < wm_info.size(); i++) {
sm_->SetQueueWaterMarkInfo(wm_info[i]);
}
// Verify initial message drop level and the session reader defer status
EXPECT_EQ(SandeshLevel::INVALID, MessageDropLevel());
EXPECT_TRUE(sm_->session() == NULL);
// Move to ssm::SERVER_INIT
GetToState(ssm::SERVER_INIT);
// Stop the task scheduler
TaskScheduler::GetInstance()->Stop();
// Enqueue 1 message 1024 byte
EvSandeshMessageRecv();
// Verify the message drop level and the session reader defer status
EXPECT_EQ(SandeshLevel::SYS_EMERG, MessageDropLevel());
EXPECT_TRUE(sm_->session()->IsReaderDeferred());
// Start the task scheduler
TaskScheduler::GetInstance()->Start();
task_util::WaitForIdle();
// Verify the message drop level and the session reader defer status
EXPECT_EQ(SandeshLevel::INVALID, MessageDropLevel());
EXPECT_FALSE(sm_->session()->IsReaderDeferred());
}

TEST_F(SandeshServerStateMachineTest, DeferDequeue) {
// Move to ssm::SERVER_INIT
GetToState(ssm::SERVER_INIT);
// Verify the state machine queue is empty
uint64_t sm_queue_count;
ASSERT_TRUE(sm_->GetQueueCount(sm_queue_count));
EXPECT_EQ(0, sm_queue_count);
// Defer the dequeue
sm_->SetDeferDequeue(true);
// Enqueue 1 message
EvSandeshMessageRecv();
// Verify that state machine queue now has 1 entry
ASSERT_TRUE(sm_->GetQueueCount(sm_queue_count));
EXPECT_EQ(1024, sm_queue_count);
// Verify the state machine state
EXPECT_EQ(ssm::SERVER_INIT, sm_->get_state());
// Undefer the dequeue
sm_->SetDeferDequeue(false);
task_util::WaitForIdle();
// Verify the state machine queue is empty
ASSERT_TRUE(sm_->GetQueueCount(sm_queue_count));
EXPECT_EQ(0, sm_queue_count);
// Verify the state machine state
EXPECT_EQ(ssm::SERVER_INIT, sm_->get_state());
}

class SandeshServerStateMachineIdleTest : public SandeshServerStateMachineTest {
virtual void SetUp() {
Expand Down

0 comments on commit feb6c0d

Please sign in to comment.