From ff67ee2137eb9b81651b2981787a3aab44d2e057 Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Tue, 19 May 2015 16:24:57 -0700 Subject: [PATCH] In cases when cdb connection drops for a generator and it reconnects we can end up in a situation where the cdb queue watermark info is duplicated since the cdb queue watermark set is called multiple times and this can cause issues when processing watermarks in WorkQueue. Fix is to uniquify the watermarks when setting them in WorkQueue. Closes-Bug: #1453236 Change-Id: I166a65c371b1815fc9fca43f8b56cc7e2d891a0c (cherry picked from commit 8905174c55361f8dc5cf8caaa634d4229ae273d4) --- src/base/queue_task.h | 37 ++++++++++++++++++------- src/base/test/queue_task_test.cc | 46 ++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 10 deletions(-) diff --git a/src/base/queue_task.h b/src/base/queue_task.h index 76eae569b6c..0ddb7317ae2 100644 --- a/src/base/queue_task.h +++ b/src/base/queue_task.h @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -32,6 +33,8 @@ struct WaterMarkInfo { } friend inline bool operator<(const WaterMarkInfo& lhs, const WaterMarkInfo& rhs); + friend inline bool operator==(const WaterMarkInfo& lhs, + const WaterMarkInfo& rhs); size_t count_; WaterMarkCallback cb_; }; @@ -40,6 +43,10 @@ inline bool operator<(const WaterMarkInfo& lhs, const WaterMarkInfo& rhs) { return lhs.count_ < rhs.count_; } +inline bool operator==(const WaterMarkInfo& lhs, const WaterMarkInfo& rhs) { + return lhs.count_ == rhs.count_; +} + typedef std::vector WaterMarkInfos; template @@ -202,16 +209,21 @@ class WorkQueue { void SetHighWaterMark(const WaterMarkInfos &high_water) { tbb::spin_rw_mutex::scoped_lock write_lock(hwater_mutex_, true); + // Eliminate duplicates and sort by converting to set + std::set hwater_set(high_water.begin(), + high_water.end()); hwater_index_ = -1; - high_water_ = high_water; - std::sort(high_water_.begin(), high_water_.end()); + high_water_ = WaterMarkInfos(hwater_set.begin(), hwater_set.end()); } void SetHighWaterMark(const WaterMarkInfo& hwm_info) { tbb::spin_rw_mutex::scoped_lock write_lock(hwater_mutex_, true); + // Eliminate duplicates and sort by converting to set + std::set hwater_set(high_water_.begin(), + high_water_.end()); + hwater_set.insert(hwm_info); hwater_index_ = -1; - high_water_.push_back(hwm_info); - std::sort(high_water_.begin(), high_water_.end()); + high_water_ = WaterMarkInfos(hwater_set.begin(), hwater_set.end()); } void ResetHighWaterMark() { @@ -227,16 +239,21 @@ class WorkQueue { void SetLowWaterMark(const WaterMarkInfos &low_water) { tbb::spin_rw_mutex::scoped_lock write_lock(lwater_mutex_, true); + // Eliminate duplicates and sort by converting to set + std::set lwater_set(low_water.begin(), + low_water.end()); lwater_index_ = -1; - low_water_ = low_water; - std::sort(low_water_.begin(), low_water_.end()); + low_water_ = WaterMarkInfos(lwater_set.begin(), lwater_set.end()); } void SetLowWaterMark(const WaterMarkInfo& lwm_info) { tbb::spin_rw_mutex::scoped_lock write_lock(lwater_mutex_, true); + // Eliminate duplicates and sort by converting to set + std::set lwater_set(low_water_.begin(), + low_water_.end()); + lwater_set.insert(lwm_info); lwater_index_ = -1; - low_water_.push_back(lwm_info); - std::sort(low_water_.begin(), low_water_.end()); + low_water_ = WaterMarkInfos(lwater_set.begin(), lwater_set.end()); } void ResetLowWaterMark() { @@ -526,8 +543,8 @@ class WorkQueue { // Sorted in ascending order WaterMarkInfos high_water_; // When queue count goes above WaterMarkInfos low_water_; // When queue count goes below - tbb::spin_rw_mutex hwater_mutex_; - tbb::spin_rw_mutex lwater_mutex_; + mutable tbb::spin_rw_mutex hwater_mutex_; + mutable tbb::spin_rw_mutex lwater_mutex_; tbb::atomic hwater_index_; tbb::atomic lwater_index_; diff --git a/src/base/test/queue_task_test.cc b/src/base/test/queue_task_test.cc index 83798d2ad2d..fa1715459c2 100644 --- a/src/base/test/queue_task_test.cc +++ b/src/base/test/queue_task_test.cc @@ -8,6 +8,7 @@ #include "testing/gunit.h" #include +#include #include "base/logging.h" #include "base/queue_task.h" #include "base/test/task_test_util.h" @@ -943,6 +944,51 @@ TEST_F(QueueTaskWaterMarkTest, Basic) { EXPECT_TRUE(qcount_.empty()); } +TEST_F(QueueTaskWaterMarkTest, Duplicates) { + // Setup high watermarks + WaterMarkInfo hwm3(12 * 1024, NULL); + work_queue_.SetHighWaterMark(hwm3); + WaterMarkInfo hwm1(4 * 1024, NULL); + work_queue_.SetHighWaterMark(hwm1); + WaterMarkInfo hwm2(8 * 1024, NULL); + work_queue_.SetHighWaterMark(hwm2); + WaterMarkInfo hwm5(8 * 1024, NULL); + work_queue_.SetHighWaterMark(hwm5); + WaterMarkInfo hwm4(4 * 1024, NULL); + work_queue_.SetHighWaterMark(hwm4); + WaterMarkInfo hwm6(12 * 1024, NULL); + work_queue_.SetHighWaterMark(hwm6); + // Verify that no duplicates exist and the watermarks are sorted + WaterMarkInfos expected_hwms = boost::assign::list_of + (WaterMarkInfo(4 * 1024, NULL)) + (WaterMarkInfo(8 * 1024, NULL)) + (WaterMarkInfo(12 * 1024, NULL)); + WaterMarkInfos actual_hwms = work_queue_.GetHighWaterMark(); + EXPECT_EQ(actual_hwms, expected_hwms); + // Setup low watermarks + WaterMarkInfo lwm1(10 * 1024, NULL); + WaterMarkInfo lwm2(6 * 1024, NULL); + WaterMarkInfo lwm3(2 * 1024, NULL); + WaterMarkInfo lwm4(10 * 1024, NULL); + WaterMarkInfo lwm5(6 * 1024, NULL); + WaterMarkInfo lwm6(2 * 1024, NULL); + WaterMarkInfos lwm; + lwm.push_back(lwm1); + lwm.push_back(lwm2); + lwm.push_back(lwm3); + lwm.push_back(lwm4); + lwm.push_back(lwm5); + lwm.push_back(lwm6); + work_queue_.SetLowWaterMark(lwm); + // Verify that no duplicates exist and the watermarks are sorted + WaterMarkInfos expected_lwms = boost::assign::list_of + (WaterMarkInfo(2 * 1024, NULL)) + (WaterMarkInfo(6 * 1024, NULL)) + (WaterMarkInfo(10 * 1024, NULL)); + WaterMarkInfos actual_lwms(work_queue_.GetLowWaterMark()); + EXPECT_EQ(actual_lwms, expected_lwms); +} + int main(int argc, char *argv[]) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();