Skip to content

Commit

Permalink
Merge "CQL driver enhancements"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Dec 1, 2016
2 parents 8bd171a + 5fc35bd commit c28f53e
Show file tree
Hide file tree
Showing 6 changed files with 576 additions and 13 deletions.
194 changes: 181 additions & 13 deletions src/database/cassandra/cql/cql_if.cc
Expand Up @@ -725,15 +725,15 @@ static std::string CassSelectFromTableInternal(const std::string &table,
const GenDb::ColumnNameRange &ck_range) {
std::ostringstream query;
// Table
query << "SELECT * FROM " << table << " WHERE ";
query << "SELECT * FROM " << table;
int rk_size(rkeys.size());
CassQueryPrinter cprinter(query);
for (int i = 0; i < rk_size; i++) {
if (i) {
int key_num(i + 1);
query << " AND key" << key_num << "=";
} else {
query << "key=";
query << " WHERE key=";
}
boost::apply_visitor(cprinter, rkeys[i]);
}
Expand Down Expand Up @@ -794,6 +794,11 @@ std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(
return CassSelectFromTableInternal(table, rkeys, ck_range);
}

std::string CassSelectFromTable(const std::string &table) {
return CassSelectFromTableInternal(table, GenDb::DbDataValueVec(),
GenDb::ColumnNameRange());
}

static GenDb::DbDataValue CassValue2DbDataValue(
interface::CassLibrary *cci, const CassValue *cvalue) {
CassValueType cvtype(cci->GetCassValueType(cvalue));
Expand Down Expand Up @@ -960,7 +965,7 @@ static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc) {
}
}

static void GetDynamicCfResult(interface::CassLibrary *cci,
static void DynamicCfGetResult(interface::CassLibrary *cci,
CassResultPtr *result, size_t rk_count,
size_t ck_count, GenDb::NewColVec *v_columns) {
// Row iterator
Expand Down Expand Up @@ -990,7 +995,60 @@ static void GetDynamicCfResult(interface::CassLibrary *cci,
}
}

static void GetStaticCfResult(interface::CassLibrary *cci,
void DynamicCfGetResult(interface::CassLibrary *cci,
CassResultPtr *result, size_t rk_count,
size_t ck_count, GenDb::ColListVec *v_col_list) {
std::auto_ptr<GenDb::ColList> col_list;
// Row iterator
CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
while (cci->CassIteratorNext(riterator.get())) {
const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
// Iterate over columns
size_t ccount(cci->CassResultColumnCount(result->get()));
// Partiiton key
GenDb::DbDataValueVec rkey;
for (size_t i = 0; i < rk_count; i++) {
const CassValue *cvalue(cci->CassRowGetColumn(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
rkey.push_back(db_value);
}
// Clustering key
GenDb::DbDataValueVec *cnames(new GenDb::DbDataValueVec);
for (size_t i = rk_count; i < rk_count + ck_count; i++) {
const CassValue *cvalue(cci->CassRowGetColumn(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
cnames->push_back(db_value);
}
// Values
GenDb::DbDataValueVec *values(new GenDb::DbDataValueVec);
for (size_t i = rk_count + ck_count; i < ccount; i++) {
const CassValue *cvalue(cci->CassRowGetColumn(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
values->push_back(db_value);
}
GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
// Do we need a new ColList?
if (!col_list.get()) {
col_list.reset(new GenDb::ColList);
col_list->rowkey_ = rkey;
}
if (rkey != col_list->rowkey_) {
v_col_list->push_back(col_list.release());
col_list.reset(new GenDb::ColList);
col_list->rowkey_ = rkey;
}
GenDb::NewColVec *v_columns(&col_list->columns_);
v_columns->push_back(column);
}
if (col_list.get()) {
v_col_list->push_back(col_list.release());
}
}

static void StaticCfGetResult(interface::CassLibrary *cci,
CassResultPtr *result, GenDb::NewColVec *v_columns) {
// Row iterator
CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
Expand All @@ -1016,6 +1074,55 @@ static void GetStaticCfResult(interface::CassLibrary *cci,
}
}

void StaticCfGetResult(interface::CassLibrary *cci,
CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list) {
std::auto_ptr<GenDb::ColList> col_list;
// Row iterator
CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
while (cci->CassIteratorNext(riterator.get())) {
const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
// Iterate over columns
size_t ccount(cci->CassResultColumnCount(result->get()));
// Partiiton key
GenDb::DbDataValueVec rkey;
for (size_t i = 0; i < rk_count; i++) {
const CassValue *cvalue(cci->CassRowGetColumn(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
rkey.push_back(db_value);
}
// Do we need a new ColList?
if (!col_list.get()) {
col_list.reset(new GenDb::ColList);
col_list->rowkey_ = rkey;
}
if (rkey != col_list->rowkey_) {
v_col_list->push_back(col_list.release());
col_list.reset(new GenDb::ColList);
col_list->rowkey_ = rkey;
}
GenDb::NewColVec *v_columns(&col_list->columns_);
for (size_t i = 0; i < ccount; i++) {
CassString cname;
CassError rc(cci->CassResultColumnName(result->get(), i,
&cname.data, &cname.length));
assert(rc == CASS_OK);
const CassValue *cvalue(cci->CassRowGetColumn(row, i));
assert(cvalue);
GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
if (db_value.which() == GenDb::DB_VALUE_BLANK) {
continue;
}
GenDb::NewCol *column(new GenDb::NewCol(
std::string(cname.data, cname.length), db_value, 0));
v_columns->push_back(column);
}
}
if (col_list.get()) {
v_col_list->push_back(col_list.release());
}
}

static void OnExecuteQueryAsync(CassFuture *future, void *data) {
assert(data);
std::auto_ptr<CassAsyncQueryContext> ctx(
Expand All @@ -1036,16 +1143,16 @@ static void OnExecuteQueryAsync(CassFuture *future, void *data) {
CassResultPtr result(cci->CassFutureGetResult(future), cci);
// In case of select parse the results
if (cci->CassResultColumnCount(result.get())) {
std::auto_ptr<GenDb::ColList> collist(new GenDb::ColList);
collist->cfname_ = rctx->cf_name_;
collist->rowkey_ = rctx->row_key_;
std::auto_ptr<GenDb::ColList> col_list(new GenDb::ColList);
col_list->cfname_ = rctx->cf_name_;
col_list->rowkey_ = rctx->row_key_;
if (rctx->is_dynamic_cf_) {
GetDynamicCfResult(cci, &result, rctx->rk_count_,
rctx->ck_count_, &collist->columns_);
DynamicCfGetResult(cci, &result, rctx->rk_count_,
rctx->ck_count_, &col_list->columns_);
} else {
GetStaticCfResult(cci, &result, &collist->columns_);
StaticCfGetResult(cci, &result, &col_list->columns_);
}
ctx->cb_(db_rc, collist);
ctx->cb_(db_rc, col_list);
return;
}
}
Expand Down Expand Up @@ -1110,7 +1217,21 @@ static bool DynamicCfGetResultSync(interface::CassLibrary *cci,
if (!success) {
return success;
}
GetDynamicCfResult(cci, &result, rk_count, ck_count, v_columns);
DynamicCfGetResult(cci, &result, rk_count, ck_count, v_columns);
return success;
}

static bool DynamicCfGetResultSync(interface::CassLibrary *cci,
CassSession *session, const char *query,
size_t rk_count, size_t ck_count, CassConsistency consistency,
GenDb::ColListVec *v_col_list) {
CassResultPtr result(NULL, cci);
bool success(ExecuteQueryResultSync(cci, session, query, &result,
consistency));
if (!success) {
return success;
}
DynamicCfGetResult(cci, &result, rk_count, ck_count, v_col_list);
return success;
}

Expand All @@ -1134,7 +1255,20 @@ static bool StaticCfGetResultSync(interface::CassLibrary *cci,
if (!success) {
return success;
}
GetStaticCfResult(cci, &result, v_columns);
StaticCfGetResult(cci, &result, v_columns);
return success;
}

static bool StaticCfGetResultSync(interface::CassLibrary *cci,
CassSession *session, const char *query, size_t rk_count,
CassConsistency consistency, GenDb::ColListVec *v_col_list) {
CassResultPtr result(NULL, cci);
bool success(ExecuteQueryResultSync(cci, session, query, &result,
consistency));
if (!success) {
return success;
}
StaticCfGetResult(cci, &result, rk_count, v_col_list);
return success;
}

Expand Down Expand Up @@ -1565,6 +1699,27 @@ bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
}
}

bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
CassConsistency consistency, GenDb::ColListVec *out) {
if (session_state_ != SessionState::CONNECTED) {
return false;
}
std::string query(impl::CassSelectFromTable(cfname));
size_t rk_count;
assert(impl::GetCassTablePartitionKeyCount(cci_, session_.get(),
keyspace_, cfname, &rk_count));
if (IsTableStatic(cfname)) {
return impl::StaticCfGetResultSync(cci_, session_.get(),
query.c_str(), rk_count, consistency, out);
} else {
size_t ck_count;
assert(impl::GetCassTableClusteringKeyCount(cci_, session_.get(),
keyspace_, cfname, &ck_count));
return impl::DynamicCfGetResultSync(cci_, session_.get(),
query.c_str(), rk_count, ck_count, consistency, out);
}
}

bool CqlIfImpl::SelectFromTableClusteringKeyRangeSync(const std::string &cfname,
const GenDb::DbDataValueVec &rkey,
const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
Expand Down Expand Up @@ -2155,6 +2310,19 @@ bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
return true;
}

bool CqlIf::Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname,
GenDb::DbConsistency::type dconsistency) {
CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
bool success(impl_->SelectFromTableSync(cfname, consistency, out));
if (!success) {
IncrementTableReadFailStats(cfname);
IncrementErrors(GenDb::IfErrors::ERR_READ_COLUMN);
return success;
}
IncrementTableReadStats(cfname);
return success;
}

// Queue
bool CqlIf::Db_GetQueueStats(uint64_t *queue_count,
uint64_t *enqueues) const {
Expand Down
2 changes: 2 additions & 0 deletions src/database/cassandra/cql/cql_if.h
Expand Up @@ -79,6 +79,8 @@ class CqlIf : public GenDb::GenDbIf {
const GenDb::ColumnNameRange &crange,
GenDb::DbConsistency::type dconsistency, int task_id,
int task_instance, GenDb::GenDbIf::DbGetRowCb cb);
virtual bool Db_GetAllRows(GenDb::ColListVec *out,
const std::string &cfname, GenDb::DbConsistency::type dconsistency);
// Queue
virtual bool Db_GetQueueStats(uint64_t *queue_count,
uint64_t *enqueues) const;
Expand Down
10 changes: 10 additions & 0 deletions src/database/cassandra/cql/cql_if_impl.h
Expand Up @@ -30,6 +30,7 @@ std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf);
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf);
std::string CassSelectFromTable(const std::string &table);
std::string PartitionKey2CassSelectFromTable(const std::string &table,
const GenDb::DbDataValueVec &rkeys);
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(
Expand Down Expand Up @@ -186,6 +187,13 @@ struct CassAsyncQueryContext {
boost::scoped_ptr<CassQueryResultContext> result_ctx_;
};

void DynamicCfGetResult(interface::CassLibrary *cci,
CassResultPtr *result, size_t rk_count,
size_t ck_count, GenDb::ColListVec *v_col_list);
void StaticCfGetResult(interface::CassLibrary *cci,
CassResultPtr *result, size_t rk_count,
GenDb::ColListVec *v_col_list);

} // namespace impl

//
Expand Down Expand Up @@ -224,6 +232,8 @@ class CqlIfImpl {
bool SelectFromTableSync(const std::string &cfname,
const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
GenDb::NewColVec *out);
bool SelectFromTableSync(const std::string &cfname,
CassConsistency consistency, GenDb::ColListVec *out);
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname,
const GenDb::DbDataValueVec &rkey,
const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
Expand Down

0 comments on commit c28f53e

Please sign in to comment.