diff --git a/src/analytics/collector.cc b/src/analytics/collector.cc index e44fb2ea097..1f84bd1f78c 100644 --- a/src/analytics/collector.cc +++ b/src/analytics/collector.cc @@ -65,34 +65,14 @@ const std::vector Collector::kSmQueueWaterMarkInfo (Collector::kQSizeLowWaterMark, SandeshLevel::INVALID, false, true); Collector::Collector(EventManager *evm, short server_port, - DbHandlerPtr db_handler, OpServerProxy *osp, VizCallback cb, - std::vector cassandra_ips, - std::vector cassandra_ports, const TtlMap& ttl_map, - const std::string &cassandra_user, - const std::string &cassandra_password) : + DbHandlerPtr db_handler, OpServerProxy *osp, VizCallback cb) : SandeshServer(evm), db_handler_(db_handler), osp_(osp), evm_(evm), cb_(cb), - cassandra_ips_(cassandra_ips), - cassandra_ports_(cassandra_ports), - ttl_map_(ttl_map), - db_task_id_(TaskScheduler::GetInstance()->GetTaskId(kDbTask)), - cassandra_user_(cassandra_user), - cassandra_password_(cassandra_password), db_queue_wm_info_(kDbQueueWaterMarkInfo), sm_queue_wm_info_(kSmQueueWaterMarkInfo) { - - dbConnStatus_ = ConnectionStatus::INIT; - - if (!task_policy_set_) { - TaskPolicy db_task_policy = boost::assign::list_of - (TaskExclusion(lifetime_mgr_task_id())); - TaskScheduler::GetInstance()->SetPolicy(db_task_id_, db_task_policy); - task_policy_set_ = true; - } - SandeshServer::Initialize(server_port); Module::type module = Module::COLLECTOR; @@ -103,10 +83,6 @@ Collector::Collector(EventManager *evm, short server_port, Collector::~Collector() { } -int Collector::db_task_id() { - return db_task_id_; -} - void Collector::SessionShutdown() { SandeshServer::SessionShutdown(); diff --git a/src/analytics/collector.h b/src/analytics/collector.h index 59fd971e22a..5d66883f252 100644 --- a/src/analytics/collector.h +++ b/src/analytics/collector.h @@ -68,11 +68,7 @@ class Collector : public SandeshServer { Collector(EventManager *evm, short server_port, DbHandlerPtr db_handler, OpServerProxy *osp, - VizCallback cb, - std::vector cassandra_ips, - std::vector cassandra_ports, const TtlMap& ttl_map, - const std::string& cassandra_user, - const std::string& cassandra_password); + VizCallback cb); virtual ~Collector(); virtual void Shutdown(); virtual void SessionShutdown(); @@ -117,11 +113,6 @@ class Collector : public SandeshServer { static std::string GetSelfIp() { return self_ip_; } static void SetSelfIp(std::string ip) { self_ip_ = ip; } - std::vector cassandra_ips() { return cassandra_ips_; } - std::vector cassandra_ports() { return cassandra_ports_; } - std::string cassandra_user() { return cassandra_user_; } - std::string cassandra_password() { return cassandra_password_; } - const TtlMap& analytics_ttl_map() { return ttl_map_; } int db_task_id(); const CollectorStats &GetStats() const { return stats_; } void SendGeneratorStatistics(); @@ -140,7 +131,6 @@ class Collector : public SandeshServer { virtual void DisconnectSession(SandeshSession *session); private: - ConnectionStatus::type dbConnStatus_; void SetQueueWaterMarkInfo(QueueType::type type, Sandesh::QueueWaterMarkInfo &wm); void ResetQueueWaterMarkInfo(QueueType::type type); @@ -166,12 +156,7 @@ class Collector : public SandeshServer { EventManager * const evm_; VizCallback cb_; - std::vector cassandra_ips_; - std::vector cassandra_ports_; - TtlMap ttl_map_; int db_task_id_; - std::string cassandra_user_; - std::string cassandra_password_; // SandeshGenerator map typedef boost::ptr_map GeneratorMap; diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index b0062c2266e..cd29c18e899 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -65,21 +65,22 @@ DbHandler::DbHandler(EventManager *evm, const std::vector &cassandra_ips, const std::vector &cassandra_ports, std::string name, const TtlMap& ttl_map, - const std::string& cassandra_user, - const std::string& cassandra_password, - const std::string &cassandra_compaction_strategy, + const Options::Cassandra &cassandra_options, const std::string &zookeeper_server_list, bool use_zookeeper, bool disable_all_writes, bool disable_statistics_writes, bool disable_messages_writes, bool disable_messages_keyword_writes, bool use_db_write_options, const DbWriteOptions &db_write_options) : dbif_(new cass::cql::CqlIf(evm, cassandra_ips, - cassandra_ports[0], cassandra_user, cassandra_password)), + cassandra_ports[0], cassandra_options.user_, + cassandra_options.password_)), name_(name), drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), tablespace_(g_viz_constants.COLLECTOR_KEYSPACE_CQL), - compaction_strategy_(cassandra_compaction_strategy), + compaction_strategy_(cassandra_options.compaction_strategy_), + flow_tables_compaction_strategy_( + cassandra_options.flow_tables_compaction_strategy_), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, (uint8_t)g_viz_constants.PARTITION_MAX), zookeeper_server_list_(zookeeper_server_list), @@ -328,7 +329,7 @@ bool DbHandler::CreateTables() { for (std::vector::const_iterator it = vizd_flow_tables.begin(); it != vizd_flow_tables.end(); it++) { - if (!dbif_->Db_AddColumnfamily(*it, compaction_strategy_)) { + if (!dbif_->Db_AddColumnfamily(*it, flow_tables_compaction_strategy_)) { DB_LOG(ERROR, it->cfname_ << " FAILED"); return false; } @@ -1869,8 +1870,7 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, DbHandlerInitializer::InitializeDoneCb callback, const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, - const std::string &cassandra_user, const std::string &cassandra_password, - const std::string &cassandra_compaction_strategy, + const Options::Cassandra &cassandra_options, const std::string &zookeeper_server_list, bool use_zookeeper, bool disable_all_db_writes, bool disable_db_stats_writes, bool disable_db_messages_writes, @@ -1880,8 +1880,7 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, db_handler_(new DbHandler(evm, boost::bind(&DbHandlerInitializer::ScheduleInit, this), cassandra_ips, cassandra_ports, db_name, ttl_map, - cassandra_user, cassandra_password, cassandra_compaction_strategy, - zookeeper_server_list, use_zookeeper, + cassandra_options, zookeeper_server_list, use_zookeeper, disable_all_db_writes, disable_db_stats_writes, disable_db_messages_writes, disable_db_messages_keyword_writes, true, db_write_options)), diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 026c15cf0f6..63ea1ebdf70 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -92,9 +92,7 @@ class DbHandler { const std::vector &cassandra_ips, const std::vector &cassandra_ports, std::string name, const TtlMap& ttl_map, - const std::string& cassandra_user, - const std::string& cassandra_password, - const std::string& cassandra_compaction_strategy, + const Options::Cassandra &cassandra_options, const std::string &zookeeper_server_list, bool use_zookeeper, bool disable_all_writes, bool disable_stats_writes, bool disable_messages_writes, bool disable_messages_keyword_writes, @@ -254,6 +252,7 @@ class DbHandler { static tbb::mutex fmutex_; std::string tablespace_; std::string compaction_strategy_; + std::string flow_tables_compaction_strategy_; UniformInt8RandomGenerator gen_partition_no_; std::string zookeeper_server_list_; bool use_zookeeper_; @@ -310,10 +309,7 @@ class DbHandlerInitializer { const std::string &timer_task_name, InitializeDoneCb callback, const std::vector &cassandra_ips, const std::vector &cassandra_ports, - const TtlMap& ttl_map, - const std::string& cassandra_user, - const std::string& cassandra_password, - const std::string &cassandra_compaction_strategy, + const TtlMap& ttl_map, const Options::Cassandra &cassandra_options, const std::string &zookeeper_server_list, bool use_zookeeper, bool disable_all_db_writes, bool disable_db_stats_writes, bool disable_db_messages_writes, diff --git a/src/analytics/main.cc b/src/analytics/main.cc index 0039bb94191..ace64945fab 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -340,9 +340,7 @@ int main(int argc, char *argv[]) options.partitions(), options.dup(), options.kafka_prefix(), - ttl_map, options.cassandra_user(), - options.cassandra_password(), - options.cassandra_compaction_strategy(), + ttl_map, options.get_cassandra_options(), zookeeper_server_list, use_zookeeper, options.disable_all_db_writes(), options.disable_db_statistics_writes(), diff --git a/src/analytics/options.cc b/src/analytics/options.cc index b5aafab522d..b4c1a651191 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -107,8 +107,12 @@ void Options::Initialize(EventManager &evm, "Cassandra password") ("CASSANDRA.compaction_strategy", opt::value()->default_value( - GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY), - "Cassandra compaction strategy");; + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY), + "Cassandra compaction strategy") + ("CASSANDRA.flow_tables.compaction_strategy", + opt::value()->default_value( + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY), + "Cassandra compaction strategy for flow tables"); // Command line and config file options. opt::options_description config("Configuration options"); @@ -387,6 +391,25 @@ void Options::GetOptValueImpl( } } +static bool ValidateCompactionStrategyOption( + const std::string &compaction_strategy, + const std::string &option) { + if (!((compaction_strategy == + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) || + (compaction_strategy == + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY) || + (compaction_strategy == + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY))) { + cout << "Invalid " << option << ", please select one of [" << + GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY << ", " << + GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY << ", " << + GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY << "]" << + endl; + return false; + } + return true; +} + // Process command line options. They can come from a conf file as well. Options // from command line always overrides those that come from the config file. void Options::Process(int argc, char *argv[], @@ -540,22 +563,23 @@ void Options::Process(int argc, char *argv[], GetOptValue(var_map, redis_port_, "REDIS.port"); GetOptValue(var_map, redis_server_, "REDIS.server"); GetOptValue(var_map, redis_password_, "REDIS.password"); - GetOptValue(var_map, cassandra_user_, "CASSANDRA.cassandra_user"); - GetOptValue(var_map, cassandra_password_, "CASSANDRA.cassandra_password"); - GetOptValue(var_map, cassandra_compaction_strategy_, + GetOptValue(var_map, cassandra_options_.user_, + "CASSANDRA.cassandra_user"); + GetOptValue(var_map, cassandra_options_.password_, + "CASSANDRA.cassandra_password"); + GetOptValue(var_map, cassandra_options_.compaction_strategy_, "CASSANDRA.compaction_strategy"); - if (!((cassandra_compaction_strategy_ == - GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) || - (cassandra_compaction_strategy_ == - GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY) || - (cassandra_compaction_strategy_ == - GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY))) { - cout << "Invalid CASSANDRA.compaction_strategy," << - " please select one of [" << - GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY << ", " << - GenDb::g_gendb_constants.LEVELED_COMPACTION_STRATEGY << ", " << - GenDb::g_gendb_constants.SIZE_TIERED_COMPACTION_STRATEGY << "]" << - endl; + if (!ValidateCompactionStrategyOption( + cassandra_options_.compaction_strategy_, + "CASSANDRA.compaction_strategy")) { + exit(-1); + } + GetOptValue(var_map, + cassandra_options_.flow_tables_compaction_strategy_, + "CASSANDRA.flow_tables.compaction_strategy"); + if (!ValidateCompactionStrategyOption( + cassandra_options_.flow_tables_compaction_strategy_, + "CASSANDRA.flow_tables.compaction_strategy")) { exit(-1); } GetOptValue(var_map, ks_port_, "KEYSTONE.auth_port"); diff --git a/src/analytics/options.h b/src/analytics/options.h index 4b031cc6a28..8a5eb7452d4 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -83,9 +83,26 @@ class DbWriteOptions { // Process command line/configuration file options for collector. class Options { public: + struct Cassandra { + Cassandra() : + user_(), + password_(), + compaction_strategy_(), + flow_tables_compaction_strategy_() { + } + + std::string user_; + std::string password_; + std::string compaction_strategy_; + std::string flow_tables_compaction_strategy_; + }; + Options(); bool Parse(EventManager &evm, int argc, char **argv); + const Cassandra get_cassandra_options() const { + return cassandra_options_; + } const std::vector cassandra_server_list() const { return cassandra_server_list_; } @@ -115,11 +132,6 @@ class Options { const std::string redis_server() const { return redis_server_; } const uint16_t redis_port() const { return redis_port_; } const std::string redis_password() const { return redis_password_; } - const std::string cassandra_user() const { return cassandra_user_; } - const std::string cassandra_password() const { return cassandra_password_; } - const std::string cassandra_compaction_strategy() const { - return cassandra_compaction_strategy_; - } const std::string hostname() const { return hostname_; } const std::string host_ip() const { return host_ip_; } const uint16_t http_server_port() const { return http_server_port_; } @@ -194,9 +206,7 @@ class Options { std::string redis_server_; uint16_t redis_port_; std::string redis_password_; - std::string cassandra_user_; - std::string cassandra_password_; - std::string cassandra_compaction_strategy_; + Cassandra cassandra_options_; std::string hostname_; std::string host_ip_; uint16_t http_server_port_; diff --git a/src/analytics/protobuf_collector.cc b/src/analytics/protobuf_collector.cc index 5a40a21f773..55aa75fd3dd 100644 --- a/src/analytics/protobuf_collector.cc +++ b/src/analytics/protobuf_collector.cc @@ -10,42 +10,23 @@ #include "analytics/db_handler.h" #include "analytics/protobuf_collector.h" -const std::string ProtobufCollector::kDbName("Google Protocol Buffer"); -const int ProtobufCollector::kDbTaskInstance(-1); -const std::string ProtobufCollector::kDbTaskName("protobuf_collector::Db"); - ProtobufCollector::ProtobufCollector(EventManager *evm, - uint16_t protobuf_udp_port, - const std::vector &cassandra_ips, - const std::vector &cassandra_ports, const TtlMap& ttl_map, - const std::string& cassandra_user, const std::string& cassandra_password, - DbHandlerPtr global_dbhandler) { - db_handler_ = global_dbhandler; - server_.reset(new protobuf::ProtobufServer(evm, protobuf_udp_port, - boost::bind(&DbHandler::StatTableInsert, db_handler_, - _1, _2, _3, _4, _5, GenDb::GenDbIf::DbAddColumnCb()))); + uint16_t protobuf_udp_port, DbHandlerPtr db_handler) : + server_(new protobuf::ProtobufServer(evm, protobuf_udp_port, + boost::bind(&DbHandler::StatTableInsert, db_handler, + _1, _2, _3, _4, _5, GenDb::GenDbIf::DbAddColumnCb()))) { } ProtobufCollector::~ProtobufCollector() { } bool ProtobufCollector::Initialize() { - if (db_initializer_) { - return db_initializer_->Initialize(); - } - DbInitializeCb(); + server_->Initialize(); return true; } void ProtobufCollector::Shutdown() { server_->Shutdown(); - if (db_initializer_) { - db_initializer_->Shutdown(); - } -} - -void ProtobufCollector::DbInitializeCb() { - server_->Initialize(); } void ProtobufCollector::SendStatistics(const std::string &name) { @@ -58,20 +39,5 @@ void ProtobufCollector::SendStatistics(const std::string &name) { snh->set_tx_socket_stats(v_tx_stats); snh->set_rx_socket_stats(v_rx_stats); snh->set_rx_message_stats(v_rx_msg_stats); - if (db_initializer_) { - // Database statistics - std::vector v_dbti, v_stats_dbti; - GenDb::DbErrors dbe; - db_handler_->GetStats(&v_dbti, &dbe, &v_stats_dbti); - std::vector v_dbe; - v_dbe.push_back(dbe); - uint64_t db_queue_count, db_enqueues; - db_handler_->GetStats(&db_queue_count, &db_enqueues); - snh->set_db_table_info(v_dbti); - snh->set_db_statistics_table_info(v_stats_dbti); - snh->set_db_errors(v_dbe); - snh->set_db_queue_count(db_queue_count); - snh->set_db_enqueues(db_enqueues); - } PROTOBUF_COLLECTOR_STATS_SEND_SANDESH(snh); } diff --git a/src/analytics/protobuf_collector.h b/src/analytics/protobuf_collector.h index f79cff6ebfc..f2249da8c5f 100644 --- a/src/analytics/protobuf_collector.h +++ b/src/analytics/protobuf_collector.h @@ -16,25 +16,13 @@ class DbHandlerInitializer; class ProtobufCollector { public: ProtobufCollector(EventManager *evm, uint16_t udp_server_port, - const std::vector &cassandra_ips, - const std::vector &cassandra_ports, const TtlMap&, - const std::string& cassandra_user, - const std::string& cassandra_password, - DbHandlerPtr global_dbhandler); + DbHandlerPtr db_handler); virtual ~ProtobufCollector(); bool Initialize(); void Shutdown(); void SendStatistics(const std::string &name); private: - void DbInitializeCb(); - - static const std::string kDbName; - static const int kDbTaskInstance; - static const std::string kDbTaskName; - - boost::scoped_ptr db_initializer_; - DbHandlerPtr db_handler_; boost::scoped_ptr server_; }; diff --git a/src/analytics/test/SConscript b/src/analytics/test/SConscript index abb797b3733..796950b4a47 100644 --- a/src/analytics/test/SConscript +++ b/src/analytics/test/SConscript @@ -117,7 +117,7 @@ sflow_parser_test = env.UnitTest('sflow_parser_test', env.Alias('src/analytics:sflow_parser_test', sflow_parser_test) test_suite = [ -#options_test, + options_test, viz_message_test, stat_walker_test, protobuf_test, diff --git a/src/analytics/test/db_handler_mock.h b/src/analytics/test/db_handler_mock.h index 7b875a865be..ba6ad1df3e3 100644 --- a/src/analytics/test/db_handler_mock.h +++ b/src/analytics/test/db_handler_mock.h @@ -14,8 +14,9 @@ class DbHandlerMock : public DbHandler { DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) : DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this), std::vector(1, "127.0.0.1"), - std::vector(1, 9160), "localhost", ttl_map, "", "", - "", "", false, false, false, false, false, false, + std::vector(1, 9160), "localhost", ttl_map, + Options::Cassandra(), "", + false, false, false, false, false, false, DbWriteOptions()) { } void StartDbifReinit() { diff --git a/src/analytics/test/options_test.cc b/src/analytics/test/options_test.cc index f3787e6c65f..1b368344a5f 100644 --- a/src/analytics/test/options_test.cc +++ b/src/analytics/test/options_test.cc @@ -33,20 +33,18 @@ class OptionsTest : public ::testing::Test { boost::system::error_code error; hostname_ = host_name(error); host_ip_ = GetHostIp(evm_.io_service(), hostname_); - default_cassandra_server_list_.push_back("127.0.0.1:9160"); - default_conf_files_.push_back("/etc/contrail/contrail-collector.conf"); - default_conf_files_.push_back("/etc/contrail/contrail-database.conf"); + default_cassandra_server_list_.push_back("127.0.0.1:9042"); } virtual void TearDown() { remove("./options_test_collector_config_file.conf"); + remove ("./options_test_cassandra_config_file.conf"); } EventManager evm_; std::string hostname_; std::string host_ip_; vector default_cassandra_server_list_; - vector default_conf_files_; Options options_; }; @@ -59,6 +57,7 @@ TEST_F(OptionsTest, NoArguments) { options_.Parse(evm_, argc, argv); vector expected_conf_files_; expected_conf_files_.push_back("/etc/contrail/contrail-collector.conf"); + expected_conf_files_.push_back("/etc/contrail/contrail-keystone-auth.conf"); TASK_UTIL_EXPECT_VECTOR_EQ(default_cassandra_server_list_, options_.cassandra_server_list()); EXPECT_EQ(options_.redis_server(), "127.0.0.1"); @@ -86,11 +85,12 @@ TEST_F(OptionsTest, NoArguments) { EXPECT_EQ(options_.syslog_port(), -1); EXPECT_EQ(options_.dup(), false); EXPECT_EQ(options_.test_mode(), false); - EXPECT_EQ(options_.sandesh_send_rate_limit(), 0); + EXPECT_EQ(options_.sandesh_send_rate_limit(), + g_sandesh_constants.DEFAULT_SANDESH_SEND_RATELIMIT); EXPECT_EQ(options_.disable_flow_collection(), false); EXPECT_EQ(options_.disable_db_messages_writes(), false); EXPECT_EQ(options_.enable_db_messages_keyword_writes(), false); - EXPECT_EQ(options_.disable_db_stats_writes(), false); + EXPECT_EQ(options_.disable_db_statistics_writes(), false); EXPECT_EQ(options_.disable_all_db_writes(), false); uint16_t protobuf_port(0); EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port)); @@ -139,7 +139,7 @@ TEST_F(OptionsTest, DefaultConfFile) { EXPECT_EQ(options_.disable_flow_collection(), false); EXPECT_EQ(options_.disable_db_messages_writes(), false); EXPECT_EQ(options_.enable_db_messages_keyword_writes(), false); - EXPECT_EQ(options_.disable_db_stats_writes(), false); + EXPECT_EQ(options_.disable_db_statistics_writes(), false); EXPECT_EQ(options_.disable_all_db_writes(), false); uint16_t protobuf_port(0); EXPECT_FALSE(options_.collector_protobuf_port(&protobuf_port)); @@ -195,14 +195,14 @@ TEST_F(OptionsTest, OverrideStringFromCommandLine) { } TEST_F(OptionsTest, OverrideBooleanFromCommandLine) { - int argc = 4; + int argc = 6; char *argv[argc]; char argv_0[] = "options_test"; char argv_1[] = "--conf_file=controller/src/analytics/contrail-collector.conf"; char argv_2[] = "--DEFAULT.test_mode"; char argv_3[] = "--DEFAULT.disable_flow_collection"; - char argv_4[] = "--DATABASE.disable_all_db_writes"; - char argv_5[] = "--DATABASE.enable_db_messages_keyword_writes"; + char argv_4[] = "--DATABASE.disable_all_writes"; + char argv_5[] = "--DATABASE.enable_message_keyword_writes"; argv[0] = argv_0; argv[1] = argv_1; argv[2] = argv_2; @@ -292,7 +292,9 @@ TEST_F(OptionsTest, CustomConfigFile) { string cassandra_config = "" "[CASSANDRA]\n" "cassandra_user=cassandra1\n" - "cassandra_password=cassandra1\n"; + "cassandra_password=cassandra1\n" + "compaction_strategy=LeveledCompactionStrategy\n" + "flow_tables.compaction_strategy=SizeTieredCompactionStrategy\n"; config_file.open("./options_test_cassandra_config_file.conf"); config_file << cassandra_config; @@ -350,8 +352,13 @@ TEST_F(OptionsTest, CustomConfigFile) { uint16_t protobuf_port(0); EXPECT_TRUE(options_.collector_protobuf_port(&protobuf_port)); EXPECT_EQ(protobuf_port, 3333); - EXPECT_EQ(options_.cassandra_user(), "cassandra1"); - EXPECT_EQ(options_.cassandra_password(), "cassandra1"); + Options::Cassandra cassandra_options(options_.get_cassandra_options()); + EXPECT_EQ(cassandra_options.user_, "cassandra1"); + EXPECT_EQ(cassandra_options.password_, "cassandra1"); + EXPECT_EQ(cassandra_options.compaction_strategy_, + "LeveledCompactionStrategy"); + EXPECT_EQ(cassandra_options.flow_tables_compaction_strategy_, + "SizeTieredCompactionStrategy"); EXPECT_EQ(options_.sandesh_send_rate_limit(), 5); } @@ -455,8 +462,9 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) { TASK_UTIL_EXPECT_VECTOR_EQ(options_.config_file(), input_conf_files); - EXPECT_EQ(options_.cassandra_user(),"cassandra"); - EXPECT_EQ(options_.cassandra_password(),"cassandra"); + Options::Cassandra cassandra_options(options_.get_cassandra_options()); + EXPECT_EQ(cassandra_options.user_,"cassandra"); + EXPECT_EQ(cassandra_options.password_,"cassandra"); EXPECT_EQ(options_.discovery_server(), "1.0.0.1"); EXPECT_EQ(options_.discovery_port(), 100); EXPECT_EQ(options_.hostname(), "test"); diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index bbe0af31ef0..f1ef9d066f7 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -43,9 +43,7 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, int syslog_port, int sflow_port, int ipfix_port, uint16_t partitions, bool dup, const std::string &kafka_prefix, const TtlMap& ttl_map, - const std::string &cassandra_user, - const std::string &cassandra_password, - const std::string &cassandra_compaction_strategy, + const Options::Cassandra &cassandra_options, const std::string &zookeeper_server_list, bool use_zookeeper, bool disable_all_db_writes, bool disable_db_stats_writes, bool disable_db_messages_writes, @@ -54,8 +52,7 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), - cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, cassandra_compaction_strategy, + cassandra_ips, cassandra_ports, ttl_map, cassandra_options, zookeeper_server_list, use_zookeeper, disable_all_db_writes, disable_db_stats_writes, disable_db_messages_writes, disable_db_messages_keyword_writes, @@ -65,9 +62,7 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())), collector_(new Collector(evm, listen_port, db_initializer_->GetDbHandler(), osp_.get(), - boost::bind(&Ruleeng::rule_execute, ruleeng_.get(), _1, _2, _3, _4), - cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password)), + boost::bind(&Ruleeng::rule_execute, ruleeng_.get(), _1, _2, _3, _4))), syslog_listener_(new SyslogListeners(evm, boost::bind(&Ruleeng::rule_execute, ruleeng_.get(), _1, _2, _3, _4), db_initializer_->GetDbHandler(), syslog_port)), @@ -83,9 +78,7 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, name_ = boost::asio::ip::host_name(error); if (protobuf_collector_enabled) { protobuf_collector_.reset(new ProtobufCollector(evm, - protobuf_listen_port, cassandra_ips, cassandra_ports, - ttl_map, cassandra_user, cassandra_password, - db_initializer_->GetDbHandler())); + protobuf_listen_port, db_initializer_->GetDbHandler())); } CollectorPublish(); } diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index 73cef35080f..d57ec72defd 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -41,9 +41,7 @@ class VizCollector { int syslog_port, int sflow_port, int ipfix_port, uint16_t partitions, bool dup, const std::string &kafka_prefix, const TtlMap &ttlmap, - const std::string& cassandra_user, - const std::string& cassandra_password, - const std::string &cassandra_compaction_strategy, + const Options::Cassandra &cassandra_options, const std::string &zookeeper_server_list, bool use_zookeeper, bool disable_all_db_writes, bool disable_db_stats_writes, bool disable_db_messages_writes,