Skip to content

Commit

Permalink
Merge "Allowing flow data that is older than latest T2 value in cache…
Browse files Browse the repository at this point in the history
…, causes lot of cassandra writes to the stats table. This fixes tries to reduce that by having another extra cache which stores the penultimate T2 value along with final value. Any flow data with T2 older than the penultimate value are going to be dropped. This allows only 16 seconds of flow data that can arrive delayed. Closes-Bug:1557763"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Mar 29, 2016
2 parents bb273d5 + 3b50b5e commit fd4f550
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 16 deletions.
57 changes: 42 additions & 15 deletions src/analytics/db_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ using process::ConnectionType;
using process::ConnectionStatus;

uint32_t DbHandler::field_cache_t2_ = 0;
std::set<std::string> DbHandler::field_cache_set_;
std::set<std::string> DbHandler::field_cache_set_[2];
uint32_t DbHandler::field_cache_old_t2_ = 0;
uint8_t DbHandler::old_t2_index_ = 0;
uint8_t DbHandler::new_t2_index_ = 1;
tbb::mutex DbHandler::fmutex_;

DbHandler::DbHandler(EventManager *evm,
Expand Down Expand Up @@ -720,27 +723,16 @@ void DbHandler::FieldNamesTableInsert(uint64_t timestamp,
std::string table_name(table_prefix);
table_name.append(field_name);

/* Check if fieldname and value were already seen in this T2
/* Check if fieldname and value were already seen in this T2;
2 caches are mainted one for last T2 and T2-1.
We only need to record them if they have NOT been seen yet */
bool record = false;
std::string fc_entry(table_name);
fc_entry.append(":");
fc_entry.append(field_val);
{
tbb::mutex::scoped_lock lock(fmutex_);
if (temp_u32 > field_cache_t2_) {
field_cache_set_.clear();
field_cache_t2_ = temp_u32;
}
if (temp_u32 == field_cache_t2_) {
if (field_cache_set_.find(fc_entry) == field_cache_set_.end()) {
field_cache_set_.insert(fc_entry);
record = true;
}
} else {
/* This is an old time-stamp */
record = true;
}
record = CanRecordDataForT2(temp_u32, fc_entry);
}

if (!record) return;
Expand All @@ -767,6 +759,41 @@ void DbHandler::FieldNamesTableInsert(uint64_t timestamp,

}

/*
* This function checks if the data can be recorded or not
* for the given t2. If t2 corresponding to the data is
* older than field_cache_old_t2_ and field_cache_t2_
* it is ignored
*/
bool DbHandler::CanRecordDataForT2(uint32_t temp_u32, std::string fc_entry) {
bool record = false;
if (temp_u32 > field_cache_t2_) {
// swap old and new index; clear the old cache
old_t2_index_ = new_t2_index_;
new_t2_index_ = (new_t2_index_ == 1)?0:1;
field_cache_old_t2_ = field_cache_t2_;
field_cache_set_[new_t2_index_].clear();
field_cache_t2_ = temp_u32;
} else if (temp_u32 > field_cache_old_t2_ && temp_u32 != field_cache_t2_){
field_cache_set_[old_t2_index_].clear();
field_cache_old_t2_ = temp_u32;
}
// Record only if not found in last or last but one T2 cache.
if (temp_u32 == field_cache_t2_) {
if (field_cache_set_[new_t2_index_].find(fc_entry) ==
field_cache_set_[new_t2_index_].end()) {
field_cache_set_[new_t2_index_].insert(fc_entry);
record = true;
}
} else if (temp_u32 == field_cache_old_t2_) {
if (field_cache_set_[old_t2_index_].find(fc_entry) ==
field_cache_set_[old_t2_index_].end()) {
field_cache_set_[old_t2_index_].insert(fc_entry);
record = true;
}
}
return record;
}
void DbHandler::GetRuleMap(RuleMap& rulemap) {
}

Expand Down
7 changes: 6 additions & 1 deletion src/analytics/db_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,18 @@ class DbHandler {
mutable tbb::mutex smutex_;
TtlMap ttl_map_;
static uint32_t field_cache_t2_;
static std::set<std::string> field_cache_set_;
static std::set<std::string> field_cache_set_[2];
static uint32_t field_cache_old_t2_;
static uint8_t old_t2_index_;
static uint8_t new_t2_index_;
static tbb::mutex fmutex_;
bool use_cql_;
std::string tablespace_;
UniformInt8RandomGenerator gen_partition_no_;
std::string zookeeper_server_list_;
bool use_zookeeper_;
bool CanRecordDataForT2(uint32_t, std::string);
friend class DbHandlerTest;
DISALLOW_COPY_AND_ASSIGN(DbHandler);
};

Expand Down
104 changes: 104 additions & 0 deletions src/analytics/test/db_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ using namespace GenDb;

TtlMap ttl_map = g_viz_constants.TtlValuesDefault;

struct DbHandlerCacheParam {
uint32_t field_cache_t2_;
std::set<std::string> field_cache_set_[2];
uint32_t field_cache_old_t2_;
uint8_t old_t2_index_;
uint8_t new_t2_index_;
};

class DbHandlerTest : public ::testing::Test {
public:
DbHandlerTest() :
Expand Down Expand Up @@ -74,6 +82,20 @@ class DbHandlerTest : public ::testing::Test {
return db_handler_;
}

struct DbHandlerCacheParam GetDbHandlerCacheParam() {
db_handler_cache_param_.field_cache_t2_ = DbHandler::field_cache_t2_;
db_handler_cache_param_.field_cache_set_[0] = DbHandler::field_cache_set_[0];
db_handler_cache_param_.field_cache_set_[1] = DbHandler::field_cache_set_[1];
db_handler_cache_param_.field_cache_old_t2_ = DbHandler::field_cache_old_t2_;
db_handler_cache_param_.old_t2_index_ = DbHandler::old_t2_index_;
db_handler_cache_param_.new_t2_index_ = DbHandler::new_t2_index_;
return db_handler_cache_param_;
}

bool WriteToCache(uint32_t temp_t2, std::string fc_entry) {
return db_handler()->CanRecordDataForT2(temp_t2, fc_entry);
}

protected:
class SandeshXMLMessageTest : public SandeshXMLMessage {
public:
Expand Down Expand Up @@ -132,6 +154,7 @@ class DbHandlerTest : public ::testing::Test {
ThriftIfMock *dbif_mock_;
#endif // !USE_CASSANDRA_CQL
DbHandlerPtr db_handler_;
DbHandlerCacheParam db_handler_cache_param_;
};


Expand Down Expand Up @@ -1172,6 +1195,87 @@ TEST_F(DbHandlerTest, FlowTableInsertTest) {
}
}

TEST_F(DbHandlerTest, CanRecordDataForT2Test) {
uint32_t t1 = GetDbHandlerCacheParam().field_cache_t2_ + 2;
std::string fc_entry("tabname:vn1");
bool ret = WriteToCache(t1, fc_entry);
struct DbHandlerCacheParam cache_param = GetDbHandlerCacheParam();
// All the field_cache_t2 should be updated
EXPECT_EQ(t1, cache_param.field_cache_t2_);
EXPECT_EQ(t1-2, cache_param.field_cache_old_t2_);
std::set<std::string> new_test_cache;
new_test_cache.insert(fc_entry);
EXPECT_THAT(new_test_cache, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.new_t2_index_]));
EXPECT_EQ(true, ret);
// only field_cache_old_t2_ should be updated
// t2 is older to field_cache_new_t2_ but
// t2 is newer to field_cache_old_t2_
uint32_t t2 = cache_param.field_cache_old_t2_+1;
fc_entry = "tabname:vn2";
ret = WriteToCache(t2, fc_entry);
cache_param = GetDbHandlerCacheParam();
std::set<std::string> new_test_cache1;
new_test_cache1.insert(fc_entry);
EXPECT_EQ(t1, cache_param.field_cache_t2_);
EXPECT_EQ(t2, cache_param.field_cache_old_t2_);
EXPECT_THAT(new_test_cache1, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.old_t2_index_]));
EXPECT_EQ(true, ret);
// None should be updated and the function returns false
uint32_t t3 = cache_param.field_cache_old_t2_ - 1;
fc_entry = "tabname:vn2";
ret = WriteToCache(t3, fc_entry);
cache_param = GetDbHandlerCacheParam();
EXPECT_EQ(t1, cache_param.field_cache_t2_);
EXPECT_EQ(t2, cache_param.field_cache_old_t2_);
EXPECT_EQ(false, ret);
// New entry with t2=field_cache_t2_ should return true
fc_entry="tabname:vn3";
ret = WriteToCache(t1, fc_entry);
new_test_cache.insert(fc_entry);
cache_param = GetDbHandlerCacheParam();
EXPECT_THAT(new_test_cache, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.new_t2_index_]));
EXPECT_EQ(true, ret);
// New entry with t2=field_cache_old_t2_ should return true
fc_entry = "tabname:vn3";
ret = WriteToCache(t2, fc_entry);
new_test_cache1.insert(fc_entry);
cache_param = GetDbHandlerCacheParam();
EXPECT_THAT(new_test_cache1, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.old_t2_index_]));
EXPECT_EQ(true, ret);
// New entry with t2=field_cache_t2_ with existing value should return false
fc_entry="tabname:vn3";
ret = WriteToCache(t1, fc_entry);
new_test_cache.insert(fc_entry);
cache_param = GetDbHandlerCacheParam();
EXPECT_THAT(new_test_cache, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.new_t2_index_]));
EXPECT_EQ(false, ret);
// New entry with t2=field_cache_old_t2_ with existing value should return
// false
fc_entry = "tabname:vn3";
ret = WriteToCache(t2, fc_entry);
new_test_cache1.insert(fc_entry);
cache_param = GetDbHandlerCacheParam();
EXPECT_THAT(new_test_cache1, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.old_t2_index_]));
EXPECT_EQ(false, ret);
// New entry with t2>field_cache_t2_ with existing value should return true
t1 += 1;
fc_entry = "tabname:vn3";
ret = WriteToCache(t1, fc_entry);
new_test_cache.clear();
new_test_cache.insert(fc_entry);
cache_param = GetDbHandlerCacheParam();
EXPECT_EQ(t1, cache_param.field_cache_t2_);
EXPECT_THAT(new_test_cache, ::testing::ContainerEq(
cache_param.field_cache_set_[cache_param.new_t2_index_]));
EXPECT_EQ(true, ret);
}

class FlowTableTest: public ::testing::Test {
};

Expand Down

0 comments on commit fd4f550

Please sign in to comment.