Skip to content

Commit

Permalink
Merge "This commit add database connection test functionality to the …
Browse files Browse the repository at this point in the history
…contrail-collector. Every 60 seconds we connect to Cassandra database and test if connection is successful. Based on this test result we update database connection state. This is same as calling SendGeneratorStatistics every 60 seconds."
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Apr 8, 2015
2 parents 12bfeb3 + c27d31f commit c780373
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 8 deletions.
62 changes: 62 additions & 0 deletions src/analytics/collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@
#include <boost/lexical_cast.hpp>
#include <boost/bind.hpp>
#include <boost/assign.hpp>
#include <boost/assign/list_of.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/array.hpp>
#include <boost/uuid/name_generator.hpp>

#include "base/logging.h"
#include "base/task.h"
#include "base/parse_object.h"
#include <base/connection_info.h>
#include "io/event_manager.h"

#include <sandesh/sandesh_types.h>
Expand All @@ -36,6 +41,12 @@ using std::map;
using std::vector;
using boost::shared_ptr;
using namespace boost::assign;
using std::pair;
using boost::system::error_code;
using process::ConnectionState;
using process::ConnectionType;
using process::ConnectionStatus;


std::string Collector::prog_name_;
std::string Collector::self_ip_;
Expand Down Expand Up @@ -83,6 +94,8 @@ Collector::Collector(EventManager *evm, short server_port,
db_queue_wm_info_(kDbQueueWaterMarkInfo),
sm_queue_wm_info_(kSmQueueWaterMarkInfo) {

dbConnStatus_ = ConnectionStatus::INIT;

if (!task_policy_set_) {
TaskPolicy db_task_policy = boost::assign::list_of
(TaskExclusion(lifetime_mgr_task_id()));
Expand Down Expand Up @@ -306,6 +319,55 @@ void Collector::DisconnectSession(SandeshSession *session) {
gen->DisconnectSession(vsession);
}

std::string Collector::DbGlobalName(bool dup) {
std::string name;
error_code error;
if (dup)
name = boost::asio::ip::host_name(error) + "dup" + ":" + "Global";
else
name = boost::asio::ip::host_name(error) + ":" + "Global";

return name;
}

void Collector::TestDbConnErrHandler() {
}

void Collector::TestDatabaseConnection() {
bool connect_status_change = false;
boost::scoped_ptr<GenDb::GenDbIf> testdbif_; // for testing db connection

// try to instantiate a new dbif instance for testing db connection
testdbif_.reset( GenDb::GenDbIf::GenDbIfImpl(
boost::bind(&Collector::TestDbConnErrHandler, this),
cassandra_ips_, cassandra_ports_, 3600, db_handler_->GetName(), false));

if (!testdbif_->Db_Init("analytics::DbHandler", db_task_id_)) {
if (dbConnStatus_ != ConnectionStatus::DOWN) {
LOG(ERROR, "Connection to DB FAILED");
dbConnStatus_ = ConnectionStatus::DOWN;
connect_status_change = true;
}
} else {
if (dbConnStatus_ != ConnectionStatus::UP) {
LOG(ERROR, "Connection to DB Established/Re-Established");
dbConnStatus_ = ConnectionStatus::UP;
connect_status_change = true;
}
}

if (connect_status_change)
{
// update connection status
boost::system::error_code ec;
boost::asio::ip::address db_addr(boost::asio::ip::address::from_string(
db_handler_->GetHost(), ec));
boost::asio::ip::tcp::endpoint db_endpoint(db_addr, db_handler_->GetPort());
ConnectionState::GetInstance()->Update(ConnectionType::DATABASE,
DbGlobalName(false), dbConnStatus_, db_endpoint, std::string());
}
}

void Collector::SendGeneratorStatistics() {
tbb::mutex::scoped_lock lock(gen_map_mutex_);
for (GeneratorMap::iterator gm_it = gen_map_.begin();
Expand Down
22 changes: 22 additions & 0 deletions src/analytics/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@
#include <string>
#include "collector_uve_types.h"
#include "db_handler.h"
#include "base/logging.h"
#include "base/task.h"
#include "base/parse_object.h"
#include <base/connection_info.h>
#include "io/event_manager.h"
#include "base/sandesh/process_info_types.h"

#include <sandesh/sandesh_types.h>
#include <sandesh/sandesh.h>
#include <sandesh/sandesh_constants.h>
#include <sandesh/sandesh_ctrl_types.h>
#include <sandesh/sandesh_uve_types.h>
#include <sandesh/sandesh_statistics.h>
#include <sandesh/sandesh_session.h>
#include <sandesh/sandesh_connection.h>
using process::ConnectionState;
using process::ConnectionType;
using process::ConnectionStatus;

class DbHandler;
class OpServerProxy;
Expand Down Expand Up @@ -104,6 +122,8 @@ class Collector : public SandeshServer {
int db_task_id();
const CollectorStats &GetStats() const { return stats_; }
void SendGeneratorStatistics();
void TestDatabaseConnection();
void TestDbConnErrHandler();

static void SetDiscoveryServiceClient(DiscoveryServiceClient *ds) {
ds_client_ = ds;
Expand All @@ -113,11 +133,13 @@ class Collector : public SandeshServer {
return ds_client_;
}

std::string DbGlobalName(bool dup=false);
protected:
virtual TcpSession *AllocSession(Socket *socket);
virtual void DisconnectSession(SandeshSession *session);

private:
ConnectionStatus::type dbConnStatus_;
void SetQueueWaterMarkInfo(QueueType::type type,
Sandesh::QueueWaterMarkInfo &wm);
void ResetQueueWaterMarkInfo(QueueType::type type);
Expand Down
3 changes: 3 additions & 0 deletions src/analytics/db_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ int DbHandler::GetTtlFromMap(const DbHandler::TtlMap& ttl_map,
}
}

std::string DbHandler::GetName() const {
return name_;
}
std::string DbHandler::GetHost() const {
return dbif_->Db_GetHost();
}
Expand Down
1 change: 1 addition & 0 deletions src/analytics/db_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ class DbHandler {
void ResetDbQueueWaterMarkInfo();
std::string GetHost() const;
int GetPort() const;
std::string GetName() const;

private:
bool CreateTables();
Expand Down
2 changes: 2 additions & 0 deletions src/analytics/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ bool CollectorInfoLogger(VizSandeshContext &ctx) {

analytics->SendGeneratorStatistics();

analytics->TestDatabaseConnection();

collector_info_log_timer->Cancel();
collector_info_log_timer->Start(60*1000, boost::bind(&CollectorInfoLogTimer),
NULL);
Expand Down
15 changes: 7 additions & 8 deletions src/analytics/viz_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,7 @@ VizCollector::~VizCollector() {
}

std::string VizCollector::DbGlobalName(bool dup) {
std::string name;
error_code error;
if (dup)
name = boost::asio::ip::host_name(error) + "dup" + ":" + "Global";
else
name = boost::asio::ip::host_name(error) + ":" + "Global";

return name;
return collector_->DbGlobalName(dup);
}

bool VizCollector::SendRemote(const string& destination,
Expand Down Expand Up @@ -182,3 +175,9 @@ void VizCollector::SendGeneratorStatistics() {
collector_->SendGeneratorStatistics();
}
}

void VizCollector::TestDatabaseConnection() {
if (collector_) {
collector_->TestDatabaseConnection();
}
}
1 change: 1 addition & 0 deletions src/analytics/viz_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class VizCollector {
}
void SendProtobufCollectorStatistics();
void SendGeneratorStatistics();
void TestDatabaseConnection();

private:
std::string DbGlobalName(bool dup=false);
Expand Down

0 comments on commit c780373

Please sign in to comment.