Skip to content

Commit

Permalink
Merge "Includes the following changes . add new fields in SYSTEM_OBJE…
Browse files Browse the repository at this point in the history
…CT_TABLE to store _ttls by collector, they will be retrieved by analytics-api and query-engine . query-engine code changes to take care of time range calcuations using all 4 types of ttls based on the query . TtlMap definition is moved from db_handler.h to viz.sandesh to be used in collector, analytics-api and query-engine. . cdb_if will not have any default ttl, it expects ttl to be given in column add Closes-Bug: #1503093, #1503401" into R2.20
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Oct 26, 2015
2 parents 10e61e1 + 39bd619 commit 105352b
Show file tree
Hide file tree
Showing 34 changed files with 418 additions and 413 deletions.
4 changes: 2 additions & 2 deletions src/analytics/collector.cc
Expand Up @@ -81,7 +81,7 @@ const std::vector<Sandesh::QueueWaterMarkInfo> Collector::kSmQueueWaterMarkInfo
Collector::Collector(EventManager *evm, short server_port,
DbHandler *db_handler, OpServerProxy *osp, VizCallback cb,
std::vector<std::string> cassandra_ips,
std::vector<int> cassandra_ports, const DbHandler::TtlMap& ttl_map) :
std::vector<int> cassandra_ports, const TtlMap& ttl_map) :
SandeshServer(evm),
db_handler_(db_handler),
osp_(osp),
Expand Down Expand Up @@ -340,7 +340,7 @@ void Collector::TestDatabaseConnection() {
// try to instantiate a new dbif instance for testing db connection
testdbif_.reset( GenDb::GenDbIf::GenDbIfImpl(
boost::bind(&Collector::TestDbConnErrHandler, this),
cassandra_ips_, cassandra_ports_, 3600, db_handler_->GetName(), true));
cassandra_ips_, cassandra_ports_, db_handler_->GetName(), true));

if (!testdbif_->Db_Init("analytics::DbHandler", db_task_id_)) {
if (dbConnStatus_ != ConnectionStatus::DOWN) {
Expand Down
6 changes: 3 additions & 3 deletions src/analytics/collector.h
Expand Up @@ -72,7 +72,7 @@ class Collector : public SandeshServer {
Collector(EventManager *evm, short server_port,
DbHandler *db_handler, OpServerProxy *osp, VizCallback cb,
std::vector<std::string> cassandra_ips,
std::vector<int> cassandra_ports, const DbHandler::TtlMap& ttl_map);
std::vector<int> cassandra_ports, const TtlMap& ttl_map);
virtual ~Collector();
virtual void Shutdown();
virtual void SessionShutdown();
Expand Down Expand Up @@ -118,7 +118,7 @@ class Collector : public SandeshServer {

std::vector<std::string> cassandra_ips() { return cassandra_ips_; }
std::vector<int> cassandra_ports() { return cassandra_ports_; }
const DbHandler::TtlMap& analytics_ttl_map() { return ttl_map_; }
const TtlMap& analytics_ttl_map() { return ttl_map_; }
int db_task_id();
const CollectorStats &GetStats() const { return stats_; }
void SendGeneratorStatistics();
Expand Down Expand Up @@ -167,7 +167,7 @@ class Collector : public SandeshServer {

std::vector<std::string> cassandra_ips_;
std::vector<int> cassandra_ports_;
DbHandler::TtlMap ttl_map_;
TtlMap ttl_map_;
int db_task_id_;

// SandeshGenerator map
Expand Down
106 changes: 70 additions & 36 deletions src/analytics/db_handler.cc
Expand Up @@ -55,13 +55,8 @@ DbHandler::DbHandler(EventManager *evm,
std::string name, const TtlMap& ttl_map) :
name_(name),
drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map) {
int analytics_ttl = DbHandler::GetTtlFromMap(ttl_map, DbHandler::GLOBAL_TTL);
if (analytics_ttl == -1) {
DB_LOG(ERROR, "Unexpected analytics_ttl value: " << analytics_ttl);
analytics_ttl = 0;
}
dbif_.reset(GenDb::GenDbIf::GenDbIfImpl(err_handler,
cassandra_ips, cassandra_ports, analytics_ttl, name, false));
cassandra_ips, cassandra_ports, name, false));

error_code error;
col_name_ = boost::asio::ip::host_name(error);
Expand All @@ -75,13 +70,23 @@ DbHandler::DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map) :
DbHandler::~DbHandler() {
}

int DbHandler::GetTtlFromMap(const DbHandler::TtlMap& ttl_map,
DbHandler::TtlType type) {
DbHandler::TtlMap::const_iterator it = ttl_map.find(type);
uint64_t DbHandler::GetTtlInHourFromMap(const TtlMap& ttl_map,
TtlType::type type) {
TtlMap::const_iterator it = ttl_map.find(type);
if (it != ttl_map.end()) {
return it->second;
} else {
return 0;
}
}

uint64_t DbHandler::GetTtlFromMap(const TtlMap& ttl_map,
TtlType::type type) {
TtlMap::const_iterator it = ttl_map.find(type);
if (it != ttl_map.end()) {
return it->second*3600;
} else {
return -1;
return 0;
}
}

Expand Down Expand Up @@ -199,6 +204,41 @@ bool DbHandler::CreateTables() {
}
}

/*
* add ttls to cassandra to be retrieved by other daemons
*/
{
std::auto_ptr<GenDb::ColList> col_list(new GenDb::ColList);
col_list->cfname_ = g_viz_constants.SYSTEM_OBJECT_TABLE;
// Rowkey
GenDb::DbDataValueVec& rowkey = col_list->rowkey_;
rowkey.reserve(1);
rowkey.push_back(g_viz_constants.SYSTEM_OBJECT_ANALYTICS);
// Columns
GenDb::NewColVec& columns = col_list->columns_;
columns.reserve(4);

GenDb::NewCol *col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_FLOW_DATA_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::FLOWDATA_TTL), 0));
columns.push_back(col);

GenDb::NewCol *flow_col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_STATS_DATA_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::STATSDATA_TTL), 0));
columns.push_back(flow_col);

GenDb::NewCol *msg_col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_CONFIG_AUDIT_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::CONFIGAUDIT_TTL), 0));
columns.push_back(msg_col);

GenDb::NewCol *stat_col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_GLOBAL_DATA_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::GLOBAL_TTL), 0));
columns.push_back(stat_col);

if (!dbif_->Db_AddColumnSync(col_list)) {
VIZD_ASSERT(0);
}
}

return true;
}

Expand Down Expand Up @@ -367,9 +407,9 @@ bool DbHandler::MessageIndexTableInsert(const std::string& cfname,
GenDb::DbDataValueVec *col_value(new GenDb::DbDataValueVec(1, unm));
int ttl;
if (message_type == "VncApiConfigLog") {
ttl = GetTtl(CONFIGAUDIT_TTL);
ttl = GetTtl(TtlType::CONFIGAUDIT_TTL);
} else {
ttl = GetTtl(GLOBAL_TTL);
ttl = GetTtl(TtlType::GLOBAL_TTL);
}
GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl));
columns.push_back(col);
Expand All @@ -389,7 +429,7 @@ void DbHandler::MessageTableOnlyInsert(const VizMsg *vmsgp) {
uint32_t temp_u32;
std::string temp_str;

int ttl = GetTtl(GLOBAL_TTL);
int ttl = GetTtl(TtlType::GLOBAL_TTL);
std::auto_ptr<GenDb::ColList> col_list(new GenDb::ColList);
col_list->cfname_ = g_viz_constants.COLLECTOR_GLOBAL_TABLE;
// Rowkey
Expand Down Expand Up @@ -514,7 +554,7 @@ void DbHandler::MessageTableInsert(const VizMsg *vmsgp) {
if ((stype == SandeshType::SYSLOG) || (stype == SandeshType::SYSTEM)) {
//Insert only if sandesh type is a SYSTEM LOG or SYSLOG
//Insert into the FieldNames stats table entries for Messagetype and Module ID
int ttl = GetTtl(GLOBAL_TTL);
int ttl = GetTtl(TtlType::GLOBAL_TTL);
FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE,
":Messagetype", message_type, header.get_Timestamp(), ttl);
FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE,
Expand Down Expand Up @@ -572,6 +612,13 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o
uint64_t &timestamp, const boost::uuids::uuid& unm, const VizMsg *vmsgp) {
uint32_t T2(timestamp >> g_viz_constants.RowTimeInBits);
uint32_t T1(timestamp & g_viz_constants.RowTimeInMask);
const std::string &message_type(vmsgp->msg->GetMessageType());
int ttl;
if (message_type == "VncApiConfigLog") {
ttl = GetTtl(TtlType::CONFIGAUDIT_TTL);
} else {
ttl = GetTtl(TtlType::GLOBAL_TTL);
}

{
uint8_t partition_no = 0;
Expand All @@ -589,13 +636,6 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o
col_name->push_back(T1);

GenDb::DbDataValueVec *col_value(new GenDb::DbDataValueVec(1, unm));
const std::string &message_type(vmsgp->msg->GetMessageType());
int ttl;
if (message_type == "VncApiConfigLog") {
ttl = GetTtl(CONFIGAUDIT_TTL);
} else {
ttl = GetTtl(GLOBAL_TTL);
}
GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl));
GenDb::NewColVec& columns = col_list->columns_;
columns.reserve(1);
Expand All @@ -617,7 +657,7 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o
rowkey.push_back(table);
GenDb::DbDataValueVec *col_name(new GenDb::DbDataValueVec(1, T1));
GenDb::DbDataValueVec *col_value(new GenDb::DbDataValueVec(1, objectkey_str));
GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value));
GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl));
GenDb::NewColVec& columns = col_list->columns_;
columns.reserve(1);
columns.push_back(col);
Expand All @@ -634,12 +674,6 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o
const SandeshHeader &header(vmsgp->msg->GetHeader());
const std::string &message_type(vmsgp->msg->GetMessageType());
//Insert into the FieldNames stats table entries for Messagetype and Module ID
int ttl;
if (message_type == "VncApiConfigLog") {
ttl = GetTtl(CONFIGAUDIT_TTL);
} else {
ttl = GetTtl(GLOBAL_TTL);
}
FieldNamesTableInsert(table, ":Objecttype", objectkey_str, timestamp, ttl);
FieldNamesTableInsert(table, ":Messagetype", message_type, timestamp, ttl);
FieldNamesTableInsert(table, ":ModuleId", header.get_Module(),
Expand Down Expand Up @@ -812,7 +846,7 @@ DbHandler::StatTableInsert(uint64_t ts,
const std::string& statAttr,
const TagMap & attribs_tag,
const AttribMap & attribs) {
int ttl = GetTtl(STATSDATA_TTL);
int ttl = GetTtl(TtlType::STATSDATA_TTL);
StatTableInsertTtl(ts, statName, statAttr, attribs_tag, attribs, ttl);
}

Expand Down Expand Up @@ -959,8 +993,8 @@ boost::uuids::uuid DbHandler::seed_uuid = StringToUuid(std::string("ffffffff-fff

static void PopulateFlowRecordTableColumns(
const std::vector<FlowRecordFields::type> &frvt,
FlowValueArray &fvalues, GenDb::NewColVec& columns, const DbHandler::TtlMap& ttl_map) {
int ttl = DbHandler::GetTtlFromMap(ttl_map, DbHandler::FLOWDATA_TTL);
FlowValueArray &fvalues, GenDb::NewColVec& columns, const TtlMap& ttl_map) {
int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL);
columns.reserve(frvt.size());
for (std::vector<FlowRecordFields::type>::const_iterator it = frvt.begin();
it != frvt.end(); it++) {
Expand All @@ -983,7 +1017,7 @@ static void PopulateFlowRecordTableRowKey(
}

static bool PopulateFlowRecordTable(FlowValueArray &fvalues,
GenDb::GenDbIf *dbif, const DbHandler::TtlMap& ttl_map) {
GenDb::GenDbIf *dbif, const TtlMap& ttl_map) {
std::auto_ptr<GenDb::ColList> colList(new GenDb::ColList);
colList->cfname_ = g_viz_constants.FLOW_TABLE;
PopulateFlowRecordTableRowKey(fvalues, colList->rowkey_);
Expand Down Expand Up @@ -1094,8 +1128,8 @@ static void PopulateFlowIndexTableColumnNames(FlowIndexTableType ftype,
static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype,
FlowValueArray &fvalues, uint32_t &T1,
GenDb::NewColVec &columns, const GenDb::DbDataValueVec &cvalues,
const DbHandler::TtlMap& ttl_map) {
int ttl = DbHandler::GetTtlFromMap(ttl_map, DbHandler::FLOWDATA_TTL);
const TtlMap& ttl_map) {
int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL);

GenDb::DbDataValueVec *names(new GenDb::DbDataValueVec);
PopulateFlowIndexTableColumnNames(ftype, fvalues, T1, names);
Expand All @@ -1107,7 +1141,7 @@ static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype,

static bool PopulateFlowIndexTables(FlowValueArray &fvalues,
uint32_t &T2, uint32_t &T1, uint8_t partition_no,
GenDb::GenDbIf *dbif, const DbHandler::TtlMap& ttl_map) {
GenDb::GenDbIf *dbif, const TtlMap& ttl_map) {
// Populate row key and column values (same for all flow index
// tables)
GenDb::DbDataValueVec rkey;
Expand Down Expand Up @@ -1314,7 +1348,7 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm,
const std::string &timer_task_name,
DbHandlerInitializer::InitializeDoneCb callback,
const std::vector<std::string> &cassandra_ips,
const std::vector<int> &cassandra_ports, const DbHandler::TtlMap& ttl_map) :
const std::vector<int> &cassandra_ports, const TtlMap& ttl_map) :
db_name_(db_name),
db_task_instance_(db_task_instance),
db_handler_(new DbHandler(evm,
Expand Down
20 changes: 7 additions & 13 deletions src/analytics/db_handler.h
Expand Up @@ -31,6 +31,7 @@
#include "sandesh/sandesh.h"
#include "viz_message.h"
#include "uflow_types.h"
#include "viz_constants.h"

class DbHandler {
public:
Expand Down Expand Up @@ -75,15 +76,6 @@ class DbHandler {
const Var& value);
};

typedef enum {
INVALID_TTL = 0,
FLOWDATA_TTL = 1,
STATSDATA_TTL = 2,
CONFIGAUDIT_TTL = 3,
GLOBAL_TTL = 4,
} TtlType;
typedef std::map<TtlType, int> TtlMap;

typedef std::map<std::string, std::string> RuleMap;

typedef std::map<std::string, Var > AttribMap;
Expand All @@ -96,8 +88,10 @@ class DbHandler {
DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map);
virtual ~DbHandler();

static int GetTtlFromMap(const TtlMap& ttl_map,
TtlType type);
static uint64_t GetTtlInHourFromMap(const TtlMap& ttl_map,
TtlType::type type);
static uint64_t GetTtlFromMap(const TtlMap& ttl_map,
TtlType::type type);
bool DropMessage(const SandeshHeader &header, const VizMsg *vmsg);
bool Init(bool initial, int instance);
void UnInit(int instance);
Expand Down Expand Up @@ -163,7 +157,7 @@ class DbHandler {
const std::pair<std::string,DbHandler::Var>& stag,
uint32_t t1, const boost::uuids::uuid& unm,
const std::string& jsonline, int ttl);
int GetTtl(TtlType type) {
uint64_t GetTtl(TtlType::type type) {
return GetTtlFromMap(ttl_map_, type);
}

Expand Down Expand Up @@ -211,7 +205,7 @@ class DbHandlerInitializer {
const std::string &timer_task_name, InitializeDoneCb callback,
const std::vector<std::string> &cassandra_ips,
const std::vector<int> &cassandra_ports,
const DbHandler::TtlMap& ttl_map);
const TtlMap& ttl_map);
DbHandlerInitializer(EventManager *evm,
const std::string &db_name, int db_task_instance,
const std::string &timer_task_name, InitializeDoneCb callback,
Expand Down
12 changes: 6 additions & 6 deletions src/analytics/main.cc
Expand Up @@ -313,14 +313,14 @@ int main(int argc, char *argv[])
options.analytics_statistics_ttl());
LOG(INFO, "COLLECTOR analytics_config_audit_ttl: " <<
options.analytics_config_audit_ttl());
DbHandler::TtlMap ttl_map;
ttl_map.insert(std::pair<DbHandler::TtlType, int>(DbHandler::FLOWDATA_TTL,
TtlMap ttl_map;
ttl_map.insert(std::make_pair(TtlType::FLOWDATA_TTL,
options.analytics_flow_ttl()));
ttl_map.insert(std::pair<DbHandler::TtlType, int>(DbHandler::STATSDATA_TTL,
ttl_map.insert(std::make_pair(TtlType::STATSDATA_TTL,
options.analytics_statistics_ttl()));
ttl_map.insert(std::pair<DbHandler::TtlType, int>
(DbHandler::CONFIGAUDIT_TTL, options.analytics_config_audit_ttl()));
ttl_map.insert(std::pair<DbHandler::TtlType, int>(DbHandler::GLOBAL_TTL,
ttl_map.insert(std::make_pair(TtlType::CONFIGAUDIT_TTL,
options.analytics_config_audit_ttl()));
ttl_map.insert(std::make_pair(TtlType::GLOBAL_TTL,
options.analytics_data_ttl()));

VizCollector analytics(a_evm,
Expand Down

0 comments on commit 105352b

Please sign in to comment.