Skip to content

Commit

Permalink
Populate correct server address in process connection info for database
Browse files Browse the repository at this point in the history
1. Modify the GenDb interface to return a list of connected database
   endpoints instead of a single host and port since with the CQL
   driver we have connections to all the database endpoints.
2. Corresponding changes in ConnectionState::Update to populate the
   connection info entry with a list of server addresses.

Change-Id: I864825a635a7d5889b952ae2a93f771db0690cc8
Closes-Bug: #1541655
  • Loading branch information
Megh Bhatt committed Feb 4, 2016
1 parent b196c43 commit 7e78d34
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 71 deletions.
7 changes: 2 additions & 5 deletions src/analytics/collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,9 @@ void Collector::TestDatabaseConnection() {
if (connect_status_change)
{
// update connection status
boost::system::error_code ec;
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
DbGlobalName(false), dbConnStatus_, db_endpoint, std::string());
DbGlobalName(false), dbConnStatus_, testdbif_->Db_GetEndpoints(),
std::string());
}
#endif // !USE_CASSANDRA_CQL
}
Expand Down
19 changes: 6 additions & 13 deletions src/analytics/db_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,9 @@ uint64_t DbHandler::GetTtlFromMap(const TtlMap& ttl_map,
std::string DbHandler::GetName() const {
return name_;
}
std::string DbHandler::GetHost() const {
return dbif_->Db_GetHost();
}

int DbHandler::GetPort() const {
return dbif_->Db_GetPort();
std::vector<boost::asio::ip::tcp::endpoint> DbHandler::GetEndpoints() const {
return dbif_->Db_GetEndpoints();
}

bool DbHandler::DropMessage(const SandeshHeader &header,
Expand Down Expand Up @@ -1626,21 +1623,17 @@ bool DbHandlerInitializer::Initialize() {
boost::system::error_code ec;
if (!db_handler_->Init(true, db_task_instance_)) {
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
db_name_, ConnectionStatus::DOWN, db_endpoint, std::string());
db_name_, ConnectionStatus::DOWN, db_handler_->GetEndpoints(),
std::string());
LOG(DEBUG, db_name_ << ": Db Initialization FAILED");
ScheduleInit();
return false;
}
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
db_name_, ConnectionStatus::UP, db_endpoint, std::string());
db_name_, ConnectionStatus::UP, db_handler_->GetEndpoints(),
std::string());

if (callback_) {
callback_();
Expand Down
3 changes: 1 addition & 2 deletions src/analytics/db_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ class DbHandler {
void SetDbQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm,
boost::function<void (void)> defer_undefer_cb);
void ResetDbQueueWaterMarkInfo();
std::string GetHost() const;
int GetPort() const;
std::vector<boost::asio::ip::tcp::endpoint> GetEndpoints() const;
std::string GetName() const;
bool UseCql() const;

Expand Down
32 changes: 24 additions & 8 deletions src/base/connection_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/tuple/tuple_comparison.hpp>
#include <boost/foreach.hpp>

#include "base/string_util.h"
#include "base/sandesh/process_info_constants.h"
Expand Down Expand Up @@ -43,22 +44,24 @@ void ConnectionState::Update() {
}
}

void ConnectionState::Update(ConnectionType::type ctype,
void ConnectionState::UpdateInternal(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
Endpoint server, std::string message) {
const std::vector<Endpoint> &servers, std::string message) {
// Populate key
ConnectionInfoKey key(ctype, name);
// Populate info
ConnectionInfo info;
info.set_type(
g_process_info_constants.ConnectionTypeNames.find(ctype)->second);
info.set_name(name);
boost::system::error_code ec;
std::string saddr(server.address().to_string(ec));
int sport(server.port());
std::string server_address(saddr + ":" + integerToString(sport));
std::vector<std::string> server_addrs = boost::assign::list_of
(server_address);
std::vector<std::string> server_addrs;
BOOST_FOREACH(const Endpoint &server, servers) {
boost::system::error_code ec;
std::string saddr(server.address().to_string(ec));
int sport(server.port());
std::string server_address(saddr + ":" + integerToString(sport));
server_addrs.push_back(server_address);
}
info.set_server_addrs(server_addrs);
info.set_status(
g_process_info_constants.ConnectionStatusNames.find(status)->second);
Expand All @@ -78,6 +81,19 @@ void ConnectionState::Update(ConnectionType::type ctype,
}
}

void ConnectionState::Update(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
const std::vector<Endpoint> &servers, std::string message) {
UpdateInternal(ctype, name, status, servers, message);
}

void ConnectionState::Update(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
Endpoint server, std::string message) {
UpdateInternal(ctype, name, status, boost::assign::list_of(server),
message);
}

void ConnectionState::Delete(ConnectionType::type ctype,
const std::string &name) {
// Construct key
Expand Down
7 changes: 7 additions & 0 deletions src/base/connection_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,17 @@ class ConnectionState {
void Update(ConnectionType::type ctype, const std::string &name,
ConnectionStatus::type status, Endpoint server,
std::string message);
void Update(ConnectionType::type ctype, const std::string &name,
ConnectionStatus::type status, const std::vector<Endpoint> &servers,
std::string message);
void Delete(ConnectionType::type ctype, const std::string &name);
std::vector<ConnectionInfo> GetInfos() const;

private:
void UpdateInternal(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
const std::vector<Endpoint> &servers, std::string message);

template <typename UVEType, typename UVEDataType> friend class
ConnectionStateManager;

Expand Down
17 changes: 9 additions & 8 deletions src/database/cassandra/cql/cql_if.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,13 @@ CqlIf::CqlIf(EventManager *evm,
impl_ = new CqlIfImpl(evm, cassandra_ips, cassandra_port,
cassandra_user, cassandra_password);
initialized_ = false;
BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) {
boost::system::error_code ec;
boost::asio::ip::address cassandra_addr(
boost::asio::ip::address::from_string(cassandra_ip, ec));
GenDb::Endpoint endpoint(cassandra_addr, cassandra_port);
endpoints_.push_back(endpoint);
}
}

CqlIf::CqlIf() : impl_(NULL) {
Expand Down Expand Up @@ -1268,14 +1275,8 @@ bool CqlIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
}

// Connection
std::string CqlIf::Db_GetHost() const {
//return impl_->Db_GetHost();
return std::string();
}

int CqlIf::Db_GetPort() const {
//return impl_->Db_GetPort();
return -1;
std::vector<GenDb::Endpoint> CqlIf::Db_GetEndpoints() const {
return endpoints_;
}

} // namespace cql
Expand Down
4 changes: 2 additions & 2 deletions src/database/cassandra/cql/cql_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ class CqlIf : public GenDb::GenDbIf {
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
// Connection
virtual std::string Db_GetHost() const;
virtual int Db_GetPort() const;
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;

private:
class CqlIfImpl;
CqlIfImpl *impl_;
tbb::atomic<bool> initialized_;
std::vector<GenDb::Endpoint> endpoints_;
};

} // namespace cql
Expand Down
8 changes: 2 additions & 6 deletions src/database/cassandra/thrift/thrift_if.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,6 @@ bool ThriftIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
}

// Connection
std::string ThriftIf::Db_GetHost() const {
return impl_->Db_GetHost();
}

int ThriftIf::Db_GetPort() const {
return impl_->Db_GetPort();
std::vector<GenDb::Endpoint> ThriftIf::Db_GetEndpoints() const {
return impl_->Db_GetEndpoints();
}
3 changes: 1 addition & 2 deletions src/database/cassandra/thrift/thrift_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ class ThriftIf : public GenDb::GenDbIf {
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
// Connection
virtual std::string Db_GetHost() const;
virtual int Db_GetPort() const;
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;

private:
ThriftIfImpl *impl_;
Expand Down
14 changes: 6 additions & 8 deletions src/database/cassandra/thrift/thrift_if_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -989,16 +989,14 @@ void ThriftIfImpl::Db_Uninit(const std::string& task_id, int task_instance) {
Db_UninitUnlocked(task_id, task_instance);
}

std::string ThriftIfImpl::Db_GetHost() const {
std::vector<GenDb::Endpoint> ThriftIfImpl::Db_GetEndpoints() const {
boost::shared_ptr<TSocket> tsocket =
boost::dynamic_pointer_cast<TSocket>(socket_);
return tsocket->getHost();
}

int ThriftIfImpl::Db_GetPort() const {
boost::shared_ptr<TSocket> tsocket =
boost::dynamic_pointer_cast<TSocket>(socket_);
return tsocket->getPort();
boost::system::error_code ec;
boost::asio::ip::address addr(boost::asio::ip::address::from_string(
tsocket->getHost(), ec));
GenDb::Endpoint endpoint(addr, tsocket->getPort());
return boost::assign::list_of(endpoint);
}

void ThriftIfImpl::Db_SetQueueWaterMarkInternal(ThriftIfQueue *queue,
Expand Down
3 changes: 1 addition & 2 deletions src/database/cassandra/thrift/thrift_if_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ class ThriftIfImpl {
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
// Connection
virtual std::string Db_GetHost() const;
virtual int Db_GetPort() const;
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;

private:
friend class ThriftIfTest;
Expand Down
6 changes: 4 additions & 2 deletions src/database/gendb_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/scoped_ptr.hpp>
#include <database/gendb_types.h>
#include <net/address.h>
#include <boost/asio/ip/tcp.hpp>

namespace GenDb {

Expand Down Expand Up @@ -182,6 +183,8 @@ struct ColumnNameRange {
uint32_t count_;
};

typedef boost::asio::ip::tcp::endpoint Endpoint;

class GenDbIf {
public:
typedef boost::function<void(void)> DbErrorHandler;
Expand Down Expand Up @@ -224,8 +227,7 @@ class GenDbIf {
virtual bool Db_GetStats(std::vector<DbTableInfo> *vdbti,
DbErrors *dbe) = 0;
// Connection
virtual std::string Db_GetHost() const = 0;
virtual int Db_GetPort() const = 0;
virtual std::vector<Endpoint> Db_GetEndpoints() const = 0;
};

} // namespace GenDb
Expand Down
21 changes: 8 additions & 13 deletions src/query_engine/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1071,17 +1071,16 @@ AnalyticsQuery::AnalyticsQuery(std::string qid, std::map<std::string,
this->status_details = EIO;
}
}
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
dbif_->Db_GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort());
if (this->status_details != 0) {
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::DOWN, db_endpoint, std::string());
std::string(), ConnectionStatus::DOWN, dbif_->Db_GetEndpoints(),
std::string());
} else {
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::UP, db_endpoint, std::string());
std::string(), ConnectionStatus::UP, dbif_->Db_GetEndpoints(),
std::string());
}
dbif_->Db_SetInitDone(true);
Init(qid, json_api_data);
Expand Down Expand Up @@ -1222,11 +1221,9 @@ QueryEngine::QueryEngine(EventManager *evm,
std::stringstream ss;
ss << "initialization of database failed. retrying " << retries++ << " time";
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
dbif_->Db_GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::DOWN, db_endpoint, std::string());
std::string(), ConnectionStatus::DOWN,
dbif_->Db_GetEndpoints(), std::string());
Q_E_LOG_LOG("QeInit", SandeshLevel::SYS_WARN, ss.str());
dbif_->Db_Uninit("qe::DbHandler", -1);
sleep(5);
Expand Down Expand Up @@ -1303,11 +1300,9 @@ QueryEngine::QueryEngine(EventManager *evm,
}
dbif_->Db_SetInitDone(true);
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
dbif_->Db_GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::UP, db_endpoint, std::string());
std::string(), ConnectionStatus::UP, dbif_->Db_GetEndpoints(),
std::string());
}

QueryEngine::~QueryEngine() {
Expand Down

0 comments on commit 7e78d34

Please sign in to comment.