From b56b31a06a07b8642234a1592906e1b2a2438b37 Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Mon, 22 Feb 2016 23:39:31 -0800 Subject: [PATCH] Distributed locking amongst contrail-collector for creating cassandra schema 1. Add synchronous, blocking zookeeper client using the zookeeper C bindings. 2. Distributed lock receipe - all nodes try to create a znode with their unique id as data. The node that is able to create gets the lock. The node will call release which will delete the znode. Other nodes will get EEXIST and they keep on retrying till they are able to create the znode. 3. Unit tests for zookeeper client. 4. contrail-collector changes to accept DEFAULT.zookeeper_server_list as command line arguments and configuration file parameter. 5. contrail-collector changes to use ZookeeperLock in DbHandler when doing Initialize. 6. Add zookeeper and cql tests to CI unit tests. Change-Id: I924016bc74d452bb8aeaa01185788e3164e77f40 Partial-Bug: #1542576 --- ci_unittests.json | 10 +- src/SConscript | 3 +- src/analytics/SConscript | 3 +- src/analytics/db_handler.cc | 36 +- src/analytics/db_handler.h | 11 +- src/analytics/generator.cc | 2 +- src/analytics/main.cc | 8 +- src/analytics/options.cc | 7 + src/analytics/options.h | 4 + src/analytics/protobuf_collector.cc | 2 +- src/analytics/test/db_handler_mock.h | 3 +- src/analytics/viz_collector.cc | 5 +- src/analytics/viz_collector.h | 4 +- src/opserver/test/test_analytics_uve.py | 11 +- src/opserver/test/utils/analytics_fixture.py | 56 ++- src/zookeeper/SConscript | 14 + src/zookeeper/test/SConscript | 26 ++ src/zookeeper/test/zookeeper_client_test.cc | 173 +++++++++ src/zookeeper/zookeeper_client.cc | 362 +++++++++++++++++++ src/zookeeper/zookeeper_client.h | 59 +++ src/zookeeper/zookeeper_client_impl.h | 48 +++ src/zookeeper/zookeeper_interface.h | 33 ++ 22 files changed, 839 insertions(+), 41 deletions(-) create mode 100644 src/zookeeper/SConscript create mode 100644 src/zookeeper/test/SConscript create mode 100644 src/zookeeper/test/zookeeper_client_test.cc create mode 100644 src/zookeeper/zookeeper_client.cc create mode 100644 src/zookeeper/zookeeper_client.h create mode 100644 src/zookeeper/zookeeper_client_impl.h create mode 100644 src/zookeeper/zookeeper_interface.h diff --git a/ci_unittests.json b/ci_unittests.json index 9405e05d6b6..7f9caee294d 100644 --- a/ci_unittests.json +++ b/ci_unittests.json @@ -5,7 +5,8 @@ "controller/src/bfd", "controller/src/database", "controller/src/opserver", - "controller/src/query_engine" + "controller/src/query_engine", + "controller/src/zookeeper" ], "scons_test_targets" : [ "controller/src/analytics:test", @@ -13,7 +14,9 @@ "controller/src/bfd:test", "controller/src/query_engine:test", "controller/src/database/gendb:test", - "controller/src/database/cassandra/thrift:test" + "controller/src/database/cassandra/thrift:test", + "controller/src/database/cassandra/cql:test", + "controller/src/zookeeper:test" ], "misc_test_targets" : [ ] }, @@ -93,6 +96,7 @@ "controller/src/db", "controller/src/discovery", "controller/src/database", + "controller/src/zookeeper", "controller/src/http", "controller/src/io", "controller/src/net", @@ -111,6 +115,8 @@ "controller/src/discovery:test", "controller/src/database/gendb:test", "controller/src/database/cassandra/thrift:test", + "controller/src/database/cassandra/cql:test", + "controller/src/zookeeper:test", "controller/src/io:test", "controller/src/net:test", "controller/src/schema:test", diff --git a/src/SConscript b/src/SConscript index 047638e70bb..8e4067d3959 100644 --- a/src/SConscript +++ b/src/SConscript @@ -34,7 +34,8 @@ subdirs = [ 'xml', 'xmpp', 'libpartition', - 'nodemgr' + 'nodemgr', + 'zookeeper', ] if sys.platform != 'darwin': diff --git a/src/analytics/SConscript b/src/analytics/SConscript index 40044986c29..e67be0d74ad 100644 --- a/src/analytics/SConscript +++ b/src/analytics/SConscript @@ -53,6 +53,8 @@ AnalyticsEnv.Prepend(LIBS=['cpuinfo', 'pugixml', 'hiredis', 'protobuf', + 'zookeeper_client', + 'zookeeper_mt', 'boost_filesystem', 'boost_program_options']) @@ -77,7 +79,6 @@ libs = MapBuildDir(['sandesh', 'base', 'io', 'net']) -libs.extend('/usr/lib/') AnalyticsEnv.Append(LIBPATH=libs) includes = MapBuildDir(['http/client', 'discovery/client', 'analytics']) diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index 14345041b05..cb913c129e8 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -25,6 +25,7 @@ #else // USE_CASSANDRA_CQL #include #endif // !USE_CASSANDRA_CQL +#include #include "viz_constants.h" #include "vizd_table_desc.h" @@ -65,14 +66,18 @@ DbHandler::DbHandler(EventManager *evm, std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql) : + bool use_cql, + const std::string &zookeeper_server_list, + bool use_zookeeper) : name_(name), drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), use_cql_(use_cql), tablespace_(), gen_partition_no_((uint8_t)g_viz_constants.PARTITION_MIN, - (uint8_t)g_viz_constants.PARTITION_MAX) { + (uint8_t)g_viz_constants.PARTITION_MAX), + zookeeper_server_list_(zookeeper_server_list), + use_zookeeper_(use_zookeeper) { #ifdef USE_CASSANDRA_CQL if (use_cql) { dbif_.reset(new cass::cql::CqlIf(evm, cassandra_ips, @@ -299,7 +304,7 @@ bool DbHandler::Init(bool initial, int instance) { } } -bool DbHandler::Initialize(int instance) { +bool DbHandler::InitializeInternal(int instance) { DB_LOG(DEBUG, "Initializing.."); /* init of vizd table structures */ @@ -326,6 +331,25 @@ bool DbHandler::Initialize(int instance) { return true; } +bool DbHandler::InitializeInternalLocked(int instance) { + // Synchronize creation across nodes using zookeeper + zookeeper::client::ZookeeperClient client(name_.c_str(), + zookeeper_server_list_.c_str()); + zookeeper::client::ZookeeperLock dmutex(&client, "/collector"); + assert(dmutex.Lock()); + bool success(InitializeInternal(instance)); + assert(dmutex.Release()); + return success; +} + +bool DbHandler::Initialize(int instance) { + if (use_zookeeper_) { + return InitializeInternalLocked(instance); + } else { + return InitializeInternal(instance); + } +} + bool DbHandler::Setup(int instance) { DB_LOG(DEBUG, "Setup.."); if (!dbif_->Db_Init("analytics::DbHandler", @@ -1674,13 +1698,15 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, const std::vector &cassandra_ips, const std::vector &cassandra_ports, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql) : + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper) : db_name_(db_name), db_task_instance_(db_task_instance), db_handler_(new DbHandler(evm, boost::bind(&DbHandlerInitializer::ScheduleInit, this), cassandra_ips, cassandra_ports, db_name, ttl_map, - cassandra_user, cassandra_password, use_cql)), + cassandra_user, cassandra_password, use_cql, + zookeeper_server_list, use_zookeeper)), callback_(callback), db_init_timer_(TimerManager::CreateTimer(*evm->io_service(), db_name + " Db Init Timer", diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index bf1817dd727..e9fcd9e5ef7 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -88,7 +88,8 @@ class DbHandler { std::string name, const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql); + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper); DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map); virtual ~DbHandler(); @@ -156,6 +157,8 @@ class DbHandler { boost::function cb); bool Setup(int instance); bool Initialize(int instance); + bool InitializeInternal(int instance); + bool InitializeInternalLocked(int instance); bool StatTableWrite(uint32_t t2, const std::string& statName, const std::string& statAttr, const std::pair& ptag, @@ -185,6 +188,8 @@ class DbHandler { bool use_cql_; std::string tablespace_; UniformInt8RandomGenerator gen_partition_no_; + std::string zookeeper_server_list_; + bool use_zookeeper_; DISALLOW_COPY_AND_ASSIGN(DbHandler); }; @@ -222,7 +227,9 @@ class DbHandlerInitializer { const TtlMap& ttl_map, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql); + bool use_cql, + const std::string &zookeeper_server_list, + bool use_zookeeper); DbHandlerInitializer(EventManager *evm, const std::string &db_name, int db_task_instance, const std::string &timer_task_name, InitializeDoneCb callback, diff --git a/src/analytics/generator.cc b/src/analytics/generator.cc index 7af6302d76d..5c129d9957e 100644 --- a/src/analytics/generator.cc +++ b/src/analytics/generator.cc @@ -108,7 +108,7 @@ SandeshGenerator::SandeshGenerator(Collector * const collector, VizSession *sess source + ":" + node_type + ":" + module + ":" + instance_id, collector->analytics_ttl_map(), collector->cassandra_user(), collector->cassandra_password(), - false)); + false, std::string(), false)); } else { //Use collector db_handler db_handler_ = global_db_handler; diff --git a/src/analytics/main.cc b/src/analytics/main.cc index fbe1bf4c08e..de372971d63 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -270,6 +270,8 @@ int main(int argc, char *argv[]) copy(cassandra_servers.begin(), cassandra_servers.end(), ostream_iterator(css, " ")); LOG(INFO, "COLLECTOR CASSANDRA SERVERS: " << css.str()); + LOG(INFO, "COLLECTOR ZOOKEEPER SERVERS: " << + options.zookeeper_server_list()); LOG(INFO, "COLLECTOR SYSLOG LISTEN PORT: " << options.syslog_port()); LOG(INFO, "COLLECTOR SFLOW LISTEN PORT: " << options.sflow_port()); LOG(INFO, "COLLECTOR IPFIX LISTEN PORT: " << options.ipfix_port()); @@ -328,6 +330,8 @@ int main(int argc, char *argv[]) //Get Platform info //cql not supported in precise, centos 6.4 6.5 bool use_cql = MiscUtils::IsCqlSupported(); + std::string zookeeper_server_list(options.zookeeper_server_list()); + bool use_zookeeper = !zookeeper_server_list.empty(); VizCollector analytics(a_evm, options.collector_port(), protobuf_server_enabled, @@ -346,7 +350,9 @@ int main(int argc, char *argv[]) options.kafka_prefix(), ttl_map, options.cassandra_user(), options.cassandra_password(), - use_cql); + use_cql, + zookeeper_server_list, + use_zookeeper); #if 0 // initialize python/c++ API diff --git a/src/analytics/options.cc b/src/analytics/options.cc index 5ab99910148..615dd7fa993 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -76,6 +76,8 @@ void Options::Initialize(EventManager &evm, #endif // !USE_CASSANDRA_CQL default_cassandra_server_list.push_back(default_cassandra_server); + string default_zookeeper_server("127.0.0.1:2181"); + vector default_kafka_broker_list; default_kafka_broker_list.push_back(""); @@ -117,6 +119,9 @@ void Options::Initialize(EventManager &evm, opt::value >()->default_value( default_cassandra_server_list, default_cassandra_server), "Cassandra server list") + ("DEFAULT.zookeeper_server_list", + opt::value()->default_value(""), + "Zookeeper server list") ("DEFAULT.kafka_broker_list", opt::value >()->default_value( default_kafka_broker_list, ""), @@ -316,6 +321,8 @@ void Options::Process(int argc, char *argv[], GetOptValue< vector >(var_map, cassandra_server_list_, "DEFAULT.cassandra_server_list"); + GetOptValue(var_map, zookeeper_server_list_, + "DEFAULT.zookeeper_server_list"); GetOptValue< vector >(var_map, kafka_broker_list_, "DEFAULT.kafka_broker_list"); GetOptValue(var_map, partitions_, "DEFAULT.partitions"); diff --git a/src/analytics/options.h b/src/analytics/options.h index e43bc7532ec..a67836b8703 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -17,6 +17,9 @@ class Options { const std::vector cassandra_server_list() const { return cassandra_server_list_; } + const std::string zookeeper_server_list() const { + return zookeeper_server_list_; + } const std::vector kafka_broker_list() const { return kafka_broker_list_; } @@ -127,6 +130,7 @@ class Options { uint64_t analytics_flow_ttl_; uint64_t analytics_statistics_ttl_; std::vector cassandra_server_list_; + std::string zookeeper_server_list_; std::vector kafka_broker_list_; uint16_t partitions_; uint32_t sandesh_ratelimit_; diff --git a/src/analytics/protobuf_collector.cc b/src/analytics/protobuf_collector.cc index 58f7c27ab44..fbcf4c6b2b7 100644 --- a/src/analytics/protobuf_collector.cc +++ b/src/analytics/protobuf_collector.cc @@ -24,7 +24,7 @@ ProtobufCollector::ProtobufCollector(EventManager *evm, db_initializer_.reset(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance, kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, false)); + cassandra_password, false, std::string(), false)); db_handler_ = db_initializer_->GetDbHandler(); } else { db_handler_ = global_dbhandler; diff --git a/src/analytics/test/db_handler_mock.h b/src/analytics/test/db_handler_mock.h index 80d9a29db00..da734d15426 100644 --- a/src/analytics/test/db_handler_mock.h +++ b/src/analytics/test/db_handler_mock.h @@ -14,7 +14,8 @@ class DbHandlerMock : public DbHandler { DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) : DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this), std::vector(1, "127.0.0.1"), - std::vector(1, 9160), "localhost", ttl_map, "", "", false) + std::vector(1, 9160), "localhost", ttl_map, "", "", false, + "", false) { } void StartDbifReinit() { diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index 42afa37ce72..eb71c2139e3 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -43,12 +43,13 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, const std::string &kafka_prefix, const TtlMap& ttl_map, const std::string &cassandra_user, const std::string &cassandra_password, - bool use_cql) : + bool use_cql, const std::string &zookeeper_server_list, + bool use_zookeeper) : db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1, std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map, cassandra_user, - cassandra_password, use_cql)), + cassandra_password, use_cql, zookeeper_server_list, use_zookeeper)), osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port, redis_password, brokers, partitions, kafka_prefix)), ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())), diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index e8c39ff28be..4b112c396d5 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -40,7 +40,9 @@ class VizCollector { const std::string &kafka_prefix, const TtlMap &ttlmap, const std::string& cassandra_user, const std::string& cassandra_password, - bool use_cql); + bool use_cql, + const std::string &zookeeper_server_list, + bool use_zookeeper); VizCollector(EventManager *evm, DbHandlerPtr db_handler, Ruleeng *ruleeng, Collector *collector, OpServerProxy *osp); diff --git a/src/opserver/test/test_analytics_uve.py b/src/opserver/test/test_analytics_uve.py index 9fbd58425b2..9615d6c2574 100755 --- a/src/opserver/test/test_analytics_uve.py +++ b/src/opserver/test/test_analytics_uve.py @@ -51,14 +51,11 @@ def setUpClass(cls): cls.redis_port = AnalyticsUveTest.get_free_port() mockredis.start_redis(cls.redis_port) - cls.zk_port = AnalyticsUveTest.get_free_port() - mockzoo.start_zoo(cls.zk_port) @classmethod def tearDownClass(cls): mockredis.stop_redis(cls.redis_port) - mockzoo.stop_zoo(cls.zk_port) #@unittest.skip('Skipping non-cassandra test with vizd') def test_00_nocassandra(self): @@ -154,7 +151,7 @@ def test_03_redis_uve_restart(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, -1, 0, - kafka_zk = self.__class__.zk_port)) + start_kafka = True)) assert vizd_obj.verify_on_setup() collectors = [vizd_obj.get_collector()] @@ -351,7 +348,7 @@ def test_06_alarmgen_basic(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, self.__class__.redis_port, 0, - kafka_zk = self.__class__.zk_port)) + start_kafka = True)) assert vizd_obj.verify_on_setup() assert(vizd_obj.verify_uvetable_alarm("ObjectCollectorInfo", @@ -458,7 +455,7 @@ def test_07_alarm(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, -1, 0, collector_ha_test=True, - kafka_zk = self.__class__.zk_port)) + start_kafka = True)) assert vizd_obj.verify_on_setup() # create alarm-generator and attach it to the first collector. @@ -572,7 +569,7 @@ def test_08_uve_alarm_filter(self): vizd_obj = self.useFixture( AnalyticsFixture(logging, builddir, -1, 0, - collector_ha_test=True, kafka_zk = self.__class__.zk_port)) + collector_ha_test=True, start_kafka = True)) assert vizd_obj.verify_on_setup() collectors = [vizd_obj.collectors[0].get_addr(), diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index 636092238f8..801141f8182 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -62,8 +62,9 @@ class Collector(object): def __init__(self, analytics_fixture, redis_uve, logger, ipfix_port = False, sflow_port = False, syslog_port = False, protobuf_port = True, - kafka = None, is_dup=False, - cassandra_user= None, cassandra_password= None): + kafka = None, is_dup = False, + cassandra_user = None, cassandra_password = None, + zookeeper = None): self.analytics_fixture = analytics_fixture if kafka is None: self.kafka_port = None @@ -91,6 +92,7 @@ def __init__(self, analytics_fixture, redis_uve, ':'+ModuleNames[Module.COLLECTOR]+':0' self.cassandra_user = analytics_fixture.cassandra_user self.cassandra_password = analytics_fixture.cassandra_password + self.zk_port = zookeeper.port # end __init__ def get_addr(self): @@ -166,6 +168,8 @@ def start(self): args.append('127.0.0.1:%d' % self.kafka_port) args.append('--DEFAULT.partitions') args.append(str(4)) + args.append('--DEFAULT.zookeeper_server_list') + args.append('127.0.0.1:%d' % self.zk_port) self._logger.info('Setting up Vizd: %s' % (' '.join(args))) ports, self._instance = \ self.analytics_fixture.start_with_ephemeral_ports( @@ -511,9 +515,8 @@ def stop(self): class Kafka(object): def __init__(self, zk_port): self.port = None - self.zk_port = zk_port - self.zk_start = False self.running = False + self.zk_port = zk_port # end __init__ def start(self): @@ -521,10 +524,6 @@ def start(self): self.running = True if not self.port: self.port = AnalyticsFixture.get_free_port() - if not self.zk_port: - self.zk_port = AnalyticsFixture.get_free_port() - mockzoo.start_zoo(self.zk_port) - self.zk_start = True mockkafka.start_kafka(self.zk_port, self.port) # end start @@ -532,18 +531,37 @@ def start(self): def stop(self): if self.running: mockkafka.stop_kafka(self.port) - if self.zk_start: - mockzoo.stop_zoo(self.zk_port) self.running = False #end stop # end class Kafka +class Zookeeper(object): + def __init__(self, zk_port=None): + self.running = False; + self.port = zk_port + # end __init__ + + def start(self): + assert(self.running == False) + if not self.port: + self.port = AnalyticsFixture.get_free_port() + mockzoo.start_zoo(self.port) + self.running = True + # end start + + def stop(self): + if self.running: + mockzoo.stop_zoo(self.port) + self.running = False + # end stop +# end class Zookeeper + class AnalyticsFixture(fixtures.Fixture): def __init__(self, logger, builddir, redis_port, cassandra_port, ipfix_port = False, sflow_port = False, syslog_port = False, protobuf_port = False, noqed=False, collector_ha_test=False, - redis_password=None, kafka_zk=0, + redis_password=None, start_kafka=False, cassandra_user=None, cassandra_password=None): self.builddir = builddir @@ -557,13 +575,14 @@ def __init__(self, logger, builddir, redis_port, cassandra_port, self.noqed = noqed self.collector_ha_test = collector_ha_test self.redis_password = redis_password - self.kafka_zk = kafka_zk + self.start_kafka = start_kafka self.kafka = None self.opserver = None self.query_engine = None self.alarmgen = None self.cassandra_user = cassandra_user self.cassandra_password = cassandra_password + self.zookeeper = None def setUp(self): super(AnalyticsFixture, self).setUp() @@ -572,8 +591,11 @@ def setUp(self): self.redis_password)] self.redis_uves[0].start() - if self.kafka_zk: - self.kafka = Kafka(self.kafka_zk) + self.zookeeper = Zookeeper() + self.zookeeper.start() + + if self.start_kafka: + self.kafka = Kafka(self.zookeeper.port) self.kafka.start() self.collectors = [Collector(self, self.redis_uves[0], self.logger, @@ -581,7 +603,8 @@ def setUp(self): sflow_port = self.sflow_port, syslog_port = self.syslog_port, protobuf_port = self.protobuf_port, - kafka = self.kafka)] + kafka = self.kafka, + zookeeper = self.zookeeper)] if not self.collectors[0].start(): self.logger.error("Collector did NOT start") return @@ -599,7 +622,8 @@ def setUp(self): self.collectors.append(Collector(self, self.redis_uves[1], self.logger, kafka = self.kafka, - is_dup=True)) + is_dup = True, + zookeeper = self.zookeeper)) if not self.collectors[1].start(): self.logger.error("Secondary Collector did NOT start") secondary_collector = self.collectors[1].get_addr() diff --git a/src/zookeeper/SConscript b/src/zookeeper/SConscript new file mode 100644 index 00000000000..16563c18953 --- /dev/null +++ b/src/zookeeper/SConscript @@ -0,0 +1,14 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +Import('BuildEnv') +ZooEnv = BuildEnv.Clone() + +ZooEnv.Append(CPPPATH = ['/usr/include']) +sources = ['zookeeper_client.cc'] + +libzoo = ZooEnv.Library('zookeeper_client', source = sources) +ZooEnv.Install(ZooEnv['TOP_LIB'], libzoo) + +ZooEnv.SConscript('test/SConscript', exports='ZooEnv', duplicate=0) diff --git a/src/zookeeper/test/SConscript b/src/zookeeper/test/SConscript new file mode 100644 index 00000000000..16a813df157 --- /dev/null +++ b/src/zookeeper/test/SConscript @@ -0,0 +1,26 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +Import('ZooEnv') + +env = ZooEnv.Clone() + +def MapBuildDir(list): + return map(lambda x: env['TOP'] + '/' + x, list) + +libs = ['zookeeper_client', 'base', 'zookeeper_mt', 'gunit'] +env.Prepend(LIBS=libs) +libpaths=['base'] +env.Append(LIBPATH = [MapBuildDir(libpaths)]) + +zookeeper_client_test = env.UnitTest('zookeeper_client_test', + ['zookeeper_client_test.cc']) + +test_suite = [ zookeeper_client_test ] +test = env.TestSuite('zookeeper_test_suite', test_suite) +env.Alias('controller/src/zookeeper:test', test) + +flaky_test_suite = [] +flaky_test = env.TestSuite('zookeeper_flaky_test_suite', flaky_test_suite) +env.Alias('controller/src/zookeeper:flaky-test', flaky_test) diff --git a/src/zookeeper/test/zookeeper_client_test.cc b/src/zookeeper/test/zookeeper_client_test.cc new file mode 100644 index 00000000000..89a7673bbbe --- /dev/null +++ b/src/zookeeper/test/zookeeper_client_test.cc @@ -0,0 +1,173 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#include + +#include +#include +#include +#include + +using ::testing::Return; +using ::testing::WithArgs; +using ::testing::DoAll; +using ::testing::SetArgPointee; +using ::testing::_; +using ::testing::StrEq; + +using namespace zookeeper::client; +using namespace zookeeper::interface; + +class ZookeeperMockInterface : public ZookeeperInterface { + public: + ZookeeperMockInterface() { + } + virtual ~ZookeeperMockInterface() { + } + MOCK_METHOD1(ZooSetDebugLevel, void(ZooLogLevel logLevel)); + MOCK_METHOD6(ZookeeperInit, zhandle_t*(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, + int flags)); + MOCK_METHOD1(ZookeeperClose, int(zhandle_t *zh)); + MOCK_METHOD1(ZooState, int(zhandle_t *zh)); + MOCK_METHOD8(ZooCreate, int(zhandle_t *zh, const char *path, + const char *value, int valuelen, const struct ACL_vector *acl, + int flags, char *path_buffer, int path_buffer_len)); + MOCK_METHOD3(ZooDelete, int(zhandle_t *zh, const char *path, + int version)); + MOCK_METHOD6(ZooGet, int(zhandle_t *zh, const char *path, int watch, + char *buffer, int* buffer_len, struct Stat *stat)); +}; + +class ZookeeperClientTest : public ::testing::Test { + protected: + ZookeeperClientTest() { + } + ~ZookeeperClientTest() { + } + virtual void SetUp() { + } + virtual void TearDown() { + } + ZookeeperClient* CreateClient(impl::ZookeeperClientImpl *impl) { + return new ZookeeperClient(impl); + } + std::string GetLockId(const ZookeeperLock &zk_lock) { + return zk_lock.Id(); + } +}; + +TEST_F(ZookeeperClientTest, Basic) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + int zkh(0xdeadbeef); + zhandle_t *zk_handle = (zhandle_t *)(&zkh); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)) + .WillOnce(Return(zk_handle)); + EXPECT_CALL(*zmi, ZooState(zk_handle)) + .WillOnce(Return(ZOO_CONNECTED_STATE)); + EXPECT_CALL(*zmi, ZooCreate(zk_handle, StrEq(zk_lock_name), + StrEq(zk_lock_id), zk_lock_id.length(), _, _, _, _)) + .WillOnce(Return(ZOK)); + EXPECT_TRUE(zk_lock.Lock()); + EXPECT_TRUE(cImpl->IsConnected()); + EXPECT_CALL(*zmi, ZooDelete(zk_handle, StrEq(zk_lock_name), _)) + .WillOnce(Return(ZOK)); + EXPECT_CALL(*zmi, ZookeeperClose(zk_handle)); + EXPECT_TRUE(zk_lock.Release()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +TEST_F(ZookeeperClientTest, ZookeeperInitFail) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)); + EXPECT_FALSE(zk_lock.Lock()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +TEST_F(ZookeeperClientTest, ZooStateConnecting2Connect) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + int zkh(0xdeadbeef); + zhandle_t *zk_handle = (zhandle_t *)(&zkh); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)) + .WillOnce(Return(zk_handle)); + EXPECT_CALL(*zmi, ZooState(zk_handle)) + .WillOnce(Return(ZOO_CONNECTING_STATE)) + .WillOnce(Return(ZOO_CONNECTED_STATE)); + EXPECT_CALL(*zmi, ZooCreate(zk_handle, StrEq(zk_lock_name), + StrEq(zk_lock_id), zk_lock_id.length(), _, _, _, _)) + .WillOnce(Return(ZOK)); + EXPECT_TRUE(zk_lock.Lock()); + EXPECT_TRUE(cImpl->IsConnected()); + EXPECT_CALL(*zmi, ZooDelete(zk_handle, StrEq(zk_lock_name), _)) + .WillOnce(Return(ZOK)); + EXPECT_CALL(*zmi, ZookeeperClose(zk_handle)); + EXPECT_TRUE(zk_lock.Release()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +ACTION_P(StrCpyToArg0, str) { + std::strcpy(arg0, str); +} + +TEST_F(ZookeeperClientTest, ZooCreateNodeExists) { + ZookeeperMockInterface *zmi(new ZookeeperMockInterface); + EXPECT_CALL(*zmi, ZooSetDebugLevel(_)); + impl::ZookeeperClientImpl *cImpl( + new impl::ZookeeperClientImpl("Test", "127.0.0.1:2181", zmi)); + std::auto_ptr client(CreateClient(cImpl)); + std::string zk_lock_name("/test-lock"); + ZookeeperLock zk_lock(client.get(), zk_lock_name.c_str()); + std::string zk_lock_id(GetLockId(zk_lock)); + int zkh(0xdeadbeef); + zhandle_t *zk_handle = (zhandle_t *)(&zkh); + EXPECT_CALL(*zmi, ZookeeperInit(StrEq("127.0.0.1:2181"), _, _, _, _, _)) + .WillOnce(Return(zk_handle)); + EXPECT_CALL(*zmi, ZooState(zk_handle)) + .WillOnce(Return(ZOO_CONNECTED_STATE)); + EXPECT_CALL(*zmi, ZooCreate(zk_handle, StrEq(zk_lock_name), + StrEq(zk_lock_id), zk_lock_id.length(), _, _, _, _)) + .WillOnce(Return(ZNODEEXISTS)) + .WillOnce(Return(ZNODEEXISTS)); + std::string other_zk_lock_id(zk_lock_id); + other_zk_lock_id += "-other"; + EXPECT_CALL(*zmi, ZooGet(zk_handle, StrEq(zk_lock_name), _, _, _, _)) + .WillOnce(DoAll(WithArgs<3>(StrCpyToArg0(other_zk_lock_id.c_str())), + SetArgPointee<4>((int)other_zk_lock_id.length()), Return(ZOK))) + .WillOnce(DoAll(WithArgs<3>(StrCpyToArg0(zk_lock_id.c_str())), + SetArgPointee<4>((int)zk_lock_id.length()), Return(ZOK))); + EXPECT_TRUE(zk_lock.Lock()); + EXPECT_TRUE(cImpl->IsConnected()); + EXPECT_CALL(*zmi, ZooDelete(zk_handle, StrEq(zk_lock_name), _)) + .WillOnce(Return(ZOK)); + EXPECT_CALL(*zmi, ZookeeperClose(zk_handle)); + EXPECT_TRUE(zk_lock.Release()); + EXPECT_FALSE(cImpl->IsConnected()); +} + +int main(int argc, char **argv) { + LoggingInit(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/zookeeper/zookeeper_client.cc b/src/zookeeper/zookeeper_client.cc new file mode 100644 index 00000000000..552732c9b32 --- /dev/null +++ b/src/zookeeper/zookeeper_client.cc @@ -0,0 +1,362 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#include +#include +#include + +#include + +#include +#include +#include + +#define ZOO_LOG(_Level, _Msg) \ + do { \ + if (LoggingDisabled()) break; \ + log4cplus::Logger logger = log4cplus::Logger::getRoot(); \ + LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \ + __LINE__ << ": " << _Msg); \ + } while (false) + +#define ZOO_LOG_ERR(_Msg) \ + do { \ + LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \ + << _Msg); \ + } while (false) + +namespace zookeeper { +namespace interface { + +class ZookeeperCBindings : public ZookeeperInterface { + public: + ZookeeperCBindings() { + } + virtual ~ZookeeperCBindings() { + } + virtual void ZooSetDebugLevel(ZooLogLevel logLevel) { + zoo_set_debug_level(logLevel); + } + virtual zhandle_t* ZookeeperInit(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, + int flags) { + return zookeeper_init(host, fn, recv_timeout, clientid, context, + flags); + } + virtual int ZookeeperClose(zhandle_t *zh) { + return zookeeper_close(zh); + } + virtual int ZooState(zhandle_t *zh) { + return zoo_state(zh); + } + virtual int ZooCreate(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len) { + return zoo_create(zh, path, value, valuelen, acl, flags, path_buffer, + path_buffer_len); + } + virtual int ZooDelete(zhandle_t *zh, const char *path, int version) { + return zoo_delete(zh, path, version); + } + virtual int ZooGet(zhandle_t *zh, const char *path, int watch, + char *buffer, int* buffer_len, struct Stat *stat) { + return zoo_get(zh, path, watch, buffer, buffer_len, stat); + } +}; + +} // namespace interface + +namespace client { +namespace impl { + +// ZookeeperClientImpl +ZookeeperClientImpl::ZookeeperClientImpl(const char *hostname, + const char *servers, zookeeper::interface::ZookeeperInterface *zki) : + hostname_(hostname), + servers_(servers), + zk_handle_(NULL), + connected_(false), + zki_(zki) { + // Set loglevel + zki_->ZooSetDebugLevel(ZOO_LOG_LEVEL_DEBUG); +} + +ZookeeperClientImpl::~ZookeeperClientImpl() { +} + +bool ZookeeperClientImpl::Connect() { + while (true) { + zk_handle_ = zki_->ZookeeperInit(servers_.c_str(), + NULL, + kSessionTimeoutMSec_, + NULL, + NULL, + 0); + // Unfortunately, EINVAL is highly overloaded in zookeeper_init + // and can correspond to: + // (1) Empty / invalid 'host' string format. + // (2) Any getaddrinfo error other than EAI_NONAME, + // EAI_NODATA, and EAI_MEMORY are mapped to EINVAL. + // Either way, retrying is not problematic. + if (zk_handle_ == NULL && errno == EINVAL) { + ZOO_LOG(WARN, "zookeeper_init FAILED: (" << errno << + "): retrying in 1 second"); + sleep(1); + continue; + } + if (zk_handle_ == NULL) { + int zerrno(errno); + ZOO_LOG_ERR("zookeeper_init returned NULL zhandle: (" + << zerrno << ")"); + return false; + } + // Block till session is connected + while (!connected_) { + int zstate(zki_->ZooState(zk_handle_)); + if (zstate == ZOO_CONNECTED_STATE) { + connected_ = true; + ZOO_LOG(DEBUG, "Session CONNECTED"); + break; + } else { + ZOO_LOG(DEBUG, "Session NOT CONNECTED: retrying in 1 second"); + sleep(1); + continue; + } + } + break; + } + assert(connected_); + return true; +} + +void ZookeeperClientImpl::Shutdown() { + if (zk_handle_) { + int rc(zki_->ZookeeperClose(zk_handle_)); + if (rc != ZOK) { + int zerrno(errno); + ZOO_LOG(WARN, "zookeeper_close FAILED (" << rc << + "): errno: " << zerrno); + } + zk_handle_ = NULL; + } + connected_ = false; +} + +bool ZookeeperClientImpl::Reconnect() { + Shutdown(); + return Connect(); +} + +bool ZookeeperClientImpl::IsConnected() const { + return connected_; +} + +int ZookeeperClientImpl::CreateNodeSync(const char *path, const char *value, + int *err) { + int rc; + // Session expired state or auth failed state + while ((rc = zki_->ZooCreate(zk_handle_, path, value, strlen(value), + &ZOO_OPEN_ACL_UNSAFE, 0, NULL, -1)) == ZINVALIDSTATE) { + // Reconnect + Reconnect(); + } + if (rc != ZOK) { + *err = errno; + } + return rc; +} + +int ZookeeperClientImpl::GetNodeDataSync(const char *path, char *buf, + int *buf_len, int *err) { + int rc; + // Session expired state or auth failed state + while ((rc = zki_->ZooGet(zk_handle_, path, 0, buf, buf_len, NULL)) == + ZINVALIDSTATE) { + // Reconnect + Reconnect(); + } + if (rc != ZOK) { + *err = errno; + } + return rc; +} + +int ZookeeperClientImpl::DeleteNodeSync(const char *path, int *err) { + int rc; + // Session expired state or auth failed state + while ((rc = zki_->ZooDelete(zk_handle_, path, -1)) == ZINVALIDSTATE) { + // Reconnect + Reconnect(); + } + if (rc != ZOK) { + *err = errno; + } + return rc; +} + +std::string ZookeeperClientImpl::Name() const { + return hostname_; +} + +} // namespace impl + +// ZookeeperClient +ZookeeperClient::ZookeeperClient(const char *hostname, const char *servers) : + impl_(new impl::ZookeeperClientImpl(hostname, servers, + new zookeeper::interface::ZookeeperCBindings)) { +} + +ZookeeperClient::ZookeeperClient(impl::ZookeeperClientImpl *impl) : + impl_(impl) { +} + +ZookeeperClient::~ZookeeperClient() { +} + +// ZookeeperLockImpl +class ZookeeperLock::ZookeeperLockImpl { + public: + ZookeeperLockImpl(impl::ZookeeperClientImpl *clientImpl, + const char *path) : + clientImpl_(clientImpl), + path_(path), + is_acquired_(false) { + // Generate unique ID for data + boost::uuids::uuid ruuid(rgen_()); + id_ = clientImpl_->Name() + "-" + to_string(ruuid); + } + + std::string Id() const { + return id_; + } + + bool Lock() { + ZOO_LOG(INFO, "Trying (" << path_ << "): " << id_); + while (true) { + // Connect if not already done + if (!clientImpl_->IsConnected()) { + bool success(clientImpl_->Connect()); + if (!success) { + ZOO_LOG_ERR("Zookeeper Client Connect FAILED"); + return success; + } + } + // Try creating the znode + int err; + int rc(clientImpl_->CreateNodeSync(path_.c_str(), id_.c_str(), + &err)); + switch (rc) { + case ZOK: { + // We acquired the lock + ZOO_LOG(INFO, "ACQUIRED (" << path_ << "): " << id_); + is_acquired_ = true; + return true; + } + case ZNODEEXISTS: { + // Node exists, get node data and check + char buf[256]; + int buf_len(sizeof(buf)); + int zerr; + int zrc(clientImpl_->GetNodeDataSync(path_.c_str(), buf, + &buf_len, &zerr)); + if (zrc == ZOK) { + // Does it match our ID? + std::string mid(buf, buf_len); + if (id_ == mid) { + // We acquired the lock + ZOO_LOG(INFO, "ACQUIRED EEXIST (" << path_ << "): " + << id_); + is_acquired_ = true; + return true; + } + ZOO_LOG(INFO, "EEXIST (" << path_ << "): " << mid << + " , ours: " << id_); + sleep(1); + continue; + } else if (zrc == ZNONODE) { + ZOO_LOG(WARN, "GetNodeDataSync(" << path_ << + "): Data: " << id_ << + ": No Node EXISTS: retrying in 1 second"); + sleep(1); + continue; + } else { + ZOO_LOG_ERR("GetNodeDataSync(" << path_ << "): " << + id_ << ": FAILED: (" << zrc << ") error: " << zerr); + clientImpl_->Shutdown(); + return false; + } + break; + } + default: { + ZOO_LOG_ERR("CreateNodeSync(" << path_ << "): " << id_ + << ": FAILED: (" << rc << ") error: " << err); + clientImpl_->Shutdown(); + return false; + } + } + } + } + + bool Release() { + bool success; + int err, rc; + if (!is_acquired_) { + ZOO_LOG_ERR("(" << path_ << "): " << id_ << + ": Release WITHOUT Lock"); + success = false; + goto cleanup; + } + is_acquired_ = false; + rc = clientImpl_->DeleteNodeSync(path_.c_str(), &err); + if (rc == ZOK) { + ZOO_LOG(INFO, "RELEASED (" << path_ << "): " << id_); + success = true; + goto cleanup; + } else if (rc == ZNONODE) { + ZOO_LOG_ERR("DeleteNodeSync(" << path_ << "): " << id_ << + ": No Node EXISTS(" << err << + "): Possible concurrent execution"); + success = false; + goto cleanup; + } else { + ZOO_LOG_ERR("DeleteNodeSync(" << path_ << "): " << id_ << + ": FAILED (" << rc << "): error " << err); + success = false; + goto cleanup; + } + cleanup: + clientImpl_->Shutdown(); + return success; + } + + private: + impl::ZookeeperClientImpl *clientImpl_; + std::string path_; + bool is_acquired_; + boost::uuids::random_generator rgen_; + std::string id_; +}; + +// ZookeeperLock +ZookeeperLock::ZookeeperLock(ZookeeperClient *client, const char *path) : + impl_(new ZookeeperLockImpl(client->impl_.get(), path)) { +} + +ZookeeperLock::~ZookeeperLock() { +} + +std::string ZookeeperLock::Id() const { + return impl_->Id(); +} + +bool ZookeeperLock::Lock() { + return impl_->Lock(); +} + +bool ZookeeperLock::Release() { + return impl_->Release(); +} + +} // namespace client +} // namespace zookeeper diff --git a/src/zookeeper/zookeeper_client.h b/src/zookeeper/zookeeper_client.h new file mode 100644 index 00000000000..c0c2bf57536 --- /dev/null +++ b/src/zookeeper/zookeeper_client.h @@ -0,0 +1,59 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef ZOOKEEPER_ZOOKEEPER_CLIENT_H_ +#define ZOOKEEPER_ZOOKEEPER_CLIENT_H_ + +class ZookeeperClientTest; + +namespace zookeeper { +namespace client { + +// Forward declarations +namespace impl { +class ZookeeperClientImpl; +} // namespace impl + +// +// Blocking, synchronous, non-thread safe Zookeeper client +// +class ZookeeperClient { + public: + ZookeeperClient(const char *hostname, const char *servers); + virtual ~ZookeeperClient(); + + private: + ZookeeperClient(impl::ZookeeperClientImpl *impl); + + friend class ZookeeperLock; + friend class ::ZookeeperClientTest; + + std::auto_ptr impl_; +}; + +// +// Usage is to first create a ZookeeperClient, and then ZookeeperLock +// for distributed synchronization +// +class ZookeeperLock { + public: + ZookeeperLock(ZookeeperClient *client, const char *path); + virtual ~ZookeeperLock(); + + bool Lock(); + bool Release(); + + private: + class ZookeeperLockImpl; + friend class ::ZookeeperClientTest; + + std::string Id() const; + + std::auto_ptr impl_; +}; + +} // namespace client +} // namespace zookeeper + +#endif // ZOOKEEPER_ZOOKEEPER_CLIENT_H_ diff --git a/src/zookeeper/zookeeper_client_impl.h b/src/zookeeper/zookeeper_client_impl.h new file mode 100644 index 00000000000..7048b53700e --- /dev/null +++ b/src/zookeeper/zookeeper_client_impl.h @@ -0,0 +1,48 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef ZOOKEEPER_ZOOKEEPER_CLIENT_IMPL_H_ +#define ZOOKEEPER_ZOOKEEPER_CLIENT_IMPL_H_ + +#include + +#include + +namespace zookeeper { +namespace client { +namespace impl { + +// +// Blocking, synchronous, non-thread safe Zookeeper client +// +class ZookeeperClientImpl { + public: + ZookeeperClientImpl(const char *hostname, const char *servers, + zookeeper::interface::ZookeeperInterface *zki); + virtual ~ZookeeperClientImpl(); + + bool Connect(); + void Shutdown(); + bool Reconnect(); + bool IsConnected() const; + int CreateNodeSync(const char *path, const char *value, int *err); + int GetNodeDataSync(const char *path, char *buf, int *buf_len, int *err); + int DeleteNodeSync(const char *path, int *err); + std::string Name() const; + + private: + static const int kSessionTimeoutMSec_ = 10000; + + std::string hostname_; + std::string servers_; + zhandle_t *zk_handle_; + bool connected_; + std::auto_ptr zki_; +}; + +} // namespace impl +} // namespace client +} // namespace zookeeper + +#endif // ZOOKEEPER_ZOOKEEPER_CLIENT_IMPL_H_ diff --git a/src/zookeeper/zookeeper_interface.h b/src/zookeeper/zookeeper_interface.h new file mode 100644 index 00000000000..a5cf6dd8bde --- /dev/null +++ b/src/zookeeper/zookeeper_interface.h @@ -0,0 +1,33 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef ZOOKEEPER_ZOOKEEPER_INTERFACE_H_ +#define ZOOKEEPER_ZOOKEEPER_INTERFACE_H_ + +#include + +namespace zookeeper { +namespace interface { + +class ZookeeperInterface { + public: + virtual ~ZookeeperInterface() {} + virtual void ZooSetDebugLevel(ZooLogLevel logLevel) = 0; + virtual zhandle_t* ZookeeperInit(const char *host, watcher_fn fn, + int recv_timeout, const clientid_t *clientid, void *context, + int flags) = 0; + virtual int ZookeeperClose(zhandle_t *zh) = 0; + virtual int ZooState(zhandle_t *zh) = 0; + virtual int ZooCreate(zhandle_t *zh, const char *path, const char *value, + int valuelen, const struct ACL_vector *acl, int flags, + char *path_buffer, int path_buffer_len) = 0; + virtual int ZooDelete(zhandle_t *zh, const char *path, int version) = 0; + virtual int ZooGet(zhandle_t *zh, const char *path, int watch, + char *buffer, int* buffer_len, struct Stat *stat) = 0; +}; + +} // namespace interface +} // namespace zookeeper + +#endif // ZOOKEEPER_ZOOKEEPER_INTERFACE_H_