Skip to content

Commit

Permalink
Distributed locking amongst contrail-collector for creating cassandra…
Browse files Browse the repository at this point in the history
… schema

1. Add synchronous, blocking zookeeper client using the zookeeper C bindings.
2. Distributed lock receipe - all nodes try to create a znode with their
   unique id as data. The node that is able to create gets the lock. The node
   will call release which will delete the znode. Other nodes will get EEXIST
   and they keep on retrying till they are able to create the znode.
3. Unit tests for zookeeper client.
4. contrail-collector changes to accept DEFAULT.zookeeper_server_list as
   command line arguments and configuration file parameter.
5. contrail-collector changes to use ZookeeperLock in DbHandler when doing
   Initialize.
6. Add zookeeper and cql tests to CI unit tests.

Change-Id: I924016bc74d452bb8aeaa01185788e3164e77f40
Partial-Bug: #1542576
  • Loading branch information
Megh Bhatt committed Feb 24, 2016
1 parent 927e287 commit b56b31a
Show file tree
Hide file tree
Showing 22 changed files with 839 additions and 41 deletions.
10 changes: 8 additions & 2 deletions ci_unittests.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
"controller/src/bfd",
"controller/src/database",
"controller/src/opserver",
"controller/src/query_engine"
"controller/src/query_engine",
"controller/src/zookeeper"
],
"scons_test_targets" : [
"controller/src/analytics:test",
"controller/src/opserver:test",
"controller/src/bfd:test",
"controller/src/query_engine:test",
"controller/src/database/gendb:test",
"controller/src/database/cassandra/thrift:test"
"controller/src/database/cassandra/thrift:test",
"controller/src/database/cassandra/cql:test",
"controller/src/zookeeper:test"
],
"misc_test_targets" : [ ]
},
Expand Down Expand Up @@ -93,6 +96,7 @@
"controller/src/db",
"controller/src/discovery",
"controller/src/database",
"controller/src/zookeeper",
"controller/src/http",
"controller/src/io",
"controller/src/net",
Expand All @@ -111,6 +115,8 @@
"controller/src/discovery:test",
"controller/src/database/gendb:test",
"controller/src/database/cassandra/thrift:test",
"controller/src/database/cassandra/cql:test",
"controller/src/zookeeper:test",
"controller/src/io:test",
"controller/src/net:test",
"controller/src/schema:test",
Expand Down
3 changes: 2 additions & 1 deletion src/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ subdirs = [
'xml',
'xmpp',
'libpartition',
'nodemgr'
'nodemgr',
'zookeeper',
]

if sys.platform != 'darwin':
Expand Down
3 changes: 2 additions & 1 deletion src/analytics/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ AnalyticsEnv.Prepend(LIBS=['cpuinfo',
'pugixml',
'hiredis',
'protobuf',
'zookeeper_client',
'zookeeper_mt',
'boost_filesystem',
'boost_program_options'])

Expand All @@ -77,7 +79,6 @@ libs = MapBuildDir(['sandesh',
'base',
'io',
'net'])
libs.extend('/usr/lib/')
AnalyticsEnv.Append(LIBPATH=libs)

includes = MapBuildDir(['http/client', 'discovery/client', 'analytics'])
Expand Down
36 changes: 31 additions & 5 deletions src/analytics/db_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#else // USE_CASSANDRA_CQL
#include <database/cassandra/thrift/thrift_if.h>
#endif // !USE_CASSANDRA_CQL
#include <zookeeper/zookeeper_client.h>

#include "viz_constants.h"
#include "vizd_table_desc.h"
Expand Down Expand Up @@ -65,14 +66,18 @@ DbHandler::DbHandler(EventManager *evm,
std::string name, const TtlMap& ttl_map,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql) :
bool use_cql,
const std::string &zookeeper_server_list,
bool use_zookeeper) :
name_(name),
drop_level_(SandeshLevel::INVALID),
ttl_map_(ttl_map),
use_cql_(use_cql),
tablespace_(),
gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN,
(uint8_t)g_viz_constants.PARTITION_MAX) {
(uint8_t)g_viz_constants.PARTITION_MAX),
zookeeper_server_list_(zookeeper_server_list),
use_zookeeper_(use_zookeeper) {
#ifdef USE_CASSANDRA_CQL
if (use_cql) {
dbif_.reset(new cass::cql::CqlIf(evm, cassandra_ips,
Expand Down Expand Up @@ -299,7 +304,7 @@ bool DbHandler::Init(bool initial, int instance) {
}
}

bool DbHandler::Initialize(int instance) {
bool DbHandler::InitializeInternal(int instance) {
DB_LOG(DEBUG, "Initializing..");

/* init of vizd table structures */
Expand All @@ -326,6 +331,25 @@ bool DbHandler::Initialize(int instance) {
return true;
}

bool DbHandler::InitializeInternalLocked(int instance) {
// Synchronize creation across nodes using zookeeper
zookeeper::client::ZookeeperClient client(name_.c_str(),
zookeeper_server_list_.c_str());
zookeeper::client::ZookeeperLock dmutex(&client, "/collector");
assert(dmutex.Lock());
bool success(InitializeInternal(instance));
assert(dmutex.Release());
return success;
}

bool DbHandler::Initialize(int instance) {
if (use_zookeeper_) {
return InitializeInternalLocked(instance);
} else {
return InitializeInternal(instance);
}
}

bool DbHandler::Setup(int instance) {
DB_LOG(DEBUG, "Setup..");
if (!dbif_->Db_Init("analytics::DbHandler",
Expand Down Expand Up @@ -1674,13 +1698,15 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm,
const std::vector<std::string> &cassandra_ips,
const std::vector<int> &cassandra_ports, const TtlMap& ttl_map,
const std::string& cassandra_user, const std::string& cassandra_password,
bool use_cql) :
bool use_cql, const std::string &zookeeper_server_list,
bool use_zookeeper) :
db_name_(db_name),
db_task_instance_(db_task_instance),
db_handler_(new DbHandler(evm,
boost::bind(&DbHandlerInitializer::ScheduleInit, this),
cassandra_ips, cassandra_ports, db_name, ttl_map,
cassandra_user, cassandra_password, use_cql)),
cassandra_user, cassandra_password, use_cql,
zookeeper_server_list, use_zookeeper)),
callback_(callback),
db_init_timer_(TimerManager::CreateTimer(*evm->io_service(),
db_name + " Db Init Timer",
Expand Down
11 changes: 9 additions & 2 deletions src/analytics/db_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class DbHandler {
std::string name, const TtlMap& ttl_map,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql);
bool use_cql, const std::string &zookeeper_server_list,
bool use_zookeeper);
DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map);
virtual ~DbHandler();

Expand Down Expand Up @@ -156,6 +157,8 @@ class DbHandler {
boost::function<void (void)> cb);
bool Setup(int instance);
bool Initialize(int instance);
bool InitializeInternal(int instance);
bool InitializeInternalLocked(int instance);
bool StatTableWrite(uint32_t t2,
const std::string& statName, const std::string& statAttr,
const std::pair<std::string,DbHandler::Var>& ptag,
Expand Down Expand Up @@ -185,6 +188,8 @@ class DbHandler {
bool use_cql_;
std::string tablespace_;
UniformInt8RandomGenerator gen_partition_no_;
std::string zookeeper_server_list_;
bool use_zookeeper_;
DISALLOW_COPY_AND_ASSIGN(DbHandler);
};

Expand Down Expand Up @@ -222,7 +227,9 @@ class DbHandlerInitializer {
const TtlMap& ttl_map,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql);
bool use_cql,
const std::string &zookeeper_server_list,
bool use_zookeeper);
DbHandlerInitializer(EventManager *evm,
const std::string &db_name, int db_task_instance,
const std::string &timer_task_name, InitializeDoneCb callback,
Expand Down
2 changes: 1 addition & 1 deletion src/analytics/generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ SandeshGenerator::SandeshGenerator(Collector * const collector, VizSession *sess
source + ":" + node_type + ":" +
module + ":" + instance_id, collector->analytics_ttl_map(),
collector->cassandra_user(), collector->cassandra_password(),
false));
false, std::string(), false));
} else {
//Use collector db_handler
db_handler_ = global_db_handler;
Expand Down
8 changes: 7 additions & 1 deletion src/analytics/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ int main(int argc, char *argv[])
copy(cassandra_servers.begin(), cassandra_servers.end(),
ostream_iterator<string>(css, " "));
LOG(INFO, "COLLECTOR CASSANDRA SERVERS: " << css.str());
LOG(INFO, "COLLECTOR ZOOKEEPER SERVERS: " <<
options.zookeeper_server_list());
LOG(INFO, "COLLECTOR SYSLOG LISTEN PORT: " << options.syslog_port());
LOG(INFO, "COLLECTOR SFLOW LISTEN PORT: " << options.sflow_port());
LOG(INFO, "COLLECTOR IPFIX LISTEN PORT: " << options.ipfix_port());
Expand Down Expand Up @@ -328,6 +330,8 @@ int main(int argc, char *argv[])
//Get Platform info
//cql not supported in precise, centos 6.4 6.5
bool use_cql = MiscUtils::IsCqlSupported();
std::string zookeeper_server_list(options.zookeeper_server_list());
bool use_zookeeper = !zookeeper_server_list.empty();
VizCollector analytics(a_evm,
options.collector_port(),
protobuf_server_enabled,
Expand All @@ -346,7 +350,9 @@ int main(int argc, char *argv[])
options.kafka_prefix(),
ttl_map, options.cassandra_user(),
options.cassandra_password(),
use_cql);
use_cql,
zookeeper_server_list,
use_zookeeper);

#if 0
// initialize python/c++ API
Expand Down
7 changes: 7 additions & 0 deletions src/analytics/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ void Options::Initialize(EventManager &evm,
#endif // !USE_CASSANDRA_CQL
default_cassandra_server_list.push_back(default_cassandra_server);

string default_zookeeper_server("127.0.0.1:2181");

vector<string> default_kafka_broker_list;
default_kafka_broker_list.push_back("");

Expand Down Expand Up @@ -117,6 +119,9 @@ void Options::Initialize(EventManager &evm,
opt::value<vector<string> >()->default_value(
default_cassandra_server_list, default_cassandra_server),
"Cassandra server list")
("DEFAULT.zookeeper_server_list",
opt::value<string>()->default_value(""),
"Zookeeper server list")
("DEFAULT.kafka_broker_list",
opt::value<vector<string> >()->default_value(
default_kafka_broker_list, ""),
Expand Down Expand Up @@ -316,6 +321,8 @@ void Options::Process(int argc, char *argv[],

GetOptValue< vector<string> >(var_map, cassandra_server_list_,
"DEFAULT.cassandra_server_list");
GetOptValue<string>(var_map, zookeeper_server_list_,
"DEFAULT.zookeeper_server_list");
GetOptValue< vector<string> >(var_map, kafka_broker_list_,
"DEFAULT.kafka_broker_list");
GetOptValue<uint16_t>(var_map, partitions_, "DEFAULT.partitions");
Expand Down
4 changes: 4 additions & 0 deletions src/analytics/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ class Options {
const std::vector<std::string> cassandra_server_list() const {
return cassandra_server_list_;
}
const std::string zookeeper_server_list() const {
return zookeeper_server_list_;
}
const std::vector<std::string> kafka_broker_list() const {
return kafka_broker_list_;
}
Expand Down Expand Up @@ -127,6 +130,7 @@ class Options {
uint64_t analytics_flow_ttl_;
uint64_t analytics_statistics_ttl_;
std::vector<std::string> cassandra_server_list_;
std::string zookeeper_server_list_;
std::vector<std::string> kafka_broker_list_;
uint16_t partitions_;
uint32_t sandesh_ratelimit_;
Expand Down
2 changes: 1 addition & 1 deletion src/analytics/protobuf_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ProtobufCollector::ProtobufCollector(EventManager *evm,
db_initializer_.reset(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance,
kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this),
cassandra_ips, cassandra_ports, ttl_map, cassandra_user,
cassandra_password, false));
cassandra_password, false, std::string(), false));
db_handler_ = db_initializer_->GetDbHandler();
} else {
db_handler_ = global_dbhandler;
Expand Down
3 changes: 2 additions & 1 deletion src/analytics/test/db_handler_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ class DbHandlerMock : public DbHandler {
DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) :
DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this),
std::vector<std::string>(1, "127.0.0.1"),
std::vector<int>(1, 9160), "localhost", ttl_map, "", "", false)
std::vector<int>(1, 9160), "localhost", ttl_map, "", "", false,
"", false)
{
}
void StartDbifReinit() {
Expand Down
5 changes: 3 additions & 2 deletions src/analytics/viz_collector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port,
const std::string &kafka_prefix, const TtlMap& ttl_map,
const std::string &cassandra_user,
const std::string &cassandra_password,
bool use_cql) :
bool use_cql, const std::string &zookeeper_server_list,
bool use_zookeeper) :
db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1,
std::string("collector:DbIf"),
boost::bind(&VizCollector::DbInitializeCb, this),
cassandra_ips, cassandra_ports, ttl_map, cassandra_user,
cassandra_password, use_cql)),
cassandra_password, use_cql, zookeeper_server_list, use_zookeeper)),
osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port,
redis_password, brokers, partitions, kafka_prefix)),
ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())),
Expand Down
4 changes: 3 additions & 1 deletion src/analytics/viz_collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ class VizCollector {
const std::string &kafka_prefix, const TtlMap &ttlmap,
const std::string& cassandra_user,
const std::string& cassandra_password,
bool use_cql);
bool use_cql,
const std::string &zookeeper_server_list,
bool use_zookeeper);
VizCollector(EventManager *evm, DbHandlerPtr db_handler,
Ruleeng *ruleeng,
Collector *collector, OpServerProxy *osp);
Expand Down
11 changes: 4 additions & 7 deletions src/opserver/test/test_analytics_uve.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,11 @@ def setUpClass(cls):

cls.redis_port = AnalyticsUveTest.get_free_port()
mockredis.start_redis(cls.redis_port)
cls.zk_port = AnalyticsUveTest.get_free_port()
mockzoo.start_zoo(cls.zk_port)

@classmethod
def tearDownClass(cls):

mockredis.stop_redis(cls.redis_port)
mockzoo.stop_zoo(cls.zk_port)

#@unittest.skip('Skipping non-cassandra test with vizd')
def test_00_nocassandra(self):
Expand Down Expand Up @@ -154,7 +151,7 @@ def test_03_redis_uve_restart(self):

vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, -1, 0,
kafka_zk = self.__class__.zk_port))
start_kafka = True))
assert vizd_obj.verify_on_setup()

collectors = [vizd_obj.get_collector()]
Expand Down Expand Up @@ -351,7 +348,7 @@ def test_06_alarmgen_basic(self):

vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, self.__class__.redis_port, 0,
kafka_zk = self.__class__.zk_port))
start_kafka = True))
assert vizd_obj.verify_on_setup()

assert(vizd_obj.verify_uvetable_alarm("ObjectCollectorInfo",
Expand Down Expand Up @@ -458,7 +455,7 @@ def test_07_alarm(self):
vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, -1, 0,
collector_ha_test=True,
kafka_zk = self.__class__.zk_port))
start_kafka = True))
assert vizd_obj.verify_on_setup()

# create alarm-generator and attach it to the first collector.
Expand Down Expand Up @@ -572,7 +569,7 @@ def test_08_uve_alarm_filter(self):

vizd_obj = self.useFixture(
AnalyticsFixture(logging, builddir, -1, 0,
collector_ha_test=True, kafka_zk = self.__class__.zk_port))
collector_ha_test=True, start_kafka = True))
assert vizd_obj.verify_on_setup()

collectors = [vizd_obj.collectors[0].get_addr(),
Expand Down

1 comment on commit b56b31a

@suiyueran686
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well ,zookeeper.h is which you missed file in zookeeper folder

Please sign in to comment.