diff --git a/src/analytics/collector.cc b/src/analytics/collector.cc index 0124296f80e..af8d5eea88a 100644 --- a/src/analytics/collector.cc +++ b/src/analytics/collector.cc @@ -593,10 +593,8 @@ void Collector::GetSmQueueWaterMarkInfo( GetQueueWaterMarkInfo(QueueType::Sm, wm_info); } - -static void SendQueueParamsError(std::string estr, const std::string &context) { - // SandeshGenerator is required, send error - QueueParamsError *eresp(new QueueParamsError); +static void SendCollectorError(std::string estr, const std::string &context) { + CollectorError *eresp(new CollectorError); eresp->set_context(context); eresp->set_error(estr); eresp->Response(); @@ -607,7 +605,7 @@ static Collector* ExtractCollectorFromRequest(SandeshContext *vscontext, VizSandeshContext *vsc = dynamic_cast(vscontext); if (!vsc) { - SendQueueParamsError("Sandesh client context NOT PRESENT", + SendCollectorError("Sandesh client context NOT PRESENT", context); return NULL; } @@ -636,7 +634,7 @@ static void SendQueueParamsResponse(Collector::QueueType::type type, void DbQueueParamsSet::HandleRequest() const { if (!(__isset.high && __isset.drop_level && __isset.queue_count)) { - SendQueueParamsError("Please specify all parameters", context()); + SendCollectorError("Please specify all parameters", context()); return; } Collector *collector = ExtractCollectorFromRequest(client_context(), @@ -652,7 +650,7 @@ void DbQueueParamsSet::HandleRequest() const { void SmQueueParamsSet::HandleRequest() const { if (!(__isset.high && __isset.drop_level && __isset.queue_count)) { - SendQueueParamsError("Please specify all parameters", context()); + SendCollectorError("Please specify all parameters", context()); return; } Collector *collector = ExtractCollectorFromRequest(client_context(), @@ -738,3 +736,49 @@ void FlowCollectionStatusRequest::HandleRequest() const { // Send response SendFlowCollectionStatusResponse(context()); } + +static DbHandlerPtr ExtractDbHandlerFromRequest(SandeshContext *vscontext, + const std::string &context) { + VizSandeshContext *vsc = + dynamic_cast(vscontext); + if (!vsc) { + SendCollectorError("Sandesh client context NOT PRESENT", + context); + return DbHandlerPtr(); + } + return vsc->Analytics()->GetDbHandler(); +} + +static void SendDatabaseWritesStatusResponse(SandeshContext *vscontext, std::string context) { + DbHandlerPtr dbh(ExtractDbHandlerFromRequest(vscontext, context)); + DatabaseWritesStatusResponse *dwsr(new DatabaseWritesStatusResponse); + dwsr->set_disable_all(dbh->IsAllWritesDisabled()); + dwsr->set_disable_statistics(dbh->IsStatisticsWritesDisabled()); + dwsr->set_disable_messages(dbh->IsMessagesWritesDisabled()); + dwsr->set_disable_flows(Sandesh::IsFlowCollectionDisabled()); + dwsr->set_context(context); + dwsr->Response(); +} + +void DisableDatabaseWritesRequest::HandleRequest() const { + DbHandlerPtr dbh(ExtractDbHandlerFromRequest(client_context(), context())); + if (__isset.disable_all) { + dbh->DisableAllWrites(get_disable_all()); + } + if (__isset.disable_statistics) { + dbh->DisableStatisticsWrites(get_disable_statistics()); + } + if (__isset.disable_messages) { + dbh->DisableMessagesWrites(get_disable_messages()); + } + if (__isset.disable_flows) { + Sandesh::DisableFlowCollection(get_disable_flows()); + } + // Send response + SendDatabaseWritesStatusResponse(client_context(), context()); +} + +void DatabaseWritesStatusRequest::HandleRequest() const { + // Send response + SendDatabaseWritesStatusResponse(client_context(), context()); +} diff --git a/src/analytics/collector_uve.sandesh b/src/analytics/collector_uve.sandesh index d97b96e91b0..29f00072952 100644 --- a/src/analytics/collector_uve.sandesh +++ b/src/analytics/collector_uve.sandesh @@ -288,10 +288,6 @@ struct QueueParams { 3: string drop_level; } -response sandesh QueueParamsError { - 1: string error; -} - /** * @description: sandesh response to return State Machine Queue params */ @@ -330,3 +326,24 @@ request sandesh FlowCollectionStatusRequest { response sandesh FlowCollectionStatusResponse { 1: bool disable } + +response sandesh CollectorError { + 1: string error +} + +request sandesh DisableDatabaseWritesRequest { + 1: optional bool disable_all + 2: optional bool disable_statistics + 3: optional bool disable_messages + 4: optional bool disable_flows +} + +request sandesh DatabaseWritesStatusRequest { +} + +response sandesh DatabaseWritesStatusResponse { + 1: bool disable_all + 2: bool disable_statistics + 3: bool disable_messages + 4: bool disable_flows +} diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index 9d2d134ef11..0060682db38 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -29,10 +29,12 @@ #include "viz_constants.h" #include "vizd_table_desc.h" +#include "viz_collector.h" #include "collector.h" #include "db_handler.h" #include "parser_util.h" #include "db_handler_impl.h" +#include "viz_sandesh.h" #define DB_LOG(_Level, _Msg) \ do { \ @@ -69,18 +71,23 @@ DbHandler::DbHandler(EventManager *evm, std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql, - const std::string &zookeeper_server_list, - bool use_zookeeper) : + const std::string &cassandra_compaction_strategy, + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper, bool disable_all_writes, + bool disable_statistics_writes, bool disable_messages_writes) : name_(name), drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), use_cql_(use_cql), tablespace_(), + compaction_strategy_(cassandra_compaction_strategy), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, (uint8_t)g_viz_constants.PARTITION_MAX), zookeeper_server_list_(zookeeper_server_list), - use_zookeeper_(use_zookeeper) { + use_zookeeper_(use_zookeeper), + disable_all_writes_(disable_all_writes), + disable_statistics_writes_(disable_statistics_writes), + disable_messages_writes_(disable_messages_writes) { #ifdef USE_CASSANDRA_CQL if (use_cql) { dbif_.reset(new cass::cql::CqlIf(evm, cassandra_ips, @@ -104,7 +111,10 @@ DbHandler::DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map) : dbif_(dbif), ttl_map_(ttl_map), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, - (uint8_t)g_viz_constants.PARTITION_MAX) { + (uint8_t)g_viz_constants.PARTITION_MAX), + disable_all_writes_(false), + disable_statistics_writes_(false), + disable_messages_writes_(false) { } DbHandler::~DbHandler() { @@ -166,7 +176,7 @@ void DbHandler::SetDropLevel(size_t queue_count, SandeshLevel::type level, bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_tables.begin(); it != vizd_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it)) { + if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -174,7 +184,7 @@ bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_flow_tables.begin(); it != vizd_flow_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it)) { + if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -182,7 +192,7 @@ bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_stat_tables.begin(); it != vizd_stat_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it)) { + if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -395,6 +405,30 @@ bool DbHandler::UseCql() const { return use_cql_; } +bool DbHandler::IsAllWritesDisabled() const { + return disable_all_writes_; +} + +bool DbHandler::IsStatisticsWritesDisabled() const { + return disable_statistics_writes_; +} + +bool DbHandler::IsMessagesWritesDisabled() const { + return disable_messages_writes_; +} + +void DbHandler::DisableAllWrites(bool disable) { + disable_all_writes_ = disable; +} + +void DbHandler::DisableStatisticsWrites(bool disable) { + disable_statistics_writes_ = disable; +} + +void DbHandler::DisableMessagesWrites(bool disable) { + disable_messages_writes_ = disable; +} + void DbHandler::SetDbQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm, boost::function defer_undefer_cb) { dbif_->Db_SetQueueWaterMark(boost::get<2>(wm), @@ -462,8 +496,17 @@ bool DbHandler::GetCqlStats(cass::cql::DbStats *stats) const { return true; } +bool DbHandler::InsertIntoDb(std::auto_ptr col_list, + GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsAllWritesDisabled()) { + return true; + } + return dbif_->Db_AddColumn(col_list, db_cb); +} + bool DbHandler::AllowMessageTableInsert(const SandeshHeader &header) { - return header.get_Type() != SandeshType::FLOW; + return !IsMessagesWritesDisabled() && !IsAllWritesDisabled() && + (header.get_Type() != SandeshType::FLOW); } bool DbHandler::MessageIndexTableInsert(const std::string& cfname, @@ -558,7 +601,7 @@ bool DbHandler::MessageIndexTableInsert(const std::string& cfname, GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); columns.push_back(col); #endif - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of message: " << message_type << ", message UUID: " << unm << " to table: " << cfname << " FAILED"); @@ -644,7 +687,7 @@ void DbHandler::MessageTableOnlyInsert(const VizMsg *vmsgp, columns.push_back(new GenDb::NewCol(g_viz_constants.DATA, vmsgp->msg->ExtractMessage(), ttl)); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of message: " << message_type << ", message UUID: " << vmsgp->unm << " COLUMN FAILED"); return; @@ -819,6 +862,9 @@ void DbHandler::GetRuleMap(RuleMap& rulemap) { void DbHandler::ObjectTableInsert(const std::string &table, const std::string &objectkey_str, uint64_t ×tamp, const boost::uuids::uuid& unm, const VizMsg *vmsgp, GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsMessagesWritesDisabled() || IsAllWritesDisabled()) { + return; + } uint32_t T2(timestamp >> g_viz_constants.RowTimeInBits); uint32_t T1(timestamp & g_viz_constants.RowTimeInMask); const std::string &message_type(vmsgp->msg->GetMessageType()); @@ -849,7 +895,7 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o GenDb::NewColVec& columns = col_list->columns_; columns.reserve(1); columns.push_back(col); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of " << objectkey_str << ", message UUID " << unm << " into table " << table << " FAILED"); @@ -870,7 +916,7 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o GenDb::NewColVec& columns = col_list->columns_; columns.reserve(1); columns.push_back(col); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of " << objectkey_str << ", message UUID " << unm << " " << table << " into table " << g_viz_constants.OBJECT_VALUE_TABLE << " FAILED"); @@ -1010,7 +1056,7 @@ bool DbHandler::StatTableWrite(uint32_t t2, GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); columns.push_back(col); - if (!dbif_->Db_AddColumn(col_list, db_cb)) { + if (!InsertIntoDb(col_list, db_cb)) { DB_LOG(ERROR, "Addition of " << statName << ", " << statAttr << " tag " << ptag.first << ":" << stag.first << " into table " << @@ -1032,6 +1078,9 @@ DbHandler::StatTableInsert(uint64_t ts, const TagMap & attribs_tag, const AttribMap & attribs, GenDb::GenDbIf::DbAddColumnCb db_cb) { + if (IsAllWritesDisabled() || IsStatisticsWritesDisabled()) { + return; + } int ttl = GetTtl(TtlType::STATSDATA_TTL); StatTableInsertTtl(ts, statName, statAttr, attribs_tag, attribs, ttl, db_cb); @@ -1232,14 +1281,14 @@ static void PopulateFlowRecordTableRowKey( } static bool PopulateFlowRecordTable(FlowValueArray &fvalues, - GenDb::GenDbIf *dbif, const TtlMap& ttl_map, - FlowFieldValuesCb fncb, GenDb::GenDbIf::DbAddColumnCb db_cb) { + DbInsertCb db_insert_cb, const TtlMap& ttl_map, + FlowFieldValuesCb fncb) { std::auto_ptr colList(new GenDb::ColList); colList->cfname_ = g_viz_constants.FLOW_TABLE; PopulateFlowRecordTableRowKey(fvalues, colList->rowkey_); PopulateFlowRecordTableColumns(FlowRecordTableColumns, fvalues, colList->columns_, ttl_map, fncb); - return dbif->Db_AddColumn(colList, db_cb); + return db_insert_cb(colList); } static const std::vector FlowIndexTableColumnValues = @@ -1436,8 +1485,8 @@ static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype, static bool PopulateFlowIndexTables(const FlowValueArray &fvalues, const uint32_t &T2, const uint32_t &T1, uint8_t partition_no, - GenDb::GenDbIf *dbif, const TtlMap& ttl_map, - FlowFieldValuesCb fncb, GenDb::GenDbIf::DbAddColumnCb db_cb) { + DbInsertCb db_insert_cb, const TtlMap& ttl_map, + FlowFieldValuesCb fncb) { // Populate row key and column values (same for all flow index // tables) GenDb::DbDataValueVec rkey; @@ -1455,7 +1504,7 @@ static bool PopulateFlowIndexTables(const FlowValueArray &fvalues, colList->rowkey_ = rkey; PopulateFlowIndexTableColumns(fitt, fvalues, T1, &colList->columns_, cvalues, ttl_map); - if (!dbif->Db_AddColumn(colList, db_cb)) { + if (!db_insert_cb(colList)) { LOG(ERROR, "Populating " << FlowIndexTable2String(fitt) << " FAILED"); } @@ -1616,8 +1665,10 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, FlowFieldValuesCb fncb = boost::bind(&DbHandler::FieldNamesTableInsert, this, timestamp, g_viz_constants.FLOW_TABLE, _1, _2, _3, db_cb); - if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_, - fncb, db_cb)) { + DbInsertCb db_insert_cb = + boost::bind(&DbHandler::InsertIntoDb, this, _1, db_cb); + if (!PopulateFlowRecordTable(flow_entry_values, db_insert_cb, ttl_map_, + fncb)) { DB_LOG(ERROR, "Populating FlowRecordTable FAILED"); } GenDb::DbDataValue &diff_bytes( @@ -1633,7 +1684,7 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, if (diff_bytes.which() != GenDb::DB_VALUE_BLANK && diff_packets.which() != GenDb::DB_VALUE_BLANK) { if (!PopulateFlowIndexTables(flow_entry_values, T2, T1, partition_no, - dbif_.get(), ttl_map_, fncb2, db_cb)) { + db_insert_cb, ttl_map_, fncb2)) { DB_LOG(ERROR, "Populating FlowIndexTables FAILED"); } } @@ -1718,16 +1769,20 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, DbHandlerInitializer::InitializeDoneCb callback, const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, - const std::string& cassandra_user, const std::string& cassandra_password, + const std::string &cassandra_user, const std::string &cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper) : + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes) : db_name_(db_name), db_task_instance_(db_task_instance), db_handler_(new DbHandler(evm, boost::bind(&DbHandlerInitializer::ScheduleInit, this), cassandra_ips, cassandra_ports, db_name, ttl_map, - cassandra_user, cassandra_password, use_cql, - zookeeper_server_list, use_zookeeper)), + cassandra_user, cassandra_password, cassandra_compaction_strategy, + use_cql, zookeeper_server_list, use_zookeeper, + disable_all_db_writes, disable_db_stats_writes, + disable_db_messages_writes)), callback_(callback), db_init_timer_(TimerManager::CreateTimer(*evm->io_service(), db_name + " Db Init Timer", diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 9d172f36254..b6ebf542955 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -88,8 +88,10 @@ class DbHandler { std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, + const std::string& cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper); + bool use_zookeeper, bool disable_all_writes, bool disable_stats_writes, + bool disable_messages_writes); DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map); virtual ~DbHandler(); @@ -135,6 +137,12 @@ class DbHandler { std::vector GetEndpoints() const; std::string GetName() const; bool UseCql() const; + bool IsAllWritesDisabled() const; + bool IsStatisticsWritesDisabled() const; + bool IsMessagesWritesDisabled() const; + void DisableAllWrites(bool disable); + void DisableStatisticsWrites(bool disable); + void DisableMessagesWrites(bool disable); private: void StatTableInsertTtl(uint64_t ts, @@ -174,9 +182,10 @@ class DbHandler { uint64_t GetTtl(TtlType::type type) { return GetTtlFromMap(ttl_map_, type); } + bool InsertIntoDb(std::auto_ptr col_list, + GenDb::GenDbIf::DbAddColumnCb db_cb); boost::scoped_ptr dbif_; - // Random generator for UUIDs ThreadSafeUuidGenerator umn_gen_; std::string name_; @@ -194,9 +203,13 @@ class DbHandler { static tbb::mutex fmutex_; bool use_cql_; std::string tablespace_; + std::string compaction_strategy_; UniformInt8RandomGenerator gen_partition_no_; std::string zookeeper_server_list_; bool use_zookeeper_; + bool disable_all_writes_; + bool disable_statistics_writes_; + bool disable_messages_writes_; bool CanRecordDataForT2(uint32_t, std::string); friend class DbHandlerTest; DISALLOW_COPY_AND_ASSIGN(DbHandler); @@ -234,11 +247,13 @@ class DbHandlerInitializer { const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, - const std::string& cassandra_user, - const std::string& cassandra_password, + const std::string &cassandra_user, + const std::string &cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper); + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes); DbHandlerInitializer(EventManager *evm, const std::string &db_name, int db_task_instance, const std::string &timer_task_name, InitializeDoneCb callback, diff --git a/src/analytics/db_handler_impl.h b/src/analytics/db_handler_impl.h index 257c1605447..e2725eca7c3 100644 --- a/src/analytics/db_handler_impl.h +++ b/src/analytics/db_handler_impl.h @@ -18,6 +18,7 @@ typedef boost::array FlowFieldValuesCb; +typedef boost::function)> DbInsertCb; void PopulateFlowIndexTableColumnValues( const std::vector &frvt, diff --git a/src/analytics/generator.cc b/src/analytics/generator.cc index e135c86af14..98817fc7cf9 100644 --- a/src/analytics/generator.cc +++ b/src/analytics/generator.cc @@ -111,7 +111,8 @@ SandeshGenerator::SandeshGenerator(Collector * const collector, VizSession *sess source + ":" + node_type + ":" + module + ":" + instance_id, collector->analytics_ttl_map(), collector->cassandra_user(), collector->cassandra_password(), - false, std::string(), false)); + std::string(), false, std::string(), + false, false, false, false)); } else { //Use collector db_handler db_handler_ = global_db_handler; diff --git a/src/analytics/main.cc b/src/analytics/main.cc index 4eed245496e..b2fc991229d 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -370,9 +370,12 @@ int main(int argc, char *argv[]) options.kafka_prefix(), ttl_map, options.cassandra_user(), options.cassandra_password(), + options.cassandra_compaction_strategy(), use_cql, zookeeper_server_list, - use_zookeeper); + use_zookeeper, options.disable_all_db_writes(), + options.disable_db_statistics_writes(), + options.disable_db_messages_writes()); #if 0 // initialize python/c++ API diff --git a/src/analytics/options.cc b/src/analytics/options.cc index 615dd7fa993..36bb7dc611e 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -14,6 +14,7 @@ #include "base/util.h" #include "net/address_util.h" #include "viz_constants.h" +#include using namespace std; using namespace boost::asio::ip; @@ -82,12 +83,16 @@ void Options::Initialize(EventManager &evm, default_kafka_broker_list.push_back(""); // Command line and config file options. - opt::options_description cassandra_config("cassandra Configuration options"); + opt::options_description cassandra_config("Cassandra Configuration options"); cassandra_config.add_options() - ("CASSANDRA.cassandra_user",opt::value()->default_value(""), + ("CASSANDRA.cassandra_user", opt::value()->default_value(""), "Cassandra user name") - ("CASSANDRA.cassandra_password",opt::value()->default_value(""), - "Cassandra password"); + ("CASSANDRA.cassandra_password", opt::value()->default_value(""), + "Cassandra password") + ("CASSANDRA.compaction_strategy", + opt::value()->default_value( + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY), + "Cassandra compaction strategy");; // Command line and config file options. opt::options_description config("Configuration options"); @@ -178,6 +183,15 @@ void Options::Initialize(EventManager &evm, ("DEFAULT.disable_flow_collection", opt::bool_switch(&disable_flow_collection_), "Disable flow message collection") + ("DATABASE.disable_all_writes", + opt::bool_switch(&disable_all_db_writes_), + "Disable all writes to the database") + ("DATABASE.disable_statistics_writes", + opt::bool_switch(&disable_db_stats_writes_), + "Disable statistics writes to the database") + ("DATABASE.disable_message_writes", + opt::bool_switch(&disable_db_messages_writes_), + "Disable message writes to the database") ("DISCOVERY.port", opt::value()->default_value( default_discovery_port), @@ -354,4 +368,20 @@ void Options::Process(int argc, char *argv[], GetOptValue(var_map, redis_password_, "REDIS.password"); GetOptValue(var_map, cassandra_user_, "CASSANDRA.cassandra_user"); GetOptValue(var_map, cassandra_password_, "CASSANDRA.cassandra_password"); + GetOptValue(var_map, cassandra_compaction_strategy_, + "CASSANDRA.compaction_strategy"); + if (!((cassandra_compaction_strategy_ == + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) || + (cassandra_compaction_strategy_ == + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY) || + (cassandra_compaction_strategy_ == + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY))) { + cout << "Invalid CASSANDRA.compaction_strategy," << + " please select one of [" << + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY << ", " << + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY << ", " << + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY << "]" << + endl; + exit(-1); + } } diff --git a/src/analytics/options.h b/src/analytics/options.h index a67836b8703..00ae2052c13 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -42,6 +42,9 @@ class Options { const std::string redis_password() const { return redis_password_; } const std::string cassandra_user() const { return cassandra_user_; } const std::string cassandra_password() const { return cassandra_password_; } + const std::string cassandra_compaction_strategy() const { + return cassandra_compaction_strategy_; + } const std::string hostname() const { return hostname_; } const std::string host_ip() const { return host_ip_; } const uint16_t http_server_port() const { return http_server_port_; } @@ -67,6 +70,9 @@ class Options { const bool test_mode() const { return test_mode_; } const uint32_t sandesh_send_rate_limit() const { return sandesh_ratelimit_; } const bool disable_flow_collection() const { return disable_flow_collection_; } + const bool disable_all_db_writes() const { return disable_all_db_writes_; } + const bool disable_db_statistics_writes() const { return disable_db_stats_writes_; } + const bool disable_db_messages_writes() const { return disable_db_messages_writes_; } private: template @@ -106,6 +112,7 @@ class Options { std::string redis_password_; std::string cassandra_user_; std::string cassandra_password_; + std::string cassandra_compaction_strategy_; std::string hostname_; std::string host_ip_; uint16_t http_server_port_; @@ -135,6 +142,9 @@ class Options { uint16_t partitions_; uint32_t sandesh_ratelimit_; bool disable_flow_collection_; + bool disable_all_db_writes_; + bool disable_db_stats_writes_; + bool disable_db_messages_writes_; boost::program_options::options_description config_file_options_; }; diff --git a/src/analytics/protobuf_collector.cc b/src/analytics/protobuf_collector.cc index b260b4e0187..739465294c8 100644 --- a/src/analytics/protobuf_collector.cc +++ b/src/analytics/protobuf_collector.cc @@ -24,7 +24,8 @@ ProtobufCollector::ProtobufCollector(EventManager *evm, db_initializer_.reset(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance, kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, false, std::string(), false)); + cassandra_password, std::string(), false, std::string(), false, + false, false, false)); db_handler_ = db_initializer_->GetDbHandler(); } else { db_handler_ = global_dbhandler; diff --git a/src/analytics/test/db_handler_mock.h b/src/analytics/test/db_handler_mock.h index 0e6ed017c23..fd6fee0cbf1 100644 --- a/src/analytics/test/db_handler_mock.h +++ b/src/analytics/test/db_handler_mock.h @@ -14,8 +14,8 @@ class DbHandlerMock : public DbHandler { DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) : DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this), std::vector(1, "127.0.0.1"), - std::vector(1, 9160), "localhost", ttl_map, "", "", false, - "", false) + std::vector(1, 9160), "localhost", ttl_map, "", "", "", false, + "", false, false, false, false) { } void StartDbifReinit() { diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index 94d1a460594..33dd50e9ebd 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -43,13 +43,18 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, const std::string &kafka_prefix, const TtlMap& ttl_map, const std::string &cassandra_user, const std::string &cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper) : + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes) : db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1, std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, use_cql, zookeeper_server_list, use_zookeeper)), + cassandra_password, cassandra_compaction_strategy, use_cql, + zookeeper_server_list, use_zookeeper, + disable_all_db_writes, disable_db_stats_writes, + disable_db_messages_writes)), osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port, redis_password, brokers, partitions, kafka_prefix)), ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())), diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index de5bc6b7497..a33e4bfde1d 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -40,9 +40,11 @@ class VizCollector { const std::string &kafka_prefix, const TtlMap &ttlmap, const std::string& cassandra_user, const std::string& cassandra_password, + const std::string &cassandra_compaction_strategy, bool use_cql, const std::string &zookeeper_server_list, - bool use_zookeeper); + bool use_zookeeper, bool disable_all_db_writes, + bool disable_db_stats_writes, bool disable_db_messages_writes); VizCollector(EventManager *evm, DbHandlerPtr db_handler, Ruleeng *ruleeng, Collector *collector, OpServerProxy *osp); diff --git a/src/base/test/queue_task_test.cc b/src/base/test/queue_task_test.cc index 4c270e9b213..667dc69b445 100644 --- a/src/base/test/queue_task_test.cc +++ b/src/base/test/queue_task_test.cc @@ -208,7 +208,7 @@ class QueueTaskTest : public ::testing::Test { tbb::atomic shutdown_test_exit_callback_sleep_; }; -TEST_F(QueueTaskTest, StartRunnerBasic) +TEST_F(QueueTaskTest, StartRunnerBasic) { TaskScheduler *scheduler = TaskScheduler::GetInstance(); // Always do start runner work_queue_.SetStartRunnerFunc( @@ -250,10 +250,10 @@ TEST_F(QueueTaskTest, StartRunnerBasic) work_queue_.Enqueue(enqueue_counter++); work_queue_.Enqueue(enqueue_counter++); task_util::WaitForIdle(1); - EXPECT_EQ(3, work_queue_.max_queue_len()); + EXPECT_EQ(4, work_queue_.max_queue_len()); // Add test case for bounded workq work_queue_.SetBounded(true); - work_queue_.size_(4); + work_queue_.SetSize(4); work_queue_.Enqueue(enqueue_counter++); work_queue_.Enqueue(enqueue_counter++); work_queue_.Enqueue(enqueue_counter++); diff --git a/src/database/cassandra/cql/cql_if.cc b/src/database/cassandra/cql/cql_if.cc index e4d4c67432d..14e9847d1d9 100644 --- a/src/database/cassandra/cql/cql_if.cc +++ b/src/database/cassandra/cql/cql_if.cc @@ -404,16 +404,17 @@ class CassStatementNameBinder : public boost::static_visitor<> { CassStatement *statement_; }; -static const std::string kQCompactionStrategy( +static const char * kQCompactionStrategy( "compaction = {'class': " - "'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}"); + "'org.apache.cassandra.db.compaction.%s'}"); static const std::string kQGCGraceSeconds("gc_grace_seconds = 0"); // // Cf2CassCreateTableIfNotExists // -std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { +std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, + const std::string &compaction_strategy) { std::ostringstream query; // Table name query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " "; @@ -430,12 +431,17 @@ std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { query << ", \"" << column.first << "\" " << DbDataType2CassType(column.second); } - query << ") WITH " << kQCompactionStrategy << " AND " << + char cbuf[512]; + int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy, + compaction_strategy.c_str())); + assert(!(n < 0 || n >= (int)sizeof(cbuf))); + query << ") WITH " << std::string(cbuf) << " AND " << kQGCGraceSeconds; return query.str(); } -std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { +std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, + const std::string &compaction_strategy) { std::ostringstream query; // Table name query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " ("; @@ -487,7 +493,11 @@ std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf) { } query << "column" << cnum; } - query << ")) WITH " << kQCompactionStrategy << " AND " << + char cbuf[512]; + int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy, + compaction_strategy.c_str())); + assert(!(n < 0 || n >= (int)sizeof(cbuf))); + query << ")) WITH " << std::string(cbuf) << " AND " << kQGCGraceSeconds; return query.str(); } @@ -1333,7 +1343,7 @@ class CqlIf::CqlIfImpl { } bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, - CassConsistency consistency) { + const std::string &compaction_strategy, CassConsistency consistency) { if (session_state_ != SessionState::CONNECTED) { return false; } @@ -1343,10 +1353,12 @@ class CqlIf::CqlIfImpl { std::string query; switch (cf.cftype_) { case GenDb::NewCf::COLUMN_FAMILY_SQL: - query = impl::StaticCf2CassCreateTableIfNotExists(cf); + query = impl::StaticCf2CassCreateTableIfNotExists(cf, + compaction_strategy); break; case GenDb::NewCf::COLUMN_FAMILY_NOSQL: - query = impl::DynamicCf2CassCreateTableIfNotExists(cf); + query = impl::DynamicCf2CassCreateTableIfNotExists(cf, + compaction_strategy); break; default: return false; @@ -1808,9 +1820,11 @@ bool CqlIf::Db_SetTablespace(const std::string &tablespace) { } // Column family -bool CqlIf::Db_AddColumnfamily(const GenDb::NewCf &cf) { +bool CqlIf::Db_AddColumnfamily(const GenDb::NewCf &cf, + const std::string &compaction_strategy) { bool success( - impl_->CreateTableIfNotExistsSync(cf, CASS_CONSISTENCY_QUORUM)); + impl_->CreateTableIfNotExistsSync(cf, compaction_strategy, + CASS_CONSISTENCY_QUORUM)); if (!success) { IncrementTableWriteFailStats(cf.cfname_); IncrementErrors(GenDb::IfErrors::ERR_WRITE_COLUMN_FAMILY); diff --git a/src/database/cassandra/cql/cql_if.h b/src/database/cassandra/cql/cql_if.h index bf075945b43..d317dec9b2e 100644 --- a/src/database/cassandra/cql/cql_if.h +++ b/src/database/cassandra/cql/cql_if.h @@ -39,7 +39,8 @@ class CqlIf : public GenDb::GenDbIf { virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor = "1"); // Column family - virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf); + virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, + const std::string &compaction_strategy); virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf); // Column virtual bool Db_AddColumn(std::auto_ptr cl); diff --git a/src/database/gendb.sandesh b/src/database/gendb.sandesh index dfd755a5ce0..e22ecac0656 100644 --- a/src/database/gendb.sandesh +++ b/src/database/gendb.sandesh @@ -22,6 +22,10 @@ enum DbDataType { BlobType = 12, // blob / bytes } +const string DATE_TIERED_COMPACTION_STRATEGY = "DateTieredCompactionStrategy"; +const string LEVELED_COMPACTION_STRATEGY = "LeveledCompactionStrategy"; +const string SIZE_TIERED_COMPACTION_STRATEGY = "SizeTieredCompactionStrategy"; + struct DbTableInfo { 1: string table_name 2: u64 reads diff --git a/src/database/gendb_if.h b/src/database/gendb_if.h index 8065e524d2a..57caf6087d8 100644 --- a/src/database/gendb_if.h +++ b/src/database/gendb_if.h @@ -170,7 +170,8 @@ class GenDbIf { virtual bool Db_AddSetTablespace(const std::string& tablespace, const std::string& replication_factor="1") = 0; // Column family - virtual bool Db_AddColumnfamily(const NewCf& cf) = 0; + virtual bool Db_AddColumnfamily(const NewCf& cf, + const std::string &compaction_strategy) = 0; virtual bool Db_UseColumnfamily(const NewCf& cf) = 0; // Column virtual bool Db_AddColumn(std::auto_ptr cl) = 0;