diff --git a/ci_unittests.json b/ci_unittests.json index 9405e05d6b6..7f9caee294d 100644 --- a/ci_unittests.json +++ b/ci_unittests.json @@ -5,7 +5,8 @@ "controller/src/bfd", "controller/src/database", "controller/src/opserver", - "controller/src/query_engine" + "controller/src/query_engine", + "controller/src/zookeeper" ], "scons_test_targets" : [ "controller/src/analytics:test", @@ -13,7 +14,9 @@ "controller/src/bfd:test", "controller/src/query_engine:test", "controller/src/database/gendb:test", - "controller/src/database/cassandra/thrift:test" + "controller/src/database/cassandra/thrift:test", + "controller/src/database/cassandra/cql:test", + "controller/src/zookeeper:test" ], "misc_test_targets" : [ ] }, @@ -93,6 +96,7 @@ "controller/src/db", "controller/src/discovery", "controller/src/database", + "controller/src/zookeeper", "controller/src/http", "controller/src/io", "controller/src/net", @@ -111,6 +115,8 @@ "controller/src/discovery:test", "controller/src/database/gendb:test", "controller/src/database/cassandra/thrift:test", + "controller/src/database/cassandra/cql:test", + "controller/src/zookeeper:test", "controller/src/io:test", "controller/src/net:test", "controller/src/schema:test", diff --git a/src/SConscript b/src/SConscript index 047638e70bb..8e4067d3959 100644 --- a/src/SConscript +++ b/src/SConscript @@ -34,7 +34,8 @@ subdirs = [ 'xml', 'xmpp', 'libpartition', - 'nodemgr' + 'nodemgr', + 'zookeeper', ] if sys.platform != 'darwin': diff --git a/src/analytics/SConscript b/src/analytics/SConscript index 40044986c29..e67be0d74ad 100644 --- a/src/analytics/SConscript +++ b/src/analytics/SConscript @@ -53,6 +53,8 @@ AnalyticsEnv.Prepend(LIBS=['cpuinfo', 'pugixml', 'hiredis', 'protobuf', + 'zookeeper_client', + 'zookeeper_mt', 'boost_filesystem', 'boost_program_options']) @@ -77,7 +79,6 @@ libs = MapBuildDir(['sandesh', 'base', 'io', 'net']) -libs.extend('/usr/lib/') AnalyticsEnv.Append(LIBPATH=libs) includes = MapBuildDir(['http/client', 'discovery/client', 'analytics']) diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index 14345041b05..cb913c129e8 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -25,6 +25,7 @@ #else // USE_CASSANDRA_CQL #include #endif // !USE_CASSANDRA_CQL +#include #include "viz_constants.h" #include "vizd_table_desc.h" @@ -65,14 +66,18 @@ DbHandler::DbHandler(EventManager *evm, std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql) : + bool use_cql, + const std::string &zookeeper_server_list, + bool use_zookeeper) : name_(name), drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), use_cql_(use_cql), tablespace_(), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, - (uint8_t)g_viz_constants.PARTITION_MAX) { + (uint8_t)g_viz_constants.PARTITION_MAX), + zookeeper_server_list_(zookeeper_server_list), + use_zookeeper_(use_zookeeper) { #ifdef USE_CASSANDRA_CQL if (use_cql) { dbif_.reset(new cass::cql::CqlIf(evm, cassandra_ips, @@ -299,7 +304,7 @@ bool DbHandler::Init(bool initial, int instance) { } } -bool DbHandler::Initialize(int instance) { +bool DbHandler::InitializeInternal(int instance) { DB_LOG(DEBUG, "Initializing.."); /* init of vizd table structures */ @@ -326,6 +331,25 @@ bool DbHandler::Initialize(int instance) { return true; } +bool DbHandler::InitializeInternalLocked(int instance) { + // Synchronize creation across nodes using zookeeper + zookeeper::client::ZookeeperClient client(name_.c_str(), + zookeeper_server_list_.c_str()); + zookeeper::client::ZookeeperLock dmutex(&client, "/collector"); + assert(dmutex.Lock()); + bool success(InitializeInternal(instance)); + assert(dmutex.Release()); + return success; +} + +bool DbHandler::Initialize(int instance) { + if (use_zookeeper_) { + return InitializeInternalLocked(instance); + } else { + return InitializeInternal(instance); + } +} + bool DbHandler::Setup(int instance) { DB_LOG(DEBUG, "Setup.."); if (!dbif_->Db_Init("analytics::DbHandler", @@ -1674,13 +1698,15 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql) : + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper) : db_name_(db_name), db_task_instance_(db_task_instance), db_handler_(new DbHandler(evm, boost::bind(&DbHandlerInitializer::ScheduleInit, this), cassandra_ips, cassandra_ports, db_name, ttl_map, - cassandra_user, cassandra_password, use_cql)), + cassandra_user, cassandra_password, use_cql, + zookeeper_server_list, use_zookeeper)), callback_(callback), db_init_timer_(TimerManager::CreateTimer(*evm->io_service(), db_name + " Db Init Timer", diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index bf1817dd727..e9fcd9e5ef7 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -88,7 +88,8 @@ class DbHandler { std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql); + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper); DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map); virtual ~DbHandler(); @@ -156,6 +157,8 @@ class DbHandler { boost::function cb); bool Setup(int instance); bool Initialize(int instance); + bool InitializeInternal(int instance); + bool InitializeInternalLocked(int instance); bool StatTableWrite(uint32_t t2, const std::string& statName, const std::string& statAttr, const std::pair& ptag, @@ -185,6 +188,8 @@ class DbHandler { bool use_cql_; std::string tablespace_; UniformInt8RandomGenerator gen_partition_no_; + std::string zookeeper_server_list_; + bool use_zookeeper_; DISALLOW_COPY_AND_ASSIGN(DbHandler); }; @@ -222,7 +227,9 @@ class DbHandlerInitializer { const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql); + bool use_cql, + const std::string &zookeeper_server_list, + bool use_zookeeper); DbHandlerInitializer(EventManager *evm, const std::string &db_name, int db_task_instance, const std::string &timer_task_name, InitializeDoneCb callback, diff --git a/src/analytics/generator.cc b/src/analytics/generator.cc index 7af6302d76d..5c129d9957e 100644 --- a/src/analytics/generator.cc +++ b/src/analytics/generator.cc @@ -108,7 +108,7 @@ SandeshGenerator::SandeshGenerator(Collector * const collector, VizSession *sess source + ":" + node_type + ":" + module + ":" + instance_id, collector->analytics_ttl_map(), collector->cassandra_user(), collector->cassandra_password(), - false)); + false, std::string(), false)); } else { //Use collector db_handler db_handler_ = global_db_handler; diff --git a/src/analytics/main.cc b/src/analytics/main.cc index fbe1bf4c08e..de372971d63 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -270,6 +270,8 @@ int main(int argc, char *argv[]) copy(cassandra_servers.begin(), cassandra_servers.end(), ostream_iterator(css, " ")); LOG(INFO, "COLLECTOR CASSANDRA SERVERS: " << css.str()); + LOG(INFO, "COLLECTOR ZOOKEEPER SERVERS: " << + options.zookeeper_server_list()); LOG(INFO, "COLLECTOR SYSLOG LISTEN PORT: " << options.syslog_port()); LOG(INFO, "COLLECTOR SFLOW LISTEN PORT: " << options.sflow_port()); LOG(INFO, "COLLECTOR IPFIX LISTEN PORT: " << options.ipfix_port()); @@ -328,6 +330,8 @@ int main(int argc, char *argv[]) //Get Platform info //cql not supported in precise, centos 6.4 6.5 bool use_cql = MiscUtils::IsCqlSupported(); + std::string zookeeper_server_list(options.zookeeper_server_list()); + bool use_zookeeper = !zookeeper_server_list.empty(); VizCollector analytics(a_evm, options.collector_port(), protobuf_server_enabled, @@ -346,7 +350,9 @@ int main(int argc, char *argv[]) options.kafka_prefix(), ttl_map, options.cassandra_user(), options.cassandra_password(), - use_cql); + use_cql, + zookeeper_server_list, + use_zookeeper); #if 0 // initialize python/c++ API diff --git a/src/analytics/options.cc b/src/analytics/options.cc index 5ab99910148..615dd7fa993 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -76,6 +76,8 @@ void Options::Initialize(EventManager &evm, #endif // !USE_CASSANDRA_CQL default_cassandra_server_list.push_back(default_cassandra_server); + string default_zookeeper_server("127.0.0.1:2181"); + vector default_kafka_broker_list; default_kafka_broker_list.push_back(""); @@ -117,6 +119,9 @@ void Options::Initialize(EventManager &evm, opt::value >()->default_value( default_cassandra_server_list, default_cassandra_server), "Cassandra server list") + ("DEFAULT.zookeeper_server_list", + opt::value()->default_value(""), + "Zookeeper server list") ("DEFAULT.kafka_broker_list", opt::value >()->default_value( default_kafka_broker_list, ""), @@ -316,6 +321,8 @@ void Options::Process(int argc, char *argv[], GetOptValue< vector >(var_map, cassandra_server_list_, "DEFAULT.cassandra_server_list"); + GetOptValue(var_map, zookeeper_server_list_, + "DEFAULT.zookeeper_server_list"); GetOptValue< vector >(var_map, kafka_broker_list_, "DEFAULT.kafka_broker_list"); GetOptValue(var_map, partitions_, "DEFAULT.partitions"); diff --git a/src/analytics/options.h b/src/analytics/options.h index e43bc7532ec..a67836b8703 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -17,6 +17,9 @@ class Options { const std::vector cassandra_server_list() const { return cassandra_server_list_; } + const std::string zookeeper_server_list() const { + return zookeeper_server_list_; + } const std::vector kafka_broker_list() const { return kafka_broker_list_; } @@ -127,6 +130,7 @@ class Options { uint64_t analytics_flow_ttl_; uint64_t analytics_statistics_ttl_; std::vector cassandra_server_list_; + std::string zookeeper_server_list_; std::vector kafka_broker_list_; uint16_t partitions_; uint32_t sandesh_ratelimit_; diff --git a/src/analytics/protobuf_collector.cc b/src/analytics/protobuf_collector.cc index 58f7c27ab44..fbcf4c6b2b7 100644 --- a/src/analytics/protobuf_collector.cc +++ b/src/analytics/protobuf_collector.cc @@ -24,7 +24,7 @@ ProtobufCollector::ProtobufCollector(EventManager *evm, db_initializer_.reset(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance, kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, false)); + cassandra_password, false, std::string(), false)); db_handler_ = db_initializer_->GetDbHandler(); } else { db_handler_ = global_dbhandler; diff --git a/src/analytics/test/db_handler_mock.h b/src/analytics/test/db_handler_mock.h index 80d9a29db00..da734d15426 100644 --- a/src/analytics/test/db_handler_mock.h +++ b/src/analytics/test/db_handler_mock.h @@ -14,7 +14,8 @@ class DbHandlerMock : public DbHandler { DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) : DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this), std::vector(1, "127.0.0.1"), - std::vector(1, 9160), "localhost", ttl_map, "", "", false) + std::vector(1, 9160), "localhost", ttl_map, "", "", false, + "", false) { } void StartDbifReinit() { diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index 42afa37ce72..eb71c2139e3 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -43,12 +43,13 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, const std::string &kafka_prefix, const TtlMap& ttl_map, const std::string &cassandra_user, const std::string &cassandra_password, - bool use_cql) : + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper) : db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1, std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, use_cql)), + cassandra_password, use_cql, zookeeper_server_list, use_zookeeper)), osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port, redis_password, brokers, partitions, kafka_prefix)), ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())), diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index e8c39ff28be..4b112c396d5 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -40,7 +40,9 @@ class VizCollector { const std::string &kafka_prefix, const TtlMap &ttlmap, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql); + bool use_cql, + const std::string &zookeeper_server_list, + bool use_zookeeper); VizCollector(EventManager *evm, DbHandlerPtr db_handler, Ruleeng *ruleeng, Collector *collector, OpServerProxy *osp); diff --git a/src/opserver/test/test_analytics_uve.py b/src/opserver/test/test_analytics_uve.py index 9fbd58425b2..9615d6c2574 100755 --- a/src/opserver/test/test_analytics_uve.py +++ b/src/opserver/test/test_analytics_uve.py @@ -51,14 +51,11 @@ def setUpClass(cls): cls.redis_port = AnalyticsUveTest.get_free_port() mockredis.start_redis(cls.redis_port) - cls.zk_port = AnalyticsUveTest.get_free_port() - mockzoo.start_zoo(cls.zk_port) @classmethod def tearDownClass(cls): mockredis.stop_redis(cls.redis_port) - mockzoo.stop_zoo(cls.zk_port) #@unittest.skip('Skipping non-cassandra test with vizd') def test_00_nocassandra(self): @@ -154,7 +151,7 @@ def test_03_redis_uve_restart(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, -1, 0, - kafka_zk = self.__class__.zk_port)) + start_kafka = True)) assert vizd_obj.verify_on_setup() collectors = [vizd_obj.get_collector()] @@ -351,7 +348,7 @@ def test_06_alarmgen_basic(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, self.__class__.redis_port, 0, - kafka_zk = self.__class__.zk_port)) + start_kafka = True)) assert vizd_obj.verify_on_setup() assert(vizd_obj.verify_uvetable_alarm("ObjectCollectorInfo", @@ -458,7 +455,7 @@ def test_07_alarm(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, -1, 0, collector_ha_test=True, - kafka_zk = self.__class__.zk_port)) + start_kafka = True)) assert vizd_obj.verify_on_setup() # create alarm-generator and attach it to the first collector. @@ -572,7 +569,7 @@ def test_08_uve_alarm_filter(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, -1, 0, - collector_ha_test=True, kafka_zk = self.__class__.zk_port)) + collector_ha_test=True, start_kafka = True)) assert vizd_obj.verify_on_setup() collectors = [vizd_obj.collectors[0].get_addr(), diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index 636092238f8..801141f8182 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -62,8 +62,9 @@ class Collector(object): def __init__(self, analytics_fixture, redis_uve, logger, ipfix_port = False, sflow_port = False, syslog_port = False, protobuf_port = True, - kafka = None, is_dup=False, - cassandra_user= None, cassandra_password= None): + kafka = None, is_dup = False, + cassandra_user = None, cassandra_password = None, + zookeeper = None): self.analytics_fixture = analytics_fixture if kafka is None: self.kafka_port = None @@ -91,6 +92,7 @@ def __init__(self, analytics_fixture, redis_uve, ':'+ModuleNames[Module.COLLECTOR]+':0' self.cassandra_user = analytics_fixture.cassandra_user self.cassandra_password = analytics_fixture.cassandra_password + self.zk_port = zookeeper.port # end __init__ def get_addr(self): @@ -166,6 +168,8 @@ def start(self): args.append('127.0.0.1:%d' % self.kafka_port) args.append('--DEFAULT.partitions') args.append(str(4)) + args.append('--DEFAULT.zookeeper_server_list') + args.append('127.0.0.1:%d' % self.zk_port) self._logger.info('Setting up Vizd: %s' % (' '.join(args))) ports, self._instance = \ self.analytics_fixture.start_with_ephemeral_ports( @@ -511,9 +515,8 @@ def stop(self): class Kafka(object): def __init__(self, zk_port): self.port = None - self.zk_port = zk_port - self.zk_start = False self.running = False + self.zk_port = zk_port # end __init__ def start(self): @@ -521,10 +524,6 @@ def start(self): self.running = True if not self.port: self.port = AnalyticsFixture.get_free_port() - if not self.zk_port: - self.zk_port = AnalyticsFixture.get_free_port() - mockzoo.start_zoo(self.zk_port) - self.zk_start = True mockkafka.start_kafka(self.zk_port, self.port) # end start @@ -532,18 +531,37 @@ def start(self): def stop(self): if self.running: mockkafka.stop_kafka(self.port) - if self.zk_start: - mockzoo.stop_zoo(self.zk_port) self.running = False #end stop # end class Kafka +class Zookeeper(object): + def __init__(self, zk_port=None): + self.running = False; + self.port = zk_port + # end __init__ + + def start(self): + assert(self.running == False) + if not self.port: + self.port = AnalyticsFixture.get_free_port() + mockzoo.start_zoo(self.port) + self.running = True + # end start + + def stop(self): + if self.running: + mockzoo.stop_zoo(self.port) + self.running = False + # end stop +# end class Zookeeper + class AnalyticsFixture(fixtures.Fixture): def __init__(self, logger, builddir, redis_port, cassandra_port, ipfix_port = False, sflow_port = False, syslog_port = False, protobuf_port = False, noqed=False, collector_ha_test=False, - redis_password=None, kafka_zk=0, + redis_password=None, start_kafka=False, cassandra_user=None, cassandra_password=None): self.builddir = builddir @@ -557,13 +575,14 @@ def __init__(self, logger, builddir, redis_port, cassandra_port, self.noqed = noqed self.collector_ha_test = collector_ha_test self.redis_password = redis_password - self.kafka_zk = kafka_zk + self.start_kafka = start_kafka self.kafka = None self.opserver = None self.query_engine = None self.alarmgen = None self.cassandra_user = cassandra_user self.cassandra_password = cassandra_password + self.zookeeper = None def setUp(self): super(AnalyticsFixture, self).setUp() @@ -572,8 +591,11 @@ def setUp(self): self.redis_password)] self.redis_uves[0].start() - if self.kafka_zk: - self.kafka = Kafka(self.kafka_zk) + self.zookeeper = Zookeeper() + self.zookeeper.start() + + if self.start_kafka: + self.kafka = Kafka(self.zookeeper.port) self.kafka.start() self.collectors = [Collector(self, self.redis_uves[0], self.logger, @@ -581,7 +603,8 @@ def setUp(self): sflow_port = self.sflow_port, syslog_port = self.syslog_port, protobuf_port = self.protobuf_port, - kafka = self.kafka)] + kafka = self.kafka, + zookeeper = self.zookeeper)] if not self.collectors[0].start(): self.logger.error("Collector did NOT start") return @@ -599,7 +622,8 @@ def setUp(self): self.collectors.append(Collector(self, self.redis_uves[1], self.logger, kafka = self.kafka, - is_dup=True)) + is_dup = True, + zookeeper = self.zookeeper)) if not self.collectors[1].start(): self.logger.error("Secondary Collector did NOT start") secondary_collector = self.collectors[1].get_addr() diff --git a/src/zookeeper/SConscript b/src/zookeeper/SConscript new file mode 100644 index 00000000000..16563c18953 --- /dev/null +++ b/src/zookeeper/SConscript @@ -0,0 +1,14 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +Import('BuildEnv') +ZooEnv = BuildEnv.Clone() + +ZooEnv.Append(CPPPATH = ['/usr/include']) +sources = ['zookeeper_client.cc'] + +libzoo = ZooEnv.Library('zookeeper_client', source = sources) +ZooEnv.Install(ZooEnv['TOP_LIB'], libzoo) + +ZooEnv.SConscript('test/SConscript', exports='ZooEnv', duplicate=0) diff --git a/src/zookeeper/test/SConscript b/src/zookeeper/test/SConscript new file mode 100644 index 00000000000..16a813df157 --- /dev/null +++ b/src/zookeeper/test/SConscript @@ -0,0 +1,26 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +Import('ZooEnv') + +env = ZooEnv.Clone() + +def MapBuildDir(list): + return map(lambda x: env['TOP'] + '/' + x, list) + +libs = ['zookeeper_client', 'base', 'zookeeper_mt', 'gunit'] +env.Prepend(LIBS=libs) +libpaths=['base'] +env.Append(LIBPATH = [MapBuildDir(libpaths)]) + +zookeeper_client_test = env.UnitTest('zookeeper_client_test', + ['zookeeper_client_test.cc']) + +test_suite = [ zookeeper_client_test ] +test = env.TestSuite('zookeeper_test_suite', test_suite) +env.Alias('controller/src/zookeeper:test', test) + +flaky_test_suite = [] +flaky_test = env.TestSuite('zookeeper_flaky_test_suite', flaky_test_suite) +env.Alias('controller/src/zookeeper:flaky-test', flaky_test) diff --git a/src/zookeeper/test/zookeeper_client_test.cc b/src/zookeeper/test/zookeeper_client_test.cc new file mode 100644 index 00000000000..89a7673bbbe --- /dev/null +++ b/src/zookeeper/test/zookeeper_client_test.cc @@ -0,0 +1,173 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#include + +#include +#include +#include +#include + +using ::testing::Return; +using ::testing::WithArgs; +using ::testing::DoAll; +using ::testing::SetArgPointee; +using ::testing::_; +using ::testing::StrEq; + +using namespace zookeeper::client; +using namespace zookeeper::interface; + +class ZookeeperMockInterface : public ZookeeperInterface { + public: + ZookeeperMockInterface() { + } + virtual ~ZookeeperMockInterface() { + } + MOCK_METHOD1(ZooSetDebugLevel, void(ZooLogLevel logLevel)); + MOCK_METHOD6(ZookeeperInit, zhandle_t*(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, + int flags)); + MOCK_METHOD1(ZookeeperClose, int(zhandle_t *zh)); + MOCK_METHOD1(ZooState, int(zhandle_t *zh)); + MOCK_METHOD8(ZooCreate, int(zhandle_t *zh, const char *path, + const char *value, int valuelen, const struct ACL_vector *acl, + int flags, char *path_buffer, int path_buffer_len)); + MOCK_METHOD3(ZooDelete, int(zhandle_t *zh, const char *path, + int version)); + MOCK_METHOD6(ZooGet, int(zhandle_t *zh, const char *path, int watch, + char *buffer, int* buffer_len, struct Stat *stat)); +}; + +class ZookeeperClientTest : public ::testing::Test { + protected: + ZookeeperClientTest() { + } + ~ZookeeperClientTest() { + } + virtual void SetUp() { + } + virtual void TearDown() { + } + ZookeeperClient* CreateClient(impl::ZookeeperClientImpl *impl) { + return new ZookeeperClient(impl); + } + std::string GetLockId(const ZookeeperLock &zk_lock) { + return zk_lock.Id(); + } +}; + +TEST_F(ZookeeperClientTest, Basic) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + int zkh(0xdeadbeef); + zhandle_t *zk_handle = (zhandle_t *)(&zkh); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)) + .WillOnce(Return(zk_handle)); + EXPECT_CALL(*zmi, ZooState(zk_handle)) + .WillOnce(Return(ZOO_CONNECTED_STATE)); + EXPECT_CALL(*zmi, ZooCreate(zk_handle, StrEq(zk_lock_name), + StrEq(zk_lock_id), zk_lock_id.length(), _, _, _, _)) + .WillOnce(Return(ZOK)); + EXPECT_TRUE(zk_lock.Lock()); + EXPECT_TRUE(cImpl->IsConnected()); + EXPECT_CALL(*zmi, ZooDelete(zk_handle, StrEq(zk_lock_name), _)) + .WillOnce(Return(ZOK)); + EXPECT_CALL(*zmi, ZookeeperClose(zk_handle)); + EXPECT_TRUE(zk_lock.Release()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +TEST_F(ZookeeperClientTest, ZookeeperInitFail) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)); + EXPECT_FALSE(zk_lock.Lock()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +TEST_F(ZookeeperClientTest, ZooStateConnecting2Connect) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + int zkh(0xdeadbeef); + zhandle_t *zk_handle = (zhandle_t *)(&zkh); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)) + .WillOnce(Return(zk_handle)); + EXPECT_CALL(*zmi, ZooState(zk_handle)) + .WillOnce(Return(ZOO_CONNECTING_STATE)) + .WillOnce(Return(ZOO_CONNECTED_STATE)); + EXPECT_CALL(*zmi, ZooCreate(zk_handle, StrEq(zk_lock_name), + StrEq(zk_lock_id), zk_lock_id.length(), _, _, _, _)) + .WillOnce(Return(ZOK)); + EXPECT_TRUE(zk_lock.Lock()); + EXPECT_TRUE(cImpl->IsConnected()); + EXPECT_CALL(*zmi, ZooDelete(zk_handle, StrEq(zk_lock_name), _)) + .WillOnce(Return(ZOK)); + EXPECT_CALL(*zmi, ZookeeperClose(zk_handle)); + EXPECT_TRUE(zk_lock.Release()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +ACTION_P(StrCpyToArg0, str) { + std::strcpy(arg0, str); +} + +TEST_F(ZookeeperClientTest, ZooCreateNodeExists) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + int zkh(0xdeadbeef); + zhandle_t *zk_handle = (zhandle_t *)(&zkh); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)) + .WillOnce(Return(zk_handle)); + EXPECT_CALL(*zmi, ZooState(zk_handle)) + .WillOnce(Return(ZOO_CONNECTED_STATE)); + EXPECT_CALL(*zmi, ZooCreate(zk_handle, StrEq(zk_lock_name), + StrEq(zk_lock_id), zk_lock_id.length(), _, _, _, _)) + .WillOnce(Return(ZNODEEXISTS)) + .WillOnce(Return(ZNODEEXISTS)); + std::string other_zk_lock_id(zk_lock_id); + other_zk_lock_id += "-other"; + EXPECT_CALL(*zmi, ZooGet(zk_handle, StrEq(zk_lock_name), _, _, _, _)) + .WillOnce(DoAll(WithArgs<3>(StrCpyToArg0(other_zk_lock_id.c_str())), + SetArgPointee<4>((int)other_zk_lock_id.length()), Return(ZOK))) + .WillOnce(DoAll(WithArgs<3>(StrCpyToArg0(zk_lock_id.c_str())), + SetArgPointee<4>((int)zk_lock_id.length()), Return(ZOK))); + EXPECT_TRUE(zk_lock.Lock()); + EXPECT_TRUE(cImpl->IsConnected()); + EXPECT_CALL(*zmi, ZooDelete(zk_handle, StrEq(zk_lock_name), _)) + .WillOnce(Return(ZOK)); + EXPECT_CALL(*zmi, ZookeeperClose(zk_handle)); + EXPECT_TRUE(zk_lock.Release()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +int main(int argc, char **argv) { + LoggingInit(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/zookeeper/zookeeper_client.cc b/src/zookeeper/zookeeper_client.cc new file mode 100644 index 00000000000..552732c9b32 --- /dev/null +++ b/src/zookeeper/zookeeper_client.cc @@ -0,0 +1,362 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#include +#include +#include + +#include + +#include +#include +#include + +#define ZOO_LOG(_Level, _Msg) \ + do { \ + if (LoggingDisabled()) break; \ + log4cplus::Logger logger = log4cplus::Logger::getRoot(); \ + LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \ + __LINE__ << ": " << _Msg); \ + } while (false) + +#define ZOO_LOG_ERR(_Msg) \ + do { \ + LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \ + << _Msg); \ + } while (false) + +namespace zookeeper { +namespace interface { + +class ZookeeperCBindings : public ZookeeperInterface { + public: + ZookeeperCBindings() { + } + virtual ~ZookeeperCBindings() { + } + virtual void ZooSetDebugLevel(ZooLogLevel logLevel) { + zoo_set_debug_level(logLevel); + } + virtual zhandle_t* ZookeeperInit(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, + int flags) { + return zookeeper_init(host, fn, recv_timeout, clientid, context, + flags); + } + virtual int ZookeeperClose(zhandle_t *zh) { + return zookeeper_close(zh); + } + virtual int ZooState(zhandle_t *zh) { + return zoo_state(zh); + } + virtual int ZooCreate(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len) { + return zoo_create(zh, path, value, valuelen, acl, flags, path_buffer, + path_buffer_len); + } + virtual int ZooDelete(zhandle_t *zh, const char *path, int version) { + return zoo_delete(zh, path, version); + } + virtual int ZooGet(zhandle_t *zh, const char *path, int watch, + char *buffer, int* buffer_len, struct Stat *stat) { + return zoo_get(zh, path, watch, buffer, buffer_len, stat); + } +}; + +} // namespace interface + +namespace client { +namespace impl { + +// ZookeeperClientImpl +ZookeeperClientImpl::ZookeeperClientImpl(const char *hostname, + const char *servers, zookeeper::interface::ZookeeperInterface *zki) : + hostname_(hostname), + servers_(servers), + zk_handle_(NULL), + connected_(false), + zki_(zki) { + // Set loglevel + zki_->ZooSetDebugLevel(ZOO_LOG_LEVEL_DEBUG); +} + +ZookeeperClientImpl::~ZookeeperClientImpl() { +} + +bool ZookeeperClientImpl::Connect() { + while (true) { + zk_handle_ = zki_->ZookeeperInit(servers_.c_str(), + NULL, + kSessionTimeoutMSec_, + NULL, + NULL, + 0); + // Unfortunately, EINVAL is highly overloaded in zookeeper_init + // and can correspond to: + // (1) Empty / invalid 'host' string format. + // (2) Any getaddrinfo error other than EAI_NONAME, + // EAI_NODATA, and EAI_MEMORY are mapped to EINVAL. + // Either way, retrying is not problematic. + if (zk_handle_ == NULL && errno == EINVAL) { + ZOO_LOG(WARN, "zookeeper_init FAILED: (" << errno << + "): retrying in 1 second"); + sleep(1); + continue; + } + if (zk_handle_ == NULL) { + int zerrno(errno); + ZOO_LOG_ERR("zookeeper_init returned NULL zhandle: (" + << zerrno << ")"); + return false; + } + // Block till session is connected + while (!connected_) { + int zstate(zki_->ZooState(zk_handle_)); + if (zstate == ZOO_CONNECTED_STATE) { + connected_ = true; + ZOO_LOG(DEBUG, "Session CONNECTED"); + break; + } else { + ZOO_LOG(DEBUG, "Session NOT CONNECTED: retrying in 1 second"); + sleep(1); + continue; + } + } + break; + } + assert(connected_); + return true; +} + +void ZookeeperClientImpl::Shutdown() { + if (zk_handle_) { + int rc(zki_->ZookeeperClose(zk_handle_)); + if (rc != ZOK) { + int zerrno(errno); + ZOO_LOG(WARN, "zookeeper_close FAILED (" << rc << + "): errno: " << zerrno); + } + zk_handle_ = NULL; + } + connected_ = false; +} + +bool ZookeeperClientImpl::Reconnect() { + Shutdown(); + return Connect(); +} + +bool ZookeeperClientImpl::IsConnected() const { + return connected_; +} + +int ZookeeperClientImpl::CreateNodeSync(const char *path, const char *value, + int *err) { + int rc; + // Session expired state or auth failed state + while ((rc = zki_->ZooCreate(zk_handle_, path, value, strlen(value), + &ZOO_OPEN_ACL_UNSAFE, 0, NULL, -1)) == ZINVALIDSTATE) { + // Reconnect + Reconnect(); + } + if (rc != ZOK) { + *err = errno; + } + return rc; +} + +int ZookeeperClientImpl::GetNodeDataSync(const char *path, char *buf, + int *buf_len, int *err) { + int rc; + // Session expired state or auth failed state + while ((rc = zki_->ZooGet(zk_handle_, path, 0, buf, buf_len, NULL)) == + ZINVALIDSTATE) { + // Reconnect + Reconnect(); + } + if (rc != ZOK) { + *err = errno; + } + return rc; +} + +int ZookeeperClientImpl::DeleteNodeSync(const char *path, int *err) { + int rc; + // Session expired state or auth failed state + while ((rc = zki_->ZooDelete(zk_handle_, path, -1)) == ZINVALIDSTATE) { + // Reconnect + Reconnect(); + } + if (rc != ZOK) { + *err = errno; + } + return rc; +} + +std::string ZookeeperClientImpl::Name() const { + return hostname_; +} + +} // namespace impl + +// ZookeeperClient +ZookeeperClient::ZookeeperClient(const char *hostname, const char *servers) : + impl_(new impl::ZookeeperClientImpl(hostname, servers, + new zookeeper::interface::ZookeeperCBindings)) { +} + +ZookeeperClient::ZookeeperClient(impl::ZookeeperClientImpl *impl) : + impl_(impl) { +} + +ZookeeperClient::~ZookeeperClient() { +} + +// ZookeeperLockImpl +class ZookeeperLock::ZookeeperLockImpl { + public: + ZookeeperLockImpl(impl::ZookeeperClientImpl *clientImpl, + const char *path) : + clientImpl_(clientImpl), + path_(path), + is_acquired_(false) { + // Generate unique ID for data + boost::uuids::uuid ruuid(rgen_()); + id_ = clientImpl_->Name() + "-" + to_string(ruuid); + } + + std::string Id() const { + return id_; + } + + bool Lock() { + ZOO_LOG(INFO, "Trying (" << path_ << "): " << id_); + while (true) { + // Connect if not already done + if (!clientImpl_->IsConnected()) { + bool success(clientImpl_->Connect()); + if (!success) { + ZOO_LOG_ERR("Zookeeper Client Connect FAILED"); + return success; + } + } + // Try creating the znode + int err; + int rc(clientImpl_->CreateNodeSync(path_.c_str(), id_.c_str(), + &err)); + switch (rc) { + case ZOK: { + // We acquired the lock + ZOO_LOG(INFO, "ACQUIRED (" << path_ << "): " << id_); + is_acquired_ = true; + return true; + } + case ZNODEEXISTS: { + // Node exists, get node data and check + char buf[256]; + int buf_len(sizeof(buf)); + int zerr; + int zrc(clientImpl_->GetNodeDataSync(path_.c_str(), buf, + &buf_len, &zerr)); + if (zrc == ZOK) { + // Does it match our ID? + std::string mid(buf, buf_len); + if (id_ == mid) { + // We acquired the lock + ZOO_LOG(INFO, "ACQUIRED EEXIST (" << path_ << "): " + << id_); + is_acquired_ = true; + return true; + } + ZOO_LOG(INFO, "EEXIST (" << path_ << "): " << mid << + " , ours: " << id_); + sleep(1); + continue; + } else if (zrc == ZNONODE) { + ZOO_LOG(WARN, "GetNodeDataSync(" << path_ << + "): Data: " << id_ << + ": No Node EXISTS: retrying in 1 second"); + sleep(1); + continue; + } else { + ZOO_LOG_ERR("GetNodeDataSync(" << path_ << "): " << + id_ << ": FAILED: (" << zrc << ") error: " << zerr); + clientImpl_->Shutdown(); + return false; + } + break; + } + default: { + ZOO_LOG_ERR("CreateNodeSync(" << path_ << "): " << id_ + << ": FAILED: (" << rc << ") error: " << err); + clientImpl_->Shutdown(); + return false; + } + } + } + } + + bool Release() { + bool success; + int err, rc; + if (!is_acquired_) { + ZOO_LOG_ERR("(" << path_ << "): " << id_ << + ": Release WITHOUT Lock"); + success = false; + goto cleanup; + } + is_acquired_ = false; + rc = clientImpl_->DeleteNodeSync(path_.c_str(), &err); + if (rc == ZOK) { + ZOO_LOG(INFO, "RELEASED (" << path_ << "): " << id_); + success = true; + goto cleanup; + } else if (rc == ZNONODE) { + ZOO_LOG_ERR("DeleteNodeSync(" << path_ << "): " << id_ << + ": No Node EXISTS(" << err << + "): Possible concurrent execution"); + success = false; + goto cleanup; + } else { + ZOO_LOG_ERR("DeleteNodeSync(" << path_ << "): " << id_ << + ": FAILED (" << rc << "): error " << err); + success = false; + goto cleanup; + } + cleanup: + clientImpl_->Shutdown(); + return success; + } + + private: + impl::ZookeeperClientImpl *clientImpl_; + std::string path_; + bool is_acquired_; + boost::uuids::random_generator rgen_; + std::string id_; +}; + +// ZookeeperLock +ZookeeperLock::ZookeeperLock(ZookeeperClient *client, const char *path) : + impl_(new ZookeeperLockImpl(client->impl_.get(), path)) { +} + +ZookeeperLock::~ZookeeperLock() { +} + +std::string ZookeeperLock::Id() const { + return impl_->Id(); +} + +bool ZookeeperLock::Lock() { + return impl_->Lock(); +} + +bool ZookeeperLock::Release() { + return impl_->Release(); +} + +} // namespace client +} // namespace zookeeper diff --git a/src/zookeeper/zookeeper_client.h b/src/zookeeper/zookeeper_client.h new file mode 100644 index 00000000000..c0c2bf57536 --- /dev/null +++ b/src/zookeeper/zookeeper_client.h @@ -0,0 +1,59 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef ZOOKEEPER_ZOOKEEPER_CLIENT_H_ +#define ZOOKEEPER_ZOOKEEPER_CLIENT_H_ + +class ZookeeperClientTest; + +namespace zookeeper { +namespace client { + +// Forward declarations +namespace impl { +class ZookeeperClientImpl; +} // namespace impl + +// +// Blocking, synchronous, non-thread safe Zookeeper client +// +class ZookeeperClient { + public: + ZookeeperClient(const char *hostname, const char *servers); + virtual ~ZookeeperClient(); + + private: + ZookeeperClient(impl::ZookeeperClientImpl *impl); + + friend class ZookeeperLock; + friend class ::ZookeeperClientTest; + + std::auto_ptr impl_; +}; + +// +// Usage is to first create a ZookeeperClient, and then ZookeeperLock +// for distributed synchronization +// +class ZookeeperLock { + public: + ZookeeperLock(ZookeeperClient *client, const char *path); + virtual ~ZookeeperLock(); + + bool Lock(); + bool Release(); + + private: + class ZookeeperLockImpl; + friend class ::ZookeeperClientTest; + + std::string Id() const; + + std::auto_ptr impl_; +}; + +} // namespace client +} // namespace zookeeper + +#endif // ZOOKEEPER_ZOOKEEPER_CLIENT_H_ diff --git a/src/zookeeper/zookeeper_client_impl.h b/src/zookeeper/zookeeper_client_impl.h new file mode 100644 index 00000000000..7048b53700e --- /dev/null +++ b/src/zookeeper/zookeeper_client_impl.h @@ -0,0 +1,48 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef ZOOKEEPER_ZOOKEEPER_CLIENT_IMPL_H_ +#define ZOOKEEPER_ZOOKEEPER_CLIENT_IMPL_H_ + +#include + +#include + +namespace zookeeper { +namespace client { +namespace impl { + +// +// Blocking, synchronous, non-thread safe Zookeeper client +// +class ZookeeperClientImpl { + public: + ZookeeperClientImpl(const char *hostname, const char *servers, + zookeeper::interface::ZookeeperInterface *zki); + virtual ~ZookeeperClientImpl(); + + bool Connect(); + void Shutdown(); + bool Reconnect(); + bool IsConnected() const; + int CreateNodeSync(const char *path, const char *value, int *err); + int GetNodeDataSync(const char *path, char *buf, int *buf_len, int *err); + int DeleteNodeSync(const char *path, int *err); + std::string Name() const; + + private: + static const int kSessionTimeoutMSec_ = 10000; + + std::string hostname_; + std::string servers_; + zhandle_t *zk_handle_; + bool connected_; + std::auto_ptr zki_; +}; + +} // namespace impl +} // namespace client +} // namespace zookeeper + +#endif // ZOOKEEPER_ZOOKEEPER_CLIENT_IMPL_H_ diff --git a/src/zookeeper/zookeeper_interface.h b/src/zookeeper/zookeeper_interface.h new file mode 100644 index 00000000000..a5cf6dd8bde --- /dev/null +++ b/src/zookeeper/zookeeper_interface.h @@ -0,0 +1,33 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef ZOOKEEPER_ZOOKEEPER_INTERFACE_H_ +#define ZOOKEEPER_ZOOKEEPER_INTERFACE_H_ + +#include + +namespace zookeeper { +namespace interface { + +class ZookeeperInterface { + public: + virtual ~ZookeeperInterface() {} + virtual void ZooSetDebugLevel(ZooLogLevel logLevel) = 0; + virtual zhandle_t* ZookeeperInit(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, + int flags) = 0; + virtual int ZookeeperClose(zhandle_t *zh) = 0; + virtual int ZooState(zhandle_t *zh) = 0; + virtual int ZooCreate(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len) = 0; + virtual int ZooDelete(zhandle_t *zh, const char *path, int version) = 0; + virtual int ZooGet(zhandle_t *zh, const char *path, int watch, + char *buffer, int* buffer_len, struct Stat *stat) = 0; +}; + +} // namespace interface +} // namespace zookeeper + +#endif // ZOOKEEPER_ZOOKEEPER_INTERFACE_H_