Skip to content

Commit

Permalink
Merge "Read FQ-Name table in chunks instead of Get-All-Rows"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Mar 6, 2017
2 parents 58c1fb8 + 473976b commit 8343023
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 42 deletions.
10 changes: 9 additions & 1 deletion src/control-node/control_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ void ControlNode::SetDefaultSchedulingPolicy() {
scheduler->SetPolicy(scheduler->GetTaskId("db::Walker"), walker_policy);

// Policy for cassandra::Reader Task.
TaskPolicy cassadra_reader_policy;
TaskPolicy cassadra_reader_policy = boost::assign::list_of
(TaskExclusion(scheduler->GetTaskId("cassandra::FQNameReader")));
for (int idx = 0; idx < ConfigClientManager::GetNumConfigReader(); ++idx) {
cassadra_reader_policy.push_back(
TaskExclusion(scheduler->GetTaskId("cassandra::ObjectProcessor"), idx));
Expand All @@ -235,4 +236,11 @@ void ControlNode::SetDefaultSchedulingPolicy() {
}
scheduler->SetPolicy(scheduler->GetTaskId("cassandra::ObjectProcessor"),
cassadra_obj_process_policy);

// Policy for cassandra::FQNameReader Task.
TaskPolicy fq_name_reader_policy = boost::assign::list_of
(TaskExclusion(scheduler->GetTaskId("cassandra::Reader")));
scheduler->SetPolicy(scheduler->GetTaskId("cassandra::FQNameReader"),
fq_name_reader_policy);

}
8 changes: 7 additions & 1 deletion src/dns/cmn/dns.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void Dns::SetTaskSchedulingPolicy() {
exclude_io);

// Policy for cassandra::Reader Task.
TaskPolicy cassadra_reader_policy;
TaskPolicy cassadra_reader_policy = boost::assign::list_of
(TaskExclusion(scheduler->GetTaskId("cassandra::FQNameReader")));
for (int idx = 0; idx < ConfigClientManager::GetNumConfigReader(); ++idx) {
cassadra_reader_policy.push_back(
TaskExclusion(scheduler->GetTaskId("cassandra::ObjectProcessor"), idx));
Expand All @@ -98,5 +99,10 @@ void Dns::SetTaskSchedulingPolicy() {
scheduler->SetPolicy(scheduler->GetTaskId("cassandra::ObjectProcessor"),
cassadra_obj_process_policy);

// Policy for cassandra::FQNameReader Task.
TaskPolicy fq_name_reader_policy = boost::assign::list_of
(TaskExclusion(scheduler->GetTaskId("cassandra::Reader")));
scheduler->SetPolicy(scheduler->GetTaskId("cassandra::FQNameReader"),
fq_name_reader_policy);

}
121 changes: 84 additions & 37 deletions src/ifmap/client/config_cassandra_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ ConfigCassandraClient::ConfigCassandraClient(ConfigClientManager *mgr,
WorkQueue<ObjectProcessReq *>::kMaxSize, 512);
obj_process_queue_.push_back(ObjProcessWorkQType(tmp_work_q));
}

fq_name_reader_.reset(new
TaskTrigger(boost::bind(&ConfigCassandraClient::FQNameReader, this),
TaskScheduler::GetInstance()->GetTaskId("cassandra::FQNameReader"), 0));
}

ConfigCassandraClient::~ConfigCassandraClient() {
Expand Down Expand Up @@ -371,23 +375,21 @@ void ConfigCassandraClient::UpdateCache(const std::string &key,
AddFQNameCache(uuid_str, obj_type, key.substr(0, key.rfind(':')));
}

bool ConfigCassandraClient::ParseFQNameRowGetUUIDList(
const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list) {
GenDb::Blob dname_blob(boost::get<GenDb::Blob>(col_list.rowkey_[0]));
string obj_type(reinterpret_cast<const char *>(dname_blob.data()),
dname_blob.size());
bool ConfigCassandraClient::ParseFQNameRowGetUUIDList(const string &obj_type,
const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list,
string *last_column) {
string column_name;
BOOST_FOREACH(const GenDb::NewCol &ncol, col_list.columns_) {
assert(ncol.name->size() == 1);
assert(ncol.value->size() == 1);

const GenDb::DbDataValue &dname(ncol.name->at(0));
assert(dname.which() == GenDb::DB_VALUE_BLOB);
GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
string key(reinterpret_cast<const char *>(dname_blob.data()),
column_name = string(reinterpret_cast<const char *>(dname_blob.data()),
dname_blob.size());
UpdateCache(key, obj_type, uuid_list);
UpdateCache(column_name, obj_type, uuid_list);
}

*last_column = column_name;
return true;
}

Expand Down Expand Up @@ -549,40 +551,85 @@ void ConfigCassandraClient::HandleCassandraConnectionStatus(bool success) {
}
}

bool ConfigCassandraClient::ReadAllFqnTableRows() {
GenDb::ColListVec cl_vec_fq_name;

ObjTypeUUIDList uuid_list;
while (true) {
if (dbif_->Db_GetAllRows(&cl_vec_fq_name, kFqnTableName,
GenDb::DbConsistency::QUORUM)) {
HandleCassandraConnectionStatus(true);
BOOST_FOREACH(const GenDb::ColList &cl_list, cl_vec_fq_name) {
assert(cl_list.rowkey_.size() == 1);
assert(cl_list.rowkey_[0].which() == GenDb::DB_VALUE_BLOB);

if (cl_list.columns_.size()) {
ParseFQNameRowGetUUIDList(cl_list, uuid_list);
}
}
break;
} else {
HandleCassandraConnectionStatus(false);
IFMAP_WARN(IFMapGetRowError, "GetAllRows failed for table. Retry !",
kFqnTableName, "");
sleep(kInitRetryTimeSec);
}
}
return EnqueueUUIDRequest(uuid_list);
}

bool ConfigCassandraClient::EnqueueUUIDRequest(
const ObjTypeUUIDList &uuid_list) {
for (ObjTypeUUIDList::const_iterator it = uuid_list.begin();
it != uuid_list.end(); it++) {
EnqueueUUIDRequest("CREATE", it->first, it->second);
}
return true;
}

bool ConfigCassandraClient::FQNameReader() {
for (ConfigClientManager::ObjectTypeList::const_iterator it =
mgr()->ObjectTypeListToRead().begin();
it != mgr()->ObjectTypeListToRead().end(); it++) {
string column_name;
while (true) {
// Rowkey is obj-type
GenDb::DbDataValueVec key;
key.push_back(GenDb::Blob(reinterpret_cast<const uint8_t *>
(it->c_str()), it->size()));
GenDb::ColumnNameRange crange;
if (!column_name.empty()) {
GenDb::Blob col_filter(reinterpret_cast<const uint8_t *>
(column_name.c_str()), column_name.size());
//
// Start reading the next set of entries from where we ended in
// last read
//
crange.start_ =
boost::assign::list_of(GenDb::DbDataValue(col_filter));
}
//
// Read kNumFQNameEntriesToRead entries at a time.
// In a scaled setup, each object type may have large number of
// entries. So read each obj-type fq-name entries in chunk of
// kNumFQNameEntriesToRead rows at a time
//
crange.count_ = kNumFQNameEntriesToRead;

GenDb::FieldNamesToReadVec field_vec;
field_vec.push_back(boost::make_tuple("key", true, false, false));
field_vec.push_back(boost::make_tuple("column1", false, true, false));

GenDb::ColList col_list;
if (dbif_->Db_GetRow(&col_list, kFqnTableName, key,
GenDb::DbConsistency::QUORUM, crange, field_vec)) {
HandleCassandraConnectionStatus(true);
if (col_list.columns_.size()) {
ObjTypeUUIDList uuid_list;
string last_column;
ParseFQNameRowGetUUIDList(*it, col_list, uuid_list,
&last_column);
//
// If the last_column we read this time is same as
// where we started the current read, move to next obj-type
//
if (last_column == column_name)
break;
EnqueueUUIDRequest(uuid_list);
//
// If we read less than kNumFQNameEntriesToRead entries,
// it means there are no more entries for current obj-type.
// We move to next obj-type
//
if (col_list.columns_.size() < kNumFQNameEntriesToRead)
break;
column_name = last_column;
} else {
// No entries for this obj-type
break;
}
} else {
HandleCassandraConnectionStatus(false);
IFMAP_WARN(IFMapGetRowError, "GetRow failed for table",
kFqnTableName, *it);
sleep(kInitRetryTimeSec);
}
}
}
// At the end of task trigger
for (int i = 0; i < num_workers_; i++) {
ObjectProcessReq *req = new ObjectProcessReq("EndOfConfig","", "");
Enqueue(i, req);
Expand All @@ -593,7 +640,7 @@ bool ConfigCassandraClient::EnqueueUUIDRequest(

bool ConfigCassandraClient::BulkDataSync() {
bulk_sync_status_ = num_workers_;
ReadAllFqnTableRows();
fq_name_reader_->Set();
return true;
}

Expand Down
10 changes: 7 additions & 3 deletions src/ifmap/client/config_cassandra_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ConfigCassandraClient : public ConfigDbClient {
static const int kInitRetryTimeSec = 5;
static const int kMaxRequestsToYield = 512;
static const int kMaxNumUUIDToRead = 64;
static const int kNumFQNameEntriesToRead = 4096;

typedef boost::scoped_ptr<GenDb::GenDbIf> GenDbIfPtr;
typedef std::pair<std::string, std::string> ObjTypeFQNPair;
Expand Down Expand Up @@ -166,14 +167,16 @@ class ConfigCassandraClient : public ConfigDbClient {
typedef std::map<string, FieldDetailMap> ObjectCacheMap;

typedef boost::shared_ptr<WorkQueue<ObjectProcessReq *> > ObjProcessWorkQType;

void InitRetry();
virtual bool ParseUuidTableRowResponse(const std::string &uuid,
const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec,
ConfigCassandraParseContext &context);
void AddUuidEntry(const string &uuid);
bool ReadAllFqnTableRows();
bool ParseFQNameRowGetUUIDList(const GenDb::ColList &col_list,
ObjTypeUUIDList &uuid_list);
bool FQNameReader();
bool ParseFQNameRowGetUUIDList(const std::string &obj_type,
const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list,
std::string *last_column);
bool ConfigReader(int worker_id);
void AddUUIDToRequestList(int worker_id, const string &oper,
const string &obj_type, const string &uuid_str);
Expand Down Expand Up @@ -202,6 +205,7 @@ class ConfigCassandraClient : public ConfigDbClient {
std::vector<UUIDProcessSet> uuid_read_set_;
std::vector<ObjectCacheMap> object_cache_map_;
std::vector<boost::shared_ptr<TaskTrigger> > config_readers_;
boost::scoped_ptr<TaskTrigger> fq_name_reader_;
mutable tbb::spin_rw_mutex rw_mutex_;
FQNameCacheMap fq_name_cache_;
std::vector<ObjProcessWorkQType> obj_process_queue_;
Expand Down
3 changes: 3 additions & 0 deletions src/ifmap/client/config_client_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ ConfigClientManager::ConfigClientManager(EventManager *evm,

bgp_schema_Server_GenerateWrapperPropertyInfo(&wrapper_field_map_);
vnc_cfg_Server_GenerateWrapperPropertyInfo(&wrapper_field_map_);

bgp_schema_Server_GenerateObjectTypeList(&obj_type_to_read_);
vnc_cfg_Server_GenerateObjectTypeList(&obj_type_to_read_);
}

void ConfigClientManager::Initialize() {
Expand Down
6 changes: 6 additions & 0 deletions src/ifmap/client/config_client_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class ConfigClientManager {
static const std::set<std::string> skip_properties;

typedef std::list<struct DBRequest *> RequestList;
typedef std::set<std::string> ObjectTypeList;

ConfigClientManager(EventManager *evm, IFMapServer *ifmap_server,
std::string hostname, std::string module_name,
const IFMapConfigOptions& config_options);
Expand Down Expand Up @@ -75,6 +77,9 @@ class ConfigClientManager {
void WaitForEndOfConfig();
void GetPeerServerInfo(IFMapPeerServerInfoUI *server_info);
void GetClientManagerInfo(ConfigClientManagerInfo &info) const;
const ObjectTypeList &ObjectTypeListToRead() const {
return obj_type_to_read_;
}

private:
typedef std::pair<std::string, std::string> LinkMemberPair;
Expand All @@ -92,6 +97,7 @@ class ConfigClientManager {
boost::scoped_ptr<ConfigAmqpClient> config_amqp_client_;
int thread_count_;
WrapperFieldMap wrapper_field_map_;
ObjectTypeList obj_type_to_read_;

mutable tbb::mutex end_of_rib_sync_mutex_;
tbb::interface5::condition_variable cond_var_;
Expand Down

0 comments on commit 8343023

Please sign in to comment.