Skip to content

Commit

Permalink
Merge "Code for control-node retrieving config via cassandra."
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Dec 20, 2016
2 parents f0b7ea8 + 90867da commit 7baa34b
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/ifmap/client/SConscript
Expand Up @@ -21,6 +21,7 @@ libifmapio = env.Library('ifmapio',
'ifmap_state_machine.cc',
'ifmap_channel.cc',
'peer_server_finder.cc',
'config_cass2json_adapter.cc',
'config_cassandra_client.cc',
'config_client_manager.cc',
'config_db_client.cc',
Expand Down
62 changes: 62 additions & 0 deletions src/ifmap/client/config_cass2json_adapter.cc
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/

#include "config_cass2json_adapter.h"

#include <iostream>

using namespace std;

const string ConfigCass2JsonAdapter::prop_prefix = "prop:";
const string ConfigCass2JsonAdapter::meta_prefix = "META:";
const string ConfigCass2JsonAdapter::comma_str = ",";

ConfigCass2JsonAdapter::ConfigCass2JsonAdapter(const CassColumnKVVec &cdvec)
: prop_plen_(prop_prefix.size()),
meta_plen_(meta_prefix.size()) {
CreateJsonString(cdvec);
}

// Return true if the caller needs to append a comma. False otherwise.
bool ConfigCass2JsonAdapter::AddOneEntry(const CassColumnKVVec &cdvec, int i) {
// If the key has 'prop:' at the start, remove it.
if (cdvec.at(i).key.substr(0, prop_plen_) == prop_prefix) {
doc_string_ += string(
"\"" + cdvec.at(i).key.substr(prop_plen_) +
"\"" + ": " + cdvec.at(i).value);
} else if (cdvec.at(i).key.substr(0, meta_plen_) == meta_prefix) {
// If the key has 'META:' at the start, ignore the column.
return false;
} else if (cdvec.at(i).key.compare("type") == 0) {
// Prepend the 'type'. This is "our key", with value being the json
// sub-document containing all other columns.
doc_string_ = string("{\n" + cdvec.at(i).value + ":" + "{\n") +
doc_string_;
return false;
} else {
doc_string_ += string("\"" + cdvec.at(i).key + "\"" + ": " +
cdvec.at(i).value);
}
return true;
}

bool ConfigCass2JsonAdapter::CreateJsonString(const CassColumnKVVec &cdvec) {
for (size_t i = 0; i < cdvec.size(); ++i) {
if (AddOneEntry(cdvec, i)) {
doc_string_ += comma_str;
}
}

// Remove the comma after the last entry.
if (doc_string_[doc_string_.size() - 1] == ',') {
doc_string_.erase(doc_string_.size() - 1);
}

// Add one brace to close out the type's value and one to close out the
// whole json document.
doc_string_ += string("\n}\n}");

return true;
}

32 changes: 32 additions & 0 deletions src/ifmap/client/config_cass2json_adapter.h
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/

#ifndef ctrlplane_config_cass2json_adapter_h
#define ctrlplane_config_cass2json_adapter_h

#include "json_adapter_data.h"

// The purpose of this class is to convert key-value pairs received from
// cassandra into one single json string.
// The user will pass a vector of key-value while creating the object. The
// constructor will create a json string, which will then be accessible via the
// doc_string() accessor.
class ConfigCass2JsonAdapter {
public:
ConfigCass2JsonAdapter(const CassColumnKVVec &cdvec);
const std::string &doc_string() { return doc_string_; }

private:
static const std::string prop_prefix;
static const std::string meta_prefix;
static const std::string comma_str;
bool CreateJsonString(const CassColumnKVVec &cdvec);
bool AddOneEntry(const CassColumnKVVec &cdvec, int i);

std::string doc_string_;
int prop_plen_;
int meta_plen_;
};

#endif // ctrlplane_config_cass2json_adapter_h
122 changes: 117 additions & 5 deletions src/ifmap/client/config_cassandra_client.cc
Expand Up @@ -5,20 +5,27 @@
#include "config_cassandra_client.h"

#include "base/logging.h"
#include "config_cass2json_adapter.h"
#include "io/event_manager.h"
#include "database/cassandra/cql/cql_if.h"
#include "ifmap/ifmap_log.h"
#include "ifmap/ifmap_log_types.h"

#include "sandesh/common/vns_constants.h"

const std::string ConfigCassandraClient::kUuidTableName = "obj_uuid_table";
const std::string ConfigCassandraClient::kFqnTableName = "obj_fq_name_table";
const std::string ConfigCassandraClient::kCassClientTaskId = "CN:CassClient";
#include <boost/foreach.hpp>
#include <boost/uuid/uuid.hpp>

using namespace std;

const string ConfigCassandraClient::kUuidTableName = "obj_uuid_table";
const string ConfigCassandraClient::kFqnTableName = "obj_fq_name_table";
const string ConfigCassandraClient::kCassClientTaskId = "CN:CassClient";

ConfigCassandraClient::ConfigCassandraClient(EventManager *evm,
const IFMapConfigOptions &options)
: ConfigDbClient(options), evm_(evm) {
const IFMapConfigOptions &options,
ConfigJsonParser *in_parser)
: ConfigDbClient(options), evm_(evm), parser_(in_parser) {
dbif_.reset(new cass::cql::CqlIf(evm, config_db_ips(),
GetFirstConfigDbPort(), "", ""));
}
Expand All @@ -34,6 +41,54 @@ void ConfigCassandraClient::InitRetry() {
sleep(kInitRetryTimeSec);
}

bool ConfigCassandraClient::ParseUuidTableRowResponse(const string &uuid,
const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec) {

string uuid_as_str(string("\"" + uuid + "\""));
cass_data_vec->push_back(JsonAdapterDataType("uuid", uuid_as_str));
BOOST_FOREACH(const GenDb::NewCol &ncol, col_list.columns_) {
assert(ncol.name->size() == 1);
assert(ncol.value->size() == 1);

const GenDb::DbDataValue &dname(ncol.name->at(0));
assert(dname.which() == GenDb::DB_VALUE_BLOB);
GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
string key(reinterpret_cast<const char *>(dname_blob.data()),
dname_blob.size());
std::cout << "key is " << key;

const GenDb::DbDataValue &dvalue(ncol.value->at(0));
assert(dvalue.which() == GenDb::DB_VALUE_STRING);
string value(boost::get<string>(dvalue));
std::cout << " and value is " << value << std::endl;

cass_data_vec->push_back(JsonAdapterDataType(key, value));
}

cout << "Filled in " << cass_data_vec->size() << " entries\n";
return true;
}

bool ConfigCassandraClient::ParseRowAndEnqueueToParser(const string &uuid_key,
const GenDb::ColList &col_list) {
auto_ptr<CassColumnKVVec> cass_data_vec(new CassColumnKVVec());
if (ParseUuidTableRowResponse(uuid_key, col_list, cass_data_vec.get())) {
// Convert column data to json string.
ConfigCass2JsonAdapter *ccja =
new ConfigCass2JsonAdapter(*(cass_data_vec.get()));
cout << "doc-string is\n" << ccja->doc_string() << endl;

// Enqueue ccja to the parser here.
// parser_->Receive(ccja->doc_string().....);
} else {
IFMAP_WARN(IFMapGetRowError, "Parsing row response failed for table",
kUuidTableName);
return false;
}

return true;
}

void ConfigCassandraClient::InitDatabase() {
while (true) {
if (!dbif_->Db_Init()) {
Expand All @@ -58,5 +113,62 @@ void ConfigCassandraClient::InitDatabase() {
}
break;
}
// TODO: remove this after all testing.
//ReadUuidTableRow("e6e5609b-64f8-4238-82e6-163e2ec11d21");
//ReadUuidTableRow("e6e5609b-64f8-4238-82e6-abce2ec1wxyz");
BulkDataSync();
}

bool ConfigCassandraClient::ReadUuidTableRow(const string &uuid_key) {
GenDb::ColList col_list;
GenDb::DbDataValueVec key;

key.push_back(GenDb::Blob(
reinterpret_cast<const uint8_t *>(uuid_key.c_str()), uuid_key.size()));

if (dbif_->Db_GetRow(&col_list, kUuidTableName, key,
GenDb::DbConsistency::QUORUM)) {
if (col_list.columns_.size()) {
ParseRowAndEnqueueToParser(uuid_key, col_list);
}
} else {
IFMAP_WARN(IFMapGetRowError, "GetRow failed for table", kUuidTableName);
return false;
}

return true;
}

bool ConfigCassandraClient::ReadAllUuidTableRows() {
GenDb::ColListVec cl_vec;

if (dbif_->Db_GetAllRows(&cl_vec, kUuidTableName,
GenDb::DbConsistency::QUORUM)) {
cout << "All Rows size " << cl_vec.size() << endl;
int count = 0;
BOOST_FOREACH(const GenDb::ColList &cl_list, cl_vec) {
assert(cl_list.rowkey_.size() == 1);
assert(cl_list.rowkey_[0].which() == GenDb::DB_VALUE_BLOB);
GenDb::Blob brk(boost::get<GenDb::Blob>(cl_list.rowkey_[0]));
string uuid_key(reinterpret_cast<const char *>(brk.data()),
brk.size());
cout << "Row " << ++count << " num-cols "
<< cl_list.columns_.size() << " and key " << uuid_key << endl;
if (cl_list.columns_.size()) {
ParseRowAndEnqueueToParser(uuid_key, cl_list);
}
}
} else {
IFMAP_WARN(IFMapGetRowError, "GetAllRows failed for table",
kUuidTableName);
return false;
}

return true;
}

bool ConfigCassandraClient::BulkDataSync() {
ReadAllUuidTableRows();
return true;
}

21 changes: 17 additions & 4 deletions src/ifmap/client/config_cassandra_client.h
Expand Up @@ -2,13 +2,16 @@
* Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
*/

#ifndef ctrlplane_cass_config_client_h
#define ctrlplane_cass_config_client_h
#ifndef ctrlplane_config_cass_client_h
#define ctrlplane_config_cass_client_h

#include "config_db_client.h"
#include "config_json_parser.h"
#include "database/gendb_if.h"
#include "json_adapter_data.h"

class EventManager;
class ConfigJsonParser;

/*
* This class has the functionality to interact with the cassandra servers that
Expand All @@ -24,15 +27,25 @@ class ConfigCassandraClient : public ConfigDbClient {

typedef boost::scoped_ptr<GenDb::GenDbIf> GenDbIfPtr;

ConfigCassandraClient(EventManager *evm, const IFMapConfigOptions &options);
ConfigCassandraClient(EventManager *evm, const IFMapConfigOptions &options,
ConfigJsonParser *in_parser);
virtual ~ConfigCassandraClient();
virtual void InitDatabase();
bool ReadUuidTableRow(const std::string &uuid);

private:
void InitRetry();
bool ParseUuidTableRowResponse(const std::string &uuid,
const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec);
void AddUuidEntry(const string &uuid);
bool BulkDataSync();
bool ReadAllUuidTableRows();
bool ParseRowAndEnqueueToParser(const string &uuid_key,
const GenDb::ColList &col_list);

EventManager *evm_;
GenDbIfPtr dbif_;
ConfigJsonParser *parser_;
};

#endif // ctrlplane_cass_config_client_h
#endif // ctrlplane_config_cass_client_h
13 changes: 10 additions & 3 deletions src/ifmap/client/config_client_manager.cc
Expand Up @@ -7,21 +7,28 @@

#include "base/task.h"
#include "ifmap/ifmap_config_options.h"
#include "ifmap/ifmap_server.h"
#include "io/event_manager.h"

int ConfigClientManager::thread_count_;

ConfigClientManager::ConfigClientManager(EventManager *evm,
IFMapServer *ifmap_server, const IFMapConfigOptions& config_options)
: evm_(evm), ifmap_server_(ifmap_server) {
config_db_client_.reset(new ConfigCassandraClient(evm, config_options));
thread_count_ = TaskScheduler::GetInstance()->HardwareThreadCount();
: evm_(evm), ifmap_server_(ifmap_server) {
config_json_parser_.reset(new ConfigJsonParser(ifmap_server_->database()));
config_db_client_.reset(new ConfigCassandraClient(evm, config_options,
config_json_parser_.get()));
thread_count_ = TaskScheduler::GetInstance()->HardwareThreadCount();
}

void ConfigClientManager::Initialize() {
config_db_client_->InitDatabase();
}

ConfigJsonParser *ConfigClientManager::config_json_parser() const {
return config_json_parser_.get();
}

ConfigDbClient *ConfigClientManager::config_db_client() const {
return config_db_client_.get();
}
Expand Down
3 changes: 3 additions & 0 deletions src/ifmap/client/config_client_manager.h
Expand Up @@ -8,6 +8,7 @@
#include <boost/scoped_ptr.hpp>

class ConfigDbClient;
class ConfigJsonParser;
class EventManager;
class IFMapServer;
struct IFMapConfigOptions;
Expand All @@ -24,13 +25,15 @@ class ConfigClientManager {
const IFMapConfigOptions& config_options);
void Initialize();
ConfigDbClient *config_db_client() const;
ConfigJsonParser *config_json_parser() const;
bool GetEndOfRibComputed() const;

private:
static int thread_count_;

EventManager *evm_;
IFMapServer *ifmap_server_;
boost::scoped_ptr<ConfigJsonParser> config_json_parser_;
boost::scoped_ptr<ConfigDbClient> config_db_client_;
};

Expand Down
14 changes: 0 additions & 14 deletions src/ifmap/client/config_json_parser.cc
Expand Up @@ -11,7 +11,6 @@ using namespace rapidjson;
using namespace std;

ConfigJsonParser::ConfigJsonParser(DB *db) : db_(db) {

}

void ConfigJsonParser::MetadataRegister(const string &metadata,
Expand Down Expand Up @@ -248,20 +247,7 @@ bool ConfigJsonParser::Receive(const string &in_message, bool add_change,
} else {
cout << "No parse error\n";
ParseDocument(document, add_change, origin, &req_list);
//TmpParseDocument(document);
}
return true;
}

// For testing purposes only. Delete before release.
void ConfigJsonParser::TmpParseDocument(const rapidjson::Document &document) {
for (Value::ConstMemberIterator itr = document.MemberBegin();
itr != document.MemberEnd(); ++itr) {
cout << "Key:" << itr->name.GetString();
if (itr->value.IsNull()) cout << endl;
else {
cout << "Value" << endl;
}
}
}

0 comments on commit 7baa34b

Please sign in to comment.