diff --git a/src/analytics/collector.cc b/src/analytics/collector.cc index a041f9422a6..67969d479d5 100644 --- a/src/analytics/collector.cc +++ b/src/analytics/collector.cc @@ -358,12 +358,9 @@ void Collector::TestDatabaseConnection() { if (connect_status_change) { // update connection status - boost::system::error_code ec; - boost::asio::ip::address db_addr(boost::asio::ip::address::from_string( - db_handler_->GetHost(), ec)); - boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort()); ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - DbGlobalName(false), dbConnStatus_, db_endpoint, std::string()); + DbGlobalName(false), dbConnStatus_, testdbif_->Db_GetEndpoints(), + std::string()); } #endif // !USE_CASSANDRA_CQL } diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index d4e9a23ffb3..db1f41f9357 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -125,12 +125,9 @@ uint64_t DbHandler::GetTtlFromMap(const TtlMap& ttl_map, std::string DbHandler::GetName() const { return name_; } -std::string DbHandler::GetHost() const { - return dbif_->Db_GetHost(); -} -int DbHandler::GetPort() const { - return dbif_->Db_GetPort(); +std::vector DbHandler::GetEndpoints() const { + return dbif_->Db_GetEndpoints(); } bool DbHandler::DropMessage(const SandeshHeader &header, @@ -1676,21 +1673,17 @@ bool DbHandlerInitializer::Initialize() { boost::system::error_code ec; if (!db_handler_->Init(true, db_task_instance_)) { // Update connection info - boost::asio::ip::address db_addr(boost::asio::ip::address::from_string( - db_handler_->GetHost(), ec)); - boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort()); ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - db_name_, ConnectionStatus::DOWN, db_endpoint, std::string()); + db_name_, ConnectionStatus::DOWN, db_handler_->GetEndpoints(), + std::string()); LOG(DEBUG, db_name_ << ": Db Initialization FAILED"); ScheduleInit(); return false; } // Update connection info - boost::asio::ip::address db_addr(boost::asio::ip::address::from_string( - db_handler_->GetHost(), ec)); - boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort()); ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - db_name_, ConnectionStatus::UP, db_endpoint, std::string()); + db_name_, ConnectionStatus::UP, db_handler_->GetEndpoints(), + std::string()); if (callback_) { callback_(); diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 348364d25cd..ce1fe75f816 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -144,8 +144,7 @@ class DbHandler { void SetDbQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm, boost::function defer_undefer_cb); void ResetDbQueueWaterMarkInfo(); - std::string GetHost() const; - int GetPort() const; + std::vector GetEndpoints() const; std::string GetName() const; bool UseCql() const; diff --git a/src/base/connection_info.cc b/src/base/connection_info.cc index 90e61c86344..d94c830ee66 100644 --- a/src/base/connection_info.cc +++ b/src/base/connection_info.cc @@ -7,6 +7,7 @@ #include #include #include +#include #include "base/string_util.h" #include "base/sandesh/process_info_constants.h" @@ -43,9 +44,9 @@ void ConnectionState::Update() { } } -void ConnectionState::Update(ConnectionType::type ctype, +void ConnectionState::UpdateInternal(ConnectionType::type ctype, const std::string &name, ConnectionStatus::type status, - Endpoint server, std::string message) { + const std::vector &servers, std::string message) { // Populate key ConnectionInfoKey key(ctype, name); // Populate info @@ -53,12 +54,14 @@ void ConnectionState::Update(ConnectionType::type ctype, info.set_type( g_process_info_constants.ConnectionTypeNames.find(ctype)->second); info.set_name(name); - boost::system::error_code ec; - std::string saddr(server.address().to_string(ec)); - int sport(server.port()); - std::string server_address(saddr + ":" + integerToString(sport)); - std::vector server_addrs = boost::assign::list_of - (server_address); + std::vector server_addrs; + BOOST_FOREACH(const Endpoint &server, servers) { + boost::system::error_code ec; + std::string saddr(server.address().to_string(ec)); + int sport(server.port()); + std::string server_address(saddr + ":" + integerToString(sport)); + server_addrs.push_back(server_address); + } info.set_server_addrs(server_addrs); info.set_status( g_process_info_constants.ConnectionStatusNames.find(status)->second); @@ -78,6 +81,19 @@ void ConnectionState::Update(ConnectionType::type ctype, } } +void ConnectionState::Update(ConnectionType::type ctype, + const std::string &name, ConnectionStatus::type status, + const std::vector &servers, std::string message) { + UpdateInternal(ctype, name, status, servers, message); +} + +void ConnectionState::Update(ConnectionType::type ctype, + const std::string &name, ConnectionStatus::type status, + Endpoint server, std::string message) { + UpdateInternal(ctype, name, status, boost::assign::list_of(server), + message); +} + void ConnectionState::Delete(ConnectionType::type ctype, const std::string &name) { // Construct key diff --git a/src/base/connection_info.h b/src/base/connection_info.h index 9d198f5dee7..14ee31db747 100644 --- a/src/base/connection_info.h +++ b/src/base/connection_info.h @@ -39,10 +39,17 @@ class ConnectionState { void Update(ConnectionType::type ctype, const std::string &name, ConnectionStatus::type status, Endpoint server, std::string message); + void Update(ConnectionType::type ctype, const std::string &name, + ConnectionStatus::type status, const std::vector &servers, + std::string message); void Delete(ConnectionType::type ctype, const std::string &name); std::vector GetInfos() const; private: + void UpdateInternal(ConnectionType::type ctype, + const std::string &name, ConnectionStatus::type status, + const std::vector &servers, std::string message); + template friend class ConnectionStateManager; diff --git a/src/database/cassandra/cql/cql_if.cc b/src/database/cassandra/cql/cql_if.cc index 4077b844615..8c5ffdb0fef 100644 --- a/src/database/cassandra/cql/cql_if.cc +++ b/src/database/cassandra/cql/cql_if.cc @@ -1136,6 +1136,13 @@ CqlIf::CqlIf(EventManager *evm, impl_ = new CqlIfImpl(evm, cassandra_ips, cassandra_port, cassandra_user, cassandra_password); initialized_ = false; + BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) { + boost::system::error_code ec; + boost::asio::ip::address cassandra_addr( + boost::asio::ip::address::from_string(cassandra_ip, ec)); + GenDb::Endpoint endpoint(cassandra_addr, cassandra_port); + endpoints_.push_back(endpoint); + } } CqlIf::CqlIf() : impl_(NULL) { @@ -1275,14 +1282,8 @@ bool CqlIf::Db_GetStats(std::vector *vdbti, } // Connection -std::string CqlIf::Db_GetHost() const { - //return impl_->Db_GetHost(); - return std::string(); -} - -int CqlIf::Db_GetPort() const { - //return impl_->Db_GetPort(); - return -1; +std::vector CqlIf::Db_GetEndpoints() const { + return endpoints_; } } // namespace cql diff --git a/src/database/cassandra/cql/cql_if.h b/src/database/cassandra/cql/cql_if.h index 7818471694d..8cc537eb8f8 100644 --- a/src/database/cassandra/cql/cql_if.h +++ b/src/database/cassandra/cql/cql_if.h @@ -62,13 +62,13 @@ class CqlIf : public GenDb::GenDbIf { virtual bool Db_GetStats(std::vector *vdbti, GenDb::DbErrors *dbe); // Connection - virtual std::string Db_GetHost() const; - virtual int Db_GetPort() const; + virtual std::vector Db_GetEndpoints() const; private: class CqlIfImpl; CqlIfImpl *impl_; tbb::atomic initialized_; + std::vector endpoints_; }; } // namespace cql diff --git a/src/database/cassandra/thrift/thrift_if.cc b/src/database/cassandra/thrift/thrift_if.cc index 35dd6e8b1c8..5d93397612c 100644 --- a/src/database/cassandra/thrift/thrift_if.cc +++ b/src/database/cassandra/thrift/thrift_if.cc @@ -111,10 +111,6 @@ bool ThriftIf::Db_GetStats(std::vector *vdbti, } // Connection -std::string ThriftIf::Db_GetHost() const { - return impl_->Db_GetHost(); -} - -int ThriftIf::Db_GetPort() const { - return impl_->Db_GetPort(); +std::vector ThriftIf::Db_GetEndpoints() const { + return impl_->Db_GetEndpoints(); } diff --git a/src/database/cassandra/thrift/thrift_if.h b/src/database/cassandra/thrift/thrift_if.h index 7c10bd99849..1e6a7f440ac 100644 --- a/src/database/cassandra/thrift/thrift_if.h +++ b/src/database/cassandra/thrift/thrift_if.h @@ -53,8 +53,7 @@ class ThriftIf : public GenDb::GenDbIf { virtual bool Db_GetStats(std::vector *vdbti, GenDb::DbErrors *dbe); // Connection - virtual std::string Db_GetHost() const; - virtual int Db_GetPort() const; + virtual std::vector Db_GetEndpoints() const; private: ThriftIfImpl *impl_; diff --git a/src/database/cassandra/thrift/thrift_if_impl.cc b/src/database/cassandra/thrift/thrift_if_impl.cc index 16ee4ffaa85..2781fb10782 100644 --- a/src/database/cassandra/thrift/thrift_if_impl.cc +++ b/src/database/cassandra/thrift/thrift_if_impl.cc @@ -991,16 +991,14 @@ void ThriftIfImpl::Db_Uninit(const std::string& task_id, int task_instance) { Db_UninitUnlocked(task_id, task_instance); } -std::string ThriftIfImpl::Db_GetHost() const { +std::vector ThriftIfImpl::Db_GetEndpoints() const { boost::shared_ptr tsocket = boost::dynamic_pointer_cast(socket_); - return tsocket->getHost(); -} - -int ThriftIfImpl::Db_GetPort() const { - boost::shared_ptr tsocket = - boost::dynamic_pointer_cast(socket_); - return tsocket->getPort(); + boost::system::error_code ec; + boost::asio::ip::address addr(boost::asio::ip::address::from_string( + tsocket->getHost(), ec)); + GenDb::Endpoint endpoint(addr, tsocket->getPort()); + return boost::assign::list_of(endpoint); } void ThriftIfImpl::Db_SetQueueWaterMarkInternal(ThriftIfQueue *queue, diff --git a/src/database/cassandra/thrift/thrift_if_impl.h b/src/database/cassandra/thrift/thrift_if_impl.h index 962726860c8..975f3f5c312 100644 --- a/src/database/cassandra/thrift/thrift_if_impl.h +++ b/src/database/cassandra/thrift/thrift_if_impl.h @@ -60,8 +60,7 @@ class ThriftIfImpl { virtual bool Db_GetStats(std::vector *vdbti, GenDb::DbErrors *dbe); // Connection - virtual std::string Db_GetHost() const; - virtual int Db_GetPort() const; + virtual std::vector Db_GetEndpoints() const; private: friend class ThriftIfTest; diff --git a/src/database/gendb_if.h b/src/database/gendb_if.h index 2034927b1aa..f89865542cd 100644 --- a/src/database/gendb_if.h +++ b/src/database/gendb_if.h @@ -16,6 +16,7 @@ #include #include #include +#include namespace GenDb { @@ -182,6 +183,8 @@ struct ColumnNameRange { uint32_t count_; }; +typedef boost::asio::ip::tcp::endpoint Endpoint; + class GenDbIf { public: typedef boost::function DbErrorHandler; @@ -224,8 +227,7 @@ class GenDbIf { virtual bool Db_GetStats(std::vector *vdbti, DbErrors *dbe) = 0; // Connection - virtual std::string Db_GetHost() const = 0; - virtual int Db_GetPort() const = 0; + virtual std::vector Db_GetEndpoints() const = 0; }; } // namespace GenDb diff --git a/src/query_engine/query.cc b/src/query_engine/query.cc index 4b00d5d1d1b..4ad8fbdfef9 100644 --- a/src/query_engine/query.cc +++ b/src/query_engine/query.cc @@ -1071,17 +1071,16 @@ AnalyticsQuery::AnalyticsQuery(std::string qid, std::mapstatus_details = EIO; } } - boost::asio::ip::address db_addr(boost::asio::ip::address::from_string( - dbif_->Db_GetHost(), ec)); - boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort()); if (this->status_details != 0) { // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - std::string(), ConnectionStatus::DOWN, db_endpoint, std::string()); + std::string(), ConnectionStatus::DOWN, dbif_->Db_GetEndpoints(), + std::string()); } else { // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - std::string(), ConnectionStatus::UP, db_endpoint, std::string()); + std::string(), ConnectionStatus::UP, dbif_->Db_GetEndpoints(), + std::string()); } dbif_->Db_SetInitDone(true); Init(qid, json_api_data); @@ -1222,11 +1221,9 @@ QueryEngine::QueryEngine(EventManager *evm, std::stringstream ss; ss << "initialization of database failed. retrying " << retries++ << " time"; // Update connection info - boost::asio::ip::address db_addr(boost::asio::ip::address::from_string( - dbif_->Db_GetHost(), ec)); - boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort()); ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - std::string(), ConnectionStatus::DOWN, db_endpoint, std::string()); + std::string(), ConnectionStatus::DOWN, + dbif_->Db_GetEndpoints(), std::string()); Q_E_LOG_LOG("QeInit", SandeshLevel::SYS_WARN, ss.str()); dbif_->Db_Uninit("qe::DbHandler", -1); sleep(5); @@ -1303,11 +1300,9 @@ QueryEngine::QueryEngine(EventManager *evm, } dbif_->Db_SetInitDone(true); // Update connection info - boost::asio::ip::address db_addr(boost::asio::ip::address::from_string( - dbif_->Db_GetHost(), ec)); - boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort()); ConnectionState::GetInstance()->Update(ConnectionType::DATABASE, - std::string(), ConnectionStatus::UP, db_endpoint, std::string()); + std::string(), ConnectionStatus::UP, dbif_->Db_GetEndpoints(), + std::string()); } QueryEngine::~QueryEngine() {