Skip to content

Commit

Permalink
Merge "Populate correct server address in process connection info for…
Browse files Browse the repository at this point in the history
… database"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 5, 2016
2 parents 72003c1 + 7e78d34 commit 51b140a
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 71 deletions.
7 changes: 2 additions & 5 deletions src/analytics/collector.cc
Expand Up @@ -358,12 +358,9 @@ void Collector::TestDatabaseConnection() {
if (connect_status_change)
{
// update connection status
boost::system::error_code ec;
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
DbGlobalName(false), dbConnStatus_, db_endpoint, std::string());
DbGlobalName(false), dbConnStatus_, testdbif_->Db_GetEndpoints(),
std::string());
}
#endif // !USE_CASSANDRA_CQL
}
Expand Down
19 changes: 6 additions & 13 deletions src/analytics/db_handler.cc
Expand Up @@ -125,12 +125,9 @@ uint64_t DbHandler::GetTtlFromMap(const TtlMap& ttl_map,
std::string DbHandler::GetName() const {
return name_;
}
std::string DbHandler::GetHost() const {
return dbif_->Db_GetHost();
}

int DbHandler::GetPort() const {
return dbif_->Db_GetPort();
std::vector<boost::asio::ip::tcp::endpoint> DbHandler::GetEndpoints() const {
return dbif_->Db_GetEndpoints();
}

bool DbHandler::DropMessage(const SandeshHeader &header,
Expand Down Expand Up @@ -1676,21 +1673,17 @@ bool DbHandlerInitializer::Initialize() {
boost::system::error_code ec;
if (!db_handler_->Init(true, db_task_instance_)) {
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
db_name_, ConnectionStatus::DOWN, db_endpoint, std::string());
db_name_, ConnectionStatus::DOWN, db_handler_->GetEndpoints(),
std::string());
LOG(DEBUG, db_name_ << ": Db Initialization FAILED");
ScheduleInit();
return false;
}
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
db_name_, ConnectionStatus::UP, db_endpoint, std::string());
db_name_, ConnectionStatus::UP, db_handler_->GetEndpoints(),
std::string());

if (callback_) {
callback_();
Expand Down
3 changes: 1 addition & 2 deletions src/analytics/db_handler.h
Expand Up @@ -144,8 +144,7 @@ class DbHandler {
void SetDbQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm,
boost::function<void (void)> defer_undefer_cb);
void ResetDbQueueWaterMarkInfo();
std::string GetHost() const;
int GetPort() const;
std::vector<boost::asio::ip::tcp::endpoint> GetEndpoints() const;
std::string GetName() const;
bool UseCql() const;

Expand Down
32 changes: 24 additions & 8 deletions src/base/connection_info.cc
Expand Up @@ -7,6 +7,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/tuple/tuple_comparison.hpp>
#include <boost/foreach.hpp>

#include "base/string_util.h"
#include "base/sandesh/process_info_constants.h"
Expand Down Expand Up @@ -43,22 +44,24 @@ void ConnectionState::Update() {
}
}

void ConnectionState::Update(ConnectionType::type ctype,
void ConnectionState::UpdateInternal(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
Endpoint server, std::string message) {
const std::vector<Endpoint> &servers, std::string message) {
// Populate key
ConnectionInfoKey key(ctype, name);
// Populate info
ConnectionInfo info;
info.set_type(
g_process_info_constants.ConnectionTypeNames.find(ctype)->second);
info.set_name(name);
boost::system::error_code ec;
std::string saddr(server.address().to_string(ec));
int sport(server.port());
std::string server_address(saddr + ":" + integerToString(sport));
std::vector<std::string> server_addrs = boost::assign::list_of
(server_address);
std::vector<std::string> server_addrs;
BOOST_FOREACH(const Endpoint &server, servers) {
boost::system::error_code ec;
std::string saddr(server.address().to_string(ec));
int sport(server.port());
std::string server_address(saddr + ":" + integerToString(sport));
server_addrs.push_back(server_address);
}
info.set_server_addrs(server_addrs);
info.set_status(
g_process_info_constants.ConnectionStatusNames.find(status)->second);
Expand All @@ -78,6 +81,19 @@ void ConnectionState::Update(ConnectionType::type ctype,
}
}

void ConnectionState::Update(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
const std::vector<Endpoint> &servers, std::string message) {
UpdateInternal(ctype, name, status, servers, message);
}

void ConnectionState::Update(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
Endpoint server, std::string message) {
UpdateInternal(ctype, name, status, boost::assign::list_of(server),
message);
}

void ConnectionState::Delete(ConnectionType::type ctype,
const std::string &name) {
// Construct key
Expand Down
7 changes: 7 additions & 0 deletions src/base/connection_info.h
Expand Up @@ -39,10 +39,17 @@ class ConnectionState {
void Update(ConnectionType::type ctype, const std::string &name,
ConnectionStatus::type status, Endpoint server,
std::string message);
void Update(ConnectionType::type ctype, const std::string &name,
ConnectionStatus::type status, const std::vector<Endpoint> &servers,
std::string message);
void Delete(ConnectionType::type ctype, const std::string &name);
std::vector<ConnectionInfo> GetInfos() const;

private:
void UpdateInternal(ConnectionType::type ctype,
const std::string &name, ConnectionStatus::type status,
const std::vector<Endpoint> &servers, std::string message);

template <typename UVEType, typename UVEDataType> friend class
ConnectionStateManager;

Expand Down
17 changes: 9 additions & 8 deletions src/database/cassandra/cql/cql_if.cc
Expand Up @@ -1136,6 +1136,13 @@ CqlIf::CqlIf(EventManager *evm,
impl_ = new CqlIfImpl(evm, cassandra_ips, cassandra_port,
cassandra_user, cassandra_password);
initialized_ = false;
BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) {
boost::system::error_code ec;
boost::asio::ip::address cassandra_addr(
boost::asio::ip::address::from_string(cassandra_ip, ec));
GenDb::Endpoint endpoint(cassandra_addr, cassandra_port);
endpoints_.push_back(endpoint);
}
}

CqlIf::CqlIf() : impl_(NULL) {
Expand Down Expand Up @@ -1275,14 +1282,8 @@ bool CqlIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
}

// Connection
std::string CqlIf::Db_GetHost() const {
//return impl_->Db_GetHost();
return std::string();
}

int CqlIf::Db_GetPort() const {
//return impl_->Db_GetPort();
return -1;
std::vector<GenDb::Endpoint> CqlIf::Db_GetEndpoints() const {
return endpoints_;
}

} // namespace cql
Expand Down
4 changes: 2 additions & 2 deletions src/database/cassandra/cql/cql_if.h
Expand Up @@ -62,13 +62,13 @@ class CqlIf : public GenDb::GenDbIf {
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
// Connection
virtual std::string Db_GetHost() const;
virtual int Db_GetPort() const;
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;

private:
class CqlIfImpl;
CqlIfImpl *impl_;
tbb::atomic<bool> initialized_;
std::vector<GenDb::Endpoint> endpoints_;
};

} // namespace cql
Expand Down
8 changes: 2 additions & 6 deletions src/database/cassandra/thrift/thrift_if.cc
Expand Up @@ -111,10 +111,6 @@ bool ThriftIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
}

// Connection
std::string ThriftIf::Db_GetHost() const {
return impl_->Db_GetHost();
}

int ThriftIf::Db_GetPort() const {
return impl_->Db_GetPort();
std::vector<GenDb::Endpoint> ThriftIf::Db_GetEndpoints() const {
return impl_->Db_GetEndpoints();
}
3 changes: 1 addition & 2 deletions src/database/cassandra/thrift/thrift_if.h
Expand Up @@ -53,8 +53,7 @@ class ThriftIf : public GenDb::GenDbIf {
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
// Connection
virtual std::string Db_GetHost() const;
virtual int Db_GetPort() const;
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;

private:
ThriftIfImpl *impl_;
Expand Down
14 changes: 6 additions & 8 deletions src/database/cassandra/thrift/thrift_if_impl.cc
Expand Up @@ -991,16 +991,14 @@ void ThriftIfImpl::Db_Uninit(const std::string& task_id, int task_instance) {
Db_UninitUnlocked(task_id, task_instance);
}

std::string ThriftIfImpl::Db_GetHost() const {
std::vector<GenDb::Endpoint> ThriftIfImpl::Db_GetEndpoints() const {
boost::shared_ptr<TSocket> tsocket =
boost::dynamic_pointer_cast<TSocket>(socket_);
return tsocket->getHost();
}

int ThriftIfImpl::Db_GetPort() const {
boost::shared_ptr<TSocket> tsocket =
boost::dynamic_pointer_cast<TSocket>(socket_);
return tsocket->getPort();
boost::system::error_code ec;
boost::asio::ip::address addr(boost::asio::ip::address::from_string(
tsocket->getHost(), ec));
GenDb::Endpoint endpoint(addr, tsocket->getPort());
return boost::assign::list_of(endpoint);
}

void ThriftIfImpl::Db_SetQueueWaterMarkInternal(ThriftIfQueue *queue,
Expand Down
3 changes: 1 addition & 2 deletions src/database/cassandra/thrift/thrift_if_impl.h
Expand Up @@ -60,8 +60,7 @@ class ThriftIfImpl {
virtual bool Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
GenDb::DbErrors *dbe);
// Connection
virtual std::string Db_GetHost() const;
virtual int Db_GetPort() const;
virtual std::vector<GenDb::Endpoint> Db_GetEndpoints() const;

private:
friend class ThriftIfTest;
Expand Down
6 changes: 4 additions & 2 deletions src/database/gendb_if.h
Expand Up @@ -16,6 +16,7 @@
#include <boost/scoped_ptr.hpp>
#include <database/gendb_types.h>
#include <net/address.h>
#include <boost/asio/ip/tcp.hpp>

namespace GenDb {

Expand Down Expand Up @@ -182,6 +183,8 @@ struct ColumnNameRange {
uint32_t count_;
};

typedef boost::asio::ip::tcp::endpoint Endpoint;

class GenDbIf {
public:
typedef boost::function<void(void)> DbErrorHandler;
Expand Down Expand Up @@ -224,8 +227,7 @@ class GenDbIf {
virtual bool Db_GetStats(std::vector<DbTableInfo> *vdbti,
DbErrors *dbe) = 0;
// Connection
virtual std::string Db_GetHost() const = 0;
virtual int Db_GetPort() const = 0;
virtual std::vector<Endpoint> Db_GetEndpoints() const = 0;
};

} // namespace GenDb
Expand Down
21 changes: 8 additions & 13 deletions src/query_engine/query.cc
Expand Up @@ -1071,17 +1071,16 @@ AnalyticsQuery::AnalyticsQuery(std::string qid, std::map<std::string,
this->status_details = EIO;
}
}
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
dbif_->Db_GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort());
if (this->status_details != 0) {
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::DOWN, db_endpoint, std::string());
std::string(), ConnectionStatus::DOWN, dbif_->Db_GetEndpoints(),
std::string());
} else {
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::UP, db_endpoint, std::string());
std::string(), ConnectionStatus::UP, dbif_->Db_GetEndpoints(),
std::string());
}
dbif_->Db_SetInitDone(true);
Init(qid, json_api_data);
Expand Down Expand Up @@ -1222,11 +1221,9 @@ QueryEngine::QueryEngine(EventManager *evm,
std::stringstream ss;
ss << "initialization of database failed. retrying " << retries++ << " time";
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
dbif_->Db_GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::DOWN, db_endpoint, std::string());
std::string(), ConnectionStatus::DOWN,
dbif_->Db_GetEndpoints(), std::string());
Q_E_LOG_LOG("QeInit", SandeshLevel::SYS_WARN, ss.str());
dbif_->Db_Uninit("qe::DbHandler", -1);
sleep(5);
Expand Down Expand Up @@ -1303,11 +1300,9 @@ QueryEngine::QueryEngine(EventManager *evm,
}
dbif_->Db_SetInitDone(true);
// Update connection info
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
dbif_->Db_GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, dbif_->Db_GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
std::string(), ConnectionStatus::UP, db_endpoint, std::string());
std::string(), ConnectionStatus::UP, dbif_->Db_GetEndpoints(),
std::string());
}

QueryEngine::~QueryEngine() {
Expand Down

0 comments on commit 51b140a

Please sign in to comment.