Skip to content

Commit

Permalink
Merge "Distributed locking amongst contrail-collector for creating ca…
Browse files Browse the repository at this point in the history
…ssandra schema"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 29, 2016
2 parents f14892a + b56b31a commit 8f7a47e
Show file tree
Hide file tree
Showing 22 changed files with 839 additions and 41 deletions.
10 changes: 8 additions & 2 deletions ci_unittests.json
Expand Up @@ -5,15 +5,18 @@
"controller/src/bfd",
"controller/src/database",
"controller/src/opserver",
"controller/src/query_engine"
"controller/src/query_engine",
"controller/src/zookeeper"
],
"scons_test_targets" : [
"controller/src/analytics:test",
"controller/src/opserver:test",
"controller/src/bfd:test",
"controller/src/query_engine:test",
"controller/src/database/gendb:test",
"controller/src/database/cassandra/thrift:test"
"controller/src/database/cassandra/thrift:test",
"controller/src/database/cassandra/cql:test",
"controller/src/zookeeper:test"
],
"misc_test_targets" : [ ]
},
Expand Down Expand Up @@ -93,6 +96,7 @@
"controller/src/db",
"controller/src/discovery",
"controller/src/database",
"controller/src/zookeeper",
"controller/src/http",
"controller/src/io",
"controller/src/net",
Expand All @@ -111,6 +115,8 @@
"controller/src/discovery:test",
"controller/src/database/gendb:test",
"controller/src/database/cassandra/thrift:test",
"controller/src/database/cassandra/cql:test",
"controller/src/zookeeper:test",
"controller/src/io:test",
"controller/src/net:test",
"controller/src/schema:test",
Expand Down
3 changes: 2 additions & 1 deletion src/SConscript
Expand Up @@ -34,7 +34,8 @@ subdirs = [
'xml',
'xmpp',
'libpartition',
'nodemgr'
'nodemgr',
'zookeeper',
]

if sys.platform != 'darwin':
Expand Down
3 changes: 2 additions & 1 deletion src/analytics/SConscript
Expand Up @@ -53,6 +53,8 @@ AnalyticsEnv.Prepend(LIBS=['cpuinfo',
'pugixml',
'hiredis',
'protobuf',
'zookeeper_client',
'zookeeper_mt',
'boost_filesystem',
'boost_program_options'])

Expand All @@ -77,7 +79,6 @@ libs = MapBuildDir(['sandesh',
'base',
'io',
'net'])
libs.extend('/usr/lib/')
AnalyticsEnv.Append(LIBPATH=libs)

includes = MapBuildDir(['http/client', 'discovery/client', 'analytics'])
Expand Down
36 changes: 31 additions & 5 deletions src/analytics/db_handler.cc
Expand Up @@ -25,6 +25,7 @@
#else // USE_CASSANDRA_CQL
#include <database/cassandra/thrift/thrift_if.h>
#endif // !USE_CASSANDRA_CQL
#include <zookeeper/zookeeper_client.h>

#include "viz_constants.h"
#include "vizd_table_desc.h"
Expand Down Expand Up @@ -65,14 +66,18 @@ DbHandler::DbHandler(EventManager *evm,
std::string name, const TtlMap& ttl_map,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql) :
bool use_cql,
const std::string &zookeeper_server_list,
bool use_zookeeper) :
name_(name),
drop_level_(SandeshLevel::INVALID),
ttl_map_(ttl_map),
use_cql_(use_cql),
tablespace_(),
gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN,
(uint8_t)g_viz_constants.PARTITION_MAX) {
(uint8_t)g_viz_constants.PARTITION_MAX),
zookeeper_server_list_(zookeeper_server_list),
use_zookeeper_(use_zookeeper) {
#ifdef USE_CASSANDRA_CQL
if (use_cql) {
dbif_.reset(new cass::cql::CqlIf(evm, cassandra_ips,
Expand Down Expand Up @@ -299,7 +304,7 @@ bool DbHandler::Init(bool initial, int instance) {
}
}

bool DbHandler::Initialize(int instance) {
bool DbHandler::InitializeInternal(int instance) {
DB_LOG(DEBUG, "Initializing..");

/* init of vizd table structures */
Expand All @@ -326,6 +331,25 @@ bool DbHandler::Initialize(int instance) {
return true;
}

bool DbHandler::InitializeInternalLocked(int instance) {
// Synchronize creation across nodes using zookeeper
zookeeper::client::ZookeeperClient client(name_.c_str(),
zookeeper_server_list_.c_str());
zookeeper::client::ZookeeperLock dmutex(&client, "/collector");
assert(dmutex.Lock());
bool success(InitializeInternal(instance));
assert(dmutex.Release());
return success;
}

bool DbHandler::Initialize(int instance) {
if (use_zookeeper_) {
return InitializeInternalLocked(instance);
} else {
return InitializeInternal(instance);
}
}

bool DbHandler::Setup(int instance) {
DB_LOG(DEBUG, "Setup..");
if (!dbif_->Db_Init("analytics::DbHandler",
Expand Down Expand Up @@ -1674,13 +1698,15 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm,
const std::vector<std::string> &cassandra_ips,
const std::vector<int> &cassandra_ports, const TtlMap& ttl_map,
const std::string& cassandra_user, const std::string& cassandra_password,
bool use_cql) :
bool use_cql, const std::string &zookeeper_server_list,
bool use_zookeeper) :
db_name_(db_name),
db_task_instance_(db_task_instance),
db_handler_(new DbHandler(evm,
boost::bind(&DbHandlerInitializer::ScheduleInit, this),
cassandra_ips, cassandra_ports, db_name, ttl_map,
cassandra_user, cassandra_password, use_cql)),
cassandra_user, cassandra_password, use_cql,
zookeeper_server_list, use_zookeeper)),
callback_(callback),
db_init_timer_(TimerManager::CreateTimer(*evm->io_service(),
db_name + " Db Init Timer",
Expand Down
11 changes: 9 additions & 2 deletions src/analytics/db_handler.h
Expand Up @@ -88,7 +88,8 @@ class DbHandler {
std::string name, const TtlMap& ttl_map,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql);
bool use_cql, const std::string &zookeeper_server_list,
bool use_zookeeper);
DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map);
virtual ~DbHandler();

Expand Down Expand Up @@ -156,6 +157,8 @@ class DbHandler {
boost::function<void (void)> cb);
bool Setup(int instance);
bool Initialize(int instance);
bool InitializeInternal(int instance);
bool InitializeInternalLocked(int instance);
bool StatTableWrite(uint32_t t2,
const std::string& statName, const std::string& statAttr,
const std::pair<std::string,DbHandler::Var>& ptag,
Expand Down Expand Up @@ -185,6 +188,8 @@ class DbHandler {
bool use_cql_;
std::string tablespace_;
UniformInt8RandomGenerator gen_partition_no_;
std::string zookeeper_server_list_;
bool use_zookeeper_;
DISALLOW_COPY_AND_ASSIGN(DbHandler);
};

Expand Down Expand Up @@ -222,7 +227,9 @@ class DbHandlerInitializer {
const TtlMap& ttl_map,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql);
bool use_cql,
const std::string &zookeeper_server_list,
bool use_zookeeper);
DbHandlerInitializer(EventManager *evm,
const std::string &db_name, int db_task_instance,
const std::string &timer_task_name, InitializeDoneCb callback,
Expand Down
2 changes: 1 addition & 1 deletion src/analytics/generator.cc
Expand Up @@ -108,7 +108,7 @@ SandeshGenerator::SandeshGenerator(Collector * const collector, VizSession *sess
source + ":" + node_type + ":" +
module + ":" + instance_id, collector->analytics_ttl_map(),
collector->cassandra_user(), collector->cassandra_password(),
false));
false, std::string(), false));
} else {
//Use collector db_handler
db_handler_ = global_db_handler;
Expand Down
8 changes: 7 additions & 1 deletion src/analytics/main.cc
Expand Up @@ -270,6 +270,8 @@ int main(int argc, char *argv[])
copy(cassandra_servers.begin(), cassandra_servers.end(),
ostream_iterator<string>(css, " "));
LOG(INFO, "COLLECTOR CASSANDRA SERVERS: " << css.str());
LOG(INFO, "COLLECTOR ZOOKEEPER SERVERS: " <<
options.zookeeper_server_list());
LOG(INFO, "COLLECTOR SYSLOG LISTEN PORT: " << options.syslog_port());
LOG(INFO, "COLLECTOR SFLOW LISTEN PORT: " << options.sflow_port());
LOG(INFO, "COLLECTOR IPFIX LISTEN PORT: " << options.ipfix_port());
Expand Down Expand Up @@ -328,6 +330,8 @@ int main(int argc, char *argv[])
//Get Platform info
//cql not supported in precise, centos 6.4 6.5
bool use_cql = MiscUtils::IsCqlSupported();
std::string zookeeper_server_list(options.zookeeper_server_list());
bool use_zookeeper = !zookeeper_server_list.empty();
VizCollector analytics(a_evm,
options.collector_port(),
protobuf_server_enabled,
Expand All @@ -346,7 +350,9 @@ int main(int argc, char *argv[])
options.kafka_prefix(),
ttl_map, options.cassandra_user(),
options.cassandra_password(),
use_cql);
use_cql,
zookeeper_server_list,
use_zookeeper);

#if 0
// initialize python/c++ API
Expand Down
7 changes: 7 additions & 0 deletions src/analytics/options.cc
Expand Up @@ -76,6 +76,8 @@ void Options::Initialize(EventManager &evm,
#endif // !USE_CASSANDRA_CQL
default_cassandra_server_list.push_back(default_cassandra_server);

string default_zookeeper_server("127.0.0.1:2181");

vector<string> default_kafka_broker_list;
default_kafka_broker_list.push_back("");

Expand Down Expand Up @@ -117,6 +119,9 @@ void Options::Initialize(EventManager &evm,
opt::value<vector<string> >()->default_value(
default_cassandra_server_list, default_cassandra_server),
"Cassandra server list")
("DEFAULT.zookeeper_server_list",
opt::value<string>()->default_value(""),
"Zookeeper server list")
("DEFAULT.kafka_broker_list",
opt::value<vector<string> >()->default_value(
default_kafka_broker_list, ""),
Expand Down Expand Up @@ -316,6 +321,8 @@ void Options::Process(int argc, char *argv[],

GetOptValue< vector<string> >(var_map, cassandra_server_list_,
"DEFAULT.cassandra_server_list");
GetOptValue<string>(var_map, zookeeper_server_list_,
"DEFAULT.zookeeper_server_list");
GetOptValue< vector<string> >(var_map, kafka_broker_list_,
"DEFAULT.kafka_broker_list");
GetOptValue<uint16_t>(var_map, partitions_, "DEFAULT.partitions");
Expand Down
4 changes: 4 additions & 0 deletions src/analytics/options.h
Expand Up @@ -17,6 +17,9 @@ class Options {
const std::vector<std::string> cassandra_server_list() const {
return cassandra_server_list_;
}
const std::string zookeeper_server_list() const {
return zookeeper_server_list_;
}
const std::vector<std::string> kafka_broker_list() const {
return kafka_broker_list_;
}
Expand Down Expand Up @@ -127,6 +130,7 @@ class Options {
uint64_t analytics_flow_ttl_;
uint64_t analytics_statistics_ttl_;
std::vector<std::string> cassandra_server_list_;
std::string zookeeper_server_list_;
std::vector<std::string> kafka_broker_list_;
uint16_t partitions_;
uint32_t sandesh_ratelimit_;
Expand Down
2 changes: 1 addition & 1 deletion src/analytics/protobuf_collector.cc
Expand Up @@ -24,7 +24,7 @@ ProtobufCollector::ProtobufCollector(EventManager *evm,
db_initializer_.reset(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance,
kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this),
cassandra_ips, cassandra_ports, ttl_map, cassandra_user,
cassandra_password, false));
cassandra_password, false, std::string(), false));
db_handler_ = db_initializer_->GetDbHandler();
} else {
db_handler_ = global_dbhandler;
Expand Down
3 changes: 2 additions & 1 deletion src/analytics/test/db_handler_mock.h
Expand Up @@ -14,7 +14,8 @@ class DbHandlerMock : public DbHandler {
DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) :
DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this),
std::vector<std::string>(1, "127.0.0.1"),
std::vector<int>(1, 9160), "localhost", ttl_map, "", "", false)
std::vector<int>(1, 9160), "localhost", ttl_map, "", "", false,
"", false)
{
}
void StartDbifReinit() {
Expand Down
5 changes: 3 additions & 2 deletions src/analytics/viz_collector.cc
Expand Up @@ -43,12 +43,13 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port,
const std::string &kafka_prefix, const TtlMap& ttl_map,
const std::string &cassandra_user,
const std::string &cassandra_password,
bool use_cql) :
bool use_cql, const std::string &zookeeper_server_list,
bool use_zookeeper) :
db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1,
std::string("collector:DbIf"),
boost::bind(&VizCollector::DbInitializeCb, this),
cassandra_ips, cassandra_ports, ttl_map, cassandra_user,
cassandra_password, use_cql)),
cassandra_password, use_cql, zookeeper_server_list, use_zookeeper)),
osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port,
redis_password, brokers, partitions, kafka_prefix)),
ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())),
Expand Down
4 changes: 3 additions & 1 deletion src/analytics/viz_collector.h
Expand Up @@ -40,7 +40,9 @@ class VizCollector {
const std::string &kafka_prefix, const TtlMap &ttlmap,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql);
bool use_cql,
const std::string &zookeeper_server_list,
bool use_zookeeper);
VizCollector(EventManager *evm, DbHandlerPtr db_handler,
Ruleeng *ruleeng,
Collector *collector, OpServerProxy *osp);
Expand Down
11 changes: 4 additions & 7 deletions src/opserver/test/test_analytics_uve.py
Expand Up @@ -51,14 +51,11 @@ def setUpClass(cls):

cls.redis_port = AnalyticsUveTest.get_free_port()
mockredis.start_redis(cls.redis_port)
cls.zk_port = AnalyticsUveTest.get_free_port()
mockzoo.start_zoo(cls.zk_port)

@classmethod
def tearDownClass(cls):

mockredis.stop_redis(cls.redis_port)
mockzoo.stop_zoo(cls.zk_port)

#@unittest.skip('Skipping non-cassandra test with vizd')
def test_00_nocassandra(self):
Expand Down Expand Up @@ -154,7 +151,7 @@ def test_03_redis_uve_restart(self):

vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, -1, 0,
kafka_zk = self.__class__.zk_port))
start_kafka = True))
assert vizd_obj.verify_on_setup()

collectors = [vizd_obj.get_collector()]
Expand Down Expand Up @@ -351,7 +348,7 @@ def test_06_alarmgen_basic(self):

vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, self.__class__.redis_port, 0,
kafka_zk = self.__class__.zk_port))
start_kafka = True))
assert vizd_obj.verify_on_setup()

assert(vizd_obj.verify_uvetable_alarm("ObjectCollectorInfo",
Expand Down Expand Up @@ -458,7 +455,7 @@ def test_07_alarm(self):
vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, -1, 0,
collector_ha_test=True,
kafka_zk = self.__class__.zk_port))
start_kafka = True))
assert vizd_obj.verify_on_setup()

# create alarm-generator and attach it to the first collector.
Expand Down Expand Up @@ -572,7 +569,7 @@ def test_08_uve_alarm_filter(self):

vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, -1, 0,
collector_ha_test=True, kafka_zk = self.__class__.zk_port))
collector_ha_test=True, start_kafka = True))
assert vizd_obj.verify_on_setup()

collectors = [vizd_obj.collectors[0].get_addr(),
Expand Down

0 comments on commit 8f7a47e

Please sign in to comment.