From 78be0bb6aebfc69188abfe2541f71827ea6319e7 Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Fri, 9 Sep 2016 17:01:57 -0700 Subject: [PATCH] Add CassLibrary interface to CQL driver Add CassLibrary interface to CQL driver so that all calls to Datastax CPP CQL driver are made via the interface and hence it becomes easier to test the CqlIfImpl class. CassDatastaxLibrary provides the implementation of the CassLibrary interface. Change-Id: I11dafe65dc474d51af3a28a24c93d392ba317972 Closes-Bug: #1622042 --- src/database/cassandra/cql/cql_if.cc | 1559 +++++++++++++--------- src/database/cassandra/cql/cql_if.h | 8 +- src/database/cassandra/cql/cql_if_impl.h | 245 +++- src/database/cassandra/cql/cql_lib_if.h | 305 +++++ 4 files changed, 1455 insertions(+), 662 deletions(-) create mode 100644 src/database/cassandra/cql/cql_lib_if.h diff --git a/src/database/cassandra/cql/cql_if.cc b/src/database/cassandra/cql/cql_if.cc index 7f6ee08a81f..bed15156218 100644 --- a/src/database/cassandra/cql/cql_if.cc +++ b/src/database/cassandra/cql/cql_if.cc @@ -20,6 +20,7 @@ #include #include #include +#include #define CQLIF_LOG(_Level, _Msg) \ do { \ @@ -39,98 +40,6 @@ namespace cass { namespace cql { namespace impl { -// CQL Library Shared Pointers to handle library free calls -template -struct Deleter; - -template<> -struct Deleter { - void operator()(CassCluster *ptr) { - if (ptr != NULL) { - cass_cluster_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassSession* ptr) { - if (ptr != NULL) { - cass_session_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassFuture* ptr) { - if (ptr != NULL) { - cass_future_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassStatement* ptr) { - if (ptr != NULL) { - cass_statement_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(const CassResult* ptr) { - if (ptr != NULL) { - cass_result_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassIterator* ptr) { - if (ptr != NULL) { - cass_iterator_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(const CassPrepared* ptr) { - if (ptr != NULL) { - cass_prepared_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(const CassSchemaMeta* ptr) { - if (ptr != NULL) { - cass_schema_meta_free(ptr); - } - } -}; - -template -class CassSharedPtr : public boost::shared_ptr { -public: - explicit CassSharedPtr(T* ptr = NULL) - : boost::shared_ptr(ptr, Deleter()) {} -}; - -typedef CassSharedPtr CassClusterPtr; -typedef CassSharedPtr CassSessionPtr; -typedef CassSharedPtr CassFuturePtr; -typedef CassSharedPtr CassStatementPtr; -typedef CassSharedPtr CassResultPtr; -typedef CassSharedPtr CassIteratorPtr; -typedef CassSharedPtr CassPreparedPtr; -typedef CassSharedPtr CassSchemaMetaPtr; - // CassString convenience structure struct CassString { CassString() : @@ -286,45 +195,47 @@ class CassQueryPrinter : public boost::static_visitor<> { // class CassStatementIndexBinder : public boost::static_visitor<> { public: - CassStatementIndexBinder(CassStatement *statement) : + CassStatementIndexBinder(interface::CassLibrary *cci, + CassStatement *statement) : + cci_(cci), statement_(statement) { } void operator()(const boost::blank &tblank, size_t index) const { assert(false && "CassStatement bind to boost::blank not supported"); } void operator()(const std::string &tstring, size_t index) const { - CassError rc(cass_statement_bind_string_n(statement_, index, + CassError rc(cci_->CassStatementBindStringN(statement_, index, tstring.c_str(), tstring.length())); assert(rc == CASS_OK); } void operator()(const boost::uuids::uuid &tuuid, size_t index) const { CassUuid cuuid; decode_uuid((char *)&tuuid, &cuuid); - CassError rc(cass_statement_bind_uuid(statement_, index, cuuid)); + CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid)); assert(rc == CASS_OK); } void operator()(const uint8_t &tu8, size_t index) const { - CassError rc(cass_statement_bind_int32(statement_, index, tu8)); + CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8)); assert(rc == CASS_OK); } void operator()(const uint16_t &tu16, size_t index) const { - CassError rc(cass_statement_bind_int32(statement_, index, tu16)); + CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16)); assert(rc == CASS_OK); } void operator()(const uint32_t &tu32, size_t index) const { assert(tu32 <= (uint32_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int32(statement_, index, + CassError rc(cci_->CassStatementBindInt32(statement_, index, (cass_int32_t)tu32)); assert(rc == CASS_OK); } void operator()(const uint64_t &tu64, size_t index) const { assert(tu64 <= (uint64_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int64(statement_, index, + CassError rc(cci_->CassStatementBindInt64(statement_, index, (cass_int64_t)tu64)); assert(rc == CASS_OK); } void operator()(const double &tdouble, size_t index) const { - CassError rc(cass_statement_bind_double(statement_, index, + CassError rc(cci_->CassStatementBindDouble(statement_, index, (cass_double_t)tdouble)); assert(rc == CASS_OK); } @@ -332,62 +243,65 @@ class CassStatementIndexBinder : public boost::static_visitor<> { CassInet cinet; if (tipaddr.is_v4()) { boost::asio::ip::address_v4 tv4(tipaddr.to_v4()); - cinet = cass_inet_init_v4(tv4.to_bytes().c_array()); + cinet = cci_->CassInetInitV4(tv4.to_bytes().c_array()); } else { boost::asio::ip::address_v6 tv6(tipaddr.to_v6()); - cinet = cass_inet_init_v6(tv6.to_bytes().c_array()); + cinet = cci_->CassInetInitV6(tv6.to_bytes().c_array()); } - CassError rc(cass_statement_bind_inet(statement_, index, + CassError rc(cci_->CassStatementBindInet(statement_, index, cinet)); assert(rc == CASS_OK); } + interface::CassLibrary *cci_; CassStatement *statement_; }; class CassStatementNameBinder : public boost::static_visitor<> { public: - CassStatementNameBinder(CassStatement *statement) : + CassStatementNameBinder(interface::CassLibrary *cci, + CassStatement *statement) : + cci_(cci), statement_(statement) { } void operator()(const boost::blank &tblank, const char *name) const { assert(false && "CassStatement bind to boost::blank not supported"); } void operator()(const std::string &tstring, const char *name) const { - CassError rc(cass_statement_bind_string_by_name_n(statement_, name, + CassError rc(cci_->CassStatementBindStringByNameN(statement_, name, strlen(name), tstring.c_str(), tstring.length())); assert(rc == CASS_OK); } void operator()(const boost::uuids::uuid &tuuid, const char *name) const { CassUuid cuuid; decode_uuid((char *)&tuuid, &cuuid); - CassError rc(cass_statement_bind_uuid_by_name(statement_, name, + CassError rc(cci_->CassStatementBindUuidByName(statement_, name, cuuid)); assert(rc == CASS_OK); } void operator()(const uint8_t &tu8, const char *name) const { - CassError rc(cass_statement_bind_int32_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt32ByName(statement_, name, tu8)); assert(rc == CASS_OK); } void operator()(const uint16_t &tu16, const char *name) const { - CassError rc(cass_statement_bind_int32_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt32ByName(statement_, name, tu16)); assert(rc == CASS_OK); } void operator()(const uint32_t &tu32, const char *name) const { assert(tu32 <= (uint32_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int32_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt32ByName(statement_, name, (cass_int32_t)tu32)); assert(rc == CASS_OK); } void operator()(const uint64_t &tu64, const char *name) const { assert(tu64 <= (uint64_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int64_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt64ByName(statement_, name, (cass_int64_t)tu64)); assert(rc == CASS_OK); } void operator()(const double &tdouble, const char *name) const { - CassError rc(cass_statement_bind_double_by_name(statement_, name, + CassError rc(cci_->CassStatementBindDoubleByName(statement_, name, (cass_double_t)tdouble)); assert(rc == CASS_OK); } @@ -395,15 +309,16 @@ class CassStatementNameBinder : public boost::static_visitor<> { CassInet cinet; if (tipaddr.is_v4()) { boost::asio::ip::address_v4 tv4(tipaddr.to_v4()); - cinet = cass_inet_init_v4(tv4.to_bytes().c_array()); + cinet = cci_->CassInetInitV4(tv4.to_bytes().c_array()); } else { boost::asio::ip::address_v6 tv6(tipaddr.to_v6()); - cinet = cass_inet_init_v6(tv6.to_bytes().c_array()); + cinet = cci_->CassInetInitV6(tv6.to_bytes().c_array()); } - CassError rc(cass_statement_bind_inet_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInetByName(statement_, name, cinet)); assert(rc == CASS_OK); } + interface::CassLibrary *cci_; CassStatement *statement_; }; @@ -679,9 +594,10 @@ std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf) { // Cf2CassPrepareBind // -bool StaticCf2CassPrepareBind(CassStatement *statement, +bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, + CassStatement *statement, const GenDb::ColList *v_columns) { - CassStatementNameBinder values_binder(statement); + CassStatementNameBinder values_binder(cci, statement); // Row keys const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_); int rk_size(rkeys.size()); @@ -713,15 +629,16 @@ bool StaticCf2CassPrepareBind(CassStatement *statement, cttl = column.ttl; idx++; } - CassError rc(cass_statement_bind_int32(statement, idx++, + CassError rc(cci->CassStatementBindInt32(statement, idx++, (cass_int32_t)cttl)); assert(rc == CASS_OK); return true; } -bool DynamicCf2CassPrepareBind(CassStatement *statement, +bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, + CassStatement *statement, const GenDb::ColList *v_columns) { - CassStatementIndexBinder values_binder(statement); + CassStatementIndexBinder values_binder(cci, statement); // Row keys const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_); int rk_size(rkeys.size()); @@ -746,7 +663,7 @@ bool DynamicCf2CassPrepareBind(CassStatement *statement, boost::apply_visitor(boost::bind(values_binder, _1, idx++), cvalues[0]); } - CassError rc(cass_statement_bind_int32(statement, idx++, + CassError rc(cci->CassStatementBindInt32(statement, idx++, (cass_int32_t)column.ttl)); assert(rc == CASS_OK); return true; @@ -826,21 +743,22 @@ std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable( return CassSelectFromTableInternal(table, rkeys, ck_range); } -static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) { - CassValueType cvtype(cass_value_type(cvalue)); +static GenDb::DbDataValue CassValue2DbDataValue( + interface::CassLibrary *cci, const CassValue *cvalue) { + CassValueType cvtype(cci->GetCassValueType(cvalue)); switch (cvtype) { case CASS_VALUE_TYPE_ASCII: case CASS_VALUE_TYPE_VARCHAR: case CASS_VALUE_TYPE_TEXT: { CassString ctstring; - CassError rc(cass_value_get_string(cvalue, &ctstring.data, + CassError rc(cci->CassValueGetString(cvalue, &ctstring.data, &ctstring.length)); assert(rc == CASS_OK); return std::string(ctstring.data, ctstring.length); } case CASS_VALUE_TYPE_UUID: { CassUuid ctuuid; - CassError rc(cass_value_get_uuid(cvalue, &ctuuid)); + CassError rc(cci->CassValueGetUuid(cvalue, &ctuuid)); assert(rc == CASS_OK); boost::uuids::uuid u; encode_uuid((char *)&u, ctuuid); @@ -848,37 +766,37 @@ static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) { } case CASS_VALUE_TYPE_DOUBLE: { cass_double_t ctdouble; - CassError rc(cass_value_get_double(cvalue, &ctdouble)); + CassError rc(cci->CassValueGetDouble(cvalue, &ctdouble)); assert(rc == CASS_OK); return (double)ctdouble; } case CASS_VALUE_TYPE_TINY_INT: { cass_int8_t ct8; - CassError rc(cass_value_get_int8(cvalue, &ct8)); + CassError rc(cci->CassValueGetInt8(cvalue, &ct8)); assert(rc == CASS_OK); return (uint8_t)ct8; } case CASS_VALUE_TYPE_SMALL_INT: { cass_int16_t ct16; - CassError rc(cass_value_get_int16(cvalue, &ct16)); + CassError rc(cci->CassValueGetInt16(cvalue, &ct16)); assert(rc == CASS_OK); return (uint16_t)ct16; } case CASS_VALUE_TYPE_INT: { cass_int32_t ct32; - CassError rc(cass_value_get_int32(cvalue, &ct32)); + CassError rc(cci->CassValueGetInt32(cvalue, &ct32)); assert(rc == CASS_OK); return (uint32_t)ct32; } case CASS_VALUE_TYPE_BIGINT: { cass_int64_t ct64; - CassError rc(cass_value_get_int64(cvalue, &ct64)); + CassError rc(cci->CassValueGetInt64(cvalue, &ct64)); assert(rc == CASS_OK); return (uint64_t)ct64; } case CASS_VALUE_TYPE_INET: { CassInet ctinet; - CassError rc(cass_value_get_inet(cvalue, &ctinet)); + CassError rc(cci->CassValueGetInet(cvalue, &ctinet)); assert(rc == CASS_OK); IpAddress ipaddr; if (ctinet.address_length == CASS_INET_V4_LENGTH) { @@ -906,75 +824,71 @@ static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) { } } -static bool PrepareSync(CassSession *session, const char* query, +static bool PrepareSync(interface::CassLibrary *cci, + CassSession *session, const char* query, CassPreparedPtr *prepared) { CQLIF_LOG(DEBUG, "PrepareSync: " << query); - CassFuturePtr future(cass_session_prepare(session, query)); - cass_future_wait(future.get()); + CassFuturePtr future(cci->CassSessionPrepare(session, query), cci); + cci->CassFutureWait(future.get()); - CassError rc(cass_future_error_code(future.get())); + CassError rc(cci->CassFutureErrorCode(future.get())); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future.get(), &err.data, &err.length); + cci->CassFutureErrorMessage(future.get(), &err.data, &err.length); CQLIF_LOG_ERR("PrepareSync: " << query << " FAILED: " << err.data); } else { - *prepared = CassPreparedPtr(cass_future_get_prepared(future.get())); + *prepared = CassPreparedPtr(cci->CassFutureGetPrepared(future.get()), + cci); } return rc == CASS_OK; } -static bool ExecuteQuerySyncInternal(CassSession *session, +static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, + CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency) { - cass_statement_set_consistency(qstatement, consistency); - CassFuturePtr future(cass_session_execute(session, qstatement)); - cass_future_wait(future.get()); + cci->CassStatementSetConsistency(qstatement, consistency); + CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci); + cci->CassFutureWait(future.get()); - CassError rc(cass_future_error_code(future.get())); + CassError rc(cci->CassFutureErrorCode(future.get())); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future.get(), &err.data, &err.length); + cci->CassFutureErrorMessage(future.get(), &err.data, &err.length); CQLIF_LOG_ERR("SyncQuery: FAILED: " << err.data); } else { if (result) { - *result = CassResultPtr(cass_future_get_result(future.get())); + *result = CassResultPtr(cci->CassFutureGetResult(future.get()), + cci); } } return rc == CASS_OK; } -static bool ExecuteQuerySync(CassSession *session, const char *query, - CassConsistency consistency) { +static bool ExecuteQuerySync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassConsistency consistency) { CQLIF_LOG(DEBUG, "SyncQuery: " << query); - CassStatementPtr statement(cass_statement_new(query, 0)); - return ExecuteQuerySyncInternal(session, statement.get(), NULL, + CassStatementPtr statement(cci->CassStatementNew(query, 0), cci); + return ExecuteQuerySyncInternal(cci, session, statement.get(), NULL, consistency); } -static bool ExecuteQueryResultSync(CassSession *session, const char *query, +static bool ExecuteQueryResultSync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency) { CQLIF_LOG(DEBUG, "SyncQuery: " << query); - CassStatementPtr statement(cass_statement_new(query, 0)); - return ExecuteQuerySyncInternal(session, statement.get(), result, + CassStatementPtr statement(cci->CassStatementNew(query, 0), cci); + return ExecuteQuerySyncInternal(cci, session, statement.get(), result, consistency); } -static bool ExecuteQueryStatementSync(CassSession *session, - CassStatement *statement, CassConsistency consistency) { - return ExecuteQuerySyncInternal(session, statement, NULL, consistency); +static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, + CassSession *session, CassStatement *statement, + CassConsistency consistency) { + return ExecuteQuerySyncInternal(cci, session, statement, NULL, + consistency); } -typedef boost::function CassAsyncQueryCallback; - -struct CassAsyncQueryContext { - CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb) : - query_id_(query_id), - cb_(cb) { - } - std::string query_id_; - CassAsyncQueryCallback cb_; -}; - static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc) { switch (rc) { case CASS_OK: @@ -992,10 +906,11 @@ static void OnExecuteQueryAsync(CassFuture *future, void *data) { assert(data); std::auto_ptr ctx( boost::reinterpret_pointer_cast(data)); - CassError rc(cass_future_error_code(future)); + interface::CassLibrary *cci(ctx->cci_); + CassError rc(cci->CassFutureErrorCode(future)); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future, &err.data, &err.length); + cci->CassFutureErrorMessage(future, &err.data, &err.length); CQLIF_LOG_ERR("AsyncQuery: " << ctx->query_id_ << " FAILED: " << err.data); } @@ -1003,59 +918,64 @@ static void OnExecuteQueryAsync(CassFuture *future, void *data) { ctx->cb_(db_rc); } -static void ExecuteQueryAsyncInternal(CassSession *session, +static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, + CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb) { - cass_statement_set_consistency(qstatement, consistency); - CassFuturePtr future(cass_session_execute(session, qstatement)); - std::auto_ptr ctx(new CassAsyncQueryContext(qid, cb)); - cass_future_set_callback(future.get(), OnExecuteQueryAsync, ctx.release()); + cci->CassStatementSetConsistency(qstatement, consistency); + CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci); + std::auto_ptr ctx( + new CassAsyncQueryContext(qid, cb, cci)); + cci->CassFutureSetCallback(future.get(), OnExecuteQueryAsync, ctx.release()); } -static void ExecuteQueryAsync(CassSession *session, const char *query, +static void ExecuteQueryAsync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb) { CQLIF_LOG(DEBUG, "AsyncQuery: " << query); - CassStatementPtr statement(cass_statement_new(query, 0)); - ExecuteQueryAsyncInternal(session, query, statement.get(), consistency, - cb); + CassStatementPtr statement(cci->CassStatementNew(query, 0), cci); + ExecuteQueryAsyncInternal(cci, session, query, statement.get(), + consistency, cb); } -static void ExecuteQueryStatementAsync(CassSession *session, +static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, + CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb) { - ExecuteQueryAsyncInternal(session, query_id, qstatement, consistency, + ExecuteQueryAsyncInternal(cci, session, query_id, qstatement, consistency, cb); } -static bool DynamicCfGetResultSync(CassSession *session, const char *query, +static bool DynamicCfGetResultSync(interface::CassLibrary *cci, + CassSession *session, const char *query, size_t rk_count, size_t ck_count, CassConsistency consistency, GenDb::NewColVec *v_columns) { - CassResultPtr result; - bool success(ExecuteQueryResultSync(session, query, &result, + CassResultPtr result(NULL, cci); + bool success(ExecuteQueryResultSync(cci, session, query, &result, consistency)); if (!success) { return success; } // Row iterator - CassIteratorPtr riterator(cass_iterator_from_result(result.get())); - while (cass_iterator_next(riterator.get())) { - const CassRow *row(cass_iterator_get_row(riterator.get())); + CassIteratorPtr riterator(cci->CassIteratorFromResult(result.get()), cci); + while (cci->CassIteratorNext(riterator.get())) { + const CassRow *row(cci->CassIteratorGetRow(riterator.get())); // Iterate over columns - size_t ccount(cass_result_column_count(result.get())); + size_t ccount(cci->CassResultColumnCount(result.get())); // Clustering key GenDb::DbDataValueVec *cnames(new GenDb::DbDataValueVec); for (size_t i = rk_count; i < rk_count + ck_count; i++) { - const CassValue *cvalue(cass_row_get_column(row, i)); + const CassValue *cvalue(cci->CassRowGetColumn(row, i)); assert(cvalue); - GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue)); + GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue)); cnames->push_back(db_value); } // Values GenDb::DbDataValueVec *values(new GenDb::DbDataValueVec); for (size_t i = rk_count + ck_count; i < ccount; i++) { - const CassValue *cvalue(cass_row_get_column(row, i)); + const CassValue *cvalue(cci->CassRowGetColumn(row, i)); assert(cvalue); - GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue)); + GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue)); values->push_back(db_value); } GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0)); @@ -1064,28 +984,29 @@ static bool DynamicCfGetResultSync(CassSession *session, const char *query, return success; } -static bool StaticCfGetResultSync(CassSession *session, const char *query, +static bool StaticCfGetResultSync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns) { - CassResultPtr result; - bool success(ExecuteQueryResultSync(session, query, &result, + CassResultPtr result(NULL, cci); + bool success(ExecuteQueryResultSync(cci, session, query, &result, consistency)); if (!success) { return success; } // Row iterator - CassIteratorPtr riterator(cass_iterator_from_result(result.get())); - while (cass_iterator_next(riterator.get())) { - const CassRow *row(cass_iterator_get_row(riterator.get())); + CassIteratorPtr riterator(cci->CassIteratorFromResult(result.get()), cci); + while (cci->CassIteratorNext(riterator.get())) { + const CassRow *row(cci->CassIteratorGetRow(riterator.get())); // Iterate over columns - size_t ccount(cass_result_column_count(result.get())); + size_t ccount(cci->CassResultColumnCount(result.get())); for (size_t i = 0; i < ccount; i++) { CassString cname; - CassError rc(cass_result_column_name(result.get(), i, &cname.data, + CassError rc(cci->CassResultColumnName(result.get(), i, &cname.data, &cname.length)); assert(rc == CASS_OK); - const CassValue *cvalue(cass_row_get_column(row, i)); + const CassValue *cvalue(cci->CassRowGetColumn(row, i)); assert(cvalue); - GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue)); + GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue)); if (db_value.which() == GenDb::DB_VALUE_BLANK) { continue; } @@ -1097,22 +1018,23 @@ static bool StaticCfGetResultSync(CassSession *session, const char *query, return success; } -static bool SyncFutureWait(CassFuture *future) { - cass_future_wait(future); - CassError rc(cass_future_error_code(future)); +static bool SyncFutureWait(interface::CassLibrary *cci, + CassFuture *future) { + cci->CassFutureWait(future); + CassError rc(cci->CassFutureErrorCode(future)); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future, &err.data, &err.length); + cci->CassFutureErrorMessage(future, &err.data, &err.length); CQLIF_LOG_ERR("SyncWait: FAILED: " << err.data); } return rc == CASS_OK; } -static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta, +static const CassTableMeta * GetCassTableMeta( + interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error) { const CassKeyspaceMeta *keyspace_meta( - cass_schema_meta_keyspace_by_name(schema_meta, - keyspace.c_str())); + cci->CassSchemaMetaKeyspaceByName(schema_meta, keyspace.c_str())); if (keyspace_meta == NULL) { if (log_error) { CQLIF_LOG_ERR("No keyspace schema: Keyspace: " << keyspace << @@ -1123,8 +1045,7 @@ static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta, std::string table_lower(table); boost::algorithm::to_lower(table_lower); const CassTableMeta *table_meta( - cass_keyspace_meta_table_by_name(keyspace_meta, - table_lower.c_str())); + cci->CassKeyspaceMetaTableByName(keyspace_meta, table_lower.c_str())); if (table_meta == NULL) { if (log_error) { CQLIF_LOG_ERR("No table schema: Keyspace: " << keyspace << @@ -1135,17 +1056,18 @@ static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta, return table_meta; } -static bool IsCassTableMetaPresent(CassSession *session, +static bool IsCassTableMetaPresent(interface::CassLibrary *cci, + CassSession *session, const std::string &keyspace, const std::string &table) { - impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta( - session)); + impl::CassSchemaMetaPtr schema_meta(cci->CassSessionGetSchemaMeta( + session), cci); if (schema_meta.get() == NULL) { CQLIF_LOG(DEBUG, "No schema meta: Keyspace: " << keyspace << ", Table: " << table); return false; } bool log_error(false); - const CassTableMeta *table_meta(impl::GetCassTableMeta( + const CassTableMeta *table_meta(impl::GetCassTableMeta(cci, schema_meta.get(), keyspace, table, log_error)); if (table_meta == NULL) { return false; @@ -1153,41 +1075,44 @@ static bool IsCassTableMetaPresent(CassSession *session, return true; } -static bool GetCassTableClusteringKeyCount(CassSession *session, - const std::string &keyspace, const std::string &table, size_t *ck_count) { - impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta( - session)); +static bool GetCassTableClusteringKeyCount( + interface::CassLibrary *cci, + CassSession *session, const std::string &keyspace, + const std::string &table, size_t *ck_count) { + impl::CassSchemaMetaPtr schema_meta(cci->CassSessionGetSchemaMeta( + session), cci); if (schema_meta.get() == NULL) { CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace << ", Table: " << table); return false; } bool log_error(true); - const CassTableMeta *table_meta(impl::GetCassTableMeta( + const CassTableMeta *table_meta(impl::GetCassTableMeta(cci, schema_meta.get(), keyspace, table, log_error)); if (table_meta == NULL) { return false; } - *ck_count = cass_table_meta_clustering_key_count(table_meta); + *ck_count = cci->CassTableMetaClusteringKeyCount(table_meta); return true; } -static bool GetCassTablePartitionKeyCount(CassSession *session, +static bool GetCassTablePartitionKeyCount( + interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count) { - impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta( - session)); + impl::CassSchemaMetaPtr schema_meta(cci->CassSessionGetSchemaMeta( + session), cci); if (schema_meta.get() == NULL) { CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace << ", Table: " << table); return false; } bool log_error(true); - const CassTableMeta *table_meta(impl::GetCassTableMeta( + const CassTableMeta *table_meta(impl::GetCassTableMeta(cci, schema_meta.get(), keyspace, table, log_error)); if (table_meta == NULL) { return false; } - *rk_count = cass_table_meta_partition_key_count(table_meta); + *rk_count = cci->CassTableMetaPartitionKeyCount(table_meta); return true; } @@ -1251,485 +1176,452 @@ static void CassLibraryLog(const CassLogMessage* message, void *data) { } // namespace impl // -// CqlIf::CqlIfImpl +// CqlIfImpl // -class CqlIf::CqlIfImpl { - public: - CqlIfImpl(EventManager *evm, - const std::vector &cassandra_ips, - int cassandra_port, - const std::string &cassandra_user, - const std::string &cassandra_password) : - evm_(evm), - cluster_(cass_cluster_new()), - session_(cass_session_new()), - reconnect_timer_(TimerManager::CreateTimer(*evm->io_service(), - "CqlIfImpl Reconnect Timer", - TaskScheduler::GetInstance()->GetTaskId(kTaskName), - kTaskInstance)), - connect_cb_(NULL), - disconnect_cb_(NULL), - keyspace_(), - io_thread_count_(2) { - // Set session state to INIT - session_state_ = SessionState::INIT; - // Set contact points and port - std::string contact_points(boost::algorithm::join(cassandra_ips, ",")); - cass_cluster_set_contact_points(cluster_.get(), contact_points.c_str()); - cass_cluster_set_port(cluster_.get(), cassandra_port); - // Set credentials for plain text authentication - if (!cassandra_user.empty() && !cassandra_password.empty()) { - cass_cluster_set_credentials(cluster_.get(), cassandra_user.c_str(), - cassandra_password.c_str()); - } - // Set number of IO threads to half the number of cores - cass_cluster_set_num_threads_io(cluster_.get(), io_thread_count_); - cass_cluster_set_pending_requests_high_water_mark(cluster_.get(), 10000); - cass_cluster_set_pending_requests_low_water_mark(cluster_.get(), 5000); - cass_cluster_set_write_bytes_high_water_mark(cluster_.get(), 128000); - cass_cluster_set_write_bytes_low_water_mark(cluster_.get(), 96000); +CqlIfImpl::CqlIfImpl(EventManager *evm, + const std::vector &cassandra_ips, + int cassandra_port, + const std::string &cassandra_user, + const std::string &cassandra_password, + interface::CassLibrary *cci) : + evm_(evm), + cci_(cci), + cluster_(cci_->CassClusterNew(), cci_), + session_(cci_->CassSessionNew(), cci_), + reconnect_timer_(TimerManager::CreateTimer(*evm->io_service(), + "CqlIfImpl Reconnect Timer", + TaskScheduler::GetInstance()->GetTaskId(kTaskName), + kTaskInstance)), + connect_cb_(NULL), + disconnect_cb_(NULL), + keyspace_(), + io_thread_count_(2) { + // Set session state to INIT + session_state_ = SessionState::INIT; + // Set contact points and port + std::string contact_points(boost::algorithm::join(cassandra_ips, ",")); + cci_->CassClusterSetContactPoints(cluster_.get(), contact_points.c_str()); + cci_->CassClusterSetPort(cluster_.get(), cassandra_port); + // Set credentials for plain text authentication + if (!cassandra_user.empty() && !cassandra_password.empty()) { + cci_->CassClusterSetCredentials(cluster_.get(), cassandra_user.c_str(), + cassandra_password.c_str()); + } + // Set number of IO threads to half the number of cores + cci_->CassClusterSetNumThreadsIo(cluster_.get(), io_thread_count_); + cci_->CassClusterSetPendingRequestsHighWaterMark(cluster_.get(), 10000); + cci_->CassClusterSetPendingRequestsLowWaterMark(cluster_.get(), 5000); + cci_->CassClusterSetWriteBytesHighWaterMark(cluster_.get(), 128000); + cci_->CassClusterSetWriteBytesLowWaterMark(cluster_.get(), 96000); +} + +CqlIfImpl::~CqlIfImpl() { + assert(session_state_ == SessionState::INIT || + session_state_ == SessionState::DISCONNECTED); + TimerManager::DeleteTimer(reconnect_timer_); + reconnect_timer_ = NULL; +} + +bool CqlIfImpl::CreateKeyspaceIfNotExistsSync(const std::string &keyspace, + const std::string &replication_factor, CassConsistency consistency) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - virtual ~CqlIfImpl() { - assert(session_state_ == SessionState::INIT || - session_state_ == SessionState::DISCONNECTED); - TimerManager::DeleteTimer(reconnect_timer_); - reconnect_timer_ = NULL; + char buf[512]; + int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists, + keyspace.c_str(), replication_factor.c_str())); + if (n < 0 || n >= (int)sizeof(buf)) { + CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << + keyspace << ", RF: " << replication_factor); + return false; } + return impl::ExecuteQuerySync(cci_, session_.get(), buf, consistency); +} - bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, - const std::string &replication_factor, CassConsistency consistency) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - char buf[512]; - int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists, - keyspace.c_str(), replication_factor.c_str())); - if (n < 0 || n >= (int)sizeof(buf)) { - CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << - keyspace << ", RF: " << replication_factor); - return false; - } - return impl::ExecuteQuerySync(session_.get(), buf, consistency); +bool CqlIfImpl::UseKeyspaceSync(const std::string &keyspace, + CassConsistency consistency) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - bool UseKeyspaceSync(const std::string &keyspace, - CassConsistency consistency) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - char buf[512]; - int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str())); - if (n < 0 || n >= (int)sizeof(buf)) { - CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << - keyspace); - return false; - } - bool success(impl::ExecuteQuerySync(session_.get(), buf, - consistency)); - if (!success) { - return false; - } - // Update keyspace - keyspace_ = keyspace; - return success; + char buf[512]; + int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str())); + if (n < 0 || n >= (int)sizeof(buf)) { + CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << + keyspace); + return false; } - - bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, - CassConsistency consistency) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - // There are two types of tables - Static (SQL) and Dynamic (NOSQL) - // column family. Static column family has more or less fixed rows, - // and dynamic column family has wide rows - std::string query; - switch (cf.cftype_) { - case GenDb::NewCf::COLUMN_FAMILY_SQL: - query = impl::StaticCf2CassCreateTableIfNotExists(cf); - break; - case GenDb::NewCf::COLUMN_FAMILY_NOSQL: - query = impl::DynamicCf2CassCreateTableIfNotExists(cf); - break; - default: - return false; - } - return impl::ExecuteQuerySync(session_.get(), query.c_str(), - consistency); + bool success(impl::ExecuteQuerySync(cci_, session_.get(), buf, + consistency)); + if (!success) { + return false; } + // Update keyspace + keyspace_ = keyspace; + return success; +} - bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf) { - const std::string &table_name(cf.cfname_); - impl::CassPreparedPtr prepared; - // Check if the prepared statement exists - if (GetPrepareInsertIntoTable(table_name, &prepared)) { - return true; - } - bool success(PrepareInsertIntoTableSync(cf, &prepared)); - if (!success) { - return success; - } - // Store the prepared statement into the map - tbb::mutex::scoped_lock lock(map_mutex_); - success = (insert_prepared_map_.insert( - std::make_pair(table_name, prepared))).second; - assert(success); - return success; +bool CqlIfImpl::CreateTableIfNotExistsSync(const GenDb::NewCf &cf, + CassConsistency consistency) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + // There are two types of tables - Static (SQL) and Dynamic (NOSQL) + // column family. Static column family has more or less fixed rows, + // and dynamic column family has wide rows + std::string query; + switch (cf.cftype_) { + case GenDb::NewCf::COLUMN_FAMILY_SQL: + query = impl::StaticCf2CassCreateTableIfNotExists(cf); + break; + case GenDb::NewCf::COLUMN_FAMILY_NOSQL: + query = impl::DynamicCf2CassCreateTableIfNotExists(cf); + break; + default: + return false; } + return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(), + consistency); +} - bool GetPrepareInsertIntoTable(const std::string &table_name, - impl::CassPreparedPtr *prepared) const { - tbb::mutex::scoped_lock lock(map_mutex_); - CassPreparedMapType::const_iterator it( - insert_prepared_map_.find(table_name)); - if (it == insert_prepared_map_.end()) { - return false; - } - *prepared = it->second; +bool CqlIfImpl::LocatePrepareInsertIntoTable(const GenDb::NewCf &cf) { + const std::string &table_name(cf.cfname_); + impl::CassPreparedPtr prepared(NULL, cci_); + // Check if the prepared statement exists + if (GetPrepareInsertIntoTable(table_name, &prepared)) { return true; } - - bool IsTablePresent(const GenDb::NewCf &cf) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - return impl::IsCassTableMetaPresent(session_.get(), keyspace_, - cf.cfname_); + bool success(PrepareInsertIntoTableSync(cf, &prepared)); + if (!success) { + return success; } + // Store the prepared statement into the map + tbb::mutex::scoped_lock lock(map_mutex_); + success = (insert_prepared_map_.insert( + std::make_pair(table_name, prepared))).second; + assert(success); + return success; +} - bool IsTableStatic(const std::string &table) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - size_t ck_count; - assert(impl::GetCassTableClusteringKeyCount(session_.get(), keyspace_, - table, &ck_count)); - return ck_count == 0; +bool CqlIfImpl::GetPrepareInsertIntoTable(const std::string &table_name, + impl::CassPreparedPtr *prepared) const { + tbb::mutex::scoped_lock lock(map_mutex_); + CassPreparedMapType::const_iterator it( + insert_prepared_map_.find(table_name)); + if (it == insert_prepared_map_.end()) { + return false; } + *prepared = it->second; + return true; +} - bool IsTableDynamic(const std::string &table) { - return !IsTableStatic(table); +bool CqlIfImpl::IsTablePresent(const GenDb::NewCf &cf) { + if (session_state_ != SessionState::CONNECTED) { + return false; } + return impl::IsCassTableMetaPresent(cci_, session_.get(), keyspace_, + cf.cfname_); +} - bool InsertIntoTableSync(std::auto_ptr v_columns, - CassConsistency consistency) { - return InsertIntoTableInternal(v_columns, consistency, true, NULL); +bool CqlIfImpl::IsTableStatic(const std::string &table) { + if (session_state_ != SessionState::CONNECTED) { + return false; } + size_t ck_count; + assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(), + keyspace_, table, &ck_count)); + return ck_count == 0; +} - bool InsertIntoTableAsync(std::auto_ptr v_columns, - CassConsistency consistency, impl::CassAsyncQueryCallback cb) { - return InsertIntoTableInternal(v_columns, consistency, false, cb); - } +bool CqlIfImpl::IsTableDynamic(const std::string &table) { + return !IsTableStatic(table); +} - bool InsertIntoTablePrepareAsync(std::auto_ptr v_columns, - CassConsistency consistency, impl::CassAsyncQueryCallback cb) { - return InsertIntoTablePrepareInternal(v_columns, consistency, false, - cb); - } +bool CqlIfImpl::InsertIntoTableSync(std::auto_ptr v_columns, + CassConsistency consistency) { + return InsertIntoTableInternal(v_columns, consistency, true, NULL); +} - bool IsInsertIntoTablePrepareSupported(const std::string &table) { - return IsTableDynamic(table); - } +bool CqlIfImpl::InsertIntoTableAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb) { + return InsertIntoTableInternal(v_columns, consistency, false, cb); +} - bool SelectFromTableSync(const std::string &cfname, - const GenDb::DbDataValueVec &rkey, CassConsistency consistency, - GenDb::NewColVec *out) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query(impl::PartitionKey2CassSelectFromTable(cfname, - rkey)); - if (IsTableStatic(cfname)) { - return impl::StaticCfGetResultSync(session_.get(), - query.c_str(), consistency, out); - } else { - size_t rk_count; - assert(impl::GetCassTablePartitionKeyCount(session_.get(), - keyspace_, cfname, &rk_count)); - size_t ck_count; - assert(impl::GetCassTableClusteringKeyCount(session_.get(), - keyspace_, cfname, &ck_count)); - return impl::DynamicCfGetResultSync(session_.get(), - query.c_str(), rk_count, ck_count, consistency, out); - } - } +bool CqlIfImpl::InsertIntoTablePrepareAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb) { + return InsertIntoTablePrepareInternal(v_columns, consistency, false, + cb); +} - bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, - const GenDb::DbDataValueVec &rkey, - const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, - GenDb::NewColVec *out) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query( - impl::PartitionKeyAndClusteringKeyRange2CassSelectFromTable(cfname, - rkey, ck_range)); - assert(IsTableDynamic(cfname)); +bool CqlIfImpl::IsInsertIntoTablePrepareSupported(const std::string &table) { + return IsTableDynamic(table); +} + +bool CqlIfImpl::SelectFromTableSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, CassConsistency consistency, + GenDb::NewColVec *out) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + std::string query(impl::PartitionKey2CassSelectFromTable(cfname, + rkey)); + if (IsTableStatic(cfname)) { + return impl::StaticCfGetResultSync(cci_, session_.get(), + query.c_str(), consistency, out); + } else { size_t rk_count; - assert(impl::GetCassTablePartitionKeyCount(session_.get(), + assert(impl::GetCassTablePartitionKeyCount(cci_, session_.get(), keyspace_, cfname, &rk_count)); size_t ck_count; - assert(impl::GetCassTableClusteringKeyCount(session_.get(), + assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(), keyspace_, cfname, &ck_count)); - return impl::DynamicCfGetResultSync(session_.get(), + return impl::DynamicCfGetResultSync(cci_, session_.get(), query.c_str(), rk_count, ck_count, consistency, out); } +} - void ConnectAsync() { - session_state_ = SessionState::CONNECT_PENDING; - impl::CassFuturePtr future(cass_session_connect(session_.get(), - cluster_.get())); - if (connect_cb_.empty()) { - connect_cb_ = boost::bind(&CqlIfImpl::ConnectCallbackProcess, - this, _1); - } - cass_future_set_callback(future.get(), ConnectCallback, this); +bool CqlIfImpl::SelectFromTableClusteringKeyRangeSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, + const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, + GenDb::NewColVec *out) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - bool ConnectSync() { - impl::CassFuturePtr future(cass_session_connect(session_.get(), - cluster_.get())); - bool success(impl::SyncFutureWait(future.get())); - if (success) { - session_state_ = SessionState::CONNECTED; - CQLIF_LOG(INFO, "ConnectSync Done"); - } else { - CQLIF_LOG_ERR("ConnectSync FAILED"); - } - return success; - } - - void DisconnectAsync() { - // Close all session and pending queries - session_state_ = SessionState::DISCONNECT_PENDING; - impl::CassFuturePtr future(cass_session_close(session_.get())); - if (disconnect_cb_.empty()) { - disconnect_cb_ = boost::bind(&CqlIfImpl::DisconnectCallbackProcess, - this, _1); - } - cass_future_set_callback(future.get(), DisconnectCallback, this); + std::string query( + impl::PartitionKeyAndClusteringKeyRange2CassSelectFromTable(cfname, + rkey, ck_range)); + assert(IsTableDynamic(cfname)); + size_t rk_count; + assert(impl::GetCassTablePartitionKeyCount(cci_, session_.get(), + keyspace_, cfname, &rk_count)); + size_t ck_count; + assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(), + keyspace_, cfname, &ck_count)); + return impl::DynamicCfGetResultSync(cci_, session_.get(), + query.c_str(), rk_count, ck_count, consistency, out); +} + +void CqlIfImpl::ConnectAsync() { + session_state_ = SessionState::CONNECT_PENDING; + impl::CassFuturePtr future(cci_->CassSessionConnect(session_.get(), + cluster_.get()), cci_); + if (connect_cb_.empty()) { + connect_cb_ = boost::bind(&CqlIfImpl::ConnectCallbackProcess, + this, _1); + } + cci_->CassFutureSetCallback(future.get(), ConnectCallback, this); +} + +bool CqlIfImpl::ConnectSync() { + impl::CassFuturePtr future(cci_->CassSessionConnect(session_.get(), + cluster_.get()), cci_); + bool success(impl::SyncFutureWait(cci_, future.get())); + if (success) { + session_state_ = SessionState::CONNECTED; + CQLIF_LOG(INFO, "ConnectSync Done"); + } else { + CQLIF_LOG_ERR("ConnectSync FAILED"); } + return success; +} - bool DisconnectSync() { - // Close all session and pending queries - impl::CassFuturePtr future(cass_session_close(session_.get())); - bool success(impl::SyncFutureWait(future.get())); - if (success) { - session_state_ = SessionState::DISCONNECTED; - CQLIF_LOG(INFO, "DisconnectSync Done"); - } else { - CQLIF_LOG_ERR("DisconnectSync FAILED"); - } - return success; +void CqlIfImpl::DisconnectAsync() { + // Close all session and pending queries + session_state_ = SessionState::DISCONNECT_PENDING; + impl::CassFuturePtr future(cci_->CassSessionClose(session_.get()), cci_); + if (disconnect_cb_.empty()) { + disconnect_cb_ = boost::bind(&CqlIfImpl::DisconnectCallbackProcess, + this, _1); } + cci_->CassFutureSetCallback(future.get(), DisconnectCallback, this); +} - void GetMetrics(Metrics *metrics) const { - CassMetrics cass_metrics; - cass_session_get_metrics(session_.get(), &cass_metrics); - // Requests - metrics->requests.min = cass_metrics.requests.min; - metrics->requests.max = cass_metrics.requests.max; - metrics->requests.mean = cass_metrics.requests.mean; - metrics->requests.stddev = cass_metrics.requests.stddev; - metrics->requests.median = cass_metrics.requests.median; - metrics->requests.percentile_75th = - cass_metrics.requests.percentile_75th; - metrics->requests.percentile_95th = - cass_metrics.requests.percentile_95th; - metrics->requests.percentile_98th = - cass_metrics.requests.percentile_98th; - metrics->requests.percentile_99th = - cass_metrics.requests.percentile_99th; - metrics->requests.percentile_999th = - cass_metrics.requests.percentile_999th; - metrics->requests.mean_rate = cass_metrics.requests.mean_rate; - metrics->requests.one_minute_rate = - cass_metrics.requests.one_minute_rate; - metrics->requests.five_minute_rate = - cass_metrics.requests.five_minute_rate; - metrics->requests.fifteen_minute_rate = - cass_metrics.requests.fifteen_minute_rate; - // Stats - metrics->stats.total_connections = - cass_metrics.stats.total_connections; - metrics->stats.available_connections = - cass_metrics.stats.available_connections; - metrics->stats.exceeded_pending_requests_water_mark = - cass_metrics.stats.exceeded_pending_requests_water_mark; - metrics->stats.exceeded_write_bytes_water_mark = - cass_metrics.stats.exceeded_write_bytes_water_mark; - // Errors - metrics->errors.connection_timeouts = - cass_metrics.errors.connection_timeouts; - metrics->errors.pending_request_timeouts = - cass_metrics.errors.pending_request_timeouts; - metrics->errors.request_timeouts = - cass_metrics.errors.request_timeouts; - } - - private: - typedef boost::function ConnectCbFn; - typedef boost::function DisconnectCbFn; - - static void ConnectCallback(CassFuture *future, void *data) { - CqlIfImpl *impl_ = (CqlIfImpl *)data; - impl_->connect_cb_(future); - } - - static void DisconnectCallback(CassFuture *future, void *data) { - CqlIfImpl *impl_ = (CqlIfImpl *)data; - impl_->disconnect_cb_(future); - } - - bool ReconnectTimerExpired() { - ConnectAsync(); - return false; +bool CqlIfImpl::DisconnectSync() { + // Close all session and pending queries + impl::CassFuturePtr future(cci_->CassSessionClose(session_.get()), cci_); + bool success(impl::SyncFutureWait(cci_, future.get())); + if (success) { + session_state_ = SessionState::DISCONNECTED; + CQLIF_LOG(INFO, "DisconnectSync Done"); + } else { + CQLIF_LOG_ERR("DisconnectSync FAILED"); } + return success; +} - void ReconnectTimerErrorHandler(std::string error_name, - std::string error_message) { - CQLIF_LOG_ERR(error_name << " " << error_message); +void CqlIfImpl::GetMetrics(Metrics *metrics) const { + CassMetrics cass_metrics; + cci_->CassSessionGetMetrics(session_.get(), &cass_metrics); + // Requests + metrics->requests.min = cass_metrics.requests.min; + metrics->requests.max = cass_metrics.requests.max; + metrics->requests.mean = cass_metrics.requests.mean; + metrics->requests.stddev = cass_metrics.requests.stddev; + metrics->requests.median = cass_metrics.requests.median; + metrics->requests.percentile_75th = + cass_metrics.requests.percentile_75th; + metrics->requests.percentile_95th = + cass_metrics.requests.percentile_95th; + metrics->requests.percentile_98th = + cass_metrics.requests.percentile_98th; + metrics->requests.percentile_99th = + cass_metrics.requests.percentile_99th; + metrics->requests.percentile_999th = + cass_metrics.requests.percentile_999th; + metrics->requests.mean_rate = cass_metrics.requests.mean_rate; + metrics->requests.one_minute_rate = + cass_metrics.requests.one_minute_rate; + metrics->requests.five_minute_rate = + cass_metrics.requests.five_minute_rate; + metrics->requests.fifteen_minute_rate = + cass_metrics.requests.fifteen_minute_rate; + // Stats + metrics->stats.total_connections = + cass_metrics.stats.total_connections; + metrics->stats.available_connections = + cass_metrics.stats.available_connections; + metrics->stats.exceeded_pending_requests_water_mark = + cass_metrics.stats.exceeded_pending_requests_water_mark; + metrics->stats.exceeded_write_bytes_water_mark = + cass_metrics.stats.exceeded_write_bytes_water_mark; + // Errors + metrics->errors.connection_timeouts = + cass_metrics.errors.connection_timeouts; + metrics->errors.pending_request_timeouts = + cass_metrics.errors.pending_request_timeouts; + metrics->errors.request_timeouts = + cass_metrics.errors.request_timeouts; +} + +void CqlIfImpl::ConnectCallback(CassFuture *future, void *data) { + CqlIfImpl *impl_ = (CqlIfImpl *)data; + impl_->connect_cb_(future); +} + +void CqlIfImpl::DisconnectCallback(CassFuture *future, void *data) { + CqlIfImpl *impl_ = (CqlIfImpl *)data; + impl_->disconnect_cb_(future); +} + +bool CqlIfImpl::ReconnectTimerExpired() { + ConnectAsync(); + return false; +} + +void CqlIfImpl::ReconnectTimerErrorHandler(std::string error_name, + std::string error_message) { + CQLIF_LOG_ERR(error_name << " " << error_message); +} + +void CqlIfImpl::ConnectCallbackProcess(CassFuture *future) { + CassError code(cci_->CassFutureErrorCode(future)); + if (code != CASS_OK) { + impl::CassString err; + cci_->CassFutureErrorMessage(future, &err.data, &err.length); + CQLIF_LOG(INFO, err.data); + // Start a timer to reconnect + reconnect_timer_->Start(kReconnectInterval, + boost::bind(&CqlIfImpl::ReconnectTimerExpired, this), + boost::bind(&CqlIfImpl::ReconnectTimerErrorHandler, this, + _1, _2)); + return; } + session_state_ = SessionState::CONNECTED; +} - void ConnectCallbackProcess(CassFuture *future) { - CassError code(cass_future_error_code(future)); - if (code != CASS_OK) { - impl::CassString err; - cass_future_error_message(future, &err.data, &err.length); - CQLIF_LOG(INFO, err.data); - // Start a timer to reconnect - reconnect_timer_->Start(kReconnectInterval, - boost::bind(&CqlIfImpl::ReconnectTimerExpired, this), - boost::bind(&CqlIfImpl::ReconnectTimerErrorHandler, this, - _1, _2)); - return; - } - session_state_ = SessionState::CONNECTED; +void CqlIfImpl::DisconnectCallbackProcess(CassFuture *future) { + CassError code(cci_->CassFutureErrorCode(future)); + if (code != CASS_OK) { + impl::CassString err; + cci_->CassFutureErrorMessage(future, &err.data, &err.length); + CQLIF_LOG_ERR(err.data); } + session_state_ = SessionState::DISCONNECTED; +} - void DisconnectCallbackProcess(CassFuture *future) { - CassError code(cass_future_error_code(future)); - if (code != CASS_OK) { - impl::CassString err; - cass_future_error_message(future, &err.data, &err.length); - CQLIF_LOG_ERR(err.data); - } - session_state_ = SessionState::DISCONNECTED; +bool CqlIfImpl::InsertIntoTableInternal(std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - bool InsertIntoTableInternal(std::auto_ptr v_columns, - CassConsistency consistency, bool sync, - impl::CassAsyncQueryCallback cb) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query; - if (IsTableStatic(v_columns->cfname_)) { - query = impl::StaticCf2CassInsertIntoTable(v_columns.get()); - } else { - query = impl::DynamicCf2CassInsertIntoTable(v_columns.get()); - } - if (sync) { - return impl::ExecuteQuerySync(session_.get(), query.c_str(), - consistency); - } else { - impl::ExecuteQueryAsync(session_.get(), query.c_str(), - consistency, cb); - return true; - } + std::string query; + if (IsTableStatic(v_columns->cfname_)) { + query = impl::StaticCf2CassInsertIntoTable(v_columns.get()); + } else { + query = impl::DynamicCf2CassInsertIntoTable(v_columns.get()); } - - bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, - impl::CassPreparedPtr *prepared) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query; - switch (cf.cftype_) { - case GenDb::NewCf::COLUMN_FAMILY_SQL: - query = impl::StaticCf2CassPrepareInsertIntoTable(cf); - break; - case GenDb::NewCf::COLUMN_FAMILY_NOSQL: - query = impl::DynamicCf2CassPrepareInsertIntoTable(cf); - break; - default: - return false; - } - return impl::PrepareSync(session_.get(), query.c_str(), - prepared); + if (sync) { + return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(), + consistency); + } else { + impl::ExecuteQueryAsync(cci_, session_.get(), query.c_str(), + consistency, cb); + return true; } +} - bool InsertIntoTablePrepareInternal(std::auto_ptr v_columns, - CassConsistency consistency, bool sync, - impl::CassAsyncQueryCallback cb) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - impl::CassPreparedPtr prepared; - bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared)); - if (!success) { - CQLIF_LOG_ERR("CassPrepared statement NOT found: " << - v_columns->cfname_); - return false; - } - impl::CassStatementPtr qstatement(cass_prepared_bind(prepared.get())); - if (IsTableStatic(v_columns->cfname_)) { - success = impl::StaticCf2CassPrepareBind(qstatement.get(), - v_columns.get()); - } else { - success = impl::DynamicCf2CassPrepareBind(qstatement.get(), - v_columns.get()); - } - if (!success) { - return false; - } - if (sync) { - return impl::ExecuteQueryStatementSync(session_.get(), - qstatement.get(), consistency); - } else { - std::string qid("Prepare: " + v_columns->cfname_); - impl::ExecuteQueryStatementAsync(session_.get(), qid.c_str(), - qstatement.get(), consistency, cb); - return true; - } +bool CqlIfImpl::PrepareInsertIntoTableSync(const GenDb::NewCf &cf, + impl::CassPreparedPtr *prepared) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + std::string query; + switch (cf.cftype_) { + case GenDb::NewCf::COLUMN_FAMILY_SQL: + query = impl::StaticCf2CassPrepareInsertIntoTable(cf); + break; + case GenDb::NewCf::COLUMN_FAMILY_NOSQL: + query = impl::DynamicCf2CassPrepareInsertIntoTable(cf); + break; + default: + return false; } + return impl::PrepareSync(cci_, session_.get(), query.c_str(), + prepared); +} - static const char * kQCreateKeyspaceIfNotExists; - static const char * kQUseKeyspace; - static const char * kTaskName; - static const int kTaskInstance = -1; - static const int kReconnectInterval = 5 * 1000; - - struct SessionState { - enum type { - INIT, - CONNECT_PENDING, - CONNECTED, - DISCONNECT_PENDING, - DISCONNECTED, - }; - }; - - EventManager *evm_; - impl::CassClusterPtr cluster_; - impl::CassSessionPtr session_; - tbb::atomic session_state_; - Timer *reconnect_timer_; - ConnectCbFn connect_cb_; - DisconnectCbFn disconnect_cb_; - std::string keyspace_; - int io_thread_count_; - typedef boost::unordered_map - CassPreparedMapType; - CassPreparedMapType insert_prepared_map_; - mutable tbb::mutex map_mutex_; -}; +bool CqlIfImpl::InsertIntoTablePrepareInternal( + std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + impl::CassPreparedPtr prepared(NULL, cci_); + bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared)); + if (!success) { + CQLIF_LOG_ERR("CassPrepared statement NOT found: " << + v_columns->cfname_); + return false; + } + impl::CassStatementPtr qstatement(cci_->CassPreparedBind(prepared.get()), + cci_); + if (IsTableStatic(v_columns->cfname_)) { + success = impl::StaticCf2CassPrepareBind(cci_, qstatement.get(), + v_columns.get()); + } else { + success = impl::DynamicCf2CassPrepareBind(cci_, qstatement.get(), + v_columns.get()); + } + if (!success) { + return false; + } + if (sync) { + return impl::ExecuteQueryStatementSync(cci_, session_.get(), + qstatement.get(), consistency); + } else { + std::string qid("Prepare: " + v_columns->cfname_); + impl::ExecuteQueryStatementAsync(cci_, session_.get(), qid.c_str(), + qstatement.get(), consistency, cb); + return true; + } +} -const char * CqlIf::CqlIfImpl::kQCreateKeyspaceIfNotExists( +const char * CqlIfImpl::kQCreateKeyspaceIfNotExists( "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH " "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }"); -const char * CqlIf::CqlIfImpl::kQUseKeyspace("USE \"%s\""); -const char * CqlIf::CqlIfImpl::kTaskName("CqlIfImpl::Task"); +const char * CqlIfImpl::kQUseKeyspace("USE \"%s\""); +const char * CqlIfImpl::kTaskName("CqlIfImpl::Task"); // // CqlIf @@ -1739,14 +1631,14 @@ CqlIf::CqlIf(EventManager *evm, int cassandra_port, const std::string &cassandra_user, const std::string &cassandra_password) : - impl_(NULL), + cci_(new interface::CassDatastaxLibrary), + impl_(new CqlIfImpl(evm, cassandra_ips, cassandra_port, + cassandra_user, cassandra_password, cci_.get())), use_prepared_for_insert_(true) { // Setup library logging - cass_log_set_level(impl::Log4Level2CassLogLevel( + cci_->CassLogSetLevel(impl::Log4Level2CassLogLevel( log4cplus::Logger::getRoot().getLogLevel())); - cass_log_set_callback(impl::CassLibraryLog, NULL); - impl_ = new CqlIfImpl(evm, cassandra_ips, cassandra_port, - cassandra_user, cassandra_password); + cci_->CassLogSetCallback(impl::CassLibraryLog, NULL); initialized_ = false; BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) { boost::system::error_code ec; @@ -1761,9 +1653,6 @@ CqlIf::CqlIf() : impl_(NULL) { } CqlIf::~CqlIf() { - if (impl_) { - delete impl_; - } } // Init/Uninit @@ -2063,5 +1952,359 @@ std::vector CqlIf::Db_GetEndpoints() const { return endpoints_; } +namespace interface { + +// +// CassDatastaxLibrary +// +CassDatastaxLibrary::CassDatastaxLibrary() { +} + +CassDatastaxLibrary::~CassDatastaxLibrary() { +} + +// CassCluster +CassCluster* CassDatastaxLibrary::CassClusterNew() { + return cass_cluster_new(); +} + +void CassDatastaxLibrary::CassClusterFree(CassCluster* cluster) { + cass_cluster_free(cluster); +} + +CassError CassDatastaxLibrary::CassClusterSetContactPoints( + CassCluster* cluster, const char* contact_points) { + return cass_cluster_set_contact_points(cluster, contact_points); +} + +CassError CassDatastaxLibrary::CassClusterSetPort(CassCluster* cluster, + int port) { + return cass_cluster_set_port(cluster, port); +} + +void CassDatastaxLibrary::CassClusterSetCredentials(CassCluster* cluster, + const char* username, const char* password) { + cass_cluster_set_credentials(cluster, username, password); +} + +CassError CassDatastaxLibrary::CassClusterSetNumThreadsIo(CassCluster* cluster, + unsigned num_threads) { + return cass_cluster_set_num_threads_io(cluster, num_threads); +} + +CassError CassDatastaxLibrary::CassClusterSetPendingRequestsHighWaterMark( + CassCluster* cluster, unsigned num_requests) { + return cass_cluster_set_pending_requests_high_water_mark(cluster, + num_requests); +} + +CassError CassDatastaxLibrary::CassClusterSetPendingRequestsLowWaterMark( + CassCluster* cluster, unsigned num_requests) { + return cass_cluster_set_pending_requests_low_water_mark(cluster, + num_requests); +} + +CassError CassDatastaxLibrary::CassClusterSetWriteBytesHighWaterMark( + CassCluster* cluster, unsigned num_bytes) { + return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes); +} + +CassError CassDatastaxLibrary::CassClusterSetWriteBytesLowWaterMark( + CassCluster* cluster, unsigned num_bytes) { + return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes); +} + +// CassSession +CassSession* CassDatastaxLibrary::CassSessionNew() { + return cass_session_new(); +} + +void CassDatastaxLibrary::CassSessionFree(CassSession* session) { + cass_session_free(session); +} + +CassFuture* CassDatastaxLibrary::CassSessionConnect(CassSession* session, + const CassCluster* cluster) { + return cass_session_connect(session, cluster); +} + +CassFuture* CassDatastaxLibrary::CassSessionClose(CassSession* session) { + return cass_session_close(session); +} + +CassFuture* CassDatastaxLibrary::CassSessionExecute(CassSession* session, + const CassStatement* statement) { + return cass_session_execute(session, statement); +} + +const CassSchemaMeta* CassDatastaxLibrary::CassSessionGetSchemaMeta( + const CassSession* session) { + return cass_session_get_schema_meta(session); +} + +CassFuture* CassDatastaxLibrary::CassSessionPrepare(CassSession* session, + const char* query) { + return cass_session_prepare(session, query); +} + +void CassDatastaxLibrary::CassSessionGetMetrics(const CassSession* session, + CassMetrics* output) { + cass_session_get_metrics(session, output); +} + +// CassSchema +void CassDatastaxLibrary::CassSchemaMetaFree( + const CassSchemaMeta* schema_meta) { + cass_schema_meta_free(schema_meta); +} + +const CassKeyspaceMeta* CassDatastaxLibrary::CassSchemaMetaKeyspaceByName( + const CassSchemaMeta* schema_meta, const char* keyspace) { + return cass_schema_meta_keyspace_by_name(schema_meta, keyspace); +} + +const CassTableMeta* CassDatastaxLibrary::CassKeyspaceMetaTableByName( + const CassKeyspaceMeta* keyspace_meta, const char* table) { + return cass_keyspace_meta_table_by_name(keyspace_meta, table); +} + +size_t CassDatastaxLibrary::CassTableMetaPartitionKeyCount( + const CassTableMeta* table_meta) { + return cass_table_meta_partition_key_count(table_meta); +} + +size_t CassDatastaxLibrary::CassTableMetaClusteringKeyCount( + const CassTableMeta* table_meta) { + return cass_table_meta_clustering_key_count(table_meta); +} + +// CassFuture +void CassDatastaxLibrary::CassFutureFree(CassFuture* future) { + cass_future_free(future); +} + +CassError CassDatastaxLibrary::CassFutureSetCallback(CassFuture* future, + CassFutureCallback callback, void* data) { + return cass_future_set_callback(future, callback, data); +} + +void CassDatastaxLibrary::CassFutureWait(CassFuture* future) { + cass_future_wait(future); +} + +const CassResult* CassDatastaxLibrary::CassFutureGetResult( + CassFuture* future) { + return cass_future_get_result(future); +} + +void CassDatastaxLibrary::CassFutureErrorMessage(CassFuture* future, + const char** message, size_t* message_length) { + cass_future_error_message(future, message, message_length); +} + +CassError CassDatastaxLibrary::CassFutureErrorCode(CassFuture* future) { + return cass_future_error_code(future); +} + +const CassPrepared* CassDatastaxLibrary::CassFutureGetPrepared( + CassFuture* future) { + return cass_future_get_prepared(future); +} + +// CassResult +void CassDatastaxLibrary::CassResultFree(const CassResult* result) { + cass_result_free(result); +} + +size_t CassDatastaxLibrary::CassResultColumnCount(const CassResult* result) { + return cass_result_column_count(result); +} + +CassError CassDatastaxLibrary::CassResultColumnName(const CassResult *result, + size_t index, const char** name, size_t* name_length) { + return cass_result_column_name(result, index, name, name_length); +} + +// CassIterator +void CassDatastaxLibrary::CassIteratorFree(CassIterator* iterator) { + cass_iterator_free(iterator); +} + +CassIterator* CassDatastaxLibrary::CassIteratorFromResult( + const CassResult* result) { + return cass_iterator_from_result(result); +} + +cass_bool_t CassDatastaxLibrary::CassIteratorNext(CassIterator* iterator) { + return cass_iterator_next(iterator); +} + +const CassRow* CassDatastaxLibrary::CassIteratorGetRow( + const CassIterator* iterator) { + return cass_iterator_get_row(iterator); +} + +// CassStatement +CassStatement* CassDatastaxLibrary::CassStatementNew(const char* query, + size_t parameter_count) { + return cass_statement_new(query, parameter_count); +} + +void CassDatastaxLibrary::CassStatementFree(CassStatement* statement) { + cass_statement_free(statement); +} + +CassError CassDatastaxLibrary::CassStatementSetConsistency( + CassStatement* statement, CassConsistency consistency) { + return cass_statement_set_consistency(statement, consistency); +} + +CassError CassDatastaxLibrary::CassStatementBindStringN( + CassStatement* statement, + size_t index, const char* value, size_t value_length) { + return cass_statement_bind_string_n(statement, index, value, value_length); +} + +CassError CassDatastaxLibrary::CassStatementBindInt32(CassStatement* statement, + size_t index, cass_int32_t value) { + return cass_statement_bind_int32(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInt64(CassStatement* statement, + size_t index, cass_int64_t value) { + return cass_statement_bind_int64(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindUuid(CassStatement* statement, + size_t index, CassUuid value) { + return cass_statement_bind_uuid(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindDouble( + CassStatement* statement, size_t index, cass_double_t value) { + return cass_statement_bind_double(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInet(CassStatement* statement, + size_t index, CassInet value) { + return cass_statement_bind_inet(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindStringByNameN( + CassStatement* statement, + const char* name, size_t name_length, const char* value, + size_t value_length) { + return cass_statement_bind_string_by_name_n(statement, name, name_length, + value, value_length); +} + +CassError CassDatastaxLibrary::CassStatementBindInt32ByName( + CassStatement* statement, const char* name, cass_int32_t value) { + return cass_statement_bind_int32_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInt64ByName( + CassStatement* statement, const char* name, cass_int64_t value) { + return cass_statement_bind_int64_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindUuidByName( + CassStatement* statement, const char* name, CassUuid value) { + return cass_statement_bind_uuid_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindDoubleByName( + CassStatement* statement, const char* name, cass_double_t value) { + return cass_statement_bind_double_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInetByName( + CassStatement* statement, const char* name, CassInet value) { + return cass_statement_bind_inet_by_name(statement, name, value); +} + +// CassPrepare +void CassDatastaxLibrary::CassPreparedFree(const CassPrepared* prepared) { + cass_prepared_free(prepared); +} + +CassStatement* CassDatastaxLibrary::CassPreparedBind( + const CassPrepared* prepared) { + return cass_prepared_bind(prepared); +} + +// CassValue +CassValueType CassDatastaxLibrary::GetCassValueType(const CassValue* value) { + return cass_value_type(value); +} + +CassError CassDatastaxLibrary::CassValueGetString(const CassValue* value, + const char** output, size_t* output_size) { + return cass_value_get_string(value, output, output_size); +} + +CassError CassDatastaxLibrary::CassValueGetInt8(const CassValue* value, + cass_int8_t* output) { + return cass_value_get_int8(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInt16(const CassValue* value, + cass_int16_t* output) { + return cass_value_get_int16(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInt32(const CassValue* value, + cass_int32_t* output) { + return cass_value_get_int32(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInt64(const CassValue* value, + cass_int64_t* output) { + return cass_value_get_int64(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetUuid(const CassValue* value, + CassUuid* output) { + return cass_value_get_uuid(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetDouble(const CassValue* value, + cass_double_t* output) { + return cass_value_get_double(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInet(const CassValue* value, + CassInet* output) { + return cass_value_get_inet(value, output); +} + +// CassInet +CassInet CassDatastaxLibrary::CassInetInitV4( + const cass_uint8_t* address) { + return cass_inet_init_v4(address); +} + +CassInet CassDatastaxLibrary::CassInetInitV6( + const cass_uint8_t* address) { + return cass_inet_init_v6(address); +} + +// CassRow +const CassValue* CassDatastaxLibrary::CassRowGetColumn(const CassRow* row, + size_t index) { + return cass_row_get_column(row, index); +} + +// CassLog +void CassDatastaxLibrary::CassLogSetLevel(CassLogLevel log_level) { + cass_log_set_level(log_level); +} + +void CassDatastaxLibrary::CassLogSetCallback(CassLogCallback callback, + void* data) { + cass_log_set_callback(callback, data); +} + +} // namespace interface } // namespace cql } // namespace cass diff --git a/src/database/cassandra/cql/cql_if.h b/src/database/cassandra/cql/cql_if.h index bf075945b43..a2933df6692 100644 --- a/src/database/cassandra/cql/cql_if.h +++ b/src/database/cassandra/cql/cql_if.h @@ -13,6 +13,8 @@ #include #include #include +#include +#include class EventManager; @@ -68,7 +70,7 @@ class CqlIf : public GenDb::GenDbIf { virtual bool Db_GetCumulativeStats(std::vector *vdbti, GenDb::DbErrors *dbe) const; virtual void Db_GetCqlMetrics(Metrics *metrics) const; - virtual void Db_GetCqlStats(cass::cql::DbStats *db_stats) const; + virtual void Db_GetCqlStats(DbStats *db_stats) const; // Connection virtual std::vector Db_GetEndpoints() const; @@ -91,8 +93,8 @@ class CqlIf : public GenDb::GenDbIf { uint64_t num_reads); void IncrementErrors(GenDb::IfErrors::Type err_type); - class CqlIfImpl; - CqlIfImpl *impl_; + boost::scoped_ptr cci_; + boost::scoped_ptr impl_; tbb::atomic initialized_; std::vector endpoints_; mutable tbb::mutex stats_mutex_; diff --git a/src/database/cassandra/cql/cql_if_impl.h b/src/database/cassandra/cql/cql_if_impl.h index 0a630e6f429..465501218a9 100644 --- a/src/database/cassandra/cql/cql_if_impl.h +++ b/src/database/cassandra/cql/cql_if_impl.h @@ -1,5 +1,5 @@ // -// Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. // #ifndef DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_ @@ -7,7 +7,16 @@ #include +#include + +#include + +#include +#include + #include +#include +#include namespace cass { namespace cql { @@ -25,7 +34,241 @@ std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable( const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &crange); +// CQL Library Shared Pointers to handle library free calls +template +struct Deleter; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassCluster *ptr) { + if (ptr != NULL) { + cci_->CassClusterFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassSession* ptr) { + if (ptr != NULL) { + cci_->CassSessionFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassFuture* ptr) { + if (ptr != NULL) { + cci_->CassFutureFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassStatement* ptr) { + if (ptr != NULL) { + cci_->CassStatementFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(const CassResult* ptr) { + if (ptr != NULL) { + cci_->CassResultFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassIterator* ptr) { + if (ptr != NULL) { + cci_->CassIteratorFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(const CassPrepared* ptr) { + if (ptr != NULL) { + cci_->CassPreparedFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(const CassSchemaMeta* ptr) { + if (ptr != NULL) { + cci_->CassSchemaMetaFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template +class CassSharedPtr : public boost::shared_ptr { + public: + CassSharedPtr(T* ptr, interface::CassLibrary *cci) : + boost::shared_ptr(ptr, Deleter(cci)) {} +}; + +typedef CassSharedPtr CassClusterPtr; +typedef CassSharedPtr CassSessionPtr; +typedef CassSharedPtr CassFuturePtr; +typedef CassSharedPtr CassStatementPtr; +typedef CassSharedPtr CassResultPtr; +typedef CassSharedPtr CassIteratorPtr; +typedef CassSharedPtr CassPreparedPtr; +typedef CassSharedPtr CassSchemaMetaPtr; + +typedef boost::function CassAsyncQueryCallback; + +struct CassAsyncQueryContext { + CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb, + interface::CassLibrary *cci) : + query_id_(query_id), + cb_(cb), + cci_(cci) { + } + std::string query_id_; + CassAsyncQueryCallback cb_; + interface::CassLibrary *cci_; +}; + } // namespace impl + +// +// CqlIfImpl +// +class CqlIfImpl { + public: + CqlIfImpl(EventManager *evm, + const std::vector &cassandra_ips, + int cassandra_port, + const std::string &cassandra_user, + const std::string &cassandra_password, + interface::CassLibrary *cci); + virtual ~CqlIfImpl(); + + bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, + const std::string &replication_factor, CassConsistency consistency); + bool UseKeyspaceSync(const std::string &keyspace, + CassConsistency consistency); + + bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, + CassConsistency consistency); + bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf); + bool IsTablePresent(const GenDb::NewCf &cf); + bool IsTableStatic(const std::string &table); + bool IsTableDynamic(const std::string &table); + + bool InsertIntoTableSync(std::auto_ptr v_columns, + CassConsistency consistency); + bool InsertIntoTableAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb); + bool InsertIntoTablePrepareAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb); + bool IsInsertIntoTablePrepareSupported(const std::string &table); + + bool SelectFromTableSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, CassConsistency consistency, + GenDb::NewColVec *out); + bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, + const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, + GenDb::NewColVec *out); + + void ConnectAsync(); + bool ConnectSync(); + void DisconnectAsync(); + bool DisconnectSync(); + + void GetMetrics(Metrics *metrics) const; + + private: + typedef boost::function ConnectCbFn; + typedef boost::function DisconnectCbFn; + + static void ConnectCallback(CassFuture *future, void *data); + static void DisconnectCallback(CassFuture *future, void *data); + bool ReconnectTimerExpired(); + void ReconnectTimerErrorHandler(std::string error_name, + std::string error_message); + void ConnectCallbackProcess(CassFuture *future); + void DisconnectCallbackProcess(CassFuture *future); + + bool InsertIntoTableInternal(std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb); + bool GetPrepareInsertIntoTable(const std::string &table_name, + impl::CassPreparedPtr *prepared) const; + bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, + impl::CassPreparedPtr *prepared); + bool InsertIntoTablePrepareInternal(std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb); + + static const char * kQCreateKeyspaceIfNotExists; + static const char * kQUseKeyspace; + static const char * kTaskName; + static const int kTaskInstance = -1; + static const int kReconnectInterval = 5 * 1000; + + struct SessionState { + enum type { + INIT, + CONNECT_PENDING, + CONNECTED, + DISCONNECT_PENDING, + DISCONNECTED, + }; + }; + + EventManager *evm_; + interface::CassLibrary *cci_; + impl::CassClusterPtr cluster_; + impl::CassSessionPtr session_; + tbb::atomic session_state_; + Timer *reconnect_timer_; + ConnectCbFn connect_cb_; + DisconnectCbFn disconnect_cb_; + std::string keyspace_; + int io_thread_count_; + typedef boost::unordered_map + CassPreparedMapType; + CassPreparedMapType insert_prepared_map_; + mutable tbb::mutex map_mutex_; +}; + } // namespace cql } // namespace cass diff --git a/src/database/cassandra/cql/cql_lib_if.h b/src/database/cassandra/cql/cql_lib_if.h new file mode 100644 index 00000000000..b094aad5d4a --- /dev/null +++ b/src/database/cassandra/cql/cql_lib_if.h @@ -0,0 +1,305 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef DATABASE_CASSANDRA_CQL_CQL_LIB_IF_H_ +#define DATABASE_CASSANDRA_CQL_CQL_LIB_IF_H_ + +#include + +namespace cass { +namespace cql { +namespace interface { + +class CassLibrary { + public: + virtual ~CassLibrary() {} + + // CassCluster + virtual CassCluster* CassClusterNew() = 0; + virtual void CassClusterFree(CassCluster* cluster) = 0; + virtual CassError CassClusterSetContactPoints(CassCluster* cluster, + const char* contact_points) = 0; + virtual CassError CassClusterSetPort(CassCluster* cluster, + int port) = 0; + virtual void CassClusterSetCredentials(CassCluster* cluster, + const char* username, const char* password) = 0; + virtual CassError CassClusterSetNumThreadsIo(CassCluster* cluster, + unsigned num_threads) = 0; + virtual CassError CassClusterSetPendingRequestsHighWaterMark( + CassCluster* cluster, unsigned num_requests) = 0; + virtual CassError CassClusterSetPendingRequestsLowWaterMark( + CassCluster* cluster, unsigned num_requests) = 0; + virtual CassError CassClusterSetWriteBytesHighWaterMark( + CassCluster* cluster, unsigned num_bytes) = 0; + virtual CassError CassClusterSetWriteBytesLowWaterMark( + CassCluster* cluster, unsigned num_bytes) = 0; + + // CassSession + virtual CassSession* CassSessionNew() = 0; + virtual void CassSessionFree(CassSession* session) = 0; + virtual CassFuture* CassSessionConnect(CassSession* session, + const CassCluster* cluster) = 0; + virtual CassFuture* CassSessionClose(CassSession* session) = 0; + virtual CassFuture* CassSessionExecute(CassSession* session, + const CassStatement* statement) = 0; + virtual const CassSchemaMeta* CassSessionGetSchemaMeta( + const CassSession* session) = 0; + virtual CassFuture* CassSessionPrepare(CassSession* session, + const char* query) = 0; + virtual void CassSessionGetMetrics(const CassSession* session, + CassMetrics* output) = 0; + + // CassSchema + virtual void CassSchemaMetaFree(const CassSchemaMeta* schema_meta) = 0; + virtual const CassKeyspaceMeta* CassSchemaMetaKeyspaceByName( + const CassSchemaMeta* schema_meta, const char* keyspace) = 0; + virtual const CassTableMeta* CassKeyspaceMetaTableByName( + const CassKeyspaceMeta* keyspace_meta, const char* table) = 0; + virtual size_t CassTableMetaPartitionKeyCount( + const CassTableMeta* table_meta) = 0; + virtual size_t CassTableMetaClusteringKeyCount( + const CassTableMeta* table_meta) = 0; + + // CassFuture + virtual void CassFutureFree(CassFuture* future) = 0; + virtual CassError CassFutureSetCallback(CassFuture* future, + CassFutureCallback callback, void* data) = 0; + virtual void CassFutureWait(CassFuture* future) = 0; + virtual const CassResult* CassFutureGetResult(CassFuture* future) = 0; + virtual void CassFutureErrorMessage(CassFuture* future, + const char** message, size_t* message_length) = 0; + virtual CassError CassFutureErrorCode(CassFuture* future) = 0; + virtual const CassPrepared* CassFutureGetPrepared(CassFuture* future) = 0; + + // CassResult + virtual void CassResultFree(const CassResult* result) = 0; + virtual size_t CassResultColumnCount(const CassResult* result) = 0; + virtual CassError CassResultColumnName(const CassResult *result, + size_t index, const char** name, size_t* name_length) = 0; + + // CassIterator + virtual void CassIteratorFree(CassIterator* iterator) = 0; + virtual CassIterator* CassIteratorFromResult( + const CassResult* result) = 0; + virtual cass_bool_t CassIteratorNext(CassIterator* iterator) = 0; + virtual const CassRow* CassIteratorGetRow( + const CassIterator* iterator) = 0; + + // CassStatement + virtual CassStatement* CassStatementNew(const char* query, + size_t parameter_count) = 0; + virtual void CassStatementFree(CassStatement* statement) = 0; + virtual CassError CassStatementSetConsistency(CassStatement* statement, + CassConsistency consistency) = 0; + virtual CassError CassStatementBindStringN(CassStatement* statement, + size_t index, const char* value, size_t value_length) = 0; + virtual CassError CassStatementBindInt32(CassStatement* statement, + size_t index, cass_int32_t value) = 0; + virtual CassError CassStatementBindInt64(CassStatement* statement, + size_t index, cass_int64_t value) = 0; + virtual CassError CassStatementBindUuid(CassStatement* statement, + size_t index, CassUuid value) = 0; + virtual CassError CassStatementBindDouble(CassStatement* statement, + size_t index, cass_double_t value) = 0; + virtual CassError CassStatementBindInet(CassStatement* statement, + size_t index, CassInet value) = 0; + virtual CassError CassStatementBindStringByNameN(CassStatement* statement, + const char* name, size_t name_length, const char* value, + size_t value_length) = 0; + virtual CassError CassStatementBindInt32ByName(CassStatement* statement, + const char* name, cass_int32_t value) = 0; + virtual CassError CassStatementBindInt64ByName(CassStatement* statement, + const char* name, cass_int64_t value) = 0; + virtual CassError CassStatementBindUuidByName(CassStatement* statement, + const char* name, CassUuid value) = 0; + virtual CassError CassStatementBindDoubleByName(CassStatement* statement, + const char* name, cass_double_t value) = 0; + virtual CassError CassStatementBindInetByName(CassStatement* statement, + const char* name, CassInet value) = 0; + + // CassPrepare + virtual void CassPreparedFree(const CassPrepared* prepared) = 0; + virtual CassStatement* CassPreparedBind(const CassPrepared* prepared) = 0; + + // CassValue + virtual CassValueType GetCassValueType(const CassValue* value) = 0; + virtual CassError CassValueGetString(const CassValue* value, + const char** output, size_t* output_size) = 0; + virtual CassError CassValueGetInt8(const CassValue* value, + cass_int8_t* output) = 0; + virtual CassError CassValueGetInt16(const CassValue* value, + cass_int16_t* output) = 0; + virtual CassError CassValueGetInt32(const CassValue* value, + cass_int32_t* output) = 0; + virtual CassError CassValueGetInt64(const CassValue* value, + cass_int64_t* output) = 0; + virtual CassError CassValueGetUuid(const CassValue* value, + CassUuid* output) = 0; + virtual CassError CassValueGetDouble(const CassValue* value, + cass_double_t* output) = 0; + virtual CassError CassValueGetInet(const CassValue* value, + CassInet* output) = 0; + + // CassInet + virtual CassInet CassInetInitV4(const cass_uint8_t* address) = 0; + virtual CassInet CassInetInitV6(const cass_uint8_t* address) = 0; + + // CassRow + virtual const CassValue* CassRowGetColumn(const CassRow* row, + size_t index) = 0; + + // CassLog + virtual void CassLogSetLevel(CassLogLevel log_level) = 0; + virtual void CassLogSetCallback(CassLogCallback callback, void* data) = 0; +}; + +class CassDatastaxLibrary : public CassLibrary { + public: + CassDatastaxLibrary(); + virtual ~CassDatastaxLibrary(); + + // CassCluster + virtual CassCluster* CassClusterNew(); + virtual void CassClusterFree(CassCluster* cluster); + virtual CassError CassClusterSetContactPoints(CassCluster* cluster, + const char* contact_points); + virtual CassError CassClusterSetPort(CassCluster* cluster, + int port); + virtual void CassClusterSetCredentials(CassCluster* cluster, + const char* username, const char* password); + virtual CassError CassClusterSetNumThreadsIo(CassCluster* cluster, + unsigned num_threads); + virtual CassError CassClusterSetPendingRequestsHighWaterMark( + CassCluster* cluster, unsigned num_requests); + virtual CassError CassClusterSetPendingRequestsLowWaterMark( + CassCluster* cluster, unsigned num_requests); + virtual CassError CassClusterSetWriteBytesHighWaterMark( + CassCluster* cluster, unsigned num_bytes); + virtual CassError CassClusterSetWriteBytesLowWaterMark( + CassCluster* cluster, unsigned num_bytes); + + // CassSession + virtual CassSession* CassSessionNew(); + virtual void CassSessionFree(CassSession* session); + virtual CassFuture* CassSessionConnect(CassSession* session, + const CassCluster* cluster); + virtual CassFuture* CassSessionClose(CassSession* session); + virtual CassFuture* CassSessionExecute(CassSession* session, + const CassStatement* statement); + virtual const CassSchemaMeta* CassSessionGetSchemaMeta( + const CassSession* session); + virtual CassFuture* CassSessionPrepare(CassSession* session, + const char* query); + virtual void CassSessionGetMetrics(const CassSession* session, + CassMetrics* output); + + // CassSchema + virtual void CassSchemaMetaFree(const CassSchemaMeta* schema_meta); + virtual const CassKeyspaceMeta* CassSchemaMetaKeyspaceByName( + const CassSchemaMeta* schema_meta, const char* keyspace); + virtual const CassTableMeta* CassKeyspaceMetaTableByName( + const CassKeyspaceMeta* keyspace_meta, const char* table); + virtual size_t CassTableMetaPartitionKeyCount( + const CassTableMeta* table_meta); + virtual size_t CassTableMetaClusteringKeyCount( + const CassTableMeta* table_meta); + + // CassFuture + virtual void CassFutureFree(CassFuture* future); + virtual CassError CassFutureSetCallback(CassFuture* future, + CassFutureCallback callback, void* data); + virtual void CassFutureWait(CassFuture* future); + virtual const CassResult* CassFutureGetResult(CassFuture* future); + virtual void CassFutureErrorMessage(CassFuture* future, + const char** message, size_t* message_length); + virtual CassError CassFutureErrorCode(CassFuture* future); + virtual const CassPrepared* CassFutureGetPrepared(CassFuture* future); + + // CassResult + virtual void CassResultFree(const CassResult* result); + virtual size_t CassResultColumnCount(const CassResult* result); + virtual CassError CassResultColumnName(const CassResult *result, + size_t index, const char** name, size_t* name_length); + + // CassIterator + virtual void CassIteratorFree(CassIterator* iterator); + virtual CassIterator* CassIteratorFromResult( + const CassResult* result); + virtual cass_bool_t CassIteratorNext(CassIterator* iterator); + virtual const CassRow* CassIteratorGetRow( + const CassIterator* iterator); + + // CassStatement + virtual CassStatement* CassStatementNew(const char* query, + size_t parameter_count); + virtual void CassStatementFree(CassStatement* statement); + virtual CassError CassStatementSetConsistency(CassStatement* statement, + CassConsistency consistency); + virtual CassError CassStatementBindStringN(CassStatement* statement, + size_t index, const char* value, size_t value_length); + virtual CassError CassStatementBindInt32(CassStatement* statement, + size_t index, cass_int32_t value); + virtual CassError CassStatementBindInt64(CassStatement* statement, + size_t index, cass_int64_t value); + virtual CassError CassStatementBindUuid(CassStatement* statement, + size_t index, CassUuid value); + virtual CassError CassStatementBindDouble(CassStatement* statement, + size_t index, cass_double_t value); + virtual CassError CassStatementBindInet(CassStatement* statement, + size_t index, CassInet value); + virtual CassError CassStatementBindStringByNameN(CassStatement* statement, + const char* name, size_t name_length, const char* value, + size_t value_length); + virtual CassError CassStatementBindInt32ByName(CassStatement* statement, + const char* name, cass_int32_t value); + virtual CassError CassStatementBindInt64ByName(CassStatement* statement, + const char* name, cass_int64_t value); + virtual CassError CassStatementBindUuidByName(CassStatement* statement, + const char* name, CassUuid value); + virtual CassError CassStatementBindDoubleByName(CassStatement* statement, + const char* name, cass_double_t value); + virtual CassError CassStatementBindInetByName(CassStatement* statement, + const char* name, CassInet value); + + // CassPrepare + virtual void CassPreparedFree(const CassPrepared* prepared); + virtual CassStatement* CassPreparedBind(const CassPrepared* prepared); + + // CassValue + virtual CassValueType GetCassValueType(const CassValue* value); + virtual CassError CassValueGetString(const CassValue* value, + const char** output, size_t* output_size); + virtual CassError CassValueGetInt8(const CassValue* value, + cass_int8_t* output); + virtual CassError CassValueGetInt16(const CassValue* value, + cass_int16_t* output); + virtual CassError CassValueGetInt32(const CassValue* value, + cass_int32_t* output); + virtual CassError CassValueGetInt64(const CassValue* value, + cass_int64_t* output); + virtual CassError CassValueGetUuid(const CassValue* value, + CassUuid* output); + virtual CassError CassValueGetDouble(const CassValue* value, + cass_double_t* output); + virtual CassError CassValueGetInet(const CassValue* value, + CassInet* output); + + // CassInet + virtual CassInet CassInetInitV4(const cass_uint8_t* address); + virtual CassInet CassInetInitV6(const cass_uint8_t* address); + + // CassRow + virtual const CassValue* CassRowGetColumn(const CassRow* row, + size_t index); + + // CassLog + virtual void CassLogSetLevel(CassLogLevel log_level); + virtual void CassLogSetCallback(CassLogCallback callback, void* data); +}; + +} // namespace interface +} // namespace cql +} // namespace cass + +#endif // DATABASE_CASSANDRA_CQL_CQL_LIB_IF_H_