Skip to content

Commit

Permalink
Analytics-Nodemgr and contrail-alarm-gen will now connect to any coll…
Browse files Browse the repository at this point in the history
…ector (as per discovery)

instead of only connecting to the local collector. This help get alarm correctly on collector failure

The FieldNames table is now poppulated for stats and flows.
Also, we only write to it once every T2 per name/value combination

Adding support for redis-HA. When alarmgen reconnects to redis,
restart partitions

Change-Id: I21485b765c3c49759f20c5b308198141789ec06c
Closes-Bug: 1512539
Closes-Bug: 1512537
Closes-Bug: 1512536
Closes-Bug: 1512532
Closes-Bug: 1513409
  • Loading branch information
anishmehta committed Nov 16, 2015
1 parent 4cf884e commit d408fb9
Show file tree
Hide file tree
Showing 13 changed files with 321 additions and 179 deletions.
8 changes: 8 additions & 0 deletions src/analytics/OpServerProxy.cc
Expand Up @@ -31,6 +31,7 @@
#include "viz_sandesh.h"
#include "viz_collector.h"

using std::map;
using std::string;
using boost::shared_ptr;
using boost::assign::list_of;
Expand Down Expand Up @@ -635,6 +636,7 @@ OpServerProxy::UVENotif(const std::string &type,
const std::string &module,
const std::string &instance_id,
const std::string &table, const std::string &barekey,
const std::map<std::string,std::string>& value,
bool deleted) {

std::string key = table + ":" + barekey;
Expand Down Expand Up @@ -703,6 +705,12 @@ OpServerProxy::UVENotif(const std::string &type,
} else {
rapidjson::Value val(rapidjson::kObjectType);
val.SetObject();
for (map<string,string>::const_iterator it = value.begin();
it != value.end(); it++) {
rapidjson::Value sval(rapidjson::kStringType);
sval.SetString((it->second).c_str());
val.AddMember(it->first.c_str(), sval, dd.GetAllocator());
}
dd.AddMember("value", val, dd.GetAllocator());
}

Expand Down
1 change: 1 addition & 0 deletions src/analytics/OpServerProxy.h
Expand Up @@ -49,6 +49,7 @@ class OpServerProxy {
const std::string &module,
const std::string &instance_id,
const std::string &table, const std::string &barekey,
const std::map<std::string,std::string>& value,
bool deleted);

// Use this to delete the object when the deleted attribute is set
Expand Down
122 changes: 96 additions & 26 deletions src/analytics/db_handler.cc
Expand Up @@ -57,7 +57,7 @@ DbHandler::DbHandler(EventManager *evm,
const std::string& cassandra_user,
const std::string& cassandra_password) :
name_(name),
drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map) {
drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), field_cache_t2_(0) {
dbif_.reset(new ThriftIf(err_handler,
cassandra_ips, cassandra_ports, name, false,
cassandra_user, cassandra_password));
Expand All @@ -68,7 +68,7 @@ DbHandler::DbHandler(EventManager *evm,

DbHandler::DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map) :
dbif_(dbif),
ttl_map_(ttl_map) {
ttl_map_(ttl_map), field_cache_t2_(0) {
}

DbHandler::~DbHandler() {
Expand Down Expand Up @@ -561,33 +561,63 @@ void DbHandler::MessageTableInsert(const VizMsg *vmsgp) {
//Insert only if sandesh type is a SYSTEM LOG or SYSLOG
//Insert into the FieldNames stats table entries for Messagetype and Module ID
int ttl = GetTtl(TtlType::GLOBAL_TTL);
FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":Messagetype", message_type, header.get_Timestamp(), ttl);
FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":ModuleId", header.get_Module(), header.get_Timestamp(), ttl);
FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":Source", header.get_Source(), header.get_Timestamp(), ttl);
FieldNamesTableInsert(header.get_Timestamp(),
g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":Messagetype", message_type, ttl);
FieldNamesTableInsert(header.get_Timestamp(),
g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":ModuleId", header.get_Module(), ttl);
FieldNamesTableInsert(header.get_Timestamp(),
g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":Source", header.get_Source(), ttl);
}
}

/*
* This function takes field name and field value as arguments and inserts
* into the FieldNames stats table
*/
void DbHandler::FieldNamesTableInsert(const std::string& table_prefix,
const std::string& field_name, const std::string& field_val,
uint64_t timestamp, int ttl) {
void DbHandler::FieldNamesTableInsert(uint64_t timestamp,
const std::string& table_prefix,
const std::string& field_name, const std::string& field_val, int ttl) {
/*
* Insert the message types in the stat table
* Construct the atttributes,attrib_tags before inserting
* to the StatTableInsert
*/
uint32_t temp_u32 = timestamp >> g_viz_constants.RowTimeInBits;
std::string table_name(table_prefix);
table_name.append(field_name);

/* Check if fieldname and value were already seen in this T2
We only need to record them if they have NOT been seen yet */
bool record = false;
std::string fc_entry(table_name);
fc_entry.append(":");
fc_entry.append(field_val);
{
tbb::mutex::scoped_lock lock(smutex_);
if (temp_u32 > field_cache_t2_) {
field_cache_set_.clear();
field_cache_t2_ = temp_u32;
}
if (temp_u32 == field_cache_t2_) {
if (field_cache_set_.find(fc_entry) == field_cache_set_.end()) {
field_cache_set_.insert(fc_entry);
record = true;
}
} else {
/* This is an old time-stamp */
record = true;
}
}

if (!record) return;

DbHandler::TagMap tmap;
DbHandler::AttribMap amap;
DbHandler::Var pv;
DbHandler::AttribMap attribs;
std::string table_name(table_prefix);
table_name.append(field_name);
pv = table_name;
tmap.insert(make_pair("name", make_pair(pv, amap)));
attribs.insert(make_pair(string("name"), pv));
Expand Down Expand Up @@ -682,11 +712,14 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o
const SandeshHeader &header(vmsgp->msg->GetHeader());
const std::string &message_type(vmsgp->msg->GetMessageType());
//Insert into the FieldNames stats table entries for Messagetype and Module ID
FieldNamesTableInsert(table, ":Objecttype", objectkey_str, timestamp, ttl);
FieldNamesTableInsert(table, ":Messagetype", message_type, timestamp, ttl);
FieldNamesTableInsert(table, ":ModuleId", header.get_Module(),
timestamp, ttl);
FieldNamesTableInsert(table, ":Source", header.get_Source(), timestamp, ttl);
FieldNamesTableInsert(timestamp,
table, ":ObjectId", objectkey_str, ttl);
FieldNamesTableInsert(timestamp,
table, ":Messagetype", message_type, ttl);
FieldNamesTableInsert(timestamp,
table, ":ModuleId", header.get_Module(), ttl);
FieldNamesTableInsert(timestamp,
table, ":Source", header.get_Source(), ttl);
}
}

Expand Down Expand Up @@ -944,6 +977,16 @@ DbHandler::StatTableInsertTtl(uint64_t ts,
pair<string,DbHandler::Var> ptag;
ptag.first = it->first;
ptag.second = it->second.first;

/* Record in the fieldNames table if we have a string tag,
and if we are not recording a fieldNames stats entry itself */
if ((ptag.second.type == DbHandler::STRING) &&
(statName.compare("FieldNames") != 0)) {
FieldNamesTableInsert(ts, std::string("StatTable.") +
statName + "." + statAttr,
std::string(":") + ptag.first, ptag.second.str, ttl);
}

if (it->second.second.empty()) {
pair<string,DbHandler::Var> stag;
StatTableWrite(temp_u32, statName, statAttr,
Expand Down Expand Up @@ -1004,7 +1047,8 @@ boost::uuids::uuid DbHandler::seed_uuid = StringToUuid(std::string("ffffffff-fff

static void PopulateFlowRecordTableColumns(
const std::vector<FlowRecordFields::type> &frvt,
FlowValueArray &fvalues, GenDb::NewColVec& columns, const TtlMap& ttl_map) {
FlowValueArray &fvalues, GenDb::NewColVec& columns, const TtlMap& ttl_map,
boost::function<void (const std::string&,const std::string&,int)> fncb) {
int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL);
columns.reserve(frvt.size());
for (std::vector<FlowRecordFields::type>::const_iterator it = frvt.begin();
Expand All @@ -1014,6 +1058,13 @@ static void PopulateFlowRecordTableColumns(
GenDb::NewCol *col(new GenDb::NewCol(
g_viz_constants.FlowRecordNames[(*it)], db_value, ttl));
columns.push_back(col);
if ((*it==FlowRecordFields::FLOWREC_VROUTER)||
(*it==FlowRecordFields::FLOWREC_SOURCEVN)||
(*it==FlowRecordFields::FLOWREC_DESTVN)) {
std::string sval = boost::get<std::string>(db_value);
fncb(std::string(":")+g_viz_constants.FlowRecordNames[(*it)],
sval, ttl);
}
}
}
}
Expand All @@ -1028,12 +1079,13 @@ static void PopulateFlowRecordTableRowKey(
}

static bool PopulateFlowRecordTable(FlowValueArray &fvalues,
GenDb::GenDbIf *dbif, const TtlMap& ttl_map) {
GenDb::GenDbIf *dbif, const TtlMap& ttl_map,
boost::function<void (const std::string&,const std::string&,int)> fncb) {
std::auto_ptr<GenDb::ColList> colList(new GenDb::ColList);
colList->cfname_ = g_viz_constants.FLOW_TABLE;
PopulateFlowRecordTableRowKey(fvalues, colList->rowkey_);
PopulateFlowRecordTableColumns(FlowRecordTableColumns, fvalues,
colList->columns_, ttl_map);
colList->columns_, ttl_map, fncb);
return dbif->Db_AddColumn(colList);
}

Expand Down Expand Up @@ -1082,13 +1134,21 @@ static const std::string& FlowIndexTable2String(FlowIndexTableType ttype) {

static void PopulateFlowIndexTableColumnValues(
const std::vector<FlowRecordFields::type> &frvt,
FlowValueArray &fvalues, GenDb::DbDataValueVec &cvalues) {
FlowValueArray &fvalues, GenDb::DbDataValueVec &cvalues, int ttl,
boost::function<void (const std::string&,const std::string&,int)> fncb) {
cvalues.reserve(frvt.size());
for (std::vector<FlowRecordFields::type>::const_iterator it = frvt.begin();
it != frvt.end(); it++) {
GenDb::DbDataValue &db_value(fvalues[(*it)]);
if (db_value.which() != GenDb::DB_VALUE_BLANK) {
cvalues.push_back(db_value);
if ((*it==FlowRecordFields::FLOWREC_VROUTER)||
(*it==FlowRecordFields::FLOWREC_SOURCEVN)||
(*it==FlowRecordFields::FLOWREC_DESTVN)) {
std::string sval = boost::get<std::string>(db_value);
fncb(std::string(":")+g_viz_constants.FlowRecordNames[(*it)],
sval, ttl);
}
}
}
}
Expand Down Expand Up @@ -1152,14 +1212,16 @@ static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype,

static bool PopulateFlowIndexTables(FlowValueArray &fvalues,
uint32_t &T2, uint32_t &T1, uint8_t partition_no,
GenDb::GenDbIf *dbif, const TtlMap& ttl_map) {
GenDb::GenDbIf *dbif, const TtlMap& ttl_map,
boost::function<void (const std::string&,const std::string&,int)> fncb) {
// Populate row key and column values (same for all flow index
// tables)
GenDb::DbDataValueVec rkey;
PopulateFlowIndexTableRowKey(fvalues, T2, partition_no, rkey);
GenDb::DbDataValueVec cvalues;
int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL);
PopulateFlowIndexTableColumnValues(FlowIndexTableColumnValues, fvalues,
cvalues);
cvalues, ttl, fncb);
// Populate the Flow Index Tables
for (int tid = FLOW_INDEX_TABLE_MIN;
tid < FLOW_INDEX_TABLE_MAX_PLUS_1; ++tid) {
Expand Down Expand Up @@ -1286,7 +1348,12 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample,
// Partition no
uint8_t partition_no = 0;
// Populate Flow Record Table
if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_)) {

boost::function<void (const std::string&,const std::string&,int)> fncb =
boost::bind(&DbHandler::FieldNamesTableInsert,
this, timestamp, g_viz_constants.FLOW_TABLE, _1,_2,_3);
if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_,
fncb)) {
DB_LOG(ERROR, "Populating FlowRecordTable FAILED");
}
GenDb::DbDataValue &diff_bytes(
Expand All @@ -1295,10 +1362,13 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample,
flow_entry_values[FlowRecordFields::FLOWREC_DIFF_PACKETS]);
// Populate Flow Index Tables only if FLOWREC_DIFF_BYTES and
// FLOWREC_DIFF_PACKETS are present
boost::function<void (const std::string&,const std::string&,int)> fncb2 =
boost::bind(&DbHandler::FieldNamesTableInsert,
this, timestamp, g_viz_constants.FLOW_SERIES_TABLE, _1,_2,_3);
if (diff_bytes.which() != GenDb::DB_VALUE_BLANK &&
diff_packets.which() != GenDb::DB_VALUE_BLANK) {
if (!PopulateFlowIndexTables(flow_entry_values, T2, T1, partition_no,
dbif_.get(), ttl_map_)) {
dbif_.get(), ttl_map_, fncb2)) {
DB_LOG(ERROR, "Populating FlowIndexTables FAILED");
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/analytics/db_handler.h
Expand Up @@ -104,9 +104,9 @@ class DbHandler {
const boost::uuids::uuid& unm, const std::string keyword);
virtual void MessageTableInsert(const VizMsg *vmsgp);
void MessageTableOnlyInsert(const VizMsg *vmsgp);
void FieldNamesTableInsert(const std::string& table_name,
const std::string& field_name, const std::string& field_val,
uint64_t timestamp, int ttl);
void FieldNamesTableInsert(uint64_t timestamp,
const std::string& table_name,
const std::string& field_name, const std::string& field_val, int ttl);
void GetRuleMap(RuleMap& rulemap);

void ObjectTableInsert(const std::string &table, const std::string &rowkey,
Expand Down Expand Up @@ -175,7 +175,9 @@ class DbHandler {
GenDb::DbTableStatistics stable_stats_;
mutable tbb::mutex smutex_;
TtlMap ttl_map_;

uint32_t field_cache_t2_;
std::set<std::string> field_cache_set_;

DISALLOW_COPY_AND_ASSIGN(DbHandler);
};

Expand Down
13 changes: 10 additions & 3 deletions src/analytics/ruleeng.cc
Expand Up @@ -559,6 +559,7 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent,
return false;
}

map<string,string> emap, vmap;
if (deleted) {
if (!osp_->UVEDelete(object.name(), source, node_type, module,
instance_id, key, seq, is_alarm)) {
Expand All @@ -571,7 +572,7 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent,
}
LOG(DEBUG, __func__ << " Deleted " << key);
osp_->UVENotif(object.name(),
source, node_type, module, instance_id, table, barekey, deleted);
source, node_type, module, instance_id, table, barekey, emap ,deleted);
return true;
}

Expand All @@ -585,6 +586,7 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent,
if (strcmp(tempstr, "")) {
continue;
}
vmap.insert(make_pair(node.name(), ostr.str()));
tempstr = node.attribute("aggtype").value();
if (strcmp(tempstr, "")) {
agg = std::string(tempstr);
Expand Down Expand Up @@ -636,8 +638,13 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent,
}

// Publish on the Kafka bus that this UVE has changed
osp_->UVENotif(object.name(),
source, node_type, module, instance_id, table, barekey, deleted);
if (!strcmp(object.name(), "UVEAlarms")) {
osp_->UVENotif(object.name(),
source, node_type, module, instance_id, table, barekey, vmap, deleted);
} else {
osp_->UVENotif(object.name(),
source, node_type, module, instance_id, table, barekey, emap, deleted);
}
return true;
}

Expand Down

0 comments on commit d408fb9

Please sign in to comment.