Skip to content

Commit

Permalink
Merge "Add CQL implementation of Db_GetMultiRow() needed for query en…
Browse files Browse the repository at this point in the history
…gine Partial-Bug: #1522930 Depends-On: I80cbc408fea5e5fb5cbf69e9bec0833629a20677 Change-Id: I0f47e5e29d193cc7a911131808052472921d0285"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Dec 14, 2015
2 parents 9c03e2d + fa0ec06 commit 2c11c33
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 60 deletions.
250 changes: 208 additions & 42 deletions src/database/cassandra/cql/cql_if.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,9 @@ std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
return query.str();
}

std::string PartitionKey2CassSelectFromTable(const std::string &table,
const GenDb::DbDataValueVec &rkeys) {
static std::string CassSelectFromTableInternal(const std::string &table,
const GenDb::DbDataValueVec &rkeys,
const GenDb::ColumnNameRange &ck_range) {
std::ostringstream query;
// Table
query << "SELECT * FROM " << table << " WHERE ";
Expand All @@ -404,9 +405,63 @@ std::string PartitionKey2CassSelectFromTable(const std::string &table,
}
boost::apply_visitor(cprinter, rkeys[i]);
}
if (!ck_range.IsEmpty()) {
if (!ck_range.start_.empty()) {
int ck_start_size(ck_range.start_.size());
std::ostringstream start_ss;
start_ss << " >= (";
GenDb::DbDataValueCqlPrinter start_vprinter(start_ss);
query << " AND (";
for (int i = 0; i < ck_start_size; i++) {
if (i) {
query << ", ";
start_ss << ", ";
}
int cnum(i + 1);
query << "column" << cnum;
boost::apply_visitor(start_vprinter, ck_range.start_[i]);
}
query << ")";
start_ss << ")";
query << start_ss.str();
}
if (!ck_range.finish_.empty()) {
int ck_finish_size(ck_range.finish_.size());
std::ostringstream finish_ss;
finish_ss << " <= (";
GenDb::DbDataValueCqlPrinter finish_vprinter(finish_ss);
query << " AND (";
for (int i = 0; i < ck_finish_size; i++) {
if (i) {
query << ", ";
finish_ss << ", ";
}
int cnum(i + 1);
query << "column" << cnum;
boost::apply_visitor(finish_vprinter, ck_range.finish_[i]);
}
query << ")";
finish_ss << ")";
query << finish_ss.str();
}
if (ck_range.count_) {
query << " LIMIT " << ck_range.count_;
}
}
return query.str();
}

std::string PartitionKey2CassSelectFromTable(const std::string &table,
const GenDb::DbDataValueVec &rkeys) {
return CassSelectFromTableInternal(table, rkeys, GenDb::ColumnNameRange());
}

std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(
const std::string &table, const GenDb::DbDataValueVec &rkeys,
const GenDb::ColumnNameRange &ck_range) {
return CassSelectFromTableInternal(table, rkeys, ck_range);
}

static GenDb::DbDataValue CassValue2DbDataValue(const CassValue *cvalue) {
CassValueType cvtype(cass_value_type(cvalue));
switch (cvtype) {
Expand Down Expand Up @@ -489,8 +544,8 @@ static bool ExecuteQuerySync(CassSession *session, const char *query,
return ExecuteQuerySyncInternal(session, query, NULL, consistency);
}

static bool StaticCfGetResultSync(CassSession *session,
const char *query, CassConsistency consistency,
static bool DynamicCfGetResultSync(CassSession *session, const char *query,
size_t rk_count, size_t ck_count, CassConsistency consistency,
GenDb::NewColVec *v_columns) {
CassResultPtr result;
bool success(ExecuteQuerySyncInternal(session, query, &result,
Expand All @@ -500,6 +555,43 @@ static bool StaticCfGetResultSync(CassSession *session,
}
// Row iterator
CassIteratorPtr riterator(cass_iterator_from_result(result.get()));
while (cass_iterator_next(riterator.get())) {
const CassRow *row(cass_iterator_get_row(riterator.get()));
// Iterate over columns
size_t ccount(cass_result_column_count(result.get()));
assert(ccount == rk_count + ck_count + 1);
// Clustering key
GenDb::DbDataValueVec *cnames(new GenDb::DbDataValueVec);
for (size_t i = rk_count; i < rk_count + ck_count; i++) {
const CassValue *cvalue(cass_row_get_column(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue));
cnames->push_back(db_value);
}
// Values
GenDb::DbDataValueVec *values(new GenDb::DbDataValueVec);
for (size_t i = rk_count + ck_count; i < ccount; i++) {
const CassValue *cvalue(cass_row_get_column(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cvalue));
values->push_back(db_value);
}
GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
v_columns->push_back(column);
}
return success;
}

static bool StaticCfGetResultSync(CassSession *session, const char *query,
CassConsistency consistency, GenDb::NewColVec *v_columns) {
CassResultPtr result;
bool success(ExecuteQuerySyncInternal(session, query, &result,
consistency));
if (!success) {
return success;
}
// Row iterator
CassIteratorPtr riterator(cass_iterator_from_result(result.get()));
while (cass_iterator_next(riterator.get())) {
const CassRow *row(cass_iterator_get_row(riterator.get()));
// Iterate over columns
Expand Down Expand Up @@ -531,6 +623,65 @@ static bool SyncFutureWait(CassFuture *future) {
return rc == CASS_OK;
}

static const CassTableMeta * GetCassTableMeta(const CassSchemaMeta *schema_meta,
const std::string &keyspace, const std::string &table) {
const CassKeyspaceMeta *keyspace_meta(
cass_schema_meta_keyspace_by_name(schema_meta,
keyspace.c_str()));
if (keyspace_meta == NULL) {
CQLIF_LOG_ERR("No keyspace schema: Keyspace: " << keyspace <<
", Table: " << table);
return NULL;
}
std::string table_lower(table);
boost::algorithm::to_lower(table_lower);
const CassTableMeta *table_meta(
cass_keyspace_meta_table_by_name(keyspace_meta,
table_lower.c_str()));
if (table_meta == NULL) {
CQLIF_LOG_ERR("No table schema: Keyspace: " << keyspace <<
", Table: " << table_lower);
return NULL;
}
return table_meta;
}

static bool GetCassTableClusteringKeyCount(CassSession *session,
const std::string &keyspace, const std::string &table, size_t *ck_count) {
impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta(
session));
if (schema_meta.get() == NULL) {
CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace <<
", Table: " << table);
return false;
}
const CassTableMeta *table_meta(impl::GetCassTableMeta(
schema_meta.get(), keyspace, table));
if (table_meta == NULL) {
return false;
}
*ck_count = cass_table_meta_clustering_key_count(table_meta);
return true;
}

static bool GetCassTablePartitionKeyCount(CassSession *session,
const std::string &keyspace, const std::string &table, size_t *rk_count) {
impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta(
session));
if (schema_meta.get() == NULL) {
CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace <<
", Table: " << table);
return false;
}
const CassTableMeta *table_meta(impl::GetCassTableMeta(
schema_meta.get(), keyspace, table));
if (table_meta == NULL) {
return false;
}
*rk_count = cass_table_meta_partition_key_count(table_meta);
return true;
}

static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel) {
switch (clevel) {
case CASS_LOG_DISABLED:
Expand Down Expand Up @@ -684,7 +835,8 @@ class CqlIf::CqlIfImpl {

bool IsTableStatic(const std::string &table) {
size_t ck_count;
assert(GetTableClusteringKeyCount(keyspace_, table, &ck_count));
assert(impl::GetCassTableClusteringKeyCount(session_.get(), keyspace_,
table, &ck_count));
return ck_count == 0;
}

Expand All @@ -707,11 +859,39 @@ class CqlIf::CqlIfImpl {
bool SelectFromTableSync(const std::string &cfname,
const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
GenDb::NewColVec *out) {
std::string query(impl::PartitionKey2CassSelectFromTable(cfname,
rkey));
assert(IsTableStatic(cfname));
return impl::StaticCfGetResultSync(session_.get(),
query.c_str(), consistency, out);
std::string query(impl::PartitionKey2CassSelectFromTable(cfname,
rkey));
if (IsTableStatic(cfname)) {
return impl::StaticCfGetResultSync(session_.get(),
query.c_str(), consistency, out);
} else {
size_t rk_count;
assert(impl::GetCassTablePartitionKeyCount(session_.get(),
keyspace_, cfname, &rk_count));
size_t ck_count;
assert(impl::GetCassTableClusteringKeyCount(session_.get(),
keyspace_, cfname, &ck_count));
return impl::DynamicCfGetResultSync(session_.get(),
query.c_str(), rk_count, ck_count, consistency, out);
}
}

bool SelectFromTableSync(const std::string &cfname,
const GenDb::DbDataValueVec &rkey,
const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
GenDb::NewColVec *out) {
std::string query(
impl::PartitionKeyAndClusteringKeyRange2CassSelectFromTable(cfname,
rkey, ck_range));
assert(IsTableDynamic(cfname));
size_t rk_count;
assert(impl::GetCassTablePartitionKeyCount(session_.get(),
keyspace_, cfname, &rk_count));
size_t ck_count;
assert(impl::GetCassTableClusteringKeyCount(session_.get(),
keyspace_, cfname, &ck_count));
return impl::DynamicCfGetResultSync(session_.get(),
query.c_str(), rk_count, ck_count, consistency, out);
}

void ConnectAsync() {
Expand Down Expand Up @@ -812,37 +992,6 @@ class CqlIf::CqlIfImpl {
session_state_ = SessionState::DISCONNECTED;
}

bool GetTableClusteringKeyCount(const std::string &keyspace,
const std::string &table, size_t *ck_count) {
impl::CassSchemaMetaPtr schema_meta(cass_session_get_schema_meta(
session_.get()));
if (schema_meta.get() == NULL) {
CQLIF_LOG_ERR("No schema meta: Keyspace: " << keyspace <<
", Table: " << table);
return false;
}
const CassKeyspaceMeta *keyspace_meta(
cass_schema_meta_keyspace_by_name(schema_meta.get(),
keyspace.c_str()));
if (keyspace_meta == NULL) {
CQLIF_LOG_ERR("No keyspace schema: Keyspace: " << keyspace <<
", Table: " << table);
return false;
}
std::string table_lower(table);
boost::algorithm::to_lower(table_lower);
const CassTableMeta *table_meta(
cass_keyspace_meta_table_by_name(keyspace_meta,
table_lower.c_str()));
if (table_meta == NULL) {
CQLIF_LOG_ERR("No table schema: Keyspace: " << keyspace <<
", Table: " << table_lower);
return false;
}
*ck_count = cass_table_meta_clustering_key_count(table_meta);
return true;
}

static const char * kQCreateKeyspaceIfNotExists;
static const char * kQUseKeyspace;
static const char * kTaskName;
Expand Down Expand Up @@ -961,9 +1110,26 @@ bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
&out->columns_);
}

bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
return Db_GetMultiRow(out, cfname, v_rowkey, GenDb::ColumnNameRange());
}

bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
const std::vector<GenDb::DbDataValueVec> &v_rowkey,
GenDb::ColumnNameRange *crange) {
const GenDb::ColumnNameRange &crange) {
BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
bool success(impl_->SelectFromTableSync(cfname, rkey, crange,
CASS_CONSISTENCY_ONE, &v_columns->columns_));
if (!success) {
CQLIF_LOG_ERR("SELECT FROM Table: " << cfname << " Partition Key: "
<< GenDb::DbDataValueVecToString(rkey) << " Clustering Key Range: " <<
crange.ToString() << " FAILED");
return false;
}
out->push_back(v_columns.release());
}
return true;
}

Expand Down
7 changes: 5 additions & 2 deletions src/database/cassandra/cql/cql_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ class CqlIf : public GenDb::GenDbIf {
const GenDb::DbDataValueVec &rowkey);
virtual bool Db_GetMultiRow(GenDb::ColListVec *out,
const std::string &cfname,
const std::vector<GenDb::DbDataValueVec> &key,
GenDb::ColumnNameRange *crange_ptr = NULL);
const std::vector<GenDb::DbDataValueVec> &v_rowkey);
virtual bool Db_GetMultiRow(GenDb::ColListVec *out,
const std::string &cfname,
const std::vector<GenDb::DbDataValueVec> &v_rowkey,
const GenDb::ColumnNameRange &crange);
// Queue
virtual bool Db_GetQueueStats(uint64_t *queue_count,
uint64_t *enqueues) const;
Expand Down
3 changes: 3 additions & 0 deletions src/database/cassandra/cql/cql_if_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
std::string PartitionKey2CassSelectFromTable(const std::string &table,
const GenDb::DbDataValueVec &rkeys);
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(
const std::string &table, const GenDb::DbDataValueVec &rkeys,
const GenDb::ColumnNameRange &crange);

} // namespace impl
} // namespace cql
Expand Down

0 comments on commit 2c11c33

Please sign in to comment.