Skip to content

Commit

Permalink
Code for control-node retrieving config via cassandra.
Browse files Browse the repository at this point in the history
Add code to read one row from cassandra and process the read columns. Also, add
code to convert the incoming column data to a json document that will finally
be fed to the json parser. The conversion is done by the ConfigCass2JsonAdapter
class and handles the quirks in the data stored in cassandra.
Also, add code to read all-rows on startup (bulk sync).

Change-Id: I0ea706554024e314772147fb35c722d008c05fd8
Partial-Bug: #1632470
  • Loading branch information
tkarwa committed Dec 14, 2016
1 parent 9258aaa commit 90867da
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/ifmap/client/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ libifmapio = env.Library('ifmapio',
'ifmap_state_machine.cc',
'ifmap_channel.cc',
'peer_server_finder.cc',
'config_cass2json_adapter.cc',
'config_cassandra_client.cc',
'config_client_manager.cc',
'config_db_client.cc',
Expand Down
62 changes: 62 additions & 0 deletions src/ifmap/client/config_cass2json_adapter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/

#include "config_cass2json_adapter.h"

#include <iostream>

using namespace std;

const string ConfigCass2JsonAdapter::prop_prefix = "prop:";
const string ConfigCass2JsonAdapter::meta_prefix = "META:";
const string ConfigCass2JsonAdapter::comma_str = ",";

ConfigCass2JsonAdapter::ConfigCass2JsonAdapter(const CassColumnKVVec &cdvec)
: prop_plen_(prop_prefix.size()),
meta_plen_(meta_prefix.size()) {
CreateJsonString(cdvec);
}

// Return true if the caller needs to append a comma. False otherwise.
bool ConfigCass2JsonAdapter::AddOneEntry(const CassColumnKVVec &cdvec, int i) {
// If the key has 'prop:' at the start, remove it.
if (cdvec.at(i).key.substr(0, prop_plen_) == prop_prefix) {
doc_string_ += string(
"\"" + cdvec.at(i).key.substr(prop_plen_) +
"\"" + ": " + cdvec.at(i).value);
} else if (cdvec.at(i).key.substr(0, meta_plen_) == meta_prefix) {
// If the key has 'META:' at the start, ignore the column.
return false;
} else if (cdvec.at(i).key.compare("type") == 0) {
// Prepend the 'type'. This is "our key", with value being the json
// sub-document containing all other columns.
doc_string_ = string("{\n" + cdvec.at(i).value + ":" + "{\n") +
doc_string_;
return false;
} else {
doc_string_ += string("\"" + cdvec.at(i).key + "\"" + ": " +
cdvec.at(i).value);
}
return true;
}

bool ConfigCass2JsonAdapter::CreateJsonString(const CassColumnKVVec &cdvec) {
for (size_t i = 0; i < cdvec.size(); ++i) {
if (AddOneEntry(cdvec, i)) {
doc_string_ += comma_str;
}
}

// Remove the comma after the last entry.
if (doc_string_[doc_string_.size() - 1] == ',') {
doc_string_.erase(doc_string_.size() - 1);
}

// Add one brace to close out the type's value and one to close out the
// whole json document.
doc_string_ += string("\n}\n}");

return true;
}

32 changes: 32 additions & 0 deletions src/ifmap/client/config_cass2json_adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/

#ifndef ctrlplane_config_cass2json_adapter_h
#define ctrlplane_config_cass2json_adapter_h

#include "json_adapter_data.h"

// The purpose of this class is to convert key-value pairs received from
// cassandra into one single json string.
// The user will pass a vector of key-value while creating the object. The
// constructor will create a json string, which will then be accessible via the
// doc_string() accessor.
class ConfigCass2JsonAdapter {
public:
ConfigCass2JsonAdapter(const CassColumnKVVec &cdvec);
const std::string &doc_string() { return doc_string_; }

private:
static const std::string prop_prefix;
static const std::string meta_prefix;
static const std::string comma_str;
bool CreateJsonString(const CassColumnKVVec &cdvec);
bool AddOneEntry(const CassColumnKVVec &cdvec, int i);

std::string doc_string_;
int prop_plen_;
int meta_plen_;
};

#endif // ctrlplane_config_cass2json_adapter_h
122 changes: 117 additions & 5 deletions src/ifmap/client/config_cassandra_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@
#include "config_cassandra_client.h"

#include "base/logging.h"
#include "config_cass2json_adapter.h"
#include "io/event_manager.h"
#include "database/cassandra/cql/cql_if.h"
#include "ifmap/ifmap_log.h"
#include "ifmap/ifmap_log_types.h"

#include "sandesh/common/vns_constants.h"

const std::string ConfigCassandraClient::kUuidTableName = "obj_uuid_table";
const std::string ConfigCassandraClient::kFqnTableName = "obj_fq_name_table";
const std::string ConfigCassandraClient::kCassClientTaskId = "CN:CassClient";
#include <boost/foreach.hpp>
#include <boost/uuid/uuid.hpp>

using namespace std;

const string ConfigCassandraClient::kUuidTableName = "obj_uuid_table";
const string ConfigCassandraClient::kFqnTableName = "obj_fq_name_table";
const string ConfigCassandraClient::kCassClientTaskId = "CN:CassClient";

ConfigCassandraClient::ConfigCassandraClient(EventManager *evm,
const IFMapConfigOptions &options)
: ConfigDbClient(options), evm_(evm) {
const IFMapConfigOptions &options,
ConfigJsonParser *in_parser)
: ConfigDbClient(options), evm_(evm), parser_(in_parser) {
dbif_.reset(new cass::cql::CqlIf(evm, config_db_ips(),
GetFirstConfigDbPort(), "", ""));
}
Expand All @@ -34,6 +41,54 @@ void ConfigCassandraClient::InitRetry() {
sleep(kInitRetryTimeSec);
}

bool ConfigCassandraClient::ParseUuidTableRowResponse(const string &uuid,
const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec) {

string uuid_as_str(string("\"" + uuid + "\""));
cass_data_vec->push_back(JsonAdapterDataType("uuid", uuid_as_str));
BOOST_FOREACH(const GenDb::NewCol &ncol, col_list.columns_) {
assert(ncol.name->size() == 1);
assert(ncol.value->size() == 1);

const GenDb::DbDataValue &dname(ncol.name->at(0));
assert(dname.which() == GenDb::DB_VALUE_BLOB);
GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
string key(reinterpret_cast<const char *>(dname_blob.data()),
dname_blob.size());
std::cout << "key is " << key;

const GenDb::DbDataValue &dvalue(ncol.value->at(0));
assert(dvalue.which() == GenDb::DB_VALUE_STRING);
string value(boost::get<string>(dvalue));
std::cout << " and value is " << value << std::endl;

cass_data_vec->push_back(JsonAdapterDataType(key, value));
}

cout << "Filled in " << cass_data_vec->size() << " entries\n";
return true;
}

bool ConfigCassandraClient::ParseRowAndEnqueueToParser(const string &uuid_key,
const GenDb::ColList &col_list) {
auto_ptr<CassColumnKVVec> cass_data_vec(new CassColumnKVVec());
if (ParseUuidTableRowResponse(uuid_key, col_list, cass_data_vec.get())) {
// Convert column data to json string.
ConfigCass2JsonAdapter *ccja =
new ConfigCass2JsonAdapter(*(cass_data_vec.get()));
cout << "doc-string is\n" << ccja->doc_string() << endl;

// Enqueue ccja to the parser here.
// parser_->Receive(ccja->doc_string().....);
} else {
IFMAP_WARN(IFMapGetRowError, "Parsing row response failed for table",
kUuidTableName);
return false;
}

return true;
}

void ConfigCassandraClient::InitDatabase() {
while (true) {
if (!dbif_->Db_Init()) {
Expand All @@ -58,5 +113,62 @@ void ConfigCassandraClient::InitDatabase() {
}
break;
}
// TODO: remove this after all testing.
//ReadUuidTableRow("e6e5609b-64f8-4238-82e6-163e2ec11d21");
//ReadUuidTableRow("e6e5609b-64f8-4238-82e6-abce2ec1wxyz");
BulkDataSync();
}

bool ConfigCassandraClient::ReadUuidTableRow(const string &uuid_key) {
GenDb::ColList col_list;
GenDb::DbDataValueVec key;

key.push_back(GenDb::Blob(
reinterpret_cast<const uint8_t *>(uuid_key.c_str()), uuid_key.size()));

if (dbif_->Db_GetRow(&col_list, kUuidTableName, key,
GenDb::DbConsistency::QUORUM)) {
if (col_list.columns_.size()) {
ParseRowAndEnqueueToParser(uuid_key, col_list);
}
} else {
IFMAP_WARN(IFMapGetRowError, "GetRow failed for table", kUuidTableName);
return false;
}

return true;
}

bool ConfigCassandraClient::ReadAllUuidTableRows() {
GenDb::ColListVec cl_vec;

if (dbif_->Db_GetAllRows(&cl_vec, kUuidTableName,
GenDb::DbConsistency::QUORUM)) {
cout << "All Rows size " << cl_vec.size() << endl;
int count = 0;
BOOST_FOREACH(const GenDb::ColList &cl_list, cl_vec) {
assert(cl_list.rowkey_.size() == 1);
assert(cl_list.rowkey_[0].which() == GenDb::DB_VALUE_BLOB);
GenDb::Blob brk(boost::get<GenDb::Blob>(cl_list.rowkey_[0]));
string uuid_key(reinterpret_cast<const char *>(brk.data()),
brk.size());
cout << "Row " << ++count << " num-cols "
<< cl_list.columns_.size() << " and key " << uuid_key << endl;
if (cl_list.columns_.size()) {
ParseRowAndEnqueueToParser(uuid_key, cl_list);
}
}
} else {
IFMAP_WARN(IFMapGetRowError, "GetAllRows failed for table",
kUuidTableName);
return false;
}

return true;
}

bool ConfigCassandraClient::BulkDataSync() {
ReadAllUuidTableRows();
return true;
}

21 changes: 17 additions & 4 deletions src/ifmap/client/config_cassandra_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/

#ifndef ctrlplane_cass_config_client_h
#define ctrlplane_cass_config_client_h
#ifndef ctrlplane_config_cass_client_h
#define ctrlplane_config_cass_client_h

#include "config_db_client.h"
#include "config_json_parser.h"
#include "database/gendb_if.h"
#include "json_adapter_data.h"

class EventManager;
class ConfigJsonParser;

/*
* This class has the functionality to interact with the cassandra servers that
Expand All @@ -24,15 +27,25 @@ class ConfigCassandraClient : public ConfigDbClient {

typedef boost::scoped_ptr<GenDb::GenDbIf> GenDbIfPtr;

ConfigCassandraClient(EventManager *evm, const IFMapConfigOptions &options);
ConfigCassandraClient(EventManager *evm, const IFMapConfigOptions &options,
ConfigJsonParser *in_parser);
virtual ~ConfigCassandraClient();
virtual void InitDatabase();
bool ReadUuidTableRow(const std::string &uuid);

private:
void InitRetry();
bool ParseUuidTableRowResponse(const std::string &uuid,
const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec);
void AddUuidEntry(const string &uuid);
bool BulkDataSync();
bool ReadAllUuidTableRows();
bool ParseRowAndEnqueueToParser(const string &uuid_key,
const GenDb::ColList &col_list);

EventManager *evm_;
GenDbIfPtr dbif_;
ConfigJsonParser *parser_;
};

#endif // ctrlplane_cass_config_client_h
#endif // ctrlplane_config_cass_client_h
13 changes: 10 additions & 3 deletions src/ifmap/client/config_client_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,28 @@

#include "base/task.h"
#include "ifmap/ifmap_config_options.h"
#include "ifmap/ifmap_server.h"
#include "io/event_manager.h"

int ConfigClientManager::thread_count_;

ConfigClientManager::ConfigClientManager(EventManager *evm,
IFMapServer *ifmap_server, const IFMapConfigOptions& config_options)
: evm_(evm), ifmap_server_(ifmap_server) {
config_db_client_.reset(new ConfigCassandraClient(evm, config_options));
thread_count_ = TaskScheduler::GetInstance()->HardwareThreadCount();
: evm_(evm), ifmap_server_(ifmap_server) {
config_json_parser_.reset(new ConfigJsonParser(ifmap_server_->database()));
config_db_client_.reset(new ConfigCassandraClient(evm, config_options,
config_json_parser_.get()));
thread_count_ = TaskScheduler::GetInstance()->HardwareThreadCount();
}

void ConfigClientManager::Initialize() {
config_db_client_->InitDatabase();
}

ConfigJsonParser *ConfigClientManager::config_json_parser() const {
return config_json_parser_.get();
}

ConfigDbClient *ConfigClientManager::config_db_client() const {
return config_db_client_.get();
}
Expand Down
3 changes: 3 additions & 0 deletions src/ifmap/client/config_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <boost/scoped_ptr.hpp>

class ConfigDbClient;
class ConfigJsonParser;
class EventManager;
class IFMapServer;
struct IFMapConfigOptions;
Expand All @@ -24,13 +25,15 @@ class ConfigClientManager {
const IFMapConfigOptions& config_options);
void Initialize();
ConfigDbClient *config_db_client() const;
ConfigJsonParser *config_json_parser() const;
bool GetEndOfRibComputed() const;

private:
static int thread_count_;

EventManager *evm_;
IFMapServer *ifmap_server_;
boost::scoped_ptr<ConfigJsonParser> config_json_parser_;
boost::scoped_ptr<ConfigDbClient> config_db_client_;
};

Expand Down
14 changes: 0 additions & 14 deletions src/ifmap/client/config_json_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ using namespace rapidjson;
using namespace std;

ConfigJsonParser::ConfigJsonParser(DB *db) : db_(db) {

}

void ConfigJsonParser::MetadataRegister(const string &metadata,
Expand Down Expand Up @@ -248,20 +247,7 @@ bool ConfigJsonParser::Receive(const string &in_message, bool add_change,
} else {
cout << "No parse error\n";
ParseDocument(document, add_change, origin, &req_list);
//TmpParseDocument(document);
}
return true;
}

// For testing purposes only. Delete before release.
void ConfigJsonParser::TmpParseDocument(const rapidjson::Document &document) {
for (Value::ConstMemberIterator itr = document.MemberBegin();
itr != document.MemberEnd(); ++itr) {
cout << "Key:" << itr->name.GetString();
if (itr->value.IsNull()) cout << endl;
else {
cout << "Value" << endl;
}
}
}

0 comments on commit 90867da

Please sign in to comment.