From 9ac37bfc31c46d8e6ceae1eadb9f2170c9d99f5d Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Sun, 6 Nov 2016 22:47:56 -0800 Subject: [PATCH] Options to control database writes from contrail-collector COMMIT 1: Add database/cassandra options to contrail-collector 1. Add disable_all_writes, disable_statistics_writes, disable_messages_writes options under DATABASE section to control writes to database from collector. 2. Add introspect commands to disable/enable these options. 3. Add compaction_strategy option under CASSANDRA section to control the compaction strategy to be used for analytics tables created by the collector. Closes-Bug: #1638088 COMMIT 2: Fix build error with queue_task_test Closes-Bug: 1596630 (cherry picked from commit d285623cd5ba8783bd389114babab55c077907db) (cherry picked from commit c5da4cf09399a6983a0b7fc44220c6b6c3ed3c6a) COMMIT 3: Fix cql_if_test Add missing compaction_strategy arguments to Static/DynamicCfCreate called from cql_if_test Closes-Bug: #1641260 (cherry picked from commit 8d5498f53454b51b73b1f6df117b3d1c8648d364) COMMIT 4: Disable message keyword writes Add a DATABASE.enable_db_message_keyword_writes flag to control message keyword table writes, which are now disabled by default Closes-Bug: #1641261 (cherry picked from commit fc8a43f56f4c5355b73479e83e5ce716962f8049) Change-Id: Ic9afa3935e0e87f5317fb4a015069cd669c0621f --- src/analytics/collector.cc | 62 ++++++- src/analytics/collector_uve.sandesh | 27 ++- src/analytics/db_handler.cc | 173 +++++++++++++----- src/analytics/db_handler.h | 31 +++- src/analytics/db_handler_impl.h | 1 + src/analytics/generator.cc | 3 +- src/analytics/main.cc | 6 +- src/analytics/options.cc | 42 ++++- src/analytics/options.h | 14 ++ src/analytics/protobuf_collector.cc | 3 +- src/analytics/test/db_handler_mock.h | 4 +- src/analytics/test/options_test.cc | 14 ++ src/analytics/viz_collector.cc | 10 +- src/analytics/viz_collector.h | 5 +- src/base/test/queue_task_test.cc | 6 +- src/database/cassandra/cql/cql_if.cc | 36 ++-- src/database/cassandra/cql/cql_if.h | 3 +- src/database/cassandra/cql/cql_if_impl.h | 6 +- .../cassandra/cql/test/cql_if_test.cc | 14 +- src/database/gendb.sandesh | 4 + src/database/gendb_if.h | 3 +- src/opserver/test/utils/analytics_fixture.py | 3 +- 22 files changed, 371 insertions(+), 99 deletions(-) diff --git a/src/analytics/collector.cc b/src/analytics/collector.cc index 0124296f80e..2dda50c8952 100644 --- a/src/analytics/collector.cc +++ b/src/analytics/collector.cc @@ -593,10 +593,8 @@ void Collector::GetSmQueueWaterMarkInfo( GetQueueWaterMarkInfo(QueueType::Sm, wm_info); } - -static void SendQueueParamsError(std::string estr, const std::string &context) { - // SandeshGenerator is required, send error - QueueParamsError *eresp(new QueueParamsError); +static void SendCollectorError(std::string estr, const std::string &context) { + CollectorError *eresp(new CollectorError); eresp->set_context(context); eresp->set_error(estr); eresp->Response(); @@ -607,7 +605,7 @@ static Collector* ExtractCollectorFromRequest(SandeshContext *vscontext, VizSandeshContext *vsc = dynamic_cast(vscontext); if (!vsc) { - SendQueueParamsError("Sandesh client context NOT PRESENT", + SendCollectorError("Sandesh client context NOT PRESENT", context); return NULL; } @@ -636,7 +634,7 @@ static void SendQueueParamsResponse(Collector::QueueType::type type, void DbQueueParamsSet::HandleRequest() const { if (!(__isset.high && __isset.drop_level && __isset.queue_count)) { - SendQueueParamsError("Please specify all parameters", context()); + SendCollectorError("Please specify all parameters", context()); return; } Collector *collector = ExtractCollectorFromRequest(client_context(), @@ -652,7 +650,7 @@ void DbQueueParamsSet::HandleRequest() const { void SmQueueParamsSet::HandleRequest() const { if (!(__isset.high && __isset.drop_level && __isset.queue_count)) { - SendQueueParamsError("Please specify all parameters", context()); + SendCollectorError("Please specify all parameters", context()); return; } Collector *collector = ExtractCollectorFromRequest(client_context(), @@ -738,3 +736,53 @@ void FlowCollectionStatusRequest::HandleRequest() const { // Send response SendFlowCollectionStatusResponse(context()); } + +static DbHandlerPtr ExtractDbHandlerFromRequest(SandeshContext *vscontext, + const std::string &context) { + VizSandeshContext *vsc = + dynamic_cast(vscontext); + if (!vsc) { + SendCollectorError("Sandesh client context NOT PRESENT", + context); + return DbHandlerPtr(); + } + return vsc->Analytics()->GetDbHandler(); +} + +static void SendDatabaseWritesStatusResponse(SandeshContext *vscontext, std::string context) { + DbHandlerPtr dbh(ExtractDbHandlerFromRequest(vscontext, context)); + DatabaseWritesStatusResponse *dwsr(new DatabaseWritesStatusResponse); + dwsr->set_disable_all(dbh->IsAllWritesDisabled()); + dwsr->set_disable_statistics(dbh->IsStatisticsWritesDisabled()); + dwsr->set_disable_messages(dbh->IsMessagesWritesDisabled()); + dwsr->set_disable_messages_keyword(dbh->IsMessagesKeywordWritesDisabled()); + dwsr->set_disable_flows(Sandesh::IsFlowCollectionDisabled()); + dwsr->set_context(context); + dwsr->Response(); +} + +void DisableDatabaseWritesRequest::HandleRequest() const { + DbHandlerPtr dbh(ExtractDbHandlerFromRequest(client_context(), context())); + if (__isset.disable_all) { + dbh->DisableAllWrites(get_disable_all()); + } + if (__isset.disable_statistics) { + dbh->DisableStatisticsWrites(get_disable_statistics()); + } + if (__isset.disable_messages) { + dbh->DisableMessagesWrites(get_disable_messages()); + } + if (__isset.disable_messages_keyword) { + dbh->DisableMessagesKeywordWrites(get_disable_messages_keyword()); + } + if (__isset.disable_flows) { + Sandesh::DisableFlowCollection(get_disable_flows()); + } + // Send response + SendDatabaseWritesStatusResponse(client_context(), context()); +} + +void DatabaseWritesStatusRequest::HandleRequest() const { + // Send response + SendDatabaseWritesStatusResponse(client_context(), context()); +} diff --git a/src/analytics/collector_uve.sandesh b/src/analytics/collector_uve.sandesh index d97b96e91b0..f9b49e86b26 100644 --- a/src/analytics/collector_uve.sandesh +++ b/src/analytics/collector_uve.sandesh @@ -288,10 +288,6 @@ struct QueueParams { 3: string drop_level; } -response sandesh QueueParamsError { - 1: string error; -} - /** * @description: sandesh response to return State Machine Queue params */ @@ -330,3 +326,26 @@ request sandesh FlowCollectionStatusRequest { response sandesh FlowCollectionStatusResponse { 1: bool disable } + +response sandesh CollectorError { + 1: string error +} + +request sandesh DisableDatabaseWritesRequest { + 1: optional bool disable_all + 2: optional bool disable_statistics + 3: optional bool disable_messages + 4: optional bool disable_flows + 5: optional bool disable_messages_keyword +} + +request sandesh DatabaseWritesStatusRequest { +} + +response sandesh DatabaseWritesStatusResponse { + 1: bool disable_all + 2: bool disable_statistics + 3: bool disable_messages + 4: bool disable_flows + 5: bool disable_messages_keyword +} diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index 9d2d134ef11..9c78aa04b63 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -29,10 +29,12 @@ #include "viz_constants.h" #include "vizd_table_desc.h" +#include "viz_collector.h" #include "collector.h" #include "db_handler.h" #include "parser_util.h" #include "db_handler_impl.h" +#include "viz_sandesh.h" #define DB_LOG(_Level, _Msg) \ do { \ @@ -69,18 +71,25 @@ DbHandler::DbHandler(EventManager *evm, std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql, - const std::string &zookeeper_server_list, - bool use_zookeeper) : + const std::string &cassandra_compaction_strategy, + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper, bool disable_all_writes, + bool disable_statistics_writes, bool disable_messages_writes, + bool disable_messages_keyword_writes) : name_(name), drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), use_cql_(use_cql), tablespace_(), + compaction_strategy_(cassandra_compaction_strategy), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, (uint8_t)g_viz_constants.PARTITION_MAX), zookeeper_server_list_(zookeeper_server_list), - use_zookeeper_(use_zookeeper) { + use_zookeeper_(use_zookeeper), + disable_all_writes_(disable_all_writes), + disable_statistics_writes_(disable_statistics_writes), + disable_messages_writes_(disable_messages_writes), + disable_messages_keyword_writes_(disable_messages_keyword_writes) { #ifdef USE_CASSANDRA_CQL if (use_cql) { dbif_.reset(new cass::cql::CqlIf(evm, cassandra_ips, @@ -104,7 +113,11 @@ DbHandler::DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map) : dbif_(dbif), ttl_map_(ttl_map), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, - (uint8_t)g_viz_constants.PARTITION_MAX) { + (uint8_t)g_viz_constants.PARTITION_MAX), + disable_all_writes_(false), + disable_statistics_writes_(false), + disable_messages_writes_(false), + disable_messages_keyword_writes_(false) { } DbHandler::~DbHandler() { @@ -166,7 +179,7 @@ void DbHandler::SetDropLevel(size_t queue_count, SandeshLevel::type level, bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_tables.begin(); it != vizd_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it)) { + if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -174,7 +187,7 @@ bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_flow_tables.begin(); it != vizd_flow_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it)) { + if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -182,7 +195,7 @@ bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_stat_tables.begin(); it != vizd_stat_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it)) { + if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -395,6 +408,38 @@ bool DbHandler::UseCql() const { return use_cql_; } +bool DbHandler::IsAllWritesDisabled() const { + return disable_all_writes_; +} + +bool DbHandler::IsStatisticsWritesDisabled() const { + return disable_statistics_writes_; +} + +bool DbHandler::IsMessagesWritesDisabled() const { + return disable_messages_writes_; +} + +bool DbHandler::IsMessagesKeywordWritesDisabled() const { + return disable_messages_keyword_writes_; +} + +void DbHandler::DisableAllWrites(bool disable) { + disable_all_writes_ = disable; +} + +void DbHandler::DisableStatisticsWrites(bool disable) { + disable_statistics_writes_ = disable; +} + +void DbHandler::DisableMessagesWrites(bool disable) { + disable_messages_writes_ = disable; +} + +void DbHandler::DisableMessagesKeywordWrites(bool disable) { + disable_messages_keyword_writes_ = disable; +} + void DbHandler::SetDbQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm, boost::function defer_undefer_cb) { dbif_->Db_SetQueueWaterMark(boost::get<2>(wm), @@ -462,8 +507,17 @@ bool DbHandler::GetCqlStats(cass::cql::DbStats *stats) const { return true; } +bool DbHandler::InsertIntoDb(std::auto_ptr col_list, + GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsAllWritesDisabled()) { + return true; + } + return dbif_->Db_AddColumn(col_list, db_cb); +} + bool DbHandler::AllowMessageTableInsert(const SandeshHeader &header) { - return header.get_Type() != SandeshType::FLOW; + return !IsMessagesWritesDisabled() && !IsAllWritesDisabled() && + (header.get_Type() != SandeshType::FLOW); } bool DbHandler::MessageIndexTableInsert(const std::string& cfname, @@ -558,7 +612,7 @@ bool DbHandler::MessageIndexTableInsert(const std::string& cfname, GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); columns.push_back(col); #endif - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of message: " << message_type << ", message UUID: " << unm << " to table: " << cfname << " FAILED"); @@ -644,37 +698,23 @@ void DbHandler::MessageTableOnlyInsert(const VizMsg *vmsgp, columns.push_back(new GenDb::NewCol(g_viz_constants.DATA, vmsgp->msg->ExtractMessage(), ttl)); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of message: " << message_type << ", message UUID: " << vmsgp->unm << " COLUMN FAILED"); return; } } -void DbHandler::MessageTableInsert(const VizMsg *vmsgp, +void DbHandler::MessageTableKeywordInsert(const VizMsg *vmsgp, GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsMessagesKeywordWritesDisabled() || + IsAllWritesDisabled()) { + return; + } + LineParser::WordListType words; const SandeshHeader &header(vmsgp->msg->GetHeader()); const std::string &message_type(vmsgp->msg->GetMessageType()); - - if (!AllowMessageTableInsert(header)) - return; - - MessageTableOnlyInsert(vmsgp, db_cb); - - MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_SOURCE, header, - message_type, vmsgp->unm, "", db_cb); - MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_MODULE_ID, header, - message_type, vmsgp->unm, "", db_cb); - MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_CATEGORY, header, - message_type, vmsgp->unm, "", db_cb); - MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_MESSAGE_TYPE, header, - message_type, vmsgp->unm, "", db_cb); - MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_TIMESTAMP, header, - message_type, vmsgp->unm, "", db_cb); - const SandeshType::type &stype(header.get_Type()); - - LineParser::WordListType words; if (stype == SandeshType::SYSTEM || stype == SandeshType::UVE || stype == SandeshType::OBJECT) { const SandeshXMLMessage *sxmsg = @@ -696,6 +736,32 @@ void DbHandler::MessageTableInsert(const VizMsg *vmsgp, if (!r) DB_LOG(ERROR, "Failed to parse:"); } +} + +void DbHandler::MessageTableInsert(const VizMsg *vmsgp, + GenDb::GenDbIf::DbAddColumnCb db_cb) { + const SandeshHeader &header(vmsgp->msg->GetHeader()); + const std::string &message_type(vmsgp->msg->GetMessageType()); + + if (!AllowMessageTableInsert(header)) + return; + + MessageTableOnlyInsert(vmsgp, db_cb); + + MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_SOURCE, header, + message_type, vmsgp->unm, "", db_cb); + MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_MODULE_ID, header, + message_type, vmsgp->unm, "", db_cb); + MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_CATEGORY, header, + message_type, vmsgp->unm, "", db_cb); + MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_MESSAGE_TYPE, header, + message_type, vmsgp->unm, "", db_cb); + MessageIndexTableInsert(g_viz_constants.MESSAGE_TABLE_TIMESTAMP, header, + message_type, vmsgp->unm, "", db_cb); + + MessageTableKeywordInsert(vmsgp, db_cb); + + const SandeshType::type &stype(header.get_Type()); /* * Insert the message types,module_id in the stat table @@ -819,6 +885,9 @@ void DbHandler::GetRuleMap(RuleMap& rulemap) { void DbHandler::ObjectTableInsert(const std::string &table, const std::string &objectkey_str, uint64_t ×tamp, const boost::uuids::uuid& unm, const VizMsg *vmsgp, GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsMessagesWritesDisabled() || IsAllWritesDisabled()) { + return; + } uint32_t T2(timestamp >> g_viz_constants.RowTimeInBits); uint32_t T1(timestamp & g_viz_constants.RowTimeInMask); const std::string &message_type(vmsgp->msg->GetMessageType()); @@ -849,7 +918,7 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o GenDb::NewColVec& columns = col_list->columns_; columns.reserve(1); columns.push_back(col); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of " << objectkey_str << ", message UUID " << unm << " into table " << table << " FAILED"); @@ -870,7 +939,7 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o GenDb::NewColVec& columns = col_list->columns_; columns.reserve(1); columns.push_back(col); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of " << objectkey_str << ", message UUID " << unm << " " << table << " into table " << g_viz_constants.OBJECT_VALUE_TABLE << " FAILED"); @@ -1010,7 +1079,7 @@ bool DbHandler::StatTableWrite(uint32_t t2, GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); columns.push_back(col); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of " << statName << ", " << statAttr << " tag " << ptag.first << ":" << stag.first << " into table " << @@ -1032,6 +1101,9 @@ DbHandler::StatTableInsert(uint64_t ts, const TagMap & attribs_tag, const AttribMap & attribs, GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsAllWritesDisabled() || IsStatisticsWritesDisabled()) { + return; + } int ttl = GetTtl(TtlType::STATSDATA_TTL); StatTableInsertTtl(ts, statName, statAttr, attribs_tag, attribs, ttl, db_cb); @@ -1232,14 +1304,14 @@ static void PopulateFlowRecordTableRowKey( } static bool PopulateFlowRecordTable(FlowValueArray &fvalues, - GenDb::GenDbIf *dbif, const TtlMap& ttl_map, - FlowFieldValuesCb fncb, GenDb::GenDbIf::DbAddColumnCb db_cb) { + DbInsertCb db_insert_cb, const TtlMap& ttl_map, + FlowFieldValuesCb fncb) { std::auto_ptr colList(new GenDb::ColList); colList->cfname_ = g_viz_constants.FLOW_TABLE; PopulateFlowRecordTableRowKey(fvalues, colList->rowkey_); PopulateFlowRecordTableColumns(FlowRecordTableColumns, fvalues, colList->columns_, ttl_map, fncb); - return dbif->Db_AddColumn(colList, db_cb); + return db_insert_cb(colList); } static const std::vector FlowIndexTableColumnValues = @@ -1436,8 +1508,8 @@ static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype, static bool PopulateFlowIndexTables(const FlowValueArray &fvalues, const uint32_t &T2, const uint32_t &T1, uint8_t partition_no, - GenDb::GenDbIf *dbif, const TtlMap& ttl_map, - FlowFieldValuesCb fncb, GenDb::GenDbIf::DbAddColumnCb db_cb) { + DbInsertCb db_insert_cb, const TtlMap& ttl_map, + FlowFieldValuesCb fncb) { // Populate row key and column values (same for all flow index // tables) GenDb::DbDataValueVec rkey; @@ -1455,7 +1527,7 @@ static bool PopulateFlowIndexTables(const FlowValueArray &fvalues, colList->rowkey_ = rkey; PopulateFlowIndexTableColumns(fitt, fvalues, T1, &colList->columns_, cvalues, ttl_map); - if (!dbif->Db_AddColumn(colList, db_cb)) { + if (!db_insert_cb(colList)) { LOG(ERROR, "Populating " << FlowIndexTable2String(fitt) << " FAILED"); } @@ -1616,8 +1688,10 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, FlowFieldValuesCb fncb = boost::bind(&DbHandler::FieldNamesTableInsert, this, timestamp, g_viz_constants.FLOW_TABLE, _1, _2, _3, db_cb); - if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_, - fncb, db_cb)) { + DbInsertCb db_insert_cb = + boost::bind(&DbHandler::InsertIntoDb, this, _1, db_cb); + if (!PopulateFlowRecordTable(flow_entry_values, db_insert_cb, ttl_map_, + fncb)) { DB_LOG(ERROR, "Populating FlowRecordTable FAILED"); } GenDb::DbDataValue &diff_bytes( @@ -1633,7 +1707,7 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, if (diff_bytes.which() != GenDb::DB_VALUE_BLANK && diff_packets.which() != GenDb::DB_VALUE_BLANK) { if (!PopulateFlowIndexTables(flow_entry_values, T2, T1, partition_no, - dbif_.get(), ttl_map_, fncb2, db_cb)) { + db_insert_cb, ttl_map_, fncb2)) { DB_LOG(ERROR, "Populating FlowIndexTables FAILED"); } } @@ -1718,16 +1792,21 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, DbHandlerInitializer::InitializeDoneCb callback, const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, - const std::string& cassandra_user, const std::string& cassandra_password, + const std::string &cassandra_user, const std::string &cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper) : + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes, + bool disable_db_messages_keyword_writes) : db_name_(db_name), db_task_instance_(db_task_instance), db_handler_(new DbHandler(evm, boost::bind(&DbHandlerInitializer::ScheduleInit, this), cassandra_ips, cassandra_ports, db_name, ttl_map, - cassandra_user, cassandra_password, use_cql, - zookeeper_server_list, use_zookeeper)), + cassandra_user, cassandra_password, cassandra_compaction_strategy, + use_cql, zookeeper_server_list, use_zookeeper, + disable_all_db_writes, disable_db_stats_writes, + disable_db_messages_writes, disable_db_messages_keyword_writes)), callback_(callback), db_init_timer_(TimerManager::CreateTimer(*evm->io_service(), db_name + " Db Init Timer", diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 9d172f36254..49f88e86c65 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -88,8 +88,10 @@ class DbHandler { std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, + const std::string& cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper); + bool use_zookeeper, bool disable_all_writes, bool disable_stats_writes, + bool disable_messages_writes, bool disable_messages_keyword_writes); DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map); virtual ~DbHandler(); @@ -135,8 +137,18 @@ class DbHandler { std::vector GetEndpoints() const; std::string GetName() const; bool UseCql() const; + bool IsAllWritesDisabled() const; + bool IsStatisticsWritesDisabled() const; + bool IsMessagesWritesDisabled() const; + bool IsMessagesKeywordWritesDisabled() const; + void DisableAllWrites(bool disable); + void DisableStatisticsWrites(bool disable); + void DisableMessagesWrites(bool disable); + void DisableMessagesKeywordWrites(bool disable); private: + void MessageTableKeywordInsert(const VizMsg *vmsgp, + GenDb::GenDbIf::DbAddColumnCb db_cb); void StatTableInsertTtl(uint64_t ts, const std::string& statName, const std::string& statAttr, @@ -174,9 +186,10 @@ class DbHandler { uint64_t GetTtl(TtlType::type type) { return GetTtlFromMap(ttl_map_, type); } + bool InsertIntoDb(std::auto_ptr col_list, + GenDb::GenDbIf::DbAddColumnCb db_cb); boost::scoped_ptr dbif_; - // Random generator for UUIDs ThreadSafeUuidGenerator umn_gen_; std::string name_; @@ -194,9 +207,14 @@ class DbHandler { static tbb::mutex fmutex_; bool use_cql_; std::string tablespace_; + std::string compaction_strategy_; UniformInt8RandomGenerator gen_partition_no_; std::string zookeeper_server_list_; bool use_zookeeper_; + bool disable_all_writes_; + bool disable_statistics_writes_; + bool disable_messages_writes_; + bool disable_messages_keyword_writes_; bool CanRecordDataForT2(uint32_t, std::string); friend class DbHandlerTest; DISALLOW_COPY_AND_ASSIGN(DbHandler); @@ -234,11 +252,14 @@ class DbHandlerInitializer { const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, - const std::string& cassandra_user, - const std::string& cassandra_password, + const std::string &cassandra_user, + const std::string &cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper); + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes, + bool disable_db_messages_keyword_writes); DbHandlerInitializer(EventManager *evm, const std::string &db_name, int db_task_instance, const std::string &timer_task_name, InitializeDoneCb callback, diff --git a/src/analytics/db_handler_impl.h b/src/analytics/db_handler_impl.h index 257c1605447..e2725eca7c3 100644 --- a/src/analytics/db_handler_impl.h +++ b/src/analytics/db_handler_impl.h @@ -18,6 +18,7 @@ typedef boost::array FlowFieldValuesCb; +typedef boost::function)> DbInsertCb; void PopulateFlowIndexTableColumnValues( const std::vector &frvt, diff --git a/src/analytics/generator.cc b/src/analytics/generator.cc index e135c86af14..d3169550063 100644 --- a/src/analytics/generator.cc +++ b/src/analytics/generator.cc @@ -111,7 +111,8 @@ SandeshGenerator::SandeshGenerator(Collector * const collector, VizSession *sess source + ":" + node_type + ":" + module + ":" + instance_id, collector->analytics_ttl_map(), collector->cassandra_user(), collector->cassandra_password(), - false, std::string(), false)); + std::string(), false, std::string(), + false, false, false, false, false)); } else { //Use collector db_handler db_handler_ = global_db_handler; diff --git a/src/analytics/main.cc b/src/analytics/main.cc index 4eed245496e..9745597c12a 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -370,9 +370,13 @@ int main(int argc, char *argv[]) options.kafka_prefix(), ttl_map, options.cassandra_user(), options.cassandra_password(), + options.cassandra_compaction_strategy(), use_cql, zookeeper_server_list, - use_zookeeper); + use_zookeeper, options.disable_all_db_writes(), + options.disable_db_statistics_writes(), + options.disable_db_messages_writes(), + !options.enable_db_messages_keyword_writes()); #if 0 // initialize python/c++ API diff --git a/src/analytics/options.cc b/src/analytics/options.cc index 52e78a0ab7e..16ea91aae99 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -14,6 +14,7 @@ #include "base/util.h" #include "net/address_util.h" #include "viz_constants.h" +#include using namespace std; using namespace boost::asio::ip; @@ -82,12 +83,16 @@ void Options::Initialize(EventManager &evm, default_kafka_broker_list.push_back(""); // Command line and config file options. - opt::options_description cassandra_config("cassandra Configuration options"); + opt::options_description cassandra_config("Cassandra Configuration options"); cassandra_config.add_options() - ("CASSANDRA.cassandra_user",opt::value()->default_value(""), + ("CASSANDRA.cassandra_user", opt::value()->default_value(""), "Cassandra user name") - ("CASSANDRA.cassandra_password",opt::value()->default_value(""), - "Cassandra password"); + ("CASSANDRA.cassandra_password", opt::value()->default_value(""), + "Cassandra password") + ("CASSANDRA.compaction_strategy", + opt::value()->default_value( + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY), + "Cassandra compaction strategy");; // Command line and config file options. opt::options_description config("Configuration options"); @@ -178,6 +183,19 @@ void Options::Initialize(EventManager &evm, ("DEFAULT.disable_flow_collection", opt::bool_switch(&disable_flow_collection_), "Disable flow message collection") + ("DATABASE.disable_all_writes", + opt::bool_switch(&disable_all_db_writes_), + "Disable all writes to the database") + ("DATABASE.disable_statistics_writes", + opt::bool_switch(&disable_db_stats_writes_), + "Disable statistics writes to the database") + ("DATABASE.disable_message_writes", + opt::bool_switch(&disable_db_messages_writes_), + "Disable message writes to the database") + ("DATABASE.enable_message_keyword_writes", + opt::bool_switch(&enable_db_messages_keyword_writes_)-> + default_value(false), + "Enable message keyword writes to the database") ("DISCOVERY.port", opt::value()->default_value( default_discovery_port), @@ -354,4 +372,20 @@ void Options::Process(int argc, char *argv[], GetOptValue(var_map, redis_password_, "REDIS.password"); GetOptValue(var_map, cassandra_user_, "CASSANDRA.cassandra_user"); GetOptValue(var_map, cassandra_password_, "CASSANDRA.cassandra_password"); + GetOptValue(var_map, cassandra_compaction_strategy_, + "CASSANDRA.compaction_strategy"); + if (!((cassandra_compaction_strategy_ == + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) || + (cassandra_compaction_strategy_ == + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY) || + (cassandra_compaction_strategy_ == + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY))) { + cout << "Invalid CASSANDRA.compaction_strategy," << + " please select one of [" << + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY << ", " << + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY << ", " << + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY << "]" << + endl; + exit(-1); + } } diff --git a/src/analytics/options.h b/src/analytics/options.h index a67836b8703..a566d09e92a 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -42,6 +42,9 @@ class Options { const std::string redis_password() const { return redis_password_; } const std::string cassandra_user() const { return cassandra_user_; } const std::string cassandra_password() const { return cassandra_password_; } + const std::string cassandra_compaction_strategy() const { + return cassandra_compaction_strategy_; + } const std::string hostname() const { return hostname_; } const std::string host_ip() const { return host_ip_; } const uint16_t http_server_port() const { return http_server_port_; } @@ -67,6 +70,12 @@ class Options { const bool test_mode() const { return test_mode_; } const uint32_t sandesh_send_rate_limit() const { return sandesh_ratelimit_; } const bool disable_flow_collection() const { return disable_flow_collection_; } + const bool disable_all_db_writes() const { return disable_all_db_writes_; } + const bool disable_db_statistics_writes() const { return disable_db_stats_writes_; } + const bool disable_db_messages_writes() const { return disable_db_messages_writes_; } + const bool enable_db_messages_keyword_writes() const { + return enable_db_messages_keyword_writes_; + } private: template @@ -106,6 +115,7 @@ class Options { std::string redis_password_; std::string cassandra_user_; std::string cassandra_password_; + std::string cassandra_compaction_strategy_; std::string hostname_; std::string host_ip_; uint16_t http_server_port_; @@ -135,6 +145,10 @@ class Options { uint16_t partitions_; uint32_t sandesh_ratelimit_; bool disable_flow_collection_; + bool disable_all_db_writes_; + bool disable_db_stats_writes_; + bool disable_db_messages_writes_; + bool enable_db_messages_keyword_writes_; boost::program_options::options_description config_file_options_; }; diff --git a/src/analytics/protobuf_collector.cc b/src/analytics/protobuf_collector.cc index b260b4e0187..b5840eb22c0 100644 --- a/src/analytics/protobuf_collector.cc +++ b/src/analytics/protobuf_collector.cc @@ -24,7 +24,8 @@ ProtobufCollector::ProtobufCollector(EventManager *evm, db_initializer_.reset(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance, kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, false, std::string(), false)); + cassandra_password, std::string(), false, std::string(), false, + false, false, false, false)); db_handler_ = db_initializer_->GetDbHandler(); } else { db_handler_ = global_dbhandler; diff --git a/src/analytics/test/db_handler_mock.h b/src/analytics/test/db_handler_mock.h index 0e6ed017c23..a656fff3a1a 100644 --- a/src/analytics/test/db_handler_mock.h +++ b/src/analytics/test/db_handler_mock.h @@ -14,8 +14,8 @@ class DbHandlerMock : public DbHandler { DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) : DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this), std::vector(1, "127.0.0.1"), - std::vector(1, 9160), "localhost", ttl_map, "", "", false, - "", false) + std::vector(1, 9160), "localhost", ttl_map, "", "", "", false, + "", false, false, false, false, false) { } void StartDbifReinit() { diff --git a/src/analytics/test/options_test.cc b/src/analytics/test/options_test.cc index d638737312c..f3787e6c65f 100644 --- a/src/analytics/test/options_test.cc +++ b/src/analytics/test/options_test.cc @@ -88,6 +88,10 @@ TEST_F(OptionsTest, NoArguments) { EXPECT_EQ(options_.test_mode(), false); EXPECT_EQ(options_.sandesh_send_rate_limit(), 0); EXPECT_EQ(options_.disable_flow_collection(), false); + EXPECT_EQ(options_.disable_db_messages_writes(), false); + EXPECT_EQ(options_.enable_db_messages_keyword_writes(), false); + EXPECT_EQ(options_.disable_db_stats_writes(), false); + EXPECT_EQ(options_.disable_all_db_writes(), false); uint16_t protobuf_port(0); EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port)); } @@ -133,6 +137,10 @@ TEST_F(OptionsTest, DefaultConfFile) { EXPECT_EQ(options_.test_mode(), false); EXPECT_EQ(options_.sandesh_send_rate_limit(), 100); EXPECT_EQ(options_.disable_flow_collection(), false); + EXPECT_EQ(options_.disable_db_messages_writes(), false); + EXPECT_EQ(options_.enable_db_messages_keyword_writes(), false); + EXPECT_EQ(options_.disable_db_stats_writes(), false); + EXPECT_EQ(options_.disable_all_db_writes(), false); uint16_t protobuf_port(0); EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port)); } @@ -193,10 +201,14 @@ TEST_F(OptionsTest, OverrideBooleanFromCommandLine) { char argv_1[] = "--conf_file=controller/src/analytics/contrail-collector.conf"; char argv_2[] = "--DEFAULT.test_mode"; char argv_3[] = "--DEFAULT.disable_flow_collection"; + char argv_4[] = "--DATABASE.disable_all_db_writes"; + char argv_5[] = "--DATABASE.enable_db_messages_keyword_writes"; argv[0] = argv_0; argv[1] = argv_1; argv[2] = argv_2; argv[3] = argv_3; + argv[4] = argv_4; + argv[5] = argv_5; options_.Parse(evm_, argc, argv); vector passed_conf_files; @@ -230,6 +242,8 @@ TEST_F(OptionsTest, OverrideBooleanFromCommandLine) { EXPECT_EQ(options_.dup(), false); EXPECT_EQ(options_.test_mode(), true); // Overridden from command line. EXPECT_EQ(options_.disable_flow_collection(), true); // Overridden from command line. + EXPECT_EQ(options_.disable_all_db_writes(), true); // Overridden from command line. + EXPECT_EQ(options_.enable_db_messages_keyword_writes(), true); // Overriden from command line. uint16_t protobuf_port(0); EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port)); } diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index 94d1a460594..5494aeeaa0d 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -43,13 +43,19 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, const std::string &kafka_prefix, const TtlMap& ttl_map, const std::string &cassandra_user, const std::string &cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper) : + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes, + bool disable_db_messages_keyword_writes) : db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1, std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, use_cql, zookeeper_server_list, use_zookeeper)), + cassandra_password, cassandra_compaction_strategy, use_cql, + zookeeper_server_list, use_zookeeper, + disable_all_db_writes, disable_db_stats_writes, + disable_db_messages_writes, disable_db_messages_keyword_writes)), osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port, redis_password, brokers, partitions, kafka_prefix)), ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())), diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index de5bc6b7497..15e5bd6c1f4 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -40,9 +40,12 @@ class VizCollector { const std::string &kafka_prefix, const TtlMap &ttlmap, const std::string& cassandra_user, const std::string& cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper); + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes, + bool disable_db_messages_keyword_writes); VizCollector(EventManager *evm, DbHandlerPtr db_handler, Ruleeng *ruleeng, Collector *collector, OpServerProxy *osp); diff --git a/src/base/test/queue_task_test.cc b/src/base/test/queue_task_test.cc index 4c270e9b213..667dc69b445 100644 --- a/src/base/test/queue_task_test.cc +++ b/src/base/test/queue_task_test.cc @@ -208,7 +208,7 @@ class QueueTaskTest : public ::testing::Test { tbb::atomic shutdown_test_exit_callback_sleep_; }; -TEST_F(QueueTaskTest, StartRunnerBasic) +TEST_F(QueueTaskTest, StartRunnerBasic) { TaskScheduler *scheduler = TaskScheduler::GetInstance(); // Always do start runner work_queue_.SetStartRunnerFunc( @@ -250,10 +250,10 @@ TEST_F(QueueTaskTest, StartRunnerBasic) work_queue_.Enqueue(enqueue_counter++); work_queue_.Enqueue(enqueue_counter++); task_util::WaitForIdle(1); - EXPECT_EQ(3, work_queue_.max_queue_len()); + EXPECT_EQ(4, work_queue_.max_queue_len()); // Add test case for bounded workq work_queue_.SetBounded(true); - work_queue_.size_(4); + work_queue_.SetSize(4); work_queue_.Enqueue(enqueue_counter++); work_queue_.Enqueue(enqueue_counter++); work_queue_.Enqueue(enqueue_counter++); diff --git a/src/database/cassandra/cql/cql_if.cc b/src/database/cassandra/cql/cql_if.cc index e4d4c67432d..14e9847d1d9 100644 --- a/src/database/cassandra/cql/cql_if.cc +++ b/src/database/cassandra/cql/cql_if.cc @@ -404,16 +404,17 @@ class CassStatementNameBinder : public boost::static_visitor<> { CassStatement *statement_; }; -static const std::string kQCompactionStrategy( +static const char * kQCompactionStrategy( "compaction = {'class': " - "'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}"); + "'org.apache.cassandra.db.compaction.%s'}"); static const std::string kQGCGraceSeconds("gc_grace_seconds = 0"); // // Cf2CassCreateTableIfNotExists // -std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { +std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, + const std::string &compaction_strategy) { std::ostringstream query; // Table name query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " "; @@ -430,12 +431,17 @@ std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { query << ", \"" << column.first << "\" " << DbDataType2CassType(column.second); } - query << ") WITH " << kQCompactionStrategy << " AND " << + char cbuf[512]; + int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy, + compaction_strategy.c_str())); + assert(!(n < 0 || n >= (int)sizeof(cbuf))); + query << ") WITH " << std::string(cbuf) << " AND " << kQGCGraceSeconds; return query.str(); } -std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { +std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, + const std::string &compaction_strategy) { std::ostringstream query; // Table name query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " ("; @@ -487,7 +493,11 @@ std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { } query << "column" << cnum; } - query << ")) WITH " << kQCompactionStrategy << " AND " << + char cbuf[512]; + int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy, + compaction_strategy.c_str())); + assert(!(n < 0 || n >= (int)sizeof(cbuf))); + query << ")) WITH " << std::string(cbuf) << " AND " << kQGCGraceSeconds; return query.str(); } @@ -1333,7 +1343,7 @@ class CqlIf::CqlIfImpl { } bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, - CassConsistency consistency) { + const std::string &compaction_strategy, CassConsistency consistency) { if (session_state_ != SessionState::CONNECTED) { return false; } @@ -1343,10 +1353,12 @@ class CqlIf::CqlIfImpl { std::string query; switch (cf.cftype_) { case GenDb::NewCf::COLUMN_FAMILY_SQL: - query = impl::StaticCf2CassCreateTableIfNotExists(cf); + query = impl::StaticCf2CassCreateTableIfNotExists(cf, + compaction_strategy); break; case GenDb::NewCf::COLUMN_FAMILY_NOSQL: - query = impl::DynamicCf2CassCreateTableIfNotExists(cf); + query = impl::DynamicCf2CassCreateTableIfNotExists(cf, + compaction_strategy); break; default: return false; @@ -1808,9 +1820,11 @@ bool CqlIf::Db_SetTablespace(const std::string &tablespace) { } // Column family -bool CqlIf::Db_AddColumnfamily(const GenDb::NewCf &cf) { +bool CqlIf::Db_AddColumnfamily(const GenDb::NewCf &cf, + const std::string &compaction_strategy) { bool success( - impl_->CreateTableIfNotExistsSync(cf, CASS_CONSISTENCY_QUORUM)); + impl_->CreateTableIfNotExistsSync(cf, compaction_strategy, + CASS_CONSISTENCY_QUORUM)); if (!success) { IncrementTableWriteFailStats(cf.cfname_); IncrementErrors(GenDb::IfErrors::ERR_WRITE_COLUMN_FAMILY); diff --git a/src/database/cassandra/cql/cql_if.h b/src/database/cassandra/cql/cql_if.h index bf075945b43..d317dec9b2e 100644 --- a/src/database/cassandra/cql/cql_if.h +++ b/src/database/cassandra/cql/cql_if.h @@ -39,7 +39,8 @@ class CqlIf : public GenDb::GenDbIf { virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor = "1"); // Column family - virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf); + virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, + const std::string &compaction_strategy); virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf); // Column virtual bool Db_AddColumn(std::auto_ptr cl); diff --git a/src/database/cassandra/cql/cql_if_impl.h b/src/database/cassandra/cql/cql_if_impl.h index 0a630e6f429..dca3979e61f 100644 --- a/src/database/cassandra/cql/cql_if_impl.h +++ b/src/database/cassandra/cql/cql_if_impl.h @@ -13,8 +13,10 @@ namespace cass { namespace cql { namespace impl { -std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf); -std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf); +std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, + const std::string &compaction_strategy); +std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, + const std::string &compaction_strategy); std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns); std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns); std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf); diff --git a/src/database/cassandra/cql/test/cql_if_test.cc b/src/database/cassandra/cql/test/cql_if_test.cc index 4456b6ef942..53ede427d7f 100644 --- a/src/database/cassandra/cql/test/cql_if_test.cc +++ b/src/database/cassandra/cql/test/cql_if_test.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -68,7 +69,8 @@ TEST_F(CqlIfTest, StaticCfCreateTable) { ("columnJ", GenDb::DbDataType::InetType) ("columnK", GenDb::DbDataType::IntegerType)); std::string actual_qstring( - cass::cql::impl::StaticCf2CassCreateTableIfNotExists(static_cf)); + cass::cql::impl::StaticCf2CassCreateTableIfNotExists(static_cf, + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY)); std::string expected_qstring( "CREATE TABLE IF NOT EXISTS StaticCf (" "key uuid PRIMARY KEY, " @@ -84,7 +86,7 @@ TEST_F(CqlIfTest, StaticCfCreateTable) { "\"columnJ\" inet, " "\"columnK\" varint) " "WITH compaction = {'class': " - "'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'} " + "'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'} " "AND gc_grace_seconds = 0"); EXPECT_EQ(expected_qstring, actual_qstring); } @@ -100,7 +102,8 @@ TEST_F(CqlIfTest, DynamicCfCreateTable) { boost::assign::list_of // column value validation class (GenDb::DbDataType::LexicalUUIDType)); std::string actual_qstring( - cass::cql::impl::DynamicCf2CassCreateTableIfNotExists(dynamic_cf)); + cass::cql::impl::DynamicCf2CassCreateTableIfNotExists(dynamic_cf, + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY)); std::string expected_qstring( "CREATE TABLE IF NOT EXISTS DynamicCf (" "key int, " @@ -131,7 +134,8 @@ TEST_F(CqlIfTest, DynamicCfCreateTable) { boost::assign::list_of // column value validation class (GenDb::DbDataType::UTF8Type)); std::string actual_qstring1( - cass::cql::impl::DynamicCf2CassCreateTableIfNotExists(dynamic_cf1)); + cass::cql::impl::DynamicCf2CassCreateTableIfNotExists(dynamic_cf1, + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY)); std::string expected_qstring1( "CREATE TABLE IF NOT EXISTS DynamicCf1 (" "key ascii, " @@ -162,7 +166,7 @@ TEST_F(CqlIfTest, DynamicCfCreateTable) { "column1, column2, column3, column4, column5, column6, column7, " "column8, column9, column10, column11)) " "WITH compaction = {'class': " - "'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'} " + "'org.apache.cassandra.db.compaction.DateTieredCompactionStrategy'} " "AND gc_grace_seconds = 0"); EXPECT_EQ(expected_qstring1, actual_qstring1); } diff --git a/src/database/gendb.sandesh b/src/database/gendb.sandesh index dfd755a5ce0..e22ecac0656 100644 --- a/src/database/gendb.sandesh +++ b/src/database/gendb.sandesh @@ -22,6 +22,10 @@ enum DbDataType { BlobType = 12, // blob / bytes } +const string DATE_TIERED_COMPACTION_STRATEGY = "DateTieredCompactionStrategy"; +const string LEVELED_COMPACTION_STRATEGY = "LeveledCompactionStrategy"; +const string SIZE_TIERED_COMPACTION_STRATEGY = "SizeTieredCompactionStrategy"; + struct DbTableInfo { 1: string table_name 2: u64 reads diff --git a/src/database/gendb_if.h b/src/database/gendb_if.h index 8065e524d2a..57caf6087d8 100644 --- a/src/database/gendb_if.h +++ b/src/database/gendb_if.h @@ -170,7 +170,8 @@ class GenDbIf { virtual bool Db_AddSetTablespace(const std::string& tablespace, const std::string& replication_factor="1") = 0; // Column family - virtual bool Db_AddColumnfamily(const NewCf& cf) = 0; + virtual bool Db_AddColumnfamily(const NewCf& cf, + const std::string &compaction_strategy) = 0; virtual bool Db_UseColumnfamily(const NewCf& cf) = 0; // Column virtual bool Db_AddColumn(std::auto_ptr cl) = 0; diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index a15db04cd7f..12e0b805cee 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -145,7 +145,8 @@ def start(self): '--DEFAULT.ipfix_port', str(self.ipfix_port), '--DEFAULT.sflow_port', str(self.sflow_port), '--DEFAULT.log_level', 'SYS_DEBUG', - '--DEFAULT.log_file', self._log_file] + '--DEFAULT.log_file', self._log_file, + '--DATABASE.enable_message_keyword_writes'] if self.redis_password: args.append('--REDIS.password') args.append(self.redis_password)