Skip to content

Commit

Permalink
In cases when cdb connection drops for a generator and it reconnects
Browse files Browse the repository at this point in the history
we can end up in a situation where the cdb queue watermark info is
duplicated since the cdb queue watermark set is called multiple times
and this can cause issues when processing watermarks in WorkQueue.
Fix is to uniquify the watermarks when setting them in WorkQueue.
Closes-Bug: #1453236

Change-Id: I166a65c371b1815fc9fca43f8b56cc7e2d891a0c
(cherry picked from commit 8905174)
  • Loading branch information
Megh Bhatt committed May 27, 2015
1 parent 8ef64c9 commit ff67ee2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 10 deletions.
37 changes: 27 additions & 10 deletions src/base/queue_task.h
Expand Up @@ -14,6 +14,7 @@

#include <algorithm>
#include <vector>
#include <set>

#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
Expand All @@ -32,6 +33,8 @@ struct WaterMarkInfo {
}
friend inline bool operator<(const WaterMarkInfo& lhs,
const WaterMarkInfo& rhs);
friend inline bool operator==(const WaterMarkInfo& lhs,
const WaterMarkInfo& rhs);
size_t count_;
WaterMarkCallback cb_;
};
Expand All @@ -40,6 +43,10 @@ inline bool operator<(const WaterMarkInfo& lhs, const WaterMarkInfo& rhs) {
return lhs.count_ < rhs.count_;
}

inline bool operator==(const WaterMarkInfo& lhs, const WaterMarkInfo& rhs) {
return lhs.count_ == rhs.count_;
}

typedef std::vector<WaterMarkInfo> WaterMarkInfos;

template <typename QueueEntryT, typename QueueT>
Expand Down Expand Up @@ -202,16 +209,21 @@ class WorkQueue {

void SetHighWaterMark(const WaterMarkInfos &high_water) {
tbb::spin_rw_mutex::scoped_lock write_lock(hwater_mutex_, true);
// Eliminate duplicates and sort by converting to set
std::set<WaterMarkInfo> hwater_set(high_water.begin(),
high_water.end());
hwater_index_ = -1;
high_water_ = high_water;
std::sort(high_water_.begin(), high_water_.end());
high_water_ = WaterMarkInfos(hwater_set.begin(), hwater_set.end());
}

void SetHighWaterMark(const WaterMarkInfo& hwm_info) {
tbb::spin_rw_mutex::scoped_lock write_lock(hwater_mutex_, true);
// Eliminate duplicates and sort by converting to set
std::set<WaterMarkInfo> hwater_set(high_water_.begin(),
high_water_.end());
hwater_set.insert(hwm_info);
hwater_index_ = -1;
high_water_.push_back(hwm_info);
std::sort(high_water_.begin(), high_water_.end());
high_water_ = WaterMarkInfos(hwater_set.begin(), hwater_set.end());
}

void ResetHighWaterMark() {
Expand All @@ -227,16 +239,21 @@ class WorkQueue {

void SetLowWaterMark(const WaterMarkInfos &low_water) {
tbb::spin_rw_mutex::scoped_lock write_lock(lwater_mutex_, true);
// Eliminate duplicates and sort by converting to set
std::set<WaterMarkInfo> lwater_set(low_water.begin(),
low_water.end());
lwater_index_ = -1;
low_water_ = low_water;
std::sort(low_water_.begin(), low_water_.end());
low_water_ = WaterMarkInfos(lwater_set.begin(), lwater_set.end());
}

void SetLowWaterMark(const WaterMarkInfo& lwm_info) {
tbb::spin_rw_mutex::scoped_lock write_lock(lwater_mutex_, true);
// Eliminate duplicates and sort by converting to set
std::set<WaterMarkInfo> lwater_set(low_water_.begin(),
low_water_.end());
lwater_set.insert(lwm_info);
lwater_index_ = -1;
low_water_.push_back(lwm_info);
std::sort(low_water_.begin(), low_water_.end());
low_water_ = WaterMarkInfos(lwater_set.begin(), lwater_set.end());
}

void ResetLowWaterMark() {
Expand Down Expand Up @@ -526,8 +543,8 @@ class WorkQueue {
// Sorted in ascending order
WaterMarkInfos high_water_; // When queue count goes above
WaterMarkInfos low_water_; // When queue count goes below
tbb::spin_rw_mutex hwater_mutex_;
tbb::spin_rw_mutex lwater_mutex_;
mutable tbb::spin_rw_mutex hwater_mutex_;
mutable tbb::spin_rw_mutex lwater_mutex_;
tbb::atomic<int> hwater_index_;
tbb::atomic<int> lwater_index_;

Expand Down
46 changes: 46 additions & 0 deletions src/base/test/queue_task_test.cc
Expand Up @@ -8,6 +8,7 @@

#include "testing/gunit.h"
#include <boost/bind.hpp>
#include <boost/assign/list_of.hpp>
#include "base/logging.h"
#include "base/queue_task.h"
#include "base/test/task_test_util.h"
Expand Down Expand Up @@ -943,6 +944,51 @@ TEST_F(QueueTaskWaterMarkTest, Basic) {
EXPECT_TRUE(qcount_.empty());
}

TEST_F(QueueTaskWaterMarkTest, Duplicates) {
// Setup high watermarks
WaterMarkInfo hwm3(12 * 1024, NULL);
work_queue_.SetHighWaterMark(hwm3);
WaterMarkInfo hwm1(4 * 1024, NULL);
work_queue_.SetHighWaterMark(hwm1);
WaterMarkInfo hwm2(8 * 1024, NULL);
work_queue_.SetHighWaterMark(hwm2);
WaterMarkInfo hwm5(8 * 1024, NULL);
work_queue_.SetHighWaterMark(hwm5);
WaterMarkInfo hwm4(4 * 1024, NULL);
work_queue_.SetHighWaterMark(hwm4);
WaterMarkInfo hwm6(12 * 1024, NULL);
work_queue_.SetHighWaterMark(hwm6);
// Verify that no duplicates exist and the watermarks are sorted
WaterMarkInfos expected_hwms = boost::assign::list_of
(WaterMarkInfo(4 * 1024, NULL))
(WaterMarkInfo(8 * 1024, NULL))
(WaterMarkInfo(12 * 1024, NULL));
WaterMarkInfos actual_hwms = work_queue_.GetHighWaterMark();
EXPECT_EQ(actual_hwms, expected_hwms);
// Setup low watermarks
WaterMarkInfo lwm1(10 * 1024, NULL);
WaterMarkInfo lwm2(6 * 1024, NULL);
WaterMarkInfo lwm3(2 * 1024, NULL);
WaterMarkInfo lwm4(10 * 1024, NULL);
WaterMarkInfo lwm5(6 * 1024, NULL);
WaterMarkInfo lwm6(2 * 1024, NULL);
WaterMarkInfos lwm;
lwm.push_back(lwm1);
lwm.push_back(lwm2);
lwm.push_back(lwm3);
lwm.push_back(lwm4);
lwm.push_back(lwm5);
lwm.push_back(lwm6);
work_queue_.SetLowWaterMark(lwm);
// Verify that no duplicates exist and the watermarks are sorted
WaterMarkInfos expected_lwms = boost::assign::list_of
(WaterMarkInfo(2 * 1024, NULL))
(WaterMarkInfo(6 * 1024, NULL))
(WaterMarkInfo(10 * 1024, NULL));
WaterMarkInfos actual_lwms(work_queue_.GetLowWaterMark());
EXPECT_EQ(actual_lwms, expected_lwms);
}

int main(int argc, char *argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down

0 comments on commit ff67ee2

Please sign in to comment.