diff --git a/src/database/cassandra/cql/cql_if.cc b/src/database/cassandra/cql/cql_if.cc index 7f6ee08a81f..bed15156218 100644 --- a/src/database/cassandra/cql/cql_if.cc +++ b/src/database/cassandra/cql/cql_if.cc @@ -20,6 +20,7 @@ #include #include #include +#include #define CQLIF_LOG(_Level, _Msg) \ do { \ @@ -39,98 +40,6 @@ namespace cass { namespace cql { namespace impl { -// CQL Library Shared Pointers to handle library free calls -template -struct Deleter; - -template<> -struct Deleter { - void operator()(CassCluster *ptr) { - if (ptr != NULL) { - cass_cluster_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassSession* ptr) { - if (ptr != NULL) { - cass_session_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassFuture* ptr) { - if (ptr != NULL) { - cass_future_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassStatement* ptr) { - if (ptr != NULL) { - cass_statement_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(const CassResult* ptr) { - if (ptr != NULL) { - cass_result_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(CassIterator* ptr) { - if (ptr != NULL) { - cass_iterator_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(const CassPrepared* ptr) { - if (ptr != NULL) { - cass_prepared_free(ptr); - } - } -}; - -template<> -struct Deleter { - void operator()(const CassSchemaMeta* ptr) { - if (ptr != NULL) { - cass_schema_meta_free(ptr); - } - } -}; - -template -class CassSharedPtr : public boost::shared_ptr { -public: - explicit CassSharedPtr(T* ptr = NULL) - : boost::shared_ptr(ptr, Deleter()) {} -}; - -typedef CassSharedPtr CassClusterPtr; -typedef CassSharedPtr CassSessionPtr; -typedef CassSharedPtr CassFuturePtr; -typedef CassSharedPtr CassStatementPtr; -typedef CassSharedPtr CassResultPtr; -typedef CassSharedPtr CassIteratorPtr; -typedef CassSharedPtr CassPreparedPtr; -typedef CassSharedPtr CassSchemaMetaPtr; - // CassString convenience structure struct CassString { CassString() : @@ -286,45 +195,47 @@ class CassQueryPrinter : public boost::static_visitor<> { // class CassStatementIndexBinder : public boost::static_visitor<> { public: - CassStatementIndexBinder(CassStatement *statement) : + CassStatementIndexBinder(interface::CassLibrary *cci, + CassStatement *statement) : + cci_(cci), statement_(statement) { } void operator()(const boost::blank &tblank, size_t index) const { assert(false && "CassStatement bind to boost::blank not supported"); } void operator()(const std::string &tstring, size_t index) const { - CassError rc(cass_statement_bind_string_n(statement_, index, + CassError rc(cci_->CassStatementBindStringN(statement_, index, tstring.c_str(), tstring.length())); assert(rc == CASS_OK); } void operator()(const boost::uuids::uuid &tuuid, size_t index) const { CassUuid cuuid; decode_uuid((char *)&tuuid, &cuuid); - CassError rc(cass_statement_bind_uuid(statement_, index, cuuid)); + CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid)); assert(rc == CASS_OK); } void operator()(const uint8_t &tu8, size_t index) const { - CassError rc(cass_statement_bind_int32(statement_, index, tu8)); + CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8)); assert(rc == CASS_OK); } void operator()(const uint16_t &tu16, size_t index) const { - CassError rc(cass_statement_bind_int32(statement_, index, tu16)); + CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16)); assert(rc == CASS_OK); } void operator()(const uint32_t &tu32, size_t index) const { assert(tu32 <= (uint32_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int32(statement_, index, + CassError rc(cci_->CassStatementBindInt32(statement_, index, (cass_int32_t)tu32)); assert(rc == CASS_OK); } void operator()(const uint64_t &tu64, size_t index) const { assert(tu64 <= (uint64_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int64(statement_, index, + CassError rc(cci_->CassStatementBindInt64(statement_, index, (cass_int64_t)tu64)); assert(rc == CASS_OK); } void operator()(const double &tdouble, size_t index) const { - CassError rc(cass_statement_bind_double(statement_, index, + CassError rc(cci_->CassStatementBindDouble(statement_, index, (cass_double_t)tdouble)); assert(rc == CASS_OK); } @@ -332,62 +243,65 @@ class CassStatementIndexBinder : public boost::static_visitor<> { CassInet cinet; if (tipaddr.is_v4()) { boost::asio::ip::address_v4 tv4(tipaddr.to_v4()); - cinet = cass_inet_init_v4(tv4.to_bytes().c_array()); + cinet = cci_->CassInetInitV4(tv4.to_bytes().c_array()); } else { boost::asio::ip::address_v6 tv6(tipaddr.to_v6()); - cinet = cass_inet_init_v6(tv6.to_bytes().c_array()); + cinet = cci_->CassInetInitV6(tv6.to_bytes().c_array()); } - CassError rc(cass_statement_bind_inet(statement_, index, + CassError rc(cci_->CassStatementBindInet(statement_, index, cinet)); assert(rc == CASS_OK); } + interface::CassLibrary *cci_; CassStatement *statement_; }; class CassStatementNameBinder : public boost::static_visitor<> { public: - CassStatementNameBinder(CassStatement *statement) : + CassStatementNameBinder(interface::CassLibrary *cci, + CassStatement *statement) : + cci_(cci), statement_(statement) { } void operator()(const boost::blank &tblank, const char *name) const { assert(false && "CassStatement bind to boost::blank not supported"); } void operator()(const std::string &tstring, const char *name) const { - CassError rc(cass_statement_bind_string_by_name_n(statement_, name, + CassError rc(cci_->CassStatementBindStringByNameN(statement_, name, strlen(name), tstring.c_str(), tstring.length())); assert(rc == CASS_OK); } void operator()(const boost::uuids::uuid &tuuid, const char *name) const { CassUuid cuuid; decode_uuid((char *)&tuuid, &cuuid); - CassError rc(cass_statement_bind_uuid_by_name(statement_, name, + CassError rc(cci_->CassStatementBindUuidByName(statement_, name, cuuid)); assert(rc == CASS_OK); } void operator()(const uint8_t &tu8, const char *name) const { - CassError rc(cass_statement_bind_int32_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt32ByName(statement_, name, tu8)); assert(rc == CASS_OK); } void operator()(const uint16_t &tu16, const char *name) const { - CassError rc(cass_statement_bind_int32_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt32ByName(statement_, name, tu16)); assert(rc == CASS_OK); } void operator()(const uint32_t &tu32, const char *name) const { assert(tu32 <= (uint32_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int32_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt32ByName(statement_, name, (cass_int32_t)tu32)); assert(rc == CASS_OK); } void operator()(const uint64_t &tu64, const char *name) const { assert(tu64 <= (uint64_t)std::numeric_limits::max()); - CassError rc(cass_statement_bind_int64_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInt64ByName(statement_, name, (cass_int64_t)tu64)); assert(rc == CASS_OK); } void operator()(const double &tdouble, const char *name) const { - CassError rc(cass_statement_bind_double_by_name(statement_, name, + CassError rc(cci_->CassStatementBindDoubleByName(statement_, name, (cass_double_t)tdouble)); assert(rc == CASS_OK); } @@ -395,15 +309,16 @@ class CassStatementNameBinder : public boost::static_visitor<> { CassInet cinet; if (tipaddr.is_v4()) { boost::asio::ip::address_v4 tv4(tipaddr.to_v4()); - cinet = cass_inet_init_v4(tv4.to_bytes().c_array()); + cinet = cci_->CassInetInitV4(tv4.to_bytes().c_array()); } else { boost::asio::ip::address_v6 tv6(tipaddr.to_v6()); - cinet = cass_inet_init_v6(tv6.to_bytes().c_array()); + cinet = cci_->CassInetInitV6(tv6.to_bytes().c_array()); } - CassError rc(cass_statement_bind_inet_by_name(statement_, name, + CassError rc(cci_->CassStatementBindInetByName(statement_, name, cinet)); assert(rc == CASS_OK); } + interface::CassLibrary *cci_; CassStatement *statement_; }; @@ -679,9 +594,10 @@ std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf) { // Cf2CassPrepareBind // -bool StaticCf2CassPrepareBind(CassStatement *statement, +bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, + CassStatement *statement, const GenDb::ColList *v_columns) { - CassStatementNameBinder values_binder(statement); + CassStatementNameBinder values_binder(cci, statement); // Row keys const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_); int rk_size(rkeys.size()); @@ -713,15 +629,16 @@ bool StaticCf2CassPrepareBind(CassStatement *statement, cttl = column.ttl; idx++; } - CassError rc(cass_statement_bind_int32(statement, idx++, + CassError rc(cci->CassStatementBindInt32(statement, idx++, (cass_int32_t)cttl)); assert(rc == CASS_OK); return true; } -bool DynamicCf2CassPrepareBind(CassStatement *statement, +bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, + CassStatement *statement, const GenDb::ColList *v_columns) { - CassStatementIndexBinder values_binder(statement); + CassStatementIndexBinder values_binder(cci, statement); // Row keys const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_); int rk_size(rkeys.size()); @@ -746,7 +663,7 @@ bool DynamicCf2CassPrepareBind(CassStatement *statement, boost::apply_visitor(boost::bind(values_binder, _1, idx++), cvalues[0]); } - CassError rc(cass_statement_bind_int32(statement, idx++, + CassError rc(cci->CassStatementBindInt32(statement, idx++, (cass_int32_t)column.ttl)); assert(rc == CASS_OK); return true; @@ -826,21 +743,22 @@ std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable( return CassSelectFromTableInternal(table, rkeys, ck_range); } -static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) { - CassValueType cvtype(cass_value_type(cvalue)); +static GenDb::DbDataValue CassValue2DbDataValue( + interface::CassLibrary *cci, const CassValue *cvalue) { + CassValueType cvtype(cci->GetCassValueType(cvalue)); switch (cvtype) { case CASS_VALUE_TYPE_ASCII: case CASS_VALUE_TYPE_VARCHAR: case CASS_VALUE_TYPE_TEXT: { CassString ctstring; - CassError rc(cass_value_get_string(cvalue, &ctstring.data, + CassError rc(cci->CassValueGetString(cvalue, &ctstring.data, &ctstring.length)); assert(rc == CASS_OK); return std::string(ctstring.data, ctstring.length); } case CASS_VALUE_TYPE_UUID: { CassUuid ctuuid; - CassError rc(cass_value_get_uuid(cvalue, &ctuuid)); + CassError rc(cci->CassValueGetUuid(cvalue, &ctuuid)); assert(rc == CASS_OK); boost::uuids::uuid u; encode_uuid((char *)&u, ctuuid); @@ -848,37 +766,37 @@ static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) { } case CASS_VALUE_TYPE_DOUBLE: { cass_double_t ctdouble; - CassError rc(cass_value_get_double(cvalue, &ctdouble)); + CassError rc(cci->CassValueGetDouble(cvalue, &ctdouble)); assert(rc == CASS_OK); return (double)ctdouble; } case CASS_VALUE_TYPE_TINY_INT: { cass_int8_t ct8; - CassError rc(cass_value_get_int8(cvalue, &ct8)); + CassError rc(cci->CassValueGetInt8(cvalue, &ct8)); assert(rc == CASS_OK); return (uint8_t)ct8; } case CASS_VALUE_TYPE_SMALL_INT: { cass_int16_t ct16; - CassError rc(cass_value_get_int16(cvalue, &ct16)); + CassError rc(cci->CassValueGetInt16(cvalue, &ct16)); assert(rc == CASS_OK); return (uint16_t)ct16; } case CASS_VALUE_TYPE_INT: { cass_int32_t ct32; - CassError rc(cass_value_get_int32(cvalue, &ct32)); + CassError rc(cci->CassValueGetInt32(cvalue, &ct32)); assert(rc == CASS_OK); return (uint32_t)ct32; } case CASS_VALUE_TYPE_BIGINT: { cass_int64_t ct64; - CassError rc(cass_value_get_int64(cvalue, &ct64)); + CassError rc(cci->CassValueGetInt64(cvalue, &ct64)); assert(rc == CASS_OK); return (uint64_t)ct64; } case CASS_VALUE_TYPE_INET: { CassInet ctinet; - CassError rc(cass_value_get_inet(cvalue, &ctinet)); + CassError rc(cci->CassValueGetInet(cvalue, &ctinet)); assert(rc == CASS_OK); IpAddress ipaddr; if (ctinet.address_length == CASS_INET_V4_LENGTH) { @@ -906,75 +824,71 @@ static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) { } } -static bool PrepareSync(CassSession *session, const char* query, +static bool PrepareSync(interface::CassLibrary *cci, + CassSession *session, const char* query, CassPreparedPtr *prepared) { CQLIF_LOG(DEBUG, "PrepareSync: " << query); - CassFuturePtr future(cass_session_prepare(session, query)); - cass_future_wait(future.get()); + CassFuturePtr future(cci->CassSessionPrepare(session, query), cci); + cci->CassFutureWait(future.get()); - CassError rc(cass_future_error_code(future.get())); + CassError rc(cci->CassFutureErrorCode(future.get())); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future.get(), &err.data, &err.length); + cci->CassFutureErrorMessage(future.get(), &err.data, &err.length); CQLIF_LOG_ERR("PrepareSync: " << query << " FAILED: " << err.data); } else { - *prepared = CassPreparedPtr(cass_future_get_prepared(future.get())); + *prepared = CassPreparedPtr(cci->CassFutureGetPrepared(future.get()), + cci); } return rc == CASS_OK; } -static bool ExecuteQuerySyncInternal(CassSession *session, +static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, + CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency) { - cass_statement_set_consistency(qstatement, consistency); - CassFuturePtr future(cass_session_execute(session, qstatement)); - cass_future_wait(future.get()); + cci->CassStatementSetConsistency(qstatement, consistency); + CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci); + cci->CassFutureWait(future.get()); - CassError rc(cass_future_error_code(future.get())); + CassError rc(cci->CassFutureErrorCode(future.get())); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future.get(), &err.data, &err.length); + cci->CassFutureErrorMessage(future.get(), &err.data, &err.length); CQLIF_LOG_ERR("SyncQuery: FAILED: " << err.data); } else { if (result) { - *result = CassResultPtr(cass_future_get_result(future.get())); + *result = CassResultPtr(cci->CassFutureGetResult(future.get()), + cci); } } return rc == CASS_OK; } -static bool ExecuteQuerySync(CassSession *session, const char *query, - CassConsistency consistency) { +static bool ExecuteQuerySync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassConsistency consistency) { CQLIF_LOG(DEBUG, "SyncQuery: " << query); - CassStatementPtr statement(cass_statement_new(query, 0)); - return ExecuteQuerySyncInternal(session, statement.get(), NULL, + CassStatementPtr statement(cci->CassStatementNew(query, 0), cci); + return ExecuteQuerySyncInternal(cci, session, statement.get(), NULL, consistency); } -static bool ExecuteQueryResultSync(CassSession *session, const char *query, +static bool ExecuteQueryResultSync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency) { CQLIF_LOG(DEBUG, "SyncQuery: " << query); - CassStatementPtr statement(cass_statement_new(query, 0)); - return ExecuteQuerySyncInternal(session, statement.get(), result, + CassStatementPtr statement(cci->CassStatementNew(query, 0), cci); + return ExecuteQuerySyncInternal(cci, session, statement.get(), result, consistency); } -static bool ExecuteQueryStatementSync(CassSession *session, - CassStatement *statement, CassConsistency consistency) { - return ExecuteQuerySyncInternal(session, statement, NULL, consistency); +static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, + CassSession *session, CassStatement *statement, + CassConsistency consistency) { + return ExecuteQuerySyncInternal(cci, session, statement, NULL, + consistency); } -typedef boost::function CassAsyncQueryCallback; - -struct CassAsyncQueryContext { - CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb) : - query_id_(query_id), - cb_(cb) { - } - std::string query_id_; - CassAsyncQueryCallback cb_; -}; - static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc) { switch (rc) { case CASS_OK: @@ -992,10 +906,11 @@ static void OnExecuteQueryAsync(CassFuture *future, void *data) { assert(data); std::auto_ptr ctx( boost::reinterpret_pointer_cast(data)); - CassError rc(cass_future_error_code(future)); + interface::CassLibrary *cci(ctx->cci_); + CassError rc(cci->CassFutureErrorCode(future)); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future, &err.data, &err.length); + cci->CassFutureErrorMessage(future, &err.data, &err.length); CQLIF_LOG_ERR("AsyncQuery: " << ctx->query_id_ << " FAILED: " << err.data); } @@ -1003,59 +918,64 @@ static void OnExecuteQueryAsync(CassFuture *future, void *data) { ctx->cb_(db_rc); } -static void ExecuteQueryAsyncInternal(CassSession *session, +static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, + CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb) { - cass_statement_set_consistency(qstatement, consistency); - CassFuturePtr future(cass_session_execute(session, qstatement)); - std::auto_ptr ctx(new CassAsyncQueryContext(qid, cb)); - cass_future_set_callback(future.get(), OnExecuteQueryAsync, ctx.release()); + cci->CassStatementSetConsistency(qstatement, consistency); + CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci); + std::auto_ptr ctx( + new CassAsyncQueryContext(qid, cb, cci)); + cci->CassFutureSetCallback(future.get(), OnExecuteQueryAsync, ctx.release()); } -static void ExecuteQueryAsync(CassSession *session, const char *query, +static void ExecuteQueryAsync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb) { CQLIF_LOG(DEBUG, "AsyncQuery: " << query); - CassStatementPtr statement(cass_statement_new(query, 0)); - ExecuteQueryAsyncInternal(session, query, statement.get(), consistency, - cb); + CassStatementPtr statement(cci->CassStatementNew(query, 0), cci); + ExecuteQueryAsyncInternal(cci, session, query, statement.get(), + consistency, cb); } -static void ExecuteQueryStatementAsync(CassSession *session, +static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, + CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb) { - ExecuteQueryAsyncInternal(session, query_id, qstatement, consistency, + ExecuteQueryAsyncInternal(cci, session, query_id, qstatement, consistency, cb); } -static bool DynamicCfGetResultSync(CassSession *session, const char *query, +static bool DynamicCfGetResultSync(interface::CassLibrary *cci, + CassSession *session, const char *query, size_t rk_count, size_t ck_count, CassConsistency consistency, GenDb::NewColVec *v_columns) { - CassResultPtr result; - bool success(ExecuteQueryResultSync(session, query, &result, + CassResultPtr result(NULL, cci); + bool success(ExecuteQueryResultSync(cci, session, query, &result, consistency)); if (!success) { return success; } // Row iterator - CassIteratorPtr riterator(cass_iterator_from_result(result.get())); - while (cass_iterator_next(riterator.get())) { - const CassRow *row(cass_iterator_get_row(riterator.get())); + CassIteratorPtr riterator(cci->CassIteratorFromResult(result.get()), cci); + while (cci->CassIteratorNext(riterator.get())) { + const CassRow *row(cci->CassIteratorGetRow(riterator.get())); // Iterate over columns - size_t ccount(cass_result_column_count(result.get())); + size_t ccount(cci->CassResultColumnCount(result.get())); // Clustering key GenDb::DbDataValueVec *cnames(new GenDb::DbDataValueVec); for (size_t i = rk_count; i < rk_count + ck_count; i++) { - const CassValue *cvalue(cass_row_get_column(row, i)); + const CassValue *cvalue(cci->CassRowGetColumn(row, i)); assert(cvalue); - GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue)); + GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue)); cnames->push_back(db_value); } // Values GenDb::DbDataValueVec *values(new GenDb::DbDataValueVec); for (size_t i = rk_count + ck_count; i < ccount; i++) { - const CassValue *cvalue(cass_row_get_column(row, i)); + const CassValue *cvalue(cci->CassRowGetColumn(row, i)); assert(cvalue); - GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue)); + GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue)); values->push_back(db_value); } GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0)); @@ -1064,28 +984,29 @@ static bool DynamicCfGetResultSync(CassSession *session, const char *query, return success; } -static bool StaticCfGetResultSync(CassSession *session, const char *query, +static bool StaticCfGetResultSync(interface::CassLibrary *cci, + CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns) { - CassResultPtr result; - bool success(ExecuteQueryResultSync(session, query, &result, + CassResultPtr result(NULL, cci); + bool success(ExecuteQueryResultSync(cci, session, query, &result, consistency)); if (!success) { return success; } // Row iterator - CassIteratorPtr riterator(cass_iterator_from_result(result.get())); - while (cass_iterator_next(riterator.get())) { - const CassRow *row(cass_iterator_get_row(riterator.get())); + CassIteratorPtr riterator(cci->CassIteratorFromResult(result.get()), cci); + while (cci->CassIteratorNext(riterator.get())) { + const CassRow *row(cci->CassIteratorGetRow(riterator.get())); // Iterate over columns - size_t ccount(cass_result_column_count(result.get())); + size_t ccount(cci->CassResultColumnCount(result.get())); for (size_t i = 0; i < ccount; i++) { CassString cname; - CassError rc(cass_result_column_name(result.get(), i, &cname.data, + CassError rc(cci->CassResultColumnName(result.get(), i, &cname.data, &cname.length)); assert(rc == CASS_OK); - const CassValue *cvalue(cass_row_get_column(row, i)); + const CassValue *cvalue(cci->CassRowGetColumn(row, i)); assert(cvalue); - GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue)); + GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue)); if (db_value.which() == GenDb::DB_VALUE_BLANK) { continue; } @@ -1097,22 +1018,23 @@ static bool StaticCfGetResultSync(CassSession *session, const char *query, return success; } -static bool SyncFutureWait(CassFuture *future) { - cass_future_wait(future); - CassError rc(cass_future_error_code(future)); +static bool SyncFutureWait(interface::CassLibrary *cci, + CassFuture *future) { + cci->CassFutureWait(future); + CassError rc(cci->CassFutureErrorCode(future)); if (rc != CASS_OK) { CassString err; - cass_future_error_message(future, &err.data, &err.length); + cci->CassFutureErrorMessage(future, &err.data, &err.length); CQLIF_LOG_ERR("SyncWait: FAILED: " << err.data); } return rc == CASS_OK; } -static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta, +static const CassTableMeta * GetCassTableMeta( + interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error) { const CassKeyspaceMeta *keyspace_meta( - cass_schema_meta_keyspace_by_name(schema_meta, - keyspace.c_str())); + cci->CassSchemaMetaKeyspaceByName(schema_meta, keyspace.c_str())); if (keyspace_meta == NULL) { if (log_error) { CQLIF_LOG_ERR("No keyspace schema: Keyspace: " << keyspace << @@ -1123,8 +1045,7 @@ static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta, std::string table_lower(table); boost::algorithm::to_lower(table_lower); const CassTableMeta *table_meta( - cass_keyspace_meta_table_by_name(keyspace_meta, - table_lower.c_str())); + cci->CassKeyspaceMetaTableByName(keyspace_meta, table_lower.c_str())); if (table_meta == NULL) { if (log_error) { CQLIF_LOG_ERR("No table schema: Keyspace: " << keyspace << @@ -1135,17 +1056,18 @@ static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta, return table_meta; } -static bool IsCassTableMetaPresent(CassSession *session, +static bool IsCassTableMetaPresent(interface::CassLibrary *cci, + CassSession *session, const std::string &keyspace, const std::string &table) { - impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta( - session)); + impl::CassSchemaMetaPtr schema_meta(cci->CassSessionGetSchemaMeta( + session), cci); if (schema_meta.get() == NULL) { CQLIF_LOG(DEBUG, "No schema meta: Keyspace: " << keyspace << ", Table: " << table); return false; } bool log_error(false); - const CassTableMeta *table_meta(impl::GetCassTableMeta( + const CassTableMeta *table_meta(impl::GetCassTableMeta(cci, schema_meta.get(), keyspace, table, log_error)); if (table_meta == NULL) { return false; @@ -1153,41 +1075,44 @@ static bool IsCassTableMetaPresent(CassSession *session, return true; } -static bool GetCassTableClusteringKeyCount(CassSession *session, - const std::string &keyspace, const std::string &table, size_t *ck_count) { - impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta( - session)); +static bool GetCassTableClusteringKeyCount( + interface::CassLibrary *cci, + CassSession *session, const std::string &keyspace, + const std::string &table, size_t *ck_count) { + impl::CassSchemaMetaPtr schema_meta(cci->CassSessionGetSchemaMeta( + session), cci); if (schema_meta.get() == NULL) { CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace << ", Table: " << table); return false; } bool log_error(true); - const CassTableMeta *table_meta(impl::GetCassTableMeta( + const CassTableMeta *table_meta(impl::GetCassTableMeta(cci, schema_meta.get(), keyspace, table, log_error)); if (table_meta == NULL) { return false; } - *ck_count = cass_table_meta_clustering_key_count(table_meta); + *ck_count = cci->CassTableMetaClusteringKeyCount(table_meta); return true; } -static bool GetCassTablePartitionKeyCount(CassSession *session, +static bool GetCassTablePartitionKeyCount( + interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count) { - impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta( - session)); + impl::CassSchemaMetaPtr schema_meta(cci->CassSessionGetSchemaMeta( + session), cci); if (schema_meta.get() == NULL) { CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace << ", Table: " << table); return false; } bool log_error(true); - const CassTableMeta *table_meta(impl::GetCassTableMeta( + const CassTableMeta *table_meta(impl::GetCassTableMeta(cci, schema_meta.get(), keyspace, table, log_error)); if (table_meta == NULL) { return false; } - *rk_count = cass_table_meta_partition_key_count(table_meta); + *rk_count = cci->CassTableMetaPartitionKeyCount(table_meta); return true; } @@ -1251,485 +1176,452 @@ static void CassLibraryLog(const CassLogMessage* message, void *data) { } // namespace impl // -// CqlIf::CqlIfImpl +// CqlIfImpl // -class CqlIf::CqlIfImpl { - public: - CqlIfImpl(EventManager *evm, - const std::vector &cassandra_ips, - int cassandra_port, - const std::string &cassandra_user, - const std::string &cassandra_password) : - evm_(evm), - cluster_(cass_cluster_new()), - session_(cass_session_new()), - reconnect_timer_(TimerManager::CreateTimer(*evm->io_service(), - "CqlIfImpl Reconnect Timer", - TaskScheduler::GetInstance()->GetTaskId(kTaskName), - kTaskInstance)), - connect_cb_(NULL), - disconnect_cb_(NULL), - keyspace_(), - io_thread_count_(2) { - // Set session state to INIT - session_state_ = SessionState::INIT; - // Set contact points and port - std::string contact_points(boost::algorithm::join(cassandra_ips, ",")); - cass_cluster_set_contact_points(cluster_.get(), contact_points.c_str()); - cass_cluster_set_port(cluster_.get(), cassandra_port); - // Set credentials for plain text authentication - if (!cassandra_user.empty() && !cassandra_password.empty()) { - cass_cluster_set_credentials(cluster_.get(), cassandra_user.c_str(), - cassandra_password.c_str()); - } - // Set number of IO threads to half the number of cores - cass_cluster_set_num_threads_io(cluster_.get(), io_thread_count_); - cass_cluster_set_pending_requests_high_water_mark(cluster_.get(), 10000); - cass_cluster_set_pending_requests_low_water_mark(cluster_.get(), 5000); - cass_cluster_set_write_bytes_high_water_mark(cluster_.get(), 128000); - cass_cluster_set_write_bytes_low_water_mark(cluster_.get(), 96000); +CqlIfImpl::CqlIfImpl(EventManager *evm, + const std::vector &cassandra_ips, + int cassandra_port, + const std::string &cassandra_user, + const std::string &cassandra_password, + interface::CassLibrary *cci) : + evm_(evm), + cci_(cci), + cluster_(cci_->CassClusterNew(), cci_), + session_(cci_->CassSessionNew(), cci_), + reconnect_timer_(TimerManager::CreateTimer(*evm->io_service(), + "CqlIfImpl Reconnect Timer", + TaskScheduler::GetInstance()->GetTaskId(kTaskName), + kTaskInstance)), + connect_cb_(NULL), + disconnect_cb_(NULL), + keyspace_(), + io_thread_count_(2) { + // Set session state to INIT + session_state_ = SessionState::INIT; + // Set contact points and port + std::string contact_points(boost::algorithm::join(cassandra_ips, ",")); + cci_->CassClusterSetContactPoints(cluster_.get(), contact_points.c_str()); + cci_->CassClusterSetPort(cluster_.get(), cassandra_port); + // Set credentials for plain text authentication + if (!cassandra_user.empty() && !cassandra_password.empty()) { + cci_->CassClusterSetCredentials(cluster_.get(), cassandra_user.c_str(), + cassandra_password.c_str()); + } + // Set number of IO threads to half the number of cores + cci_->CassClusterSetNumThreadsIo(cluster_.get(), io_thread_count_); + cci_->CassClusterSetPendingRequestsHighWaterMark(cluster_.get(), 10000); + cci_->CassClusterSetPendingRequestsLowWaterMark(cluster_.get(), 5000); + cci_->CassClusterSetWriteBytesHighWaterMark(cluster_.get(), 128000); + cci_->CassClusterSetWriteBytesLowWaterMark(cluster_.get(), 96000); +} + +CqlIfImpl::~CqlIfImpl() { + assert(session_state_ == SessionState::INIT || + session_state_ == SessionState::DISCONNECTED); + TimerManager::DeleteTimer(reconnect_timer_); + reconnect_timer_ = NULL; +} + +bool CqlIfImpl::CreateKeyspaceIfNotExistsSync(const std::string &keyspace, + const std::string &replication_factor, CassConsistency consistency) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - virtual ~CqlIfImpl() { - assert(session_state_ == SessionState::INIT || - session_state_ == SessionState::DISCONNECTED); - TimerManager::DeleteTimer(reconnect_timer_); - reconnect_timer_ = NULL; + char buf[512]; + int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists, + keyspace.c_str(), replication_factor.c_str())); + if (n < 0 || n >= (int)sizeof(buf)) { + CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << + keyspace << ", RF: " << replication_factor); + return false; } + return impl::ExecuteQuerySync(cci_, session_.get(), buf, consistency); +} - bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, - const std::string &replication_factor, CassConsistency consistency) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - char buf[512]; - int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists, - keyspace.c_str(), replication_factor.c_str())); - if (n < 0 || n >= (int)sizeof(buf)) { - CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << - keyspace << ", RF: " << replication_factor); - return false; - } - return impl::ExecuteQuerySync(session_.get(), buf, consistency); +bool CqlIfImpl::UseKeyspaceSync(const std::string &keyspace, + CassConsistency consistency) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - bool UseKeyspaceSync(const std::string &keyspace, - CassConsistency consistency) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - char buf[512]; - int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str())); - if (n < 0 || n >= (int)sizeof(buf)) { - CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << - keyspace); - return false; - } - bool success(impl::ExecuteQuerySync(session_.get(), buf, - consistency)); - if (!success) { - return false; - } - // Update keyspace - keyspace_ = keyspace; - return success; + char buf[512]; + int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str())); + if (n < 0 || n >= (int)sizeof(buf)) { + CQLIF_LOG_ERR("FAILED (" << n << "): Keyspace: " << + keyspace); + return false; } - - bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, - CassConsistency consistency) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - // There are two types of tables - Static (SQL) and Dynamic (NOSQL) - // column family. Static column family has more or less fixed rows, - // and dynamic column family has wide rows - std::string query; - switch (cf.cftype_) { - case GenDb::NewCf::COLUMN_FAMILY_SQL: - query = impl::StaticCf2CassCreateTableIfNotExists(cf); - break; - case GenDb::NewCf::COLUMN_FAMILY_NOSQL: - query = impl::DynamicCf2CassCreateTableIfNotExists(cf); - break; - default: - return false; - } - return impl::ExecuteQuerySync(session_.get(), query.c_str(), - consistency); + bool success(impl::ExecuteQuerySync(cci_, session_.get(), buf, + consistency)); + if (!success) { + return false; } + // Update keyspace + keyspace_ = keyspace; + return success; +} - bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf) { - const std::string &table_name(cf.cfname_); - impl::CassPreparedPtr prepared; - // Check if the prepared statement exists - if (GetPrepareInsertIntoTable(table_name, &prepared)) { - return true; - } - bool success(PrepareInsertIntoTableSync(cf, &prepared)); - if (!success) { - return success; - } - // Store the prepared statement into the map - tbb::mutex::scoped_lock lock(map_mutex_); - success = (insert_prepared_map_.insert( - std::make_pair(table_name, prepared))).second; - assert(success); - return success; +bool CqlIfImpl::CreateTableIfNotExistsSync(const GenDb::NewCf &cf, + CassConsistency consistency) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + // There are two types of tables - Static (SQL) and Dynamic (NOSQL) + // column family. Static column family has more or less fixed rows, + // and dynamic column family has wide rows + std::string query; + switch (cf.cftype_) { + case GenDb::NewCf::COLUMN_FAMILY_SQL: + query = impl::StaticCf2CassCreateTableIfNotExists(cf); + break; + case GenDb::NewCf::COLUMN_FAMILY_NOSQL: + query = impl::DynamicCf2CassCreateTableIfNotExists(cf); + break; + default: + return false; } + return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(), + consistency); +} - bool GetPrepareInsertIntoTable(const std::string &table_name, - impl::CassPreparedPtr *prepared) const { - tbb::mutex::scoped_lock lock(map_mutex_); - CassPreparedMapType::const_iterator it( - insert_prepared_map_.find(table_name)); - if (it == insert_prepared_map_.end()) { - return false; - } - *prepared = it->second; +bool CqlIfImpl::LocatePrepareInsertIntoTable(const GenDb::NewCf &cf) { + const std::string &table_name(cf.cfname_); + impl::CassPreparedPtr prepared(NULL, cci_); + // Check if the prepared statement exists + if (GetPrepareInsertIntoTable(table_name, &prepared)) { return true; } - - bool IsTablePresent(const GenDb::NewCf &cf) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - return impl::IsCassTableMetaPresent(session_.get(), keyspace_, - cf.cfname_); + bool success(PrepareInsertIntoTableSync(cf, &prepared)); + if (!success) { + return success; } + // Store the prepared statement into the map + tbb::mutex::scoped_lock lock(map_mutex_); + success = (insert_prepared_map_.insert( + std::make_pair(table_name, prepared))).second; + assert(success); + return success; +} - bool IsTableStatic(const std::string &table) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - size_t ck_count; - assert(impl::GetCassTableClusteringKeyCount(session_.get(), keyspace_, - table, &ck_count)); - return ck_count == 0; +bool CqlIfImpl::GetPrepareInsertIntoTable(const std::string &table_name, + impl::CassPreparedPtr *prepared) const { + tbb::mutex::scoped_lock lock(map_mutex_); + CassPreparedMapType::const_iterator it( + insert_prepared_map_.find(table_name)); + if (it == insert_prepared_map_.end()) { + return false; } + *prepared = it->second; + return true; +} - bool IsTableDynamic(const std::string &table) { - return !IsTableStatic(table); +bool CqlIfImpl::IsTablePresent(const GenDb::NewCf &cf) { + if (session_state_ != SessionState::CONNECTED) { + return false; } + return impl::IsCassTableMetaPresent(cci_, session_.get(), keyspace_, + cf.cfname_); +} - bool InsertIntoTableSync(std::auto_ptr v_columns, - CassConsistency consistency) { - return InsertIntoTableInternal(v_columns, consistency, true, NULL); +bool CqlIfImpl::IsTableStatic(const std::string &table) { + if (session_state_ != SessionState::CONNECTED) { + return false; } + size_t ck_count; + assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(), + keyspace_, table, &ck_count)); + return ck_count == 0; +} - bool InsertIntoTableAsync(std::auto_ptr v_columns, - CassConsistency consistency, impl::CassAsyncQueryCallback cb) { - return InsertIntoTableInternal(v_columns, consistency, false, cb); - } +bool CqlIfImpl::IsTableDynamic(const std::string &table) { + return !IsTableStatic(table); +} - bool InsertIntoTablePrepareAsync(std::auto_ptr v_columns, - CassConsistency consistency, impl::CassAsyncQueryCallback cb) { - return InsertIntoTablePrepareInternal(v_columns, consistency, false, - cb); - } +bool CqlIfImpl::InsertIntoTableSync(std::auto_ptr v_columns, + CassConsistency consistency) { + return InsertIntoTableInternal(v_columns, consistency, true, NULL); +} - bool IsInsertIntoTablePrepareSupported(const std::string &table) { - return IsTableDynamic(table); - } +bool CqlIfImpl::InsertIntoTableAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb) { + return InsertIntoTableInternal(v_columns, consistency, false, cb); +} - bool SelectFromTableSync(const std::string &cfname, - const GenDb::DbDataValueVec &rkey, CassConsistency consistency, - GenDb::NewColVec *out) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query(impl::PartitionKey2CassSelectFromTable(cfname, - rkey)); - if (IsTableStatic(cfname)) { - return impl::StaticCfGetResultSync(session_.get(), - query.c_str(), consistency, out); - } else { - size_t rk_count; - assert(impl::GetCassTablePartitionKeyCount(session_.get(), - keyspace_, cfname, &rk_count)); - size_t ck_count; - assert(impl::GetCassTableClusteringKeyCount(session_.get(), - keyspace_, cfname, &ck_count)); - return impl::DynamicCfGetResultSync(session_.get(), - query.c_str(), rk_count, ck_count, consistency, out); - } - } +bool CqlIfImpl::InsertIntoTablePrepareAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb) { + return InsertIntoTablePrepareInternal(v_columns, consistency, false, + cb); +} - bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, - const GenDb::DbDataValueVec &rkey, - const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, - GenDb::NewColVec *out) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query( - impl::PartitionKeyAndClusteringKeyRange2CassSelectFromTable(cfname, - rkey, ck_range)); - assert(IsTableDynamic(cfname)); +bool CqlIfImpl::IsInsertIntoTablePrepareSupported(const std::string &table) { + return IsTableDynamic(table); +} + +bool CqlIfImpl::SelectFromTableSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, CassConsistency consistency, + GenDb::NewColVec *out) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + std::string query(impl::PartitionKey2CassSelectFromTable(cfname, + rkey)); + if (IsTableStatic(cfname)) { + return impl::StaticCfGetResultSync(cci_, session_.get(), + query.c_str(), consistency, out); + } else { size_t rk_count; - assert(impl::GetCassTablePartitionKeyCount(session_.get(), + assert(impl::GetCassTablePartitionKeyCount(cci_, session_.get(), keyspace_, cfname, &rk_count)); size_t ck_count; - assert(impl::GetCassTableClusteringKeyCount(session_.get(), + assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(), keyspace_, cfname, &ck_count)); - return impl::DynamicCfGetResultSync(session_.get(), + return impl::DynamicCfGetResultSync(cci_, session_.get(), query.c_str(), rk_count, ck_count, consistency, out); } +} - void ConnectAsync() { - session_state_ = SessionState::CONNECT_PENDING; - impl::CassFuturePtr future(cass_session_connect(session_.get(), - cluster_.get())); - if (connect_cb_.empty()) { - connect_cb_ = boost::bind(&CqlIfImpl::ConnectCallbackProcess, - this, _1); - } - cass_future_set_callback(future.get(), ConnectCallback, this); +bool CqlIfImpl::SelectFromTableClusteringKeyRangeSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, + const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, + GenDb::NewColVec *out) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - bool ConnectSync() { - impl::CassFuturePtr future(cass_session_connect(session_.get(), - cluster_.get())); - bool success(impl::SyncFutureWait(future.get())); - if (success) { - session_state_ = SessionState::CONNECTED; - CQLIF_LOG(INFO, "ConnectSync Done"); - } else { - CQLIF_LOG_ERR("ConnectSync FAILED"); - } - return success; - } - - void DisconnectAsync() { - // Close all session and pending queries - session_state_ = SessionState::DISCONNECT_PENDING; - impl::CassFuturePtr future(cass_session_close(session_.get())); - if (disconnect_cb_.empty()) { - disconnect_cb_ = boost::bind(&CqlIfImpl::DisconnectCallbackProcess, - this, _1); - } - cass_future_set_callback(future.get(), DisconnectCallback, this); + std::string query( + impl::PartitionKeyAndClusteringKeyRange2CassSelectFromTable(cfname, + rkey, ck_range)); + assert(IsTableDynamic(cfname)); + size_t rk_count; + assert(impl::GetCassTablePartitionKeyCount(cci_, session_.get(), + keyspace_, cfname, &rk_count)); + size_t ck_count; + assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(), + keyspace_, cfname, &ck_count)); + return impl::DynamicCfGetResultSync(cci_, session_.get(), + query.c_str(), rk_count, ck_count, consistency, out); +} + +void CqlIfImpl::ConnectAsync() { + session_state_ = SessionState::CONNECT_PENDING; + impl::CassFuturePtr future(cci_->CassSessionConnect(session_.get(), + cluster_.get()), cci_); + if (connect_cb_.empty()) { + connect_cb_ = boost::bind(&CqlIfImpl::ConnectCallbackProcess, + this, _1); + } + cci_->CassFutureSetCallback(future.get(), ConnectCallback, this); +} + +bool CqlIfImpl::ConnectSync() { + impl::CassFuturePtr future(cci_->CassSessionConnect(session_.get(), + cluster_.get()), cci_); + bool success(impl::SyncFutureWait(cci_, future.get())); + if (success) { + session_state_ = SessionState::CONNECTED; + CQLIF_LOG(INFO, "ConnectSync Done"); + } else { + CQLIF_LOG_ERR("ConnectSync FAILED"); } + return success; +} - bool DisconnectSync() { - // Close all session and pending queries - impl::CassFuturePtr future(cass_session_close(session_.get())); - bool success(impl::SyncFutureWait(future.get())); - if (success) { - session_state_ = SessionState::DISCONNECTED; - CQLIF_LOG(INFO, "DisconnectSync Done"); - } else { - CQLIF_LOG_ERR("DisconnectSync FAILED"); - } - return success; +void CqlIfImpl::DisconnectAsync() { + // Close all session and pending queries + session_state_ = SessionState::DISCONNECT_PENDING; + impl::CassFuturePtr future(cci_->CassSessionClose(session_.get()), cci_); + if (disconnect_cb_.empty()) { + disconnect_cb_ = boost::bind(&CqlIfImpl::DisconnectCallbackProcess, + this, _1); } + cci_->CassFutureSetCallback(future.get(), DisconnectCallback, this); +} - void GetMetrics(Metrics *metrics) const { - CassMetrics cass_metrics; - cass_session_get_metrics(session_.get(), &cass_metrics); - // Requests - metrics->requests.min = cass_metrics.requests.min; - metrics->requests.max = cass_metrics.requests.max; - metrics->requests.mean = cass_metrics.requests.mean; - metrics->requests.stddev = cass_metrics.requests.stddev; - metrics->requests.median = cass_metrics.requests.median; - metrics->requests.percentile_75th = - cass_metrics.requests.percentile_75th; - metrics->requests.percentile_95th = - cass_metrics.requests.percentile_95th; - metrics->requests.percentile_98th = - cass_metrics.requests.percentile_98th; - metrics->requests.percentile_99th = - cass_metrics.requests.percentile_99th; - metrics->requests.percentile_999th = - cass_metrics.requests.percentile_999th; - metrics->requests.mean_rate = cass_metrics.requests.mean_rate; - metrics->requests.one_minute_rate = - cass_metrics.requests.one_minute_rate; - metrics->requests.five_minute_rate = - cass_metrics.requests.five_minute_rate; - metrics->requests.fifteen_minute_rate = - cass_metrics.requests.fifteen_minute_rate; - // Stats - metrics->stats.total_connections = - cass_metrics.stats.total_connections; - metrics->stats.available_connections = - cass_metrics.stats.available_connections; - metrics->stats.exceeded_pending_requests_water_mark = - cass_metrics.stats.exceeded_pending_requests_water_mark; - metrics->stats.exceeded_write_bytes_water_mark = - cass_metrics.stats.exceeded_write_bytes_water_mark; - // Errors - metrics->errors.connection_timeouts = - cass_metrics.errors.connection_timeouts; - metrics->errors.pending_request_timeouts = - cass_metrics.errors.pending_request_timeouts; - metrics->errors.request_timeouts = - cass_metrics.errors.request_timeouts; - } - - private: - typedef boost::function ConnectCbFn; - typedef boost::function DisconnectCbFn; - - static void ConnectCallback(CassFuture *future, void *data) { - CqlIfImpl *impl_ = (CqlIfImpl *)data; - impl_->connect_cb_(future); - } - - static void DisconnectCallback(CassFuture *future, void *data) { - CqlIfImpl *impl_ = (CqlIfImpl *)data; - impl_->disconnect_cb_(future); - } - - bool ReconnectTimerExpired() { - ConnectAsync(); - return false; +bool CqlIfImpl::DisconnectSync() { + // Close all session and pending queries + impl::CassFuturePtr future(cci_->CassSessionClose(session_.get()), cci_); + bool success(impl::SyncFutureWait(cci_, future.get())); + if (success) { + session_state_ = SessionState::DISCONNECTED; + CQLIF_LOG(INFO, "DisconnectSync Done"); + } else { + CQLIF_LOG_ERR("DisconnectSync FAILED"); } + return success; +} - void ReconnectTimerErrorHandler(std::string error_name, - std::string error_message) { - CQLIF_LOG_ERR(error_name << " " << error_message); +void CqlIfImpl::GetMetrics(Metrics *metrics) const { + CassMetrics cass_metrics; + cci_->CassSessionGetMetrics(session_.get(), &cass_metrics); + // Requests + metrics->requests.min = cass_metrics.requests.min; + metrics->requests.max = cass_metrics.requests.max; + metrics->requests.mean = cass_metrics.requests.mean; + metrics->requests.stddev = cass_metrics.requests.stddev; + metrics->requests.median = cass_metrics.requests.median; + metrics->requests.percentile_75th = + cass_metrics.requests.percentile_75th; + metrics->requests.percentile_95th = + cass_metrics.requests.percentile_95th; + metrics->requests.percentile_98th = + cass_metrics.requests.percentile_98th; + metrics->requests.percentile_99th = + cass_metrics.requests.percentile_99th; + metrics->requests.percentile_999th = + cass_metrics.requests.percentile_999th; + metrics->requests.mean_rate = cass_metrics.requests.mean_rate; + metrics->requests.one_minute_rate = + cass_metrics.requests.one_minute_rate; + metrics->requests.five_minute_rate = + cass_metrics.requests.five_minute_rate; + metrics->requests.fifteen_minute_rate = + cass_metrics.requests.fifteen_minute_rate; + // Stats + metrics->stats.total_connections = + cass_metrics.stats.total_connections; + metrics->stats.available_connections = + cass_metrics.stats.available_connections; + metrics->stats.exceeded_pending_requests_water_mark = + cass_metrics.stats.exceeded_pending_requests_water_mark; + metrics->stats.exceeded_write_bytes_water_mark = + cass_metrics.stats.exceeded_write_bytes_water_mark; + // Errors + metrics->errors.connection_timeouts = + cass_metrics.errors.connection_timeouts; + metrics->errors.pending_request_timeouts = + cass_metrics.errors.pending_request_timeouts; + metrics->errors.request_timeouts = + cass_metrics.errors.request_timeouts; +} + +void CqlIfImpl::ConnectCallback(CassFuture *future, void *data) { + CqlIfImpl *impl_ = (CqlIfImpl *)data; + impl_->connect_cb_(future); +} + +void CqlIfImpl::DisconnectCallback(CassFuture *future, void *data) { + CqlIfImpl *impl_ = (CqlIfImpl *)data; + impl_->disconnect_cb_(future); +} + +bool CqlIfImpl::ReconnectTimerExpired() { + ConnectAsync(); + return false; +} + +void CqlIfImpl::ReconnectTimerErrorHandler(std::string error_name, + std::string error_message) { + CQLIF_LOG_ERR(error_name << " " << error_message); +} + +void CqlIfImpl::ConnectCallbackProcess(CassFuture *future) { + CassError code(cci_->CassFutureErrorCode(future)); + if (code != CASS_OK) { + impl::CassString err; + cci_->CassFutureErrorMessage(future, &err.data, &err.length); + CQLIF_LOG(INFO, err.data); + // Start a timer to reconnect + reconnect_timer_->Start(kReconnectInterval, + boost::bind(&CqlIfImpl::ReconnectTimerExpired, this), + boost::bind(&CqlIfImpl::ReconnectTimerErrorHandler, this, + _1, _2)); + return; } + session_state_ = SessionState::CONNECTED; +} - void ConnectCallbackProcess(CassFuture *future) { - CassError code(cass_future_error_code(future)); - if (code != CASS_OK) { - impl::CassString err; - cass_future_error_message(future, &err.data, &err.length); - CQLIF_LOG(INFO, err.data); - // Start a timer to reconnect - reconnect_timer_->Start(kReconnectInterval, - boost::bind(&CqlIfImpl::ReconnectTimerExpired, this), - boost::bind(&CqlIfImpl::ReconnectTimerErrorHandler, this, - _1, _2)); - return; - } - session_state_ = SessionState::CONNECTED; +void CqlIfImpl::DisconnectCallbackProcess(CassFuture *future) { + CassError code(cci_->CassFutureErrorCode(future)); + if (code != CASS_OK) { + impl::CassString err; + cci_->CassFutureErrorMessage(future, &err.data, &err.length); + CQLIF_LOG_ERR(err.data); } + session_state_ = SessionState::DISCONNECTED; +} - void DisconnectCallbackProcess(CassFuture *future) { - CassError code(cass_future_error_code(future)); - if (code != CASS_OK) { - impl::CassString err; - cass_future_error_message(future, &err.data, &err.length); - CQLIF_LOG_ERR(err.data); - } - session_state_ = SessionState::DISCONNECTED; +bool CqlIfImpl::InsertIntoTableInternal(std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb) { + if (session_state_ != SessionState::CONNECTED) { + return false; } - - bool InsertIntoTableInternal(std::auto_ptr v_columns, - CassConsistency consistency, bool sync, - impl::CassAsyncQueryCallback cb) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query; - if (IsTableStatic(v_columns->cfname_)) { - query = impl::StaticCf2CassInsertIntoTable(v_columns.get()); - } else { - query = impl::DynamicCf2CassInsertIntoTable(v_columns.get()); - } - if (sync) { - return impl::ExecuteQuerySync(session_.get(), query.c_str(), - consistency); - } else { - impl::ExecuteQueryAsync(session_.get(), query.c_str(), - consistency, cb); - return true; - } + std::string query; + if (IsTableStatic(v_columns->cfname_)) { + query = impl::StaticCf2CassInsertIntoTable(v_columns.get()); + } else { + query = impl::DynamicCf2CassInsertIntoTable(v_columns.get()); } - - bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, - impl::CassPreparedPtr *prepared) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - std::string query; - switch (cf.cftype_) { - case GenDb::NewCf::COLUMN_FAMILY_SQL: - query = impl::StaticCf2CassPrepareInsertIntoTable(cf); - break; - case GenDb::NewCf::COLUMN_FAMILY_NOSQL: - query = impl::DynamicCf2CassPrepareInsertIntoTable(cf); - break; - default: - return false; - } - return impl::PrepareSync(session_.get(), query.c_str(), - prepared); + if (sync) { + return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(), + consistency); + } else { + impl::ExecuteQueryAsync(cci_, session_.get(), query.c_str(), + consistency, cb); + return true; } +} - bool InsertIntoTablePrepareInternal(std::auto_ptr v_columns, - CassConsistency consistency, bool sync, - impl::CassAsyncQueryCallback cb) { - if (session_state_ != SessionState::CONNECTED) { - return false; - } - impl::CassPreparedPtr prepared; - bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared)); - if (!success) { - CQLIF_LOG_ERR("CassPrepared statement NOT found: " << - v_columns->cfname_); - return false; - } - impl::CassStatementPtr qstatement(cass_prepared_bind(prepared.get())); - if (IsTableStatic(v_columns->cfname_)) { - success = impl::StaticCf2CassPrepareBind(qstatement.get(), - v_columns.get()); - } else { - success = impl::DynamicCf2CassPrepareBind(qstatement.get(), - v_columns.get()); - } - if (!success) { - return false; - } - if (sync) { - return impl::ExecuteQueryStatementSync(session_.get(), - qstatement.get(), consistency); - } else { - std::string qid("Prepare: " + v_columns->cfname_); - impl::ExecuteQueryStatementAsync(session_.get(), qid.c_str(), - qstatement.get(), consistency, cb); - return true; - } +bool CqlIfImpl::PrepareInsertIntoTableSync(const GenDb::NewCf &cf, + impl::CassPreparedPtr *prepared) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + std::string query; + switch (cf.cftype_) { + case GenDb::NewCf::COLUMN_FAMILY_SQL: + query = impl::StaticCf2CassPrepareInsertIntoTable(cf); + break; + case GenDb::NewCf::COLUMN_FAMILY_NOSQL: + query = impl::DynamicCf2CassPrepareInsertIntoTable(cf); + break; + default: + return false; } + return impl::PrepareSync(cci_, session_.get(), query.c_str(), + prepared); +} - static const char * kQCreateKeyspaceIfNotExists; - static const char * kQUseKeyspace; - static const char * kTaskName; - static const int kTaskInstance = -1; - static const int kReconnectInterval = 5 * 1000; - - struct SessionState { - enum type { - INIT, - CONNECT_PENDING, - CONNECTED, - DISCONNECT_PENDING, - DISCONNECTED, - }; - }; - - EventManager *evm_; - impl::CassClusterPtr cluster_; - impl::CassSessionPtr session_; - tbb::atomic session_state_; - Timer *reconnect_timer_; - ConnectCbFn connect_cb_; - DisconnectCbFn disconnect_cb_; - std::string keyspace_; - int io_thread_count_; - typedef boost::unordered_map - CassPreparedMapType; - CassPreparedMapType insert_prepared_map_; - mutable tbb::mutex map_mutex_; -}; +bool CqlIfImpl::InsertIntoTablePrepareInternal( + std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb) { + if (session_state_ != SessionState::CONNECTED) { + return false; + } + impl::CassPreparedPtr prepared(NULL, cci_); + bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared)); + if (!success) { + CQLIF_LOG_ERR("CassPrepared statement NOT found: " << + v_columns->cfname_); + return false; + } + impl::CassStatementPtr qstatement(cci_->CassPreparedBind(prepared.get()), + cci_); + if (IsTableStatic(v_columns->cfname_)) { + success = impl::StaticCf2CassPrepareBind(cci_, qstatement.get(), + v_columns.get()); + } else { + success = impl::DynamicCf2CassPrepareBind(cci_, qstatement.get(), + v_columns.get()); + } + if (!success) { + return false; + } + if (sync) { + return impl::ExecuteQueryStatementSync(cci_, session_.get(), + qstatement.get(), consistency); + } else { + std::string qid("Prepare: " + v_columns->cfname_); + impl::ExecuteQueryStatementAsync(cci_, session_.get(), qid.c_str(), + qstatement.get(), consistency, cb); + return true; + } +} -const char * CqlIf::CqlIfImpl::kQCreateKeyspaceIfNotExists( +const char * CqlIfImpl::kQCreateKeyspaceIfNotExists( "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH " "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }"); -const char * CqlIf::CqlIfImpl::kQUseKeyspace("USE \"%s\""); -const char * CqlIf::CqlIfImpl::kTaskName("CqlIfImpl::Task"); +const char * CqlIfImpl::kQUseKeyspace("USE \"%s\""); +const char * CqlIfImpl::kTaskName("CqlIfImpl::Task"); // // CqlIf @@ -1739,14 +1631,14 @@ CqlIf::CqlIf(EventManager *evm, int cassandra_port, const std::string &cassandra_user, const std::string &cassandra_password) : - impl_(NULL), + cci_(new interface::CassDatastaxLibrary), + impl_(new CqlIfImpl(evm, cassandra_ips, cassandra_port, + cassandra_user, cassandra_password, cci_.get())), use_prepared_for_insert_(true) { // Setup library logging - cass_log_set_level(impl::Log4Level2CassLogLevel( + cci_->CassLogSetLevel(impl::Log4Level2CassLogLevel( log4cplus::Logger::getRoot().getLogLevel())); - cass_log_set_callback(impl::CassLibraryLog, NULL); - impl_ = new CqlIfImpl(evm, cassandra_ips, cassandra_port, - cassandra_user, cassandra_password); + cci_->CassLogSetCallback(impl::CassLibraryLog, NULL); initialized_ = false; BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) { boost::system::error_code ec; @@ -1761,9 +1653,6 @@ CqlIf::CqlIf() : impl_(NULL) { } CqlIf::~CqlIf() { - if (impl_) { - delete impl_; - } } // Init/Uninit @@ -2063,5 +1952,359 @@ std::vector CqlIf::Db_GetEndpoints() const { return endpoints_; } +namespace interface { + +// +// CassDatastaxLibrary +// +CassDatastaxLibrary::CassDatastaxLibrary() { +} + +CassDatastaxLibrary::~CassDatastaxLibrary() { +} + +// CassCluster +CassCluster* CassDatastaxLibrary::CassClusterNew() { + return cass_cluster_new(); +} + +void CassDatastaxLibrary::CassClusterFree(CassCluster* cluster) { + cass_cluster_free(cluster); +} + +CassError CassDatastaxLibrary::CassClusterSetContactPoints( + CassCluster* cluster, const char* contact_points) { + return cass_cluster_set_contact_points(cluster, contact_points); +} + +CassError CassDatastaxLibrary::CassClusterSetPort(CassCluster* cluster, + int port) { + return cass_cluster_set_port(cluster, port); +} + +void CassDatastaxLibrary::CassClusterSetCredentials(CassCluster* cluster, + const char* username, const char* password) { + cass_cluster_set_credentials(cluster, username, password); +} + +CassError CassDatastaxLibrary::CassClusterSetNumThreadsIo(CassCluster* cluster, + unsigned num_threads) { + return cass_cluster_set_num_threads_io(cluster, num_threads); +} + +CassError CassDatastaxLibrary::CassClusterSetPendingRequestsHighWaterMark( + CassCluster* cluster, unsigned num_requests) { + return cass_cluster_set_pending_requests_high_water_mark(cluster, + num_requests); +} + +CassError CassDatastaxLibrary::CassClusterSetPendingRequestsLowWaterMark( + CassCluster* cluster, unsigned num_requests) { + return cass_cluster_set_pending_requests_low_water_mark(cluster, + num_requests); +} + +CassError CassDatastaxLibrary::CassClusterSetWriteBytesHighWaterMark( + CassCluster* cluster, unsigned num_bytes) { + return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes); +} + +CassError CassDatastaxLibrary::CassClusterSetWriteBytesLowWaterMark( + CassCluster* cluster, unsigned num_bytes) { + return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes); +} + +// CassSession +CassSession* CassDatastaxLibrary::CassSessionNew() { + return cass_session_new(); +} + +void CassDatastaxLibrary::CassSessionFree(CassSession* session) { + cass_session_free(session); +} + +CassFuture* CassDatastaxLibrary::CassSessionConnect(CassSession* session, + const CassCluster* cluster) { + return cass_session_connect(session, cluster); +} + +CassFuture* CassDatastaxLibrary::CassSessionClose(CassSession* session) { + return cass_session_close(session); +} + +CassFuture* CassDatastaxLibrary::CassSessionExecute(CassSession* session, + const CassStatement* statement) { + return cass_session_execute(session, statement); +} + +const CassSchemaMeta* CassDatastaxLibrary::CassSessionGetSchemaMeta( + const CassSession* session) { + return cass_session_get_schema_meta(session); +} + +CassFuture* CassDatastaxLibrary::CassSessionPrepare(CassSession* session, + const char* query) { + return cass_session_prepare(session, query); +} + +void CassDatastaxLibrary::CassSessionGetMetrics(const CassSession* session, + CassMetrics* output) { + cass_session_get_metrics(session, output); +} + +// CassSchema +void CassDatastaxLibrary::CassSchemaMetaFree( + const CassSchemaMeta* schema_meta) { + cass_schema_meta_free(schema_meta); +} + +const CassKeyspaceMeta* CassDatastaxLibrary::CassSchemaMetaKeyspaceByName( + const CassSchemaMeta* schema_meta, const char* keyspace) { + return cass_schema_meta_keyspace_by_name(schema_meta, keyspace); +} + +const CassTableMeta* CassDatastaxLibrary::CassKeyspaceMetaTableByName( + const CassKeyspaceMeta* keyspace_meta, const char* table) { + return cass_keyspace_meta_table_by_name(keyspace_meta, table); +} + +size_t CassDatastaxLibrary::CassTableMetaPartitionKeyCount( + const CassTableMeta* table_meta) { + return cass_table_meta_partition_key_count(table_meta); +} + +size_t CassDatastaxLibrary::CassTableMetaClusteringKeyCount( + const CassTableMeta* table_meta) { + return cass_table_meta_clustering_key_count(table_meta); +} + +// CassFuture +void CassDatastaxLibrary::CassFutureFree(CassFuture* future) { + cass_future_free(future); +} + +CassError CassDatastaxLibrary::CassFutureSetCallback(CassFuture* future, + CassFutureCallback callback, void* data) { + return cass_future_set_callback(future, callback, data); +} + +void CassDatastaxLibrary::CassFutureWait(CassFuture* future) { + cass_future_wait(future); +} + +const CassResult* CassDatastaxLibrary::CassFutureGetResult( + CassFuture* future) { + return cass_future_get_result(future); +} + +void CassDatastaxLibrary::CassFutureErrorMessage(CassFuture* future, + const char** message, size_t* message_length) { + cass_future_error_message(future, message, message_length); +} + +CassError CassDatastaxLibrary::CassFutureErrorCode(CassFuture* future) { + return cass_future_error_code(future); +} + +const CassPrepared* CassDatastaxLibrary::CassFutureGetPrepared( + CassFuture* future) { + return cass_future_get_prepared(future); +} + +// CassResult +void CassDatastaxLibrary::CassResultFree(const CassResult* result) { + cass_result_free(result); +} + +size_t CassDatastaxLibrary::CassResultColumnCount(const CassResult* result) { + return cass_result_column_count(result); +} + +CassError CassDatastaxLibrary::CassResultColumnName(const CassResult *result, + size_t index, const char** name, size_t* name_length) { + return cass_result_column_name(result, index, name, name_length); +} + +// CassIterator +void CassDatastaxLibrary::CassIteratorFree(CassIterator* iterator) { + cass_iterator_free(iterator); +} + +CassIterator* CassDatastaxLibrary::CassIteratorFromResult( + const CassResult* result) { + return cass_iterator_from_result(result); +} + +cass_bool_t CassDatastaxLibrary::CassIteratorNext(CassIterator* iterator) { + return cass_iterator_next(iterator); +} + +const CassRow* CassDatastaxLibrary::CassIteratorGetRow( + const CassIterator* iterator) { + return cass_iterator_get_row(iterator); +} + +// CassStatement +CassStatement* CassDatastaxLibrary::CassStatementNew(const char* query, + size_t parameter_count) { + return cass_statement_new(query, parameter_count); +} + +void CassDatastaxLibrary::CassStatementFree(CassStatement* statement) { + cass_statement_free(statement); +} + +CassError CassDatastaxLibrary::CassStatementSetConsistency( + CassStatement* statement, CassConsistency consistency) { + return cass_statement_set_consistency(statement, consistency); +} + +CassError CassDatastaxLibrary::CassStatementBindStringN( + CassStatement* statement, + size_t index, const char* value, size_t value_length) { + return cass_statement_bind_string_n(statement, index, value, value_length); +} + +CassError CassDatastaxLibrary::CassStatementBindInt32(CassStatement* statement, + size_t index, cass_int32_t value) { + return cass_statement_bind_int32(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInt64(CassStatement* statement, + size_t index, cass_int64_t value) { + return cass_statement_bind_int64(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindUuid(CassStatement* statement, + size_t index, CassUuid value) { + return cass_statement_bind_uuid(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindDouble( + CassStatement* statement, size_t index, cass_double_t value) { + return cass_statement_bind_double(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInet(CassStatement* statement, + size_t index, CassInet value) { + return cass_statement_bind_inet(statement, index, value); +} + +CassError CassDatastaxLibrary::CassStatementBindStringByNameN( + CassStatement* statement, + const char* name, size_t name_length, const char* value, + size_t value_length) { + return cass_statement_bind_string_by_name_n(statement, name, name_length, + value, value_length); +} + +CassError CassDatastaxLibrary::CassStatementBindInt32ByName( + CassStatement* statement, const char* name, cass_int32_t value) { + return cass_statement_bind_int32_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInt64ByName( + CassStatement* statement, const char* name, cass_int64_t value) { + return cass_statement_bind_int64_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindUuidByName( + CassStatement* statement, const char* name, CassUuid value) { + return cass_statement_bind_uuid_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindDoubleByName( + CassStatement* statement, const char* name, cass_double_t value) { + return cass_statement_bind_double_by_name(statement, name, value); +} + +CassError CassDatastaxLibrary::CassStatementBindInetByName( + CassStatement* statement, const char* name, CassInet value) { + return cass_statement_bind_inet_by_name(statement, name, value); +} + +// CassPrepare +void CassDatastaxLibrary::CassPreparedFree(const CassPrepared* prepared) { + cass_prepared_free(prepared); +} + +CassStatement* CassDatastaxLibrary::CassPreparedBind( + const CassPrepared* prepared) { + return cass_prepared_bind(prepared); +} + +// CassValue +CassValueType CassDatastaxLibrary::GetCassValueType(const CassValue* value) { + return cass_value_type(value); +} + +CassError CassDatastaxLibrary::CassValueGetString(const CassValue* value, + const char** output, size_t* output_size) { + return cass_value_get_string(value, output, output_size); +} + +CassError CassDatastaxLibrary::CassValueGetInt8(const CassValue* value, + cass_int8_t* output) { + return cass_value_get_int8(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInt16(const CassValue* value, + cass_int16_t* output) { + return cass_value_get_int16(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInt32(const CassValue* value, + cass_int32_t* output) { + return cass_value_get_int32(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInt64(const CassValue* value, + cass_int64_t* output) { + return cass_value_get_int64(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetUuid(const CassValue* value, + CassUuid* output) { + return cass_value_get_uuid(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetDouble(const CassValue* value, + cass_double_t* output) { + return cass_value_get_double(value, output); +} + +CassError CassDatastaxLibrary::CassValueGetInet(const CassValue* value, + CassInet* output) { + return cass_value_get_inet(value, output); +} + +// CassInet +CassInet CassDatastaxLibrary::CassInetInitV4( + const cass_uint8_t* address) { + return cass_inet_init_v4(address); +} + +CassInet CassDatastaxLibrary::CassInetInitV6( + const cass_uint8_t* address) { + return cass_inet_init_v6(address); +} + +// CassRow +const CassValue* CassDatastaxLibrary::CassRowGetColumn(const CassRow* row, + size_t index) { + return cass_row_get_column(row, index); +} + +// CassLog +void CassDatastaxLibrary::CassLogSetLevel(CassLogLevel log_level) { + cass_log_set_level(log_level); +} + +void CassDatastaxLibrary::CassLogSetCallback(CassLogCallback callback, + void* data) { + cass_log_set_callback(callback, data); +} + +} // namespace interface } // namespace cql } // namespace cass diff --git a/src/database/cassandra/cql/cql_if.h b/src/database/cassandra/cql/cql_if.h index bf075945b43..a2933df6692 100644 --- a/src/database/cassandra/cql/cql_if.h +++ b/src/database/cassandra/cql/cql_if.h @@ -13,6 +13,8 @@ #include #include #include +#include +#include class EventManager; @@ -68,7 +70,7 @@ class CqlIf : public GenDb::GenDbIf { virtual bool Db_GetCumulativeStats(std::vector *vdbti, GenDb::DbErrors *dbe) const; virtual void Db_GetCqlMetrics(Metrics *metrics) const; - virtual void Db_GetCqlStats(cass::cql::DbStats *db_stats) const; + virtual void Db_GetCqlStats(DbStats *db_stats) const; // Connection virtual std::vector Db_GetEndpoints() const; @@ -91,8 +93,8 @@ class CqlIf : public GenDb::GenDbIf { uint64_t num_reads); void IncrementErrors(GenDb::IfErrors::Type err_type); - class CqlIfImpl; - CqlIfImpl *impl_; + boost::scoped_ptr cci_; + boost::scoped_ptr impl_; tbb::atomic initialized_; std::vector endpoints_; mutable tbb::mutex stats_mutex_; diff --git a/src/database/cassandra/cql/cql_if_impl.h b/src/database/cassandra/cql/cql_if_impl.h index 0a630e6f429..465501218a9 100644 --- a/src/database/cassandra/cql/cql_if_impl.h +++ b/src/database/cassandra/cql/cql_if_impl.h @@ -1,5 +1,5 @@ // -// Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. // #ifndef DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_ @@ -7,7 +7,16 @@ #include +#include + +#include + +#include +#include + #include +#include +#include namespace cass { namespace cql { @@ -25,7 +34,241 @@ std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable( const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &crange); +// CQL Library Shared Pointers to handle library free calls +template +struct Deleter; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassCluster *ptr) { + if (ptr != NULL) { + cci_->CassClusterFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassSession* ptr) { + if (ptr != NULL) { + cci_->CassSessionFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassFuture* ptr) { + if (ptr != NULL) { + cci_->CassFutureFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassStatement* ptr) { + if (ptr != NULL) { + cci_->CassStatementFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(const CassResult* ptr) { + if (ptr != NULL) { + cci_->CassResultFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(CassIterator* ptr) { + if (ptr != NULL) { + cci_->CassIteratorFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(const CassPrepared* ptr) { + if (ptr != NULL) { + cci_->CassPreparedFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template<> +struct Deleter { + Deleter(interface::CassLibrary *cci) : + cci_(cci) {} + void operator()(const CassSchemaMeta* ptr) { + if (ptr != NULL) { + cci_->CassSchemaMetaFree(ptr); + } + } + interface::CassLibrary *cci_; +}; + +template +class CassSharedPtr : public boost::shared_ptr { + public: + CassSharedPtr(T* ptr, interface::CassLibrary *cci) : + boost::shared_ptr(ptr, Deleter(cci)) {} +}; + +typedef CassSharedPtr CassClusterPtr; +typedef CassSharedPtr CassSessionPtr; +typedef CassSharedPtr CassFuturePtr; +typedef CassSharedPtr CassStatementPtr; +typedef CassSharedPtr CassResultPtr; +typedef CassSharedPtr CassIteratorPtr; +typedef CassSharedPtr CassPreparedPtr; +typedef CassSharedPtr CassSchemaMetaPtr; + +typedef boost::function CassAsyncQueryCallback; + +struct CassAsyncQueryContext { + CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb, + interface::CassLibrary *cci) : + query_id_(query_id), + cb_(cb), + cci_(cci) { + } + std::string query_id_; + CassAsyncQueryCallback cb_; + interface::CassLibrary *cci_; +}; + } // namespace impl + +// +// CqlIfImpl +// +class CqlIfImpl { + public: + CqlIfImpl(EventManager *evm, + const std::vector &cassandra_ips, + int cassandra_port, + const std::string &cassandra_user, + const std::string &cassandra_password, + interface::CassLibrary *cci); + virtual ~CqlIfImpl(); + + bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, + const std::string &replication_factor, CassConsistency consistency); + bool UseKeyspaceSync(const std::string &keyspace, + CassConsistency consistency); + + bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, + CassConsistency consistency); + bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf); + bool IsTablePresent(const GenDb::NewCf &cf); + bool IsTableStatic(const std::string &table); + bool IsTableDynamic(const std::string &table); + + bool InsertIntoTableSync(std::auto_ptr v_columns, + CassConsistency consistency); + bool InsertIntoTableAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb); + bool InsertIntoTablePrepareAsync(std::auto_ptr v_columns, + CassConsistency consistency, impl::CassAsyncQueryCallback cb); + bool IsInsertIntoTablePrepareSupported(const std::string &table); + + bool SelectFromTableSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, CassConsistency consistency, + GenDb::NewColVec *out); + bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, + const GenDb::DbDataValueVec &rkey, + const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, + GenDb::NewColVec *out); + + void ConnectAsync(); + bool ConnectSync(); + void DisconnectAsync(); + bool DisconnectSync(); + + void GetMetrics(Metrics *metrics) const; + + private: + typedef boost::function ConnectCbFn; + typedef boost::function DisconnectCbFn; + + static void ConnectCallback(CassFuture *future, void *data); + static void DisconnectCallback(CassFuture *future, void *data); + bool ReconnectTimerExpired(); + void ReconnectTimerErrorHandler(std::string error_name, + std::string error_message); + void ConnectCallbackProcess(CassFuture *future); + void DisconnectCallbackProcess(CassFuture *future); + + bool InsertIntoTableInternal(std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb); + bool GetPrepareInsertIntoTable(const std::string &table_name, + impl::CassPreparedPtr *prepared) const; + bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, + impl::CassPreparedPtr *prepared); + bool InsertIntoTablePrepareInternal(std::auto_ptr v_columns, + CassConsistency consistency, bool sync, + impl::CassAsyncQueryCallback cb); + + static const char * kQCreateKeyspaceIfNotExists; + static const char * kQUseKeyspace; + static const char * kTaskName; + static const int kTaskInstance = -1; + static const int kReconnectInterval = 5 * 1000; + + struct SessionState { + enum type { + INIT, + CONNECT_PENDING, + CONNECTED, + DISCONNECT_PENDING, + DISCONNECTED, + }; + }; + + EventManager *evm_; + interface::CassLibrary *cci_; + impl::CassClusterPtr cluster_; + impl::CassSessionPtr session_; + tbb::atomic session_state_; + Timer *reconnect_timer_; + ConnectCbFn connect_cb_; + DisconnectCbFn disconnect_cb_; + std::string keyspace_; + int io_thread_count_; + typedef boost::unordered_map + CassPreparedMapType; + CassPreparedMapType insert_prepared_map_; + mutable tbb::mutex map_mutex_; +}; + } // namespace cql } // namespace cass diff --git a/src/database/cassandra/cql/cql_lib_if.h b/src/database/cassandra/cql/cql_lib_if.h new file mode 100644 index 00000000000..b094aad5d4a --- /dev/null +++ b/src/database/cassandra/cql/cql_lib_if.h @@ -0,0 +1,305 @@ +// +// Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +// + +#ifndef DATABASE_CASSANDRA_CQL_CQL_LIB_IF_H_ +#define DATABASE_CASSANDRA_CQL_CQL_LIB_IF_H_ + +#include + +namespace cass { +namespace cql { +namespace interface { + +class CassLibrary { + public: + virtual ~CassLibrary() {} + + // CassCluster + virtual CassCluster* CassClusterNew() = 0; + virtual void CassClusterFree(CassCluster* cluster) = 0; + virtual CassError CassClusterSetContactPoints(CassCluster* cluster, + const char* contact_points) = 0; + virtual CassError CassClusterSetPort(CassCluster* cluster, + int port) = 0; + virtual void CassClusterSetCredentials(CassCluster* cluster, + const char* username, const char* password) = 0; + virtual CassError CassClusterSetNumThreadsIo(CassCluster* cluster, + unsigned num_threads) = 0; + virtual CassError CassClusterSetPendingRequestsHighWaterMark( + CassCluster* cluster, unsigned num_requests) = 0; + virtual CassError CassClusterSetPendingRequestsLowWaterMark( + CassCluster* cluster, unsigned num_requests) = 0; + virtual CassError CassClusterSetWriteBytesHighWaterMark( + CassCluster* cluster, unsigned num_bytes) = 0; + virtual CassError CassClusterSetWriteBytesLowWaterMark( + CassCluster* cluster, unsigned num_bytes) = 0; + + // CassSession + virtual CassSession* CassSessionNew() = 0; + virtual void CassSessionFree(CassSession* session) = 0; + virtual CassFuture* CassSessionConnect(CassSession* session, + const CassCluster* cluster) = 0; + virtual CassFuture* CassSessionClose(CassSession* session) = 0; + virtual CassFuture* CassSessionExecute(CassSession* session, + const CassStatement* statement) = 0; + virtual const CassSchemaMeta* CassSessionGetSchemaMeta( + const CassSession* session) = 0; + virtual CassFuture* CassSessionPrepare(CassSession* session, + const char* query) = 0; + virtual void CassSessionGetMetrics(const CassSession* session, + CassMetrics* output) = 0; + + // CassSchema + virtual void CassSchemaMetaFree(const CassSchemaMeta* schema_meta) = 0; + virtual const CassKeyspaceMeta* CassSchemaMetaKeyspaceByName( + const CassSchemaMeta* schema_meta, const char* keyspace) = 0; + virtual const CassTableMeta* CassKeyspaceMetaTableByName( + const CassKeyspaceMeta* keyspace_meta, const char* table) = 0; + virtual size_t CassTableMetaPartitionKeyCount( + const CassTableMeta* table_meta) = 0; + virtual size_t CassTableMetaClusteringKeyCount( + const CassTableMeta* table_meta) = 0; + + // CassFuture + virtual void CassFutureFree(CassFuture* future) = 0; + virtual CassError CassFutureSetCallback(CassFuture* future, + CassFutureCallback callback, void* data) = 0; + virtual void CassFutureWait(CassFuture* future) = 0; + virtual const CassResult* CassFutureGetResult(CassFuture* future) = 0; + virtual void CassFutureErrorMessage(CassFuture* future, + const char** message, size_t* message_length) = 0; + virtual CassError CassFutureErrorCode(CassFuture* future) = 0; + virtual const CassPrepared* CassFutureGetPrepared(CassFuture* future) = 0; + + // CassResult + virtual void CassResultFree(const CassResult* result) = 0; + virtual size_t CassResultColumnCount(const CassResult* result) = 0; + virtual CassError CassResultColumnName(const CassResult *result, + size_t index, const char** name, size_t* name_length) = 0; + + // CassIterator + virtual void CassIteratorFree(CassIterator* iterator) = 0; + virtual CassIterator* CassIteratorFromResult( + const CassResult* result) = 0; + virtual cass_bool_t CassIteratorNext(CassIterator* iterator) = 0; + virtual const CassRow* CassIteratorGetRow( + const CassIterator* iterator) = 0; + + // CassStatement + virtual CassStatement* CassStatementNew(const char* query, + size_t parameter_count) = 0; + virtual void CassStatementFree(CassStatement* statement) = 0; + virtual CassError CassStatementSetConsistency(CassStatement* statement, + CassConsistency consistency) = 0; + virtual CassError CassStatementBindStringN(CassStatement* statement, + size_t index, const char* value, size_t value_length) = 0; + virtual CassError CassStatementBindInt32(CassStatement* statement, + size_t index, cass_int32_t value) = 0; + virtual CassError CassStatementBindInt64(CassStatement* statement, + size_t index, cass_int64_t value) = 0; + virtual CassError CassStatementBindUuid(CassStatement* statement, + size_t index, CassUuid value) = 0; + virtual CassError CassStatementBindDouble(CassStatement* statement, + size_t index, cass_double_t value) = 0; + virtual CassError CassStatementBindInet(CassStatement* statement, + size_t index, CassInet value) = 0; + virtual CassError CassStatementBindStringByNameN(CassStatement* statement, + const char* name, size_t name_length, const char* value, + size_t value_length) = 0; + virtual CassError CassStatementBindInt32ByName(CassStatement* statement, + const char* name, cass_int32_t value) = 0; + virtual CassError CassStatementBindInt64ByName(CassStatement* statement, + const char* name, cass_int64_t value) = 0; + virtual CassError CassStatementBindUuidByName(CassStatement* statement, + const char* name, CassUuid value) = 0; + virtual CassError CassStatementBindDoubleByName(CassStatement* statement, + const char* name, cass_double_t value) = 0; + virtual CassError CassStatementBindInetByName(CassStatement* statement, + const char* name, CassInet value) = 0; + + // CassPrepare + virtual void CassPreparedFree(const CassPrepared* prepared) = 0; + virtual CassStatement* CassPreparedBind(const CassPrepared* prepared) = 0; + + // CassValue + virtual CassValueType GetCassValueType(const CassValue* value) = 0; + virtual CassError CassValueGetString(const CassValue* value, + const char** output, size_t* output_size) = 0; + virtual CassError CassValueGetInt8(const CassValue* value, + cass_int8_t* output) = 0; + virtual CassError CassValueGetInt16(const CassValue* value, + cass_int16_t* output) = 0; + virtual CassError CassValueGetInt32(const CassValue* value, + cass_int32_t* output) = 0; + virtual CassError CassValueGetInt64(const CassValue* value, + cass_int64_t* output) = 0; + virtual CassError CassValueGetUuid(const CassValue* value, + CassUuid* output) = 0; + virtual CassError CassValueGetDouble(const CassValue* value, + cass_double_t* output) = 0; + virtual CassError CassValueGetInet(const CassValue* value, + CassInet* output) = 0; + + // CassInet + virtual CassInet CassInetInitV4(const cass_uint8_t* address) = 0; + virtual CassInet CassInetInitV6(const cass_uint8_t* address) = 0; + + // CassRow + virtual const CassValue* CassRowGetColumn(const CassRow* row, + size_t index) = 0; + + // CassLog + virtual void CassLogSetLevel(CassLogLevel log_level) = 0; + virtual void CassLogSetCallback(CassLogCallback callback, void* data) = 0; +}; + +class CassDatastaxLibrary : public CassLibrary { + public: + CassDatastaxLibrary(); + virtual ~CassDatastaxLibrary(); + + // CassCluster + virtual CassCluster* CassClusterNew(); + virtual void CassClusterFree(CassCluster* cluster); + virtual CassError CassClusterSetContactPoints(CassCluster* cluster, + const char* contact_points); + virtual CassError CassClusterSetPort(CassCluster* cluster, + int port); + virtual void CassClusterSetCredentials(CassCluster* cluster, + const char* username, const char* password); + virtual CassError CassClusterSetNumThreadsIo(CassCluster* cluster, + unsigned num_threads); + virtual CassError CassClusterSetPendingRequestsHighWaterMark( + CassCluster* cluster, unsigned num_requests); + virtual CassError CassClusterSetPendingRequestsLowWaterMark( + CassCluster* cluster, unsigned num_requests); + virtual CassError CassClusterSetWriteBytesHighWaterMark( + CassCluster* cluster, unsigned num_bytes); + virtual CassError CassClusterSetWriteBytesLowWaterMark( + CassCluster* cluster, unsigned num_bytes); + + // CassSession + virtual CassSession* CassSessionNew(); + virtual void CassSessionFree(CassSession* session); + virtual CassFuture* CassSessionConnect(CassSession* session, + const CassCluster* cluster); + virtual CassFuture* CassSessionClose(CassSession* session); + virtual CassFuture* CassSessionExecute(CassSession* session, + const CassStatement* statement); + virtual const CassSchemaMeta* CassSessionGetSchemaMeta( + const CassSession* session); + virtual CassFuture* CassSessionPrepare(CassSession* session, + const char* query); + virtual void CassSessionGetMetrics(const CassSession* session, + CassMetrics* output); + + // CassSchema + virtual void CassSchemaMetaFree(const CassSchemaMeta* schema_meta); + virtual const CassKeyspaceMeta* CassSchemaMetaKeyspaceByName( + const CassSchemaMeta* schema_meta, const char* keyspace); + virtual const CassTableMeta* CassKeyspaceMetaTableByName( + const CassKeyspaceMeta* keyspace_meta, const char* table); + virtual size_t CassTableMetaPartitionKeyCount( + const CassTableMeta* table_meta); + virtual size_t CassTableMetaClusteringKeyCount( + const CassTableMeta* table_meta); + + // CassFuture + virtual void CassFutureFree(CassFuture* future); + virtual CassError CassFutureSetCallback(CassFuture* future, + CassFutureCallback callback, void* data); + virtual void CassFutureWait(CassFuture* future); + virtual const CassResult* CassFutureGetResult(CassFuture* future); + virtual void CassFutureErrorMessage(CassFuture* future, + const char** message, size_t* message_length); + virtual CassError CassFutureErrorCode(CassFuture* future); + virtual const CassPrepared* CassFutureGetPrepared(CassFuture* future); + + // CassResult + virtual void CassResultFree(const CassResult* result); + virtual size_t CassResultColumnCount(const CassResult* result); + virtual CassError CassResultColumnName(const CassResult *result, + size_t index, const char** name, size_t* name_length); + + // CassIterator + virtual void CassIteratorFree(CassIterator* iterator); + virtual CassIterator* CassIteratorFromResult( + const CassResult* result); + virtual cass_bool_t CassIteratorNext(CassIterator* iterator); + virtual const CassRow* CassIteratorGetRow( + const CassIterator* iterator); + + // CassStatement + virtual CassStatement* CassStatementNew(const char* query, + size_t parameter_count); + virtual void CassStatementFree(CassStatement* statement); + virtual CassError CassStatementSetConsistency(CassStatement* statement, + CassConsistency consistency); + virtual CassError CassStatementBindStringN(CassStatement* statement, + size_t index, const char* value, size_t value_length); + virtual CassError CassStatementBindInt32(CassStatement* statement, + size_t index, cass_int32_t value); + virtual CassError CassStatementBindInt64(CassStatement* statement, + size_t index, cass_int64_t value); + virtual CassError CassStatementBindUuid(CassStatement* statement, + size_t index, CassUuid value); + virtual CassError CassStatementBindDouble(CassStatement* statement, + size_t index, cass_double_t value); + virtual CassError CassStatementBindInet(CassStatement* statement, + size_t index, CassInet value); + virtual CassError CassStatementBindStringByNameN(CassStatement* statement, + const char* name, size_t name_length, const char* value, + size_t value_length); + virtual CassError CassStatementBindInt32ByName(CassStatement* statement, + const char* name, cass_int32_t value); + virtual CassError CassStatementBindInt64ByName(CassStatement* statement, + const char* name, cass_int64_t value); + virtual CassError CassStatementBindUuidByName(CassStatement* statement, + const char* name, CassUuid value); + virtual CassError CassStatementBindDoubleByName(CassStatement* statement, + const char* name, cass_double_t value); + virtual CassError CassStatementBindInetByName(CassStatement* statement, + const char* name, CassInet value); + + // CassPrepare + virtual void CassPreparedFree(const CassPrepared* prepared); + virtual CassStatement* CassPreparedBind(const CassPrepared* prepared); + + // CassValue + virtual CassValueType GetCassValueType(const CassValue* value); + virtual CassError CassValueGetString(const CassValue* value, + const char** output, size_t* output_size); + virtual CassError CassValueGetInt8(const CassValue* value, + cass_int8_t* output); + virtual CassError CassValueGetInt16(const CassValue* value, + cass_int16_t* output); + virtual CassError CassValueGetInt32(const CassValue* value, + cass_int32_t* output); + virtual CassError CassValueGetInt64(const CassValue* value, + cass_int64_t* output); + virtual CassError CassValueGetUuid(const CassValue* value, + CassUuid* output); + virtual CassError CassValueGetDouble(const CassValue* value, + cass_double_t* output); + virtual CassError CassValueGetInet(const CassValue* value, + CassInet* output); + + // CassInet + virtual CassInet CassInetInitV4(const cass_uint8_t* address); + virtual CassInet CassInetInitV6(const cass_uint8_t* address); + + // CassRow + virtual const CassValue* CassRowGetColumn(const CassRow* row, + size_t index); + + // CassLog + virtual void CassLogSetLevel(CassLogLevel log_level); + virtual void CassLogSetCallback(CassLogCallback callback, void* data); +}; + +} // namespace interface +} // namespace cql +} // namespace cass + +#endif // DATABASE_CASSANDRA_CQL_CQL_LIB_IF_H_