Skip to content

Commit

Permalink
Replace UVE Cache map with tbb::concurrent_hash_map.
Browse files Browse the repository at this point in the history
Closes-Bug:1582078

Change-Id: I772c9210f02313a583e3597f4c777e628f620d46
  • Loading branch information
anishmehta committed May 16, 2016
1 parent 1290913 commit f013815
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 51 deletions.
19 changes: 15 additions & 4 deletions compiler/generate/t_cpp_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ void t_cpp_generator::init_generator() {
#ifndef SANDESH
"#include <TApplicationException.h>" << endl <<
#else
"#include <tbb/atomic.h>" << endl <<
"#include <boost/shared_ptr.hpp>" << endl <<
"#include <sandesh/derived_stats.h>" << endl <<
"#include <boost/pointer_cast.hpp>" << endl <<
Expand Down Expand Up @@ -1603,7 +1604,12 @@ void t_cpp_generator::generate_sandesh_member_init_list(ofstream& out,
*/
void t_cpp_generator::generate_sandesh_seqnum(ofstream& out,
t_sandesh* tsandesh) {
indent(out) << "static uint32_t lseqnum_;" << endl;
if (((t_base_type *)tsandesh->get_type())->is_sandesh_uve() ||
((t_base_type *)tsandesh->get_type())->is_sandesh_alarm()) {
indent(out) << "static tbb::atomic<uint32_t> lseqnum_;" << endl;
} else {
indent(out) << "static uint32_t lseqnum_;" << endl;
}
}

/**
Expand Down Expand Up @@ -3178,7 +3184,12 @@ void t_cpp_generator::generate_static_const_string_definition(ofstream& out,

void t_cpp_generator::generate_sandesh_static_seqnum_def(ofstream& out,
t_sandesh* tsandesh) {
out << "uint32_t " << tsandesh->get_name() << "::lseqnum_ = 1;" << endl;
if (((t_base_type *)tsandesh->get_type())->is_sandesh_uve() ||
((t_base_type *)tsandesh->get_type())->is_sandesh_alarm()) {
out << "tbb::atomic<uint32_t> " << tsandesh->get_name() << "::lseqnum_;" << endl;
} else {
out << "uint32_t " << tsandesh->get_name() << "::lseqnum_ = 1;" << endl;
}
}

void t_cpp_generator::generate_sandesh_static_versionsig_def(ofstream& out,
Expand Down Expand Up @@ -3757,11 +3768,11 @@ void t_cpp_generator::generate_sandesh_uve_creator(
indent(out) << type_name((*f_iter)->get_type()) <<
" & cdata = const_cast<" << type_name((*f_iter)->get_type()) <<
" &>(data);" << endl;
indent(out) << "uint32_t msg_seqno;" << endl;
indent(out) << "uint32_t msg_seqno = lseqnum_.fetch_and_increment() + 1;" << endl;
indent(out) << "if (!table.empty()) cdata.table_ = table;" << endl;

indent(out) << "if (uvemap" << sname <<
".UpdateUVE(cdata, msg_seqno = lseqnum_++)) {" << endl;
".UpdateUVE(cdata, msg_seqno)) {" << endl;
indent_up();
indent(out) << sname << " *snh = new " << sname << "(msg_seqno, cdata);" << endl;
indent(out) << "snh->Dispatch();" << endl;
Expand Down
104 changes: 61 additions & 43 deletions library/cpp/sandesh_uve.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <sandesh/sandesh_types.h>
#include <sandesh/sandesh.h>
#include <tbb/mutex.h>
#include <tbb/concurrent_hash_map.h>
#include <boost/functional/hash.hpp>

class SandeshUVEPerTypeMap;

Expand Down Expand Up @@ -98,15 +100,24 @@ template<typename T, typename U, int P>
class SandeshUVEPerTypeMapImpl : public SandeshUVEPerTypeMap {
public:

struct HashCompare {
static size_t hash( const std::string& key ) { return boost::hash_value(key); }
static bool equal( const std::string& key1, const std::string& key2 ) { return ( key1 == key2 ); }
};

struct UVEMapEntry {
UVEMapEntry(std::string table, uint32_t seqnum):
data(table), seqno(seqnum) {
}
U data;
uint32_t seqno;
};
typedef boost::ptr_map<std::string, UVEMapEntry> uve_type_map;
typedef std::map<std::string, uve_type_map> uve_map;

// The key is the table name
typedef boost::ptr_map<std::string, UVEMapEntry> uve_table_map;

// The key is the UVE-Key
typedef tbb::concurrent_hash_map<std::string, uve_table_map, HashCompare > uve_cmap;

SandeshUVEPerTypeMapImpl(char const * u_name) : uve_name_(u_name) {
SandeshUVETypeMaps::RegisterType(u_name, this, P);
Expand All @@ -116,60 +127,71 @@ class SandeshUVEPerTypeMapImpl : public SandeshUVEPerTypeMap {
// the generator to the collector.
// It updates the cache.
bool UpdateUVE(U& data, uint32_t seqnum) {
tbb::mutex::scoped_lock lock(uve_mutex_);
bool send = false;
tbb::mutex::scoped_lock lock;
const std::string &table = data.table_;
assert(!table.empty());
const std::string &s = data.get_name();
typename uve_map::iterator omapentry = map_.find(table);
if (omapentry == map_.end()) {
omapentry = map_.insert(std::make_pair(table, uve_type_map())).first;

// If we are going to erase, we need a global lock
// to coordinate with iterators
// To prevent deadlock, we always acquire global lock
// before accessor
if (data.get_deleted()) {
lock.acquire(uve_mutex_);
}
typename uve_cmap::accessor a;

// Ensure that the entry exists, and we have an accessor to it
while (true) {
if (cmap_.find(a, s)) break;
else {
if (cmap_.insert(a, s)) break;
}
}
typename uve_type_map::iterator imapentry =
omapentry->second.find(s);
if (imapentry == omapentry->second.end()) {

typename uve_table_map::iterator imapentry = a->second.find(table);
if (imapentry == a->second.end()) {
std::auto_ptr<UVEMapEntry> ume(new UVEMapEntry(data.table_, seqnum));
send = T::UpdateUVE(data, ume->data);
omapentry->second.insert(s, ume);
imapentry = a->second.insert(table, ume).first;
} else {
if (imapentry->second->data.get_deleted()) {
omapentry->second.erase(imapentry);
std::auto_ptr<UVEMapEntry> ume(new UVEMapEntry(
data.table_, seqnum));
send = T::UpdateUVE(data, ume->data);
omapentry->second.insert(s, ume);
} else {
send = T::UpdateUVE(data, imapentry->second->data);
imapentry->second->seqno = seqnum;
}
send = T::UpdateUVE(data, imapentry->second->data);
imapentry->second->seqno = seqnum;
}
if (data.get_deleted()) {
a->second.erase(imapentry);
if (a->second.empty()) cmap_.erase(a);
lock.release();
}

return send;
}

// This function can be used by the Sandesh Session state machine
// to get the seq num of the last message sent for this SandeshUVE type
uint32_t TypeSeq(void) {
tbb::mutex::scoped_lock lock(uve_mutex_);
return T::lseqnum();
}

uint32_t SyncUVE(const std::string &table,
SandeshUVE::SendType st,
const uint32_t seqno,
const std::string &ctx) const {
// Global lock is needed for iterator
tbb::mutex::scoped_lock lock(uve_mutex_);
uint32_t count = 0;

for (typename uve_map::const_iterator oit = map_.begin();
oit != map_.end(); oit++) {
if (!table.empty() && oit->first != table)
continue;
// Send all entries that are newer than the given sequence number
for (typename uve_type_map::const_iterator uit =
oit->second.begin(); uit != oit->second.end(); uit++) {
(const_cast<SandeshUVEPerTypeMapImpl<T,U,P> *>(this))->cmap_.rehash();
for (typename uve_cmap::const_iterator git = cmap_.begin();
git != cmap_.end(); git++) {
typename uve_cmap::const_accessor a;
if (!cmap_.find(a, git->first)) continue;
for (typename uve_table_map::const_iterator uit = a->second.begin();
uit != a->second.end(); uit++) {
if (!table.empty() && uit->first != table) continue;
if ((seqno < uit->second->seqno) || (seqno == 0)) {
if (ctx.empty()) {
SANDESH_LOG(DEBUG, __func__ << " Syncing " <<
SANDESH_LOG(INFO, __func__ << " Syncing " << uit->first <<
" val " << uit->second->data.log() <<
" seq " << uit->second->seqno);
}
Expand All @@ -184,28 +206,24 @@ class SandeshUVEPerTypeMapImpl : public SandeshUVEPerTypeMap {

bool SendUVE(const std::string& table, const std::string& name,
const std::string& ctx) const {
tbb::mutex::scoped_lock lock(uve_mutex_);

for (typename uve_map::const_iterator oit = map_.begin();
oit != map_.end(); oit++) {
if (!table.empty() && oit->first != table)
continue;
typename uve_type_map::const_iterator uve_entry =
oit->second.find(name);
if (uve_entry != oit->second.end()) {

bool sent = false;
typename uve_cmap::const_accessor a;
if (cmap_.find(a, name)) {
for (typename uve_table_map::const_iterator uve_entry = a->second.begin();
uve_entry != a->second.end(); uve_entry++) {
if (!table.empty() && uve_entry->first != table) continue;
sent = true;
T::Send(uve_entry->second->data,
(ctx.empty() ? SandeshUVE::ST_INTROSPECT : SandeshUVE::ST_SYNC),
uve_entry->second->seqno, ctx);
return true;
}
}
return false;
return sent;
}

private:

uve_map map_;
uve_cmap cmap_;
const std::string uve_name_;
mutable tbb::mutex uve_mutex_;
};
Expand Down
8 changes: 4 additions & 4 deletions library/cpp/test/sandesh_message_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ class SandeshUVEAlarmTest : public ::testing::Test {
"<tsm type=\"map\" identifier=\"5\"><map key=\"string\" value=\"i32\" size=\"2\"><element>j2</element><element>17</element><element>j3</element><element>27</element></map></tsm>");
break;
}
case 13:
case 14:
{
EXPECT_EQ(3, header.get_SequenceNum());
EXPECT_EQ(SandeshType::ALARM, header.get_Type());
Expand All @@ -582,7 +582,7 @@ class SandeshUVEAlarmTest : public ::testing::Test {
EXPECT_STREQ(expected_xml, message.c_str());
break;
}
case 14:
case 13:
{
EXPECT_EQ(2, header.get_SequenceNum());
EXPECT_EQ(SandeshType::ALARM, header.get_Type());
Expand Down Expand Up @@ -614,7 +614,7 @@ class SandeshUVEAlarmTest : public ::testing::Test {
"<tsm type=\"map\" identifier=\"5\"><map key=\"string\" value=\"i32\" size=\"2\"><element>j2</element><element>17</element><element>j3</element><element>27</element></map></tsm>");
break;
}
case 17:
case 18:
{
EXPECT_EQ(6, header.get_SequenceNum());
EXPECT_EQ(SandeshType::UVE, header.get_Type());
Expand All @@ -626,7 +626,7 @@ class SandeshUVEAlarmTest : public ::testing::Test {
"<name type=\"string\" identifier=\"1\" key=\"ObjectCollectorInfo\">uve2</name>");
break;
}
case 18:
case 17:
{
EXPECT_EQ(2, header.get_SequenceNum());
EXPECT_EQ(SandeshType::UVE, header.get_Type());
Expand Down

0 comments on commit f013815

Please sign in to comment.