diff --git a/src/analytics/collector.cc b/src/analytics/collector.cc index e8d2bd0a638..994a5607d05 100644 --- a/src/analytics/collector.cc +++ b/src/analytics/collector.cc @@ -81,7 +81,7 @@ const std::vector Collector::kSmQueueWaterMarkInfo Collector::Collector(EventManager *evm, short server_port, DbHandler *db_handler, OpServerProxy *osp, VizCallback cb, std::vector cassandra_ips, - std::vector cassandra_ports, const DbHandler::TtlMap& ttl_map) : + std::vector cassandra_ports, const TtlMap& ttl_map) : SandeshServer(evm), db_handler_(db_handler), osp_(osp), @@ -340,7 +340,7 @@ void Collector::TestDatabaseConnection() { // try to instantiate a new dbif instance for testing db connection testdbif_.reset( GenDb::GenDbIf::GenDbIfImpl( boost::bind(&Collector::TestDbConnErrHandler, this), - cassandra_ips_, cassandra_ports_, 3600, db_handler_->GetName(), true)); + cassandra_ips_, cassandra_ports_, db_handler_->GetName(), true)); if (!testdbif_->Db_Init("analytics::DbHandler", db_task_id_)) { if (dbConnStatus_ != ConnectionStatus::DOWN) { diff --git a/src/analytics/collector.h b/src/analytics/collector.h index 5a0e950ec5f..44e32c0db9b 100644 --- a/src/analytics/collector.h +++ b/src/analytics/collector.h @@ -72,7 +72,7 @@ class Collector : public SandeshServer { Collector(EventManager *evm, short server_port, DbHandler *db_handler, OpServerProxy *osp, VizCallback cb, std::vector cassandra_ips, - std::vector cassandra_ports, const DbHandler::TtlMap& ttl_map); + std::vector cassandra_ports, const TtlMap& ttl_map); virtual ~Collector(); virtual void Shutdown(); virtual void SessionShutdown(); @@ -118,7 +118,7 @@ class Collector : public SandeshServer { std::vector cassandra_ips() { return cassandra_ips_; } std::vector cassandra_ports() { return cassandra_ports_; } - const DbHandler::TtlMap& analytics_ttl_map() { return ttl_map_; } + const TtlMap& analytics_ttl_map() { return ttl_map_; } int db_task_id(); const CollectorStats &GetStats() const { return stats_; } void SendGeneratorStatistics(); @@ -167,7 +167,7 @@ class Collector : public SandeshServer { std::vector cassandra_ips_; std::vector cassandra_ports_; - DbHandler::TtlMap ttl_map_; + TtlMap ttl_map_; int db_task_id_; // SandeshGenerator map diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index 04c102db61b..04537ed9ab5 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -55,13 +55,8 @@ DbHandler::DbHandler(EventManager *evm, std::string name, const TtlMap& ttl_map) : name_(name), drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map) { - int analytics_ttl = DbHandler::GetTtlFromMap(ttl_map, DbHandler::GLOBAL_TTL); - if (analytics_ttl == -1) { - DB_LOG(ERROR, "Unexpected analytics_ttl value: " << analytics_ttl); - analytics_ttl = 0; - } dbif_.reset(GenDb::GenDbIf::GenDbIfImpl(err_handler, - cassandra_ips, cassandra_ports, analytics_ttl, name, false)); + cassandra_ips, cassandra_ports, name, false)); error_code error; col_name_ = boost::asio::ip::host_name(error); @@ -75,13 +70,23 @@ DbHandler::DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map) : DbHandler::~DbHandler() { } -int DbHandler::GetTtlFromMap(const DbHandler::TtlMap& ttl_map, - DbHandler::TtlType type) { - DbHandler::TtlMap::const_iterator it = ttl_map.find(type); +uint64_t DbHandler::GetTtlInHourFromMap(const TtlMap& ttl_map, + TtlType::type type) { + TtlMap::const_iterator it = ttl_map.find(type); + if (it != ttl_map.end()) { + return it->second; + } else { + return 0; + } +} + +uint64_t DbHandler::GetTtlFromMap(const TtlMap& ttl_map, + TtlType::type type) { + TtlMap::const_iterator it = ttl_map.find(type); if (it != ttl_map.end()) { return it->second*3600; } else { - return -1; + return 0; } } @@ -199,6 +204,41 @@ bool DbHandler::CreateTables() { } } + /* + * add ttls to cassandra to be retrieved by other daemons + */ + { + std::auto_ptr col_list(new GenDb::ColList); + col_list->cfname_ = g_viz_constants.SYSTEM_OBJECT_TABLE; + // Rowkey + GenDb::DbDataValueVec& rowkey = col_list->rowkey_; + rowkey.reserve(1); + rowkey.push_back(g_viz_constants.SYSTEM_OBJECT_ANALYTICS); + // Columns + GenDb::NewColVec& columns = col_list->columns_; + columns.reserve(4); + + GenDb::NewCol *col(new GenDb::NewCol( + g_viz_constants.SYSTEM_OBJECT_FLOW_DATA_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::FLOWDATA_TTL), 0)); + columns.push_back(col); + + GenDb::NewCol *flow_col(new GenDb::NewCol( + g_viz_constants.SYSTEM_OBJECT_STATS_DATA_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::STATSDATA_TTL), 0)); + columns.push_back(flow_col); + + GenDb::NewCol *msg_col(new GenDb::NewCol( + g_viz_constants.SYSTEM_OBJECT_CONFIG_AUDIT_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::CONFIGAUDIT_TTL), 0)); + columns.push_back(msg_col); + + GenDb::NewCol *stat_col(new GenDb::NewCol( + g_viz_constants.SYSTEM_OBJECT_GLOBAL_DATA_TTL, (uint64_t)DbHandler::GetTtlInHourFromMap(ttl_map_, TtlType::GLOBAL_TTL), 0)); + columns.push_back(stat_col); + + if (!dbif_->Db_AddColumnSync(col_list)) { + VIZD_ASSERT(0); + } + } + return true; } @@ -367,9 +407,9 @@ bool DbHandler::MessageIndexTableInsert(const std::string& cfname, GenDb::DbDataValueVec *col_value(new GenDb::DbDataValueVec(1, unm)); int ttl; if (message_type == "VncApiConfigLog") { - ttl = GetTtl(CONFIGAUDIT_TTL); + ttl = GetTtl(TtlType::CONFIGAUDIT_TTL); } else { - ttl = GetTtl(GLOBAL_TTL); + ttl = GetTtl(TtlType::GLOBAL_TTL); } GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); columns.push_back(col); @@ -389,7 +429,7 @@ void DbHandler::MessageTableOnlyInsert(const VizMsg *vmsgp) { uint32_t temp_u32; std::string temp_str; - int ttl = GetTtl(GLOBAL_TTL); + int ttl = GetTtl(TtlType::GLOBAL_TTL); std::auto_ptr col_list(new GenDb::ColList); col_list->cfname_ = g_viz_constants.COLLECTOR_GLOBAL_TABLE; // Rowkey @@ -514,7 +554,7 @@ void DbHandler::MessageTableInsert(const VizMsg *vmsgp) { if ((stype == SandeshType::SYSLOG) || (stype == SandeshType::SYSTEM)) { //Insert only if sandesh type is a SYSTEM LOG or SYSLOG //Insert into the FieldNames stats table entries for Messagetype and Module ID - int ttl = GetTtl(GLOBAL_TTL); + int ttl = GetTtl(TtlType::GLOBAL_TTL); FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE, ":Messagetype", message_type, header.get_Timestamp(), ttl); FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE, @@ -572,6 +612,13 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o uint64_t ×tamp, const boost::uuids::uuid& unm, const VizMsg *vmsgp) { uint32_t T2(timestamp >> g_viz_constants.RowTimeInBits); uint32_t T1(timestamp & g_viz_constants.RowTimeInMask); + const std::string &message_type(vmsgp->msg->GetMessageType()); + int ttl; + if (message_type == "VncApiConfigLog") { + ttl = GetTtl(TtlType::CONFIGAUDIT_TTL); + } else { + ttl = GetTtl(TtlType::GLOBAL_TTL); + } { uint8_t partition_no = 0; @@ -589,13 +636,6 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o col_name->push_back(T1); GenDb::DbDataValueVec *col_value(new GenDb::DbDataValueVec(1, unm)); - const std::string &message_type(vmsgp->msg->GetMessageType()); - int ttl; - if (message_type == "VncApiConfigLog") { - ttl = GetTtl(CONFIGAUDIT_TTL); - } else { - ttl = GetTtl(GLOBAL_TTL); - } GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); GenDb::NewColVec& columns = col_list->columns_; columns.reserve(1); @@ -617,7 +657,7 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o rowkey.push_back(table); GenDb::DbDataValueVec *col_name(new GenDb::DbDataValueVec(1, T1)); GenDb::DbDataValueVec *col_value(new GenDb::DbDataValueVec(1, objectkey_str)); - GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value)); + GenDb::NewCol *col(new GenDb::NewCol(col_name, col_value, ttl)); GenDb::NewColVec& columns = col_list->columns_; columns.reserve(1); columns.push_back(col); @@ -634,12 +674,6 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o const SandeshHeader &header(vmsgp->msg->GetHeader()); const std::string &message_type(vmsgp->msg->GetMessageType()); //Insert into the FieldNames stats table entries for Messagetype and Module ID - int ttl; - if (message_type == "VncApiConfigLog") { - ttl = GetTtl(CONFIGAUDIT_TTL); - } else { - ttl = GetTtl(GLOBAL_TTL); - } FieldNamesTableInsert(table, ":Objecttype", objectkey_str, timestamp, ttl); FieldNamesTableInsert(table, ":Messagetype", message_type, timestamp, ttl); FieldNamesTableInsert(table, ":ModuleId", header.get_Module(), @@ -812,7 +846,7 @@ DbHandler::StatTableInsert(uint64_t ts, const std::string& statAttr, const TagMap & attribs_tag, const AttribMap & attribs) { - int ttl = GetTtl(STATSDATA_TTL); + int ttl = GetTtl(TtlType::STATSDATA_TTL); StatTableInsertTtl(ts, statName, statAttr, attribs_tag, attribs, ttl); } @@ -959,8 +993,8 @@ boost::uuids::uuid DbHandler::seed_uuid = StringToUuid(std::string("ffffffff-fff static void PopulateFlowRecordTableColumns( const std::vector &frvt, - FlowValueArray &fvalues, GenDb::NewColVec& columns, const DbHandler::TtlMap& ttl_map) { - int ttl = DbHandler::GetTtlFromMap(ttl_map, DbHandler::FLOWDATA_TTL); + FlowValueArray &fvalues, GenDb::NewColVec& columns, const TtlMap& ttl_map) { + int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL); columns.reserve(frvt.size()); for (std::vector::const_iterator it = frvt.begin(); it != frvt.end(); it++) { @@ -983,7 +1017,7 @@ static void PopulateFlowRecordTableRowKey( } static bool PopulateFlowRecordTable(FlowValueArray &fvalues, - GenDb::GenDbIf *dbif, const DbHandler::TtlMap& ttl_map) { + GenDb::GenDbIf *dbif, const TtlMap& ttl_map) { std::auto_ptr colList(new GenDb::ColList); colList->cfname_ = g_viz_constants.FLOW_TABLE; PopulateFlowRecordTableRowKey(fvalues, colList->rowkey_); @@ -1094,8 +1128,8 @@ static void PopulateFlowIndexTableColumnNames(FlowIndexTableType ftype, static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype, FlowValueArray &fvalues, uint32_t &T1, GenDb::NewColVec &columns, const GenDb::DbDataValueVec &cvalues, - const DbHandler::TtlMap& ttl_map) { - int ttl = DbHandler::GetTtlFromMap(ttl_map, DbHandler::FLOWDATA_TTL); + const TtlMap& ttl_map) { + int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL); GenDb::DbDataValueVec *names(new GenDb::DbDataValueVec); PopulateFlowIndexTableColumnNames(ftype, fvalues, T1, names); @@ -1107,7 +1141,7 @@ static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype, static bool PopulateFlowIndexTables(FlowValueArray &fvalues, uint32_t &T2, uint32_t &T1, uint8_t partition_no, - GenDb::GenDbIf *dbif, const DbHandler::TtlMap& ttl_map) { + GenDb::GenDbIf *dbif, const TtlMap& ttl_map) { // Populate row key and column values (same for all flow index // tables) GenDb::DbDataValueVec rkey; @@ -1314,7 +1348,7 @@ DbHandlerInitializer::DbHandlerInitializer(EventManager *evm, const std::string &timer_task_name, DbHandlerInitializer::InitializeDoneCb callback, const std::vector &cassandra_ips, - const std::vector &cassandra_ports, const DbHandler::TtlMap& ttl_map) : + const std::vector &cassandra_ports, const TtlMap& ttl_map) : db_name_(db_name), db_task_instance_(db_task_instance), db_handler_(new DbHandler(evm, diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 07326de739d..c69df612a9e 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -31,6 +31,7 @@ #include "sandesh/sandesh.h" #include "viz_message.h" #include "uflow_types.h" +#include "viz_constants.h" class DbHandler { public: @@ -75,15 +76,6 @@ class DbHandler { const Var& value); }; - typedef enum { - INVALID_TTL = 0, - FLOWDATA_TTL = 1, - STATSDATA_TTL = 2, - CONFIGAUDIT_TTL = 3, - GLOBAL_TTL = 4, - } TtlType; - typedef std::map TtlMap; - typedef std::map RuleMap; typedef std::map AttribMap; @@ -96,8 +88,10 @@ class DbHandler { DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map); virtual ~DbHandler(); - static int GetTtlFromMap(const TtlMap& ttl_map, - TtlType type); + static uint64_t GetTtlInHourFromMap(const TtlMap& ttl_map, + TtlType::type type); + static uint64_t GetTtlFromMap(const TtlMap& ttl_map, + TtlType::type type); bool DropMessage(const SandeshHeader &header, const VizMsg *vmsg); bool Init(bool initial, int instance); void UnInit(int instance); @@ -163,7 +157,7 @@ class DbHandler { const std::pair& stag, uint32_t t1, const boost::uuids::uuid& unm, const std::string& jsonline, int ttl); - int GetTtl(TtlType type) { + uint64_t GetTtl(TtlType::type type) { return GetTtlFromMap(ttl_map_, type); } @@ -211,7 +205,7 @@ class DbHandlerInitializer { const std::string &timer_task_name, InitializeDoneCb callback, const std::vector &cassandra_ips, const std::vector &cassandra_ports, - const DbHandler::TtlMap& ttl_map); + const TtlMap& ttl_map); DbHandlerInitializer(EventManager *evm, const std::string &db_name, int db_task_instance, const std::string &timer_task_name, InitializeDoneCb callback, diff --git a/src/analytics/main.cc b/src/analytics/main.cc index 8f9bcbdc833..b95adad16e3 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -313,14 +313,14 @@ int main(int argc, char *argv[]) options.analytics_statistics_ttl()); LOG(INFO, "COLLECTOR analytics_config_audit_ttl: " << options.analytics_config_audit_ttl()); - DbHandler::TtlMap ttl_map; - ttl_map.insert(std::pair(DbHandler::FLOWDATA_TTL, + TtlMap ttl_map; + ttl_map.insert(std::make_pair(TtlType::FLOWDATA_TTL, options.analytics_flow_ttl())); - ttl_map.insert(std::pair(DbHandler::STATSDATA_TTL, + ttl_map.insert(std::make_pair(TtlType::STATSDATA_TTL, options.analytics_statistics_ttl())); - ttl_map.insert(std::pair - (DbHandler::CONFIGAUDIT_TTL, options.analytics_config_audit_ttl())); - ttl_map.insert(std::pair(DbHandler::GLOBAL_TTL, + ttl_map.insert(std::make_pair(TtlType::CONFIGAUDIT_TTL, + options.analytics_config_audit_ttl())); + ttl_map.insert(std::make_pair(TtlType::GLOBAL_TTL, options.analytics_data_ttl())); VizCollector analytics(a_evm, diff --git a/src/analytics/options.cc b/src/analytics/options.cc index 9ce9f087558..6bfca4fce3f 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -86,16 +86,16 @@ void Options::Initialize(EventManager &evm, "Listener port of Google Protocol Buffer collector server") ("DEFAULT.analytics_data_ttl", - opt::value()->default_value(g_viz_constants.AnalyticsTTL), + opt::value()->default_value(g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second), "global TTL(hours) for analytics data") ("DEFAULT.analytics_config_audit_ttl", - opt::value()->default_value(g_viz_constants.AnalyticsConfigAuditTTL), + opt::value()->default_value(g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second), "global TTL(hours) for analytics config audit data") ("DEFAULT.analytics_statistics_ttl", - opt::value()->default_value(g_viz_constants.AnalyticsStatisticsTTL), + opt::value()->default_value(g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second), "global TTL(hours) for analytics stats data") ("DEFAULT.analytics_flow_ttl", - opt::value()->default_value(g_viz_constants.AnalyticsFlowTTL), + opt::value()->default_value(g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second), "global TTL(hours) for analytics flow data") ("DEFAULT.cassandra_server_list", opt::value >()->default_value( @@ -265,22 +265,25 @@ void Options::Process(int argc, char *argv[], } else { collector_protobuf_port_configured_ = false; } - GetOptValue(var_map, analytics_data_ttl_, + GetOptValue(var_map, analytics_data_ttl_, "DEFAULT.analytics_data_ttl"); - GetOptValue(var_map, analytics_config_audit_ttl_, + if (analytics_data_ttl_ == (uint64_t)-1) { + analytics_data_ttl_ = g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second; + } + GetOptValue(var_map, analytics_config_audit_ttl_, "DEFAULT.analytics_config_audit_ttl"); - if (analytics_config_audit_ttl_ == -1) { - analytics_config_audit_ttl_ = analytics_data_ttl_; + if (analytics_config_audit_ttl_ == (uint64_t)-1) { + analytics_config_audit_ttl_ = g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second; } - GetOptValue(var_map, analytics_statistics_ttl_, + GetOptValue(var_map, analytics_statistics_ttl_, "DEFAULT.analytics_statistics_ttl"); - if (analytics_statistics_ttl_ == -1) { - analytics_statistics_ttl_ = analytics_data_ttl_; + if (analytics_statistics_ttl_ == (uint64_t)-1) { + analytics_statistics_ttl_ = g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second; } - GetOptValue(var_map, analytics_flow_ttl_, + GetOptValue(var_map, analytics_flow_ttl_, "DEFAULT.analytics_flow_ttl"); - if (analytics_flow_ttl_ == -1) { - analytics_flow_ttl_ = analytics_statistics_ttl_; + if (analytics_flow_ttl_ == (uint64_t)-1) { + analytics_flow_ttl_ = g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second; } GetOptValue< vector >(var_map, cassandra_server_list_, diff --git a/src/analytics/options.h b/src/analytics/options.h index bbcca2d4e4d..89d44ab41e4 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -49,10 +49,10 @@ class Options { const bool use_syslog() const { return use_syslog_; } const std::string syslog_facility() const { return syslog_facility_; } const bool dup() const { return dup_; } - const int analytics_data_ttl() const { return analytics_data_ttl_; } - const int analytics_flow_ttl() const { return analytics_flow_ttl_; } - const int analytics_statistics_ttl() const { return analytics_statistics_ttl_; } - const int analytics_config_audit_ttl() const { return analytics_config_audit_ttl_; } + const uint64_t analytics_data_ttl() const { return analytics_data_ttl_; } + const uint64_t analytics_flow_ttl() const { return analytics_flow_ttl_; } + const uint64_t analytics_statistics_ttl() const { return analytics_statistics_ttl_; } + const uint64_t analytics_config_audit_ttl() const { return analytics_config_audit_ttl_; } const int syslog_port() const { return syslog_port_; } const int sflow_port() const { return sflow_port_; } const int ipfix_port() const { return ipfix_port_; } @@ -112,10 +112,10 @@ class Options { int ipfix_port_; bool test_mode_; bool dup_; - int analytics_data_ttl_; - int analytics_config_audit_ttl_; - int analytics_flow_ttl_; - int analytics_statistics_ttl_; + uint64_t analytics_data_ttl_; + uint64_t analytics_config_audit_ttl_; + uint64_t analytics_flow_ttl_; + uint64_t analytics_statistics_ttl_; std::vector cassandra_server_list_; std::vector kafka_broker_list_; uint16_t partitions_; diff --git a/src/analytics/protobuf_collector.cc b/src/analytics/protobuf_collector.cc index b0a203da64c..76d8fc0a790 100644 --- a/src/analytics/protobuf_collector.cc +++ b/src/analytics/protobuf_collector.cc @@ -17,7 +17,7 @@ const std::string ProtobufCollector::kDbTaskName("protobuf_collector::Db"); ProtobufCollector::ProtobufCollector(EventManager *evm, uint16_t protobuf_udp_port, const std::vector &cassandra_ips, - const std::vector &cassandra_ports, const DbHandler::TtlMap& ttl_map) : + const std::vector &cassandra_ports, const TtlMap& ttl_map) : db_initializer_(new DbHandlerInitializer(evm, kDbName, kDbTaskInstance, kDbTaskName, boost::bind(&ProtobufCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, ttl_map)), diff --git a/src/analytics/protobuf_collector.h b/src/analytics/protobuf_collector.h index 28880e911ea..8c8d9d47c12 100644 --- a/src/analytics/protobuf_collector.h +++ b/src/analytics/protobuf_collector.h @@ -17,7 +17,7 @@ class ProtobufCollector { public: ProtobufCollector(EventManager *evm, uint16_t udp_server_port, const std::vector &cassandra_ips, - const std::vector &cassandra_ports, const DbHandler::TtlMap&); + const std::vector &cassandra_ports, const TtlMap&); virtual ~ProtobufCollector(); bool Initialize(); void Shutdown(); diff --git a/src/analytics/test/db_handler_mock.h b/src/analytics/test/db_handler_mock.h index 6d344be8090..81709d9c520 100644 --- a/src/analytics/test/db_handler_mock.h +++ b/src/analytics/test/db_handler_mock.h @@ -11,7 +11,7 @@ class DbHandlerMock : public DbHandler { public: - DbHandlerMock(EventManager *evm, const DbHandler::TtlMap& ttl_map) : + DbHandlerMock(EventManager *evm, const TtlMap& ttl_map) : DbHandler(evm, boost::bind(&DbHandlerMock::StartDbifReinit, this), std::vector(1, "127.0.0.1"), std::vector(1, 9160), "localhost", ttl_map) diff --git a/src/analytics/test/db_handler_test.cc b/src/analytics/test/db_handler_test.cc index b887f0abb55..6c8d976ef46 100644 --- a/src/analytics/test/db_handler_test.cc +++ b/src/analytics/test/db_handler_test.cc @@ -30,10 +30,7 @@ using ::testing::ElementsAreArray; using namespace pugi; using namespace GenDb; -DbHandler::TtlMap ttl_map = boost::assign::map_list_of(DbHandler::FLOWDATA_TTL, 2) - (DbHandler::STATSDATA_TTL, 4) - (DbHandler::CONFIGAUDIT_TTL, 240) - (DbHandler::GLOBAL_TTL, 48); +TtlMap ttl_map = g_viz_constants.TtlValuesDefault; class DbHandlerTest : public ::testing::Test { public: @@ -143,7 +140,7 @@ TEST_F(DbHandlerTest, MessageTableOnlyInsertTest) { GenDb::DbDataValueVec rowkey; rowkey.push_back(unm); - int ttl = ttl_map.find(DbHandler::GLOBAL_TTL)->second; + int ttl = ttl_map.find(TtlType::GLOBAL_TTL)->second; boost::ptr_vector msg_table_expected_vector = boost::assign::ptr_list_of (GenDb::NewCol(g_viz_constants.SOURCE, hdr.get_Source(), ttl)) @@ -186,12 +183,13 @@ TEST_F(DbHandlerTest, MessageIndexTableInsertTest) { hdr.set_Timestamp(UTCTimestampUsec()); boost::uuids::uuid unm(rgen_()); + int ttl = ttl_map.find(TtlType::GLOBAL_TTL)->second; DbDataValueVec *colname(new DbDataValueVec(1, (uint32_t)(hdr.get_Timestamp() & g_viz_constants.RowTimeInMask))); DbDataValueVec *colvalue(new DbDataValueVec(1, unm)); boost::ptr_vector idx_expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue)); + (GenDb::NewCol(colname, colvalue, ttl)); GenDb::DbDataValueVec src_idx_rowkey; src_idx_rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); @@ -235,24 +233,24 @@ TEST_F(DbHandlerTest, MessageTableInsertTest) { boost::ptr_vector msg_table_expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(g_viz_constants.SOURCE, hdr.get_Source())) - (GenDb::NewCol(g_viz_constants.NAMESPACE, std::string())) - (GenDb::NewCol(g_viz_constants.MODULE, hdr.get_Module())) - (GenDb::NewCol(g_viz_constants.INSTANCE_ID, hdr.get_InstanceId())) - (GenDb::NewCol(g_viz_constants.NODE_TYPE, hdr.get_NodeType())) + (GenDb::NewCol(g_viz_constants.SOURCE, hdr.get_Source(), 0)) + (GenDb::NewCol(g_viz_constants.NAMESPACE, std::string(), 0)) + (GenDb::NewCol(g_viz_constants.MODULE, hdr.get_Module(), 0)) + (GenDb::NewCol(g_viz_constants.INSTANCE_ID, hdr.get_InstanceId(), 0)) + (GenDb::NewCol(g_viz_constants.NODE_TYPE, hdr.get_NodeType(), 0)) (GenDb::NewCol(g_viz_constants.TIMESTAMP, - static_cast(hdr.get_Timestamp()))) - (GenDb::NewCol(g_viz_constants.CATEGORY, std::string())) + static_cast(hdr.get_Timestamp()), 0)) + (GenDb::NewCol(g_viz_constants.CATEGORY, std::string(), 0)) (GenDb::NewCol(g_viz_constants.LEVEL, - static_cast(0))) - (GenDb::NewCol(g_viz_constants.MESSAGE_TYPE, messagetype)) + static_cast(0), 0)) + (GenDb::NewCol(g_viz_constants.MESSAGE_TYPE, messagetype, 0)) (GenDb::NewCol(g_viz_constants.SEQUENCE_NUM, - static_cast(0))) + static_cast(0), 0)) (GenDb::NewCol(g_viz_constants.VERSION, - static_cast(0))) + static_cast(0), 0)) (GenDb::NewCol(g_viz_constants.SANDESH_TYPE, - static_cast(SandeshType::SYSTEM))) - (GenDb::NewCol(g_viz_constants.DATA, xmlmessage)); + static_cast(SandeshType::SYSTEM), 0)) + (GenDb::NewCol(g_viz_constants.DATA, xmlmessage, 0)); EXPECT_CALL(*dbif_mock(), Db_AddColumnProxy( @@ -269,7 +267,7 @@ TEST_F(DbHandlerTest, MessageTableInsertTest) { DbDataValueVec *colvalue(new DbDataValueVec(1, unm)); boost::ptr_vector idx_expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue)); + (GenDb::NewCol(colname, colvalue, 0)); GenDb::DbDataValueVec src_idx_rowkey; src_idx_rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); @@ -384,7 +382,7 @@ TEST_F(DbHandlerTest, ObjectTableInsertTest) { DbDataValueVec *colvalue(new DbDataValueVec(1, unm)); boost::ptr_vector expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue)); + (GenDb::NewCol(colname, colvalue, 0)); GenDb::DbDataValueVec rowkey; rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); @@ -408,7 +406,7 @@ TEST_F(DbHandlerTest, ObjectTableInsertTest) { "ObjectTableInsertTestRowkey")); boost::ptr_vector expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue)); + (GenDb::NewCol(colname, colvalue, 0)); GenDb::DbDataValueVec rowkey; rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); @@ -435,7 +433,7 @@ TEST_F(DbHandlerTest, ObjectTableInsertTest) { DbDataValueVec *colvalue(new DbDataValueVec(1,"")); boost::ptr_vector expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue)); + (GenDb::NewCol(colname, colvalue, 0)); GenDb::DbDataValueVec rowkey; rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); @@ -463,7 +461,7 @@ TEST_F(DbHandlerTest, ObjectTableInsertTest) { DbDataValueVec *colvalue(new DbDataValueVec(1,"")); boost::ptr_vector expected_vector = boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue)); + (GenDb::NewCol(colname, colvalue, 0)); GenDb::DbDataValueVec rowkey; rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); @@ -531,7 +529,7 @@ TEST_F(DbHandlerTest, FlowTableInsertTest) { ocolvalue.push_back((uint16_t)-24590); //dport ocolvalue.push_back(""); //json - int ttl = ttl_map.find(DbHandler::FLOWDATA_TTL)->second; + int ttl = ttl_map.find(TtlType::FLOWDATA_TTL)->second; { GenDb::DbDataValueVec *colname(new GenDb::DbDataValueVec); diff --git a/src/analytics/test/options_test.cc b/src/analytics/test/options_test.cc index 9926bb4ab6f..2ade70c2f8a 100644 --- a/src/analytics/test/options_test.cc +++ b/src/analytics/test/options_test.cc @@ -74,10 +74,10 @@ TEST_F(OptionsTest, NoArguments) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), false); - EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.AnalyticsTTL); - EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.AnalyticsConfigAuditTTL); - EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.AnalyticsStatisticsTTL); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second); + EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second); + EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); EXPECT_EQ(options_.syslog_port(), -1); EXPECT_EQ(options_.dup(), false); EXPECT_EQ(options_.test_mode(), false); @@ -115,10 +115,10 @@ TEST_F(OptionsTest, DefaultConfFile) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.AnalyticsTTL); - EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.AnalyticsConfigAuditTTL); - EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.AnalyticsStatisticsTTL); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second); + EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second); + EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); EXPECT_EQ(options_.syslog_port(), -1); EXPECT_EQ(options_.dup(), false); EXPECT_EQ(options_.test_mode(), false); @@ -158,10 +158,10 @@ TEST_F(OptionsTest, OverrideStringFromCommandLine) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.AnalyticsTTL); - EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.AnalyticsConfigAuditTTL); - EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.AnalyticsStatisticsTTL); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second); + EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second); + EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); EXPECT_EQ(options_.syslog_port(), -1); EXPECT_EQ(options_.dup(), false); EXPECT_EQ(options_.test_mode(), false); @@ -201,10 +201,10 @@ TEST_F(OptionsTest, OverrideBooleanFromCommandLine) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.AnalyticsTTL); - EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.AnalyticsConfigAuditTTL); - EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.AnalyticsStatisticsTTL); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second); + EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second); + EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); EXPECT_EQ(options_.syslog_port(), -1); EXPECT_EQ(options_.dup(), false); EXPECT_EQ(options_.test_mode(), true); // Overridden from command line. @@ -285,10 +285,10 @@ TEST_F(OptionsTest, CustomConfigFile) { EXPECT_EQ(options_.log_file_size(), 1024); EXPECT_EQ(options_.log_level(), "SYS_DEBUG"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.AnalyticsTTL); - EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.AnalyticsConfigAuditTTL); - EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.AnalyticsStatisticsTTL); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_data_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::GLOBAL_TTL)->second); + EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second); + EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); EXPECT_EQ(options_.syslog_port(), 101); EXPECT_EQ(options_.dup(), true); EXPECT_EQ(options_.test_mode(), true); @@ -385,9 +385,9 @@ TEST_F(OptionsTest, CustomConfigFileAndOverrideFromCommandLine) { EXPECT_EQ(options_.log_level(), "SYS_DEBUG"); EXPECT_EQ(options_.log_local(), true); EXPECT_EQ(options_.analytics_data_ttl(), 30); - EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.AnalyticsConfigAuditTTL); - EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.AnalyticsStatisticsTTL); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_config_audit_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::CONFIGAUDIT_TTL)->second); + EXPECT_EQ(options_.analytics_statistics_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::STATSDATA_TTL)->second); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); EXPECT_EQ(options_.syslog_port(), 102); EXPECT_EQ(options_.dup(), true); EXPECT_EQ(options_.test_mode(), true); @@ -433,7 +433,7 @@ TEST_F(OptionsTest, MultiTtlOption) { EXPECT_EQ(options_.analytics_data_ttl(), 2); EXPECT_EQ(options_.analytics_config_audit_ttl(), 240); EXPECT_EQ(options_.analytics_statistics_ttl(), 4); - EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.AnalyticsFlowTTL); + EXPECT_EQ(options_.analytics_flow_ttl(), g_viz_constants.TtlValuesDefault.find(TtlType::FLOWDATA_TTL)->second); } int main(int argc, char **argv) { diff --git a/src/analytics/test/syslog_test.cc b/src/analytics/test/syslog_test.cc index bb31f4981c7..72a73538b45 100644 --- a/src/analytics/test/syslog_test.cc +++ b/src/analytics/test/syslog_test.cc @@ -15,10 +15,7 @@ using ::testing::StrEq; using ::testing::Pointee; using ::testing::_; -DbHandler::TtlMap ttl_map = boost::assign::map_list_of(DbHandler::FLOWDATA_TTL, 2) - (DbHandler::STATSDATA_TTL, 4) - (DbHandler::CONFIGAUDIT_TTL, 240) - (DbHandler::GLOBAL_TTL, 48); +TtlMap ttl_map = g_viz_constants.TtlValuesDefault; class SyslogParserTestHelper : public SyslogParser { diff --git a/src/analytics/viz.sandesh b/src/analytics/viz.sandesh index 9a0d5223199..ba0bb483c16 100644 --- a/src/analytics/viz.sandesh +++ b/src/analytics/viz.sandesh @@ -47,6 +47,10 @@ const string SYSTEM_OBJECT_START_TIME = "SystemObjectStartTime" const string SYSTEM_OBJECT_FLOW_START_TIME = "SystemObjectFlowStartTime" const string SYSTEM_OBJECT_STAT_START_TIME = "SystemObjectStatStartTime" const string SYSTEM_OBJECT_MSG_START_TIME = "SystemObjectMsgStartTime" +const string SYSTEM_OBJECT_FLOW_DATA_TTL = "SystemObjectFlowDataTtl" +const string SYSTEM_OBJECT_STATS_DATA_TTL = "SystemObjectStatsDataTtl" +const string SYSTEM_OBJECT_CONFIG_AUDIT_TTL = "SystemObjectConfigAuditTtl" +const string SYSTEM_OBJECT_GLOBAL_DATA_TTL = "SystemObjectGlobalDataTtl" // Master object table which contains all object tables combined const string OBJECT_TABLE = "ObjectTable" @@ -134,6 +138,22 @@ const string STATS_TABLE_BY_DBL_TAG = "StatsTableByDblTagV3" const string STATS_TABLE_BY_STR_TAG = "StatsTableByStrTagV3" const string STATS_TABLE_BY_U64_TAG = "StatsTableByU64TagV3" +enum TtlType { + FLOWDATA_TTL, + STATSDATA_TTL, + CONFIGAUDIT_TTL, + GLOBAL_TTL, +} + +typedef map TtlMap + +const map TtlValuesDefault = { + TtlType.FLOWDATA_TTL : 2, + TtlType.STATSDATA_TTL : 168, + TtlType.CONFIGAUDIT_TTL : 2160, + TtlType.GLOBAL_TTL : 48, +} + const list _FLOW_TABLES = [ FLOW_TABLE_SVN_SIP, FLOW_TABLE_DVN_DIP, @@ -329,12 +349,6 @@ const string ANALYTICS_START = "ANALYTICS_START_TIME" const i32 RowTimeInBits = 23 const i32 RowTimeInMask = 0x7fffff -// analytics data ttl in the db in hours -const i32 AnalyticsTTL = 48 -const i32 AnalyticsConfigAuditTTL = 2160 -const i32 AnalyticsStatisticsTTL = 168 -const i32 AnalyticsFlowTTL = 2 - const map UVE_MAP = { "virtual-network" : VN_TABLE, "virtual-machine" : VM_TABLE, diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index ea9b1bbef3b..0743229331f 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -34,7 +34,7 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, const std::string &brokers, int syslog_port, int sflow_port, int ipfix_port, uint16_t partitions, - bool dup, const DbHandler::TtlMap& ttl_map) : + bool dup, const TtlMap& ttl_map) : db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1, std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index 17b67b7dae1..ecc3ae3c378 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -37,7 +37,7 @@ class VizCollector { const std::string &brokers, int syslog_port, int sflow_port, int ipfix_port, uint16_t partitions, - bool dup, const DbHandler::TtlMap &ttlmap); + bool dup, const TtlMap &ttlmap); VizCollector(EventManager *evm, DbHandler *db_handler, Ruleeng *ruleeng, Collector *collector, OpServerProxy *osp); ~VizCollector(); diff --git a/src/analytics/vizd_table_desc.cc b/src/analytics/vizd_table_desc.cc index 6e3d032bd1b..bb5df87a9ba 100644 --- a/src/analytics/vizd_table_desc.cc +++ b/src/analytics/vizd_table_desc.cc @@ -120,6 +120,14 @@ void init_vizd_tables() { GenDb::DbDataType::Unsigned64Type) (g_viz_constants.SYSTEM_OBJECT_STAT_START_TIME, GenDb::DbDataType::Unsigned64Type) + (g_viz_constants.SYSTEM_OBJECT_FLOW_DATA_TTL, + GenDb::DbDataType::Unsigned64Type) + (g_viz_constants.SYSTEM_OBJECT_STATS_DATA_TTL, + GenDb::DbDataType::Unsigned64Type) + (g_viz_constants.SYSTEM_OBJECT_CONFIG_AUDIT_TTL, + GenDb::DbDataType::Unsigned64Type) + (g_viz_constants.SYSTEM_OBJECT_GLOBAL_DATA_TTL, + GenDb::DbDataType::Unsigned64Type) )) (GenDb::NewCf(g_viz_constants.OBJECT_TABLE, boost::assign::list_of diff --git a/src/gendb/cdb_if.cc b/src/gendb/cdb_if.cc index 424cf234e2d..5fa6dac1310 100644 --- a/src/gendb/cdb_if.cc +++ b/src/gendb/cdb_if.cc @@ -619,7 +619,7 @@ class CdbIf::InitTask : public Task { CdbIf::CdbIf(DbErrorHandler errhandler, const std::vector &cassandra_ips, - const std::vector &cassandra_ports, int ttl, + const std::vector &cassandra_ports, std::string name, bool only_sync) : socket_(new TSocketPool(cassandra_ips, cassandra_ports)), transport_(new TFramedTransport(socket_)), @@ -629,7 +629,6 @@ CdbIf::CdbIf(DbErrorHandler errhandler, name_(name), init_task_(NULL), cleanup_task_(NULL), - cassandra_ttl_(ttl), only_sync_(only_sync), task_instance_(-1), prev_task_instance_(-1), @@ -646,7 +645,6 @@ CdbIf::CdbIf(DbErrorHandler errhandler, CdbIf::CdbIf() : init_task_(NULL), cleanup_task_(NULL), - cassandra_ttl_(-1), only_sync_(false), task_instance_(-1), prev_task_instance_(-1), @@ -1332,11 +1330,7 @@ bool CdbIf::Db_AsyncAddColumn(CdbIfColList &cl) { c.__set_value(col_value); // Timestamp and TTL c.__set_timestamp(ts); - if (it->ttl == -1) { - if (cassandra_ttl_) { - c.__set_ttl(cassandra_ttl_); - } - } else if (it->ttl) { + if (it->ttl > 0) { c.__set_ttl(it->ttl); } c_or_sc.__set_column(c); @@ -1357,11 +1351,7 @@ bool CdbIf::Db_AsyncAddColumn(CdbIfColList &cl) { c.__set_value(col_value); // Timestamp and TTL c.__set_timestamp(ts); - if (it->ttl == -1) { - if (cassandra_ttl_) { - c.__set_ttl(cassandra_ttl_); - } - } else if (it->ttl) { + if (it->ttl > 0) { c.__set_ttl(it->ttl); } c_or_sc.__set_column(c); @@ -1437,7 +1427,7 @@ bool CdbIf::ColListFromColumnOrSuper(GenDb::ColList& ret, " Decode FAILED"); continue; } - GenDb::NewCol *col(new GenDb::NewCol(citer->column.name, res)); + GenDb::NewCol *col(new GenDb::NewCol(citer->column.name, res, 0)); columns.push_back(col); } } else if (cf->cftype_ == NewCf::COLUMN_FAMILY_NOSQL) { @@ -1456,7 +1446,7 @@ bool CdbIf::ColListFromColumnOrSuper(GenDb::ColList& ret, CDBIF_LOG_ERR(cfname << ": Column Value Decode FAILED"); continue; } - GenDb::NewCol *col(new GenDb::NewCol(name.release(), value.release())); + GenDb::NewCol *col(new GenDb::NewCol(name.release(), value.release(), 0)); columns.push_back(col); } } diff --git a/src/gendb/cdb_if.h b/src/gendb/cdb_if.h index ab1dd1fe46f..92b4946d11a 100644 --- a/src/gendb/cdb_if.h +++ b/src/gendb/cdb_if.h @@ -23,7 +23,7 @@ class CdbIf : public GenDb::GenDbIf { public: CdbIf(DbErrorHandler, const std::vector&, - const std::vector&, int ttl, std::string name, + const std::vector&, std::string name, bool only_sync); CdbIf(); ~CdbIf(); @@ -255,7 +255,6 @@ class CdbIf : public GenDb::GenDbIf { mutable tbb::mutex cdbq_mutex_; InitTask *init_task_; CleanupTask *cleanup_task_; - int cassandra_ttl_; bool only_sync_; int task_instance_; int prev_task_instance_; diff --git a/src/gendb/gendb_if.cc b/src/gendb/gendb_if.cc index 63547a197df..b21ddb47e9d 100644 --- a/src/gendb/gendb_if.cc +++ b/src/gendb/gendb_if.cc @@ -12,8 +12,8 @@ using namespace GenDb; GenDbIf *GenDbIf::GenDbIfImpl(GenDbIf::DbErrorHandler hdlr, const std::vector &cassandra_ips, const std::vector &cassandra_ports, - int analytics_ttl, std::string name, bool only_sync) { - return (new CdbIf(hdlr, cassandra_ips, cassandra_ports, analytics_ttl, + std::string name, bool only_sync) { + return (new CdbIf(hdlr, cassandra_ips, cassandra_ports, name, only_sync)); } diff --git a/src/gendb/gendb_if.h b/src/gendb/gendb_if.h index 8bc49c9fc2a..0af9d347680 100644 --- a/src/gendb/gendb_if.h +++ b/src/gendb/gendb_if.h @@ -97,10 +97,10 @@ struct NewCf { }; struct NewCol { - NewCol(DbDataValueVec* n, DbDataValueVec* v, int ttl=-1) : + NewCol(DbDataValueVec* n, DbDataValueVec* v, int ttl) : cftype_(NewCf::COLUMN_FAMILY_NOSQL), name(n), value(v), ttl(ttl) {} - NewCol(const std::string& n, const DbDataValue& v, int ttl=-1) : + NewCol(const std::string& n, const DbDataValue& v, int ttl) : cftype_(NewCf::COLUMN_FAMILY_SQL), name(new DbDataValueVec(1, n)), value(new DbDataValueVec(1, v)), ttl(ttl) {} @@ -204,7 +204,7 @@ class GenDbIf { static GenDbIf *GenDbIfImpl(DbErrorHandler hdlr, const std::vector &cassandra_ips, const std::vector &cassandra_ports, - int analytics_ttl, std::string name, bool only_sync); + std::string name, bool only_sync); }; } // namespace GenDb diff --git a/src/gendb/test/gendb_if_test.cc b/src/gendb/test/gendb_if_test.cc index 15932a412cf..2401f43e1aa 100644 --- a/src/gendb/test/gendb_if_test.cc +++ b/src/gendb/test/gendb_if_test.cc @@ -527,25 +527,25 @@ class GenDbTest : public ::testing::Test { TEST_F(GenDbTest, NewColSizeSql) { // SQL - string - GenDb::NewCol tstring_col(tstring_, tstring_); + GenDb::NewCol tstring_col(tstring_, tstring_, 0); EXPECT_EQ(tstring_.length() + tstring_.length(), tstring_col.GetSize()); // SQL - uint64_t - GenDb::NewCol tu64_col(tstring_, tu64_); + GenDb::NewCol tu64_col(tstring_, tu64_, 0); EXPECT_EQ(tstring_.length() + sizeof(tu64_), tu64_col.GetSize()); // SQL - uint32_t - GenDb::NewCol tu32_col(tstring_, tu32_); + GenDb::NewCol tu32_col(tstring_, tu32_, 0); EXPECT_EQ(tstring_.length() + sizeof(tu32_), tu32_col.GetSize()); // SQL - boost::uuids::uuid - GenDb::NewCol tuuid_col(tstring_, tuuid_); + GenDb::NewCol tuuid_col(tstring_, tuuid_, 0); EXPECT_EQ(tstring_.length() + tuuid_.size(), tuuid_col.GetSize()); // SQL - uint8_t - GenDb::NewCol tu8_col(tstring_, tu8_); + GenDb::NewCol tu8_col(tstring_, tu8_, 0); EXPECT_EQ(tstring_.length() + sizeof(tu8_), tu8_col.GetSize()); // SQL - uint16_t - GenDb::NewCol tu16_col(tstring_, tu16_); + GenDb::NewCol tu16_col(tstring_, tu16_, 0); EXPECT_EQ(tstring_.length() + sizeof(tu16_), tu16_col.GetSize()); // SQL - double - GenDb::NewCol tdouble_col(tstring_, tdouble_); + GenDb::NewCol tdouble_col(tstring_, tdouble_, 0); EXPECT_EQ(tstring_.length() + sizeof(tdouble_), tdouble_col.GetSize()); } @@ -573,7 +573,7 @@ static GenDb::NewCol* CreateNewColNoSql(size_t *csize) { PopulateDbDataValueVec(name, csize); GenDb::DbDataValueVec *value(new GenDb::DbDataValueVec); PopulateDbDataValueVec(value, csize); - GenDb::NewCol *nosql_col(new GenDb::NewCol(name, value)); + GenDb::NewCol *nosql_col(new GenDb::NewCol(name, value, 0)); return nosql_col; } @@ -589,7 +589,7 @@ TEST_F(GenDbTest, ColListSize) { size_t expected_size(0); PopulateDbDataValueVec(&colList.rowkey_, &expected_size); // SQL - string - GenDb::NewCol *tstring_col(new GenDb::NewCol(tstring_, tstring_)); + GenDb::NewCol *tstring_col(new GenDb::NewCol(tstring_, tstring_, 0)); expected_size += 2 * tstring_.length(); colList.columns_.push_back(tstring_col); // No SQL diff --git a/src/opserver/analytics_db.py b/src/opserver/analytics_db.py index fb72590ba18..51b0f5dbefe 100644 --- a/src/opserver/analytics_db.py +++ b/src/opserver/analytics_db.py @@ -62,6 +62,39 @@ def _get_sysm(self): return None # end _get_sysm + def _get_analytics_ttls(self): + ret_row = {} + try: + col_family = ColumnFamily(self._pool, SYSTEM_OBJECT_TABLE) + row = col_family.get(SYSTEM_OBJECT_ANALYTICS) + except Exception as e: + self._logger.error("Exception: analytics_start_time Failure %s" % e) + ret_row[SYSTEM_OBJECT_FLOW_DATA_TTL] = AnalyticsFlowTTL + ret_row[SYSTEM_OBJECT_STATS_DATA_TTL] = AnalyticsStatisticsTTL + ret_row[SYSTEM_OBJECT_CONFIG_AUDIT_TTL] = AnalyticsConfigAuditTTL + ret_row[SYSTEM_OBJECT_GLOBAL_DATA_TTL] = AnalyticsTTL + return ret_row + + if (SYSTEM_OBJECT_FLOW_DATA_TTL not in row): + ret_row[SYSTEM_OBJECT_FLOW_DATA_TTL] = AnalyticsFlowTTL + else: + ret_row[SYSTEM_OBJECT_FLOW_DATA_TTL] = row[SYSTEM_OBJECT_FLOW_DATA_TTL] + if (SYSTEM_OBJECT_STATS_DATA_TTL not in row): + ret_row[SYSTEM_OBJECT_STATS_DATA_TTL] = AnalyticsStatisticsTTL + else: + ret_row[SYSTEM_OBJECT_STATS_DATA_TTL] = row[SYSTEM_OBJECT_STATS_DATA_TTL] + if (SYSTEM_OBJECT_CONFIG_AUDIT_TTL not in row): + ret_row[SYSTEM_OBJECT_CONFIG_AUDIT_TTL] = AnalyticsConfigAuditTTL + else: + ret_row[SYSTEM_OBJECT_CONFIG_AUDIT_TTL] = row[SYSTEM_OBJECT_CONFIG_AUDIT_TTL] + if (SYSTEM_OBJECT_GLOBAL_DATA_TTL not in row): + ret_row[SYSTEM_OBJECT_GLOBAL_DATA_TTL] = AnalyticsTTL + else: + ret_row[SYSTEM_OBJECT_GLOBAL_DATA_TTL] = row[SYSTEM_OBJECT_GLOBAL_DATA_TTL] + + return ret_row + # end _get_analytics_ttls + def _get_analytics_start_time(self): try: col_family = ColumnFamily(self._pool, SYSTEM_OBJECT_TABLE) @@ -73,14 +106,22 @@ def _get_analytics_start_time(self): # Initialize the dictionary before returning if (SYSTEM_OBJECT_START_TIME not in row): return None + ret_row = {} + ret_row[SYSTEM_OBJECT_START_TIME] = row[SYSTEM_OBJECT_START_TIME] if (SYSTEM_OBJECT_FLOW_START_TIME not in row): - row[SYSTEM_OBJECT_FLOW_START_TIME] = row[SYSTEM_OBJECT_START_TIME] + ret_row[SYSTEM_OBJECT_FLOW_START_TIME] = row[SYSTEM_OBJECT_START_TIME] + else: + ret_row[SYSTEM_OBJECT_FLOW_START_TIME] = row[SYSTEM_OBJECT_FLOW_START_TIME] if (SYSTEM_OBJECT_STAT_START_TIME not in row): - row[SYSTEM_OBJECT_STAT_START_TIME] = row[SYSTEM_OBJECT_START_TIME] + ret_row[SYSTEM_OBJECT_STAT_START_TIME] = row[SYSTEM_OBJECT_START_TIME] + else: + ret_row[SYSTEM_OBJECT_STAT_START_TIME] = row[SYSTEM_OBJECT_STAT_START_TIME] if (SYSTEM_OBJECT_MSG_START_TIME not in row): - row[SYSTEM_OBJECT_MSG_START_TIME] = row[SYSTEM_OBJECT_START_TIME] + ret_row[SYSTEM_OBJECT_MSG_START_TIME] = row[SYSTEM_OBJECT_START_TIME] + else: + ret_row[SYSTEM_OBJECT_MSG_START_TIME] = row[SYSTEM_OBJECT_MSG_START_TIME] - return row + return ret_row # end _get_analytics_start_time def _update_analytics_start_time(self, start_times): diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index 3315efc2e4c..c689effab2a 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -783,14 +783,6 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): if 'DISCOVERY' in config.sections(): disc_opts.update(dict(config.items('DISCOVERY'))) - # update ttls - if (defaults['analytics_config_audit_ttl'] == -1): - defaults['analytics_config_audit_ttl'] = defaults['analytics_data_ttl'] - if (defaults['analytics_statistics_ttl'] == -1): - defaults['analytics_statistics_ttl'] = defaults['analytics_data_ttl'] - if (defaults['analytics_flow_ttl'] == -1): - defaults['analytics_flow_ttl'] = defaults['analytics_data_ttl'] - # Override with CLI options # Don't surpress add_help here so it will handle -h @@ -1650,19 +1642,21 @@ def get_purge_cutoff(self, purge_input, start_times): current_time = UTCTimestampUsec() self._logger.error("start times:" + str(start_times)) + + analytics_ttls = self._analytics_db._get_analytics_ttls() analytics_time_range = min( (current_time - start_times[SYSTEM_OBJECT_START_TIME]), - 60*60*1000000*self._args.analytics_data_ttl) + 60*60*1000000*analytics_ttls[SYSTEM_OBJECT_GLOBAL_DATA_TTL]) flow_time_range = min( (current_time - start_times[SYSTEM_OBJECT_FLOW_START_TIME]), - 60*60*1000000*self._args.analytics_flow_ttl) + 60*60*1000000*analytics_ttls[SYSTEM_OBJECT_FLOW_DATA_TTL]) stat_time_range = min( (current_time - start_times[SYSTEM_OBJECT_STAT_START_TIME]), - 60*60*1000000*self._args.analytics_statistics_ttl) + 60*60*1000000*analytics_ttls[SYSTEM_OBJECT_STATS_DATA_TTL]) # currently using config audit TTL for message table (to be changed) msg_time_range = min( (current_time - start_times[SYSTEM_OBJECT_MSG_START_TIME]), - 60*60*1000000*self._args.analytics_config_audit_ttl) + 60*60*1000000*analytics_ttls[SYSTEM_OBJECT_CONFIG_AUDIT_TTL]) purge_cutoff['flow_cutoff'] = int(current_time - (float(100 - purge_input)* float(flow_time_range)/100.0)) diff --git a/src/query_engine/db_query.cc b/src/query_engine/db_query.cc index 5befe63bcc6..9cc3df28656 100644 --- a/src/query_engine/db_query.cc +++ b/src/query_engine/db_query.cc @@ -19,7 +19,7 @@ query_status_t DbQueryUnit::process_query() << " column_start size:" << cr.start_.size() << " column_end size:" << cr.finish_.size()); - if (m_query->is_object_table_query()) + if (m_query->is_object_table_query(m_query->table())) { GenDb::DbDataValue timestamp_start = (uint32_t)0x0; cr.start_.push_back(timestamp_start); @@ -35,9 +35,10 @@ query_status_t DbQueryUnit::process_query() GenDb::DbDataValueVec rowkey; rowkey.push_back(t2); - if (m_query->is_flow_query() || m_query->is_stat_table_query() || - (m_query->is_object_table_query() && - cfname == g_viz_constants.OBJECT_TABLE)) { + if (m_query->is_flow_query(m_query->table()) || + m_query->is_stat_table_query(m_query->table()) || + (m_query->is_object_table_query(m_query->table()) && + cfname == g_viz_constants.OBJECT_TABLE)) { uint8_t partition_no = 0; rowkey.push_back(partition_no); } @@ -91,7 +92,7 @@ query_status_t DbQueryUnit::process_query() query_result_unit_t result_unit; uint32_t t1; - if (m_query->is_stat_table_query()) { + if (m_query->is_stat_table_query(m_query->table())) { assert(i->value->size()==1); assert((i->name->size()==4)||(i->name->size()==3)); try { @@ -99,7 +100,7 @@ query_status_t DbQueryUnit::process_query() } catch (boost::bad_get& ex) { assert(0); } - } else if (m_query->is_flow_query()) { + } else if (m_query->is_flow_query(m_query->table())) { int ts_at = i->name->size() - 2; assert(ts_at >= 0); @@ -130,7 +131,7 @@ query_status_t DbQueryUnit::process_query() } // Add to result vector - if (m_query->is_stat_table_query()) { + if (m_query->is_stat_table_query(m_query->table())) { std::string attribstr; boost::uuids::uuid uuid; diff --git a/src/query_engine/options.cc b/src/query_engine/options.cc index 8e45bbb5318..cc41e045837 100644 --- a/src/query_engine/options.cc +++ b/src/query_engine/options.cc @@ -70,7 +70,7 @@ void Options::Initialize(EventManager &evm, opt::options_description config("Configuration options"); config.add_options() ("DEFAULT.analytics_data_ttl", - opt::value()->default_value(g_viz_constants.AnalyticsTTL), + opt::value()->default_value(0), "global TTL(hours) for analytics data") ("DEFAULT.collectors", opt::value >()->default_value( diff --git a/src/query_engine/qed.cc b/src/query_engine/qed.cc index 3886780af8a..842bf1bbf36 100644 --- a/src/query_engine/qed.cc +++ b/src/query_engine/qed.cc @@ -257,8 +257,7 @@ main(int argc, char *argv[]) { options.redis_port(), options.redis_password(), max_tasks, - options.max_slice(), - options.analytics_data_ttl()); + options.max_slice()); } else { qe = new QueryEngine(&evm, cassandra_ips, @@ -267,9 +266,7 @@ main(int argc, char *argv[]) { options.redis_port(), options.redis_password(), max_tasks, - options.max_slice(), - options.analytics_data_ttl(), - options.start_time()); + options.max_slice()); } (void) qe; diff --git a/src/query_engine/query.cc b/src/query_engine/query.cc index f81bf334ab6..b573579c726 100644 --- a/src/query_engine/query.cc +++ b/src/query_engine/query.cc @@ -119,7 +119,7 @@ PostProcessingQuery::PostProcessingQuery( std::string sort_str(json_sort_fields[i].GetString()); QE_TRACE(DEBUG, sort_str); std::string datatype(m_query->get_column_field_datatype(sort_str)); - if (!m_query->is_stat_table_query()) { + if (!m_query->is_stat_table_query(m_query->table())) { QE_INVALIDARG_ERROR(datatype != std::string("")); } else if (m_query->stats().is_stat_table_static()) { // This is a static StatTable. We can check the schema @@ -367,7 +367,7 @@ void AnalyticsQuery::get_query_details(bool& is_merge_needed, bool& is_map_outpu parse_status = status_details; if (parse_status != 0) return; - if (is_stat_table_query()) { + if (is_stat_table_query(table_)) { is_merge_needed = selectquery_->stats_->IsMergeNeeded(); } else { is_merge_needed = merge_needed; @@ -376,7 +376,7 @@ void AnalyticsQuery::get_query_details(bool& is_merge_needed, bool& is_map_outpu where = wherequery_->json_string_; select = selectquery_->json_string_; post = postprocess_->json_string_; - is_map_output = is_stat_table_query(); + is_map_output = is_stat_table_query(table_); } bool AnalyticsQuery::can_parallelize_query() { @@ -388,8 +388,7 @@ bool AnalyticsQuery::can_parallelize_query() { } void AnalyticsQuery::Init(GenDb::GenDbIf *db_if, std::string qid, - std::map& json_api_data, - uint64_t analytics_start_time) + std::map& json_api_data) { std::map::iterator iter; @@ -405,6 +404,7 @@ void AnalyticsQuery::Init(GenDb::GenDbIf *db_if, std::string qid, sandesh_moduleid = g_vns_constants.ModuleNames.find(Module::QUERY_ENGINE)->second; + { std::stringstream json_string; json_string << " { "; for (std::mapsecond; + } else if (is_flow_query(table_)) { + ttl = ttlmap_.find(TtlType::FLOWDATA_TTL)->second; + } else if (is_object_table_query(table_)) { + ttl = ttlmap_.find(TtlType::CONFIGAUDIT_TTL)->second; + } else { + ttl = ttlmap_.find(TtlType::GLOBAL_TTL)->second; + } + min_start_time = min_start_time-ttl*60*60*1000000; + // Start time { iter = json_api_data.find(QUERY_START_TIME); QE_PARSE_ERROR(iter != json_api_data.end()); req_from_time_ = parse_time(iter->second); QE_TRACE(DEBUG, " from_time is " << req_from_time_); - if (req_from_time_ < analytics_start_time) + if (req_from_time_ < min_start_time) { - from_time_ = analytics_start_time; + from_time_ = min_start_time; QE_TRACE(DEBUG, "updated start_time to:" << from_time_); } else { from_time_ = req_from_time_; @@ -460,11 +475,8 @@ void AnalyticsQuery::Init(GenDb::GenDbIf *db_if, std::string qid, req_end_time_ = parse_time(iter->second); QE_TRACE(DEBUG, " end_time is " << req_end_time_); - if (req_end_time_ < analytics_start_time) { - end_time_ = analytics_start_time; - } else if (req_end_time_ > - (uint64_t)(curr_time.tv_sec*1000000+curr_time.tv_usec)) { - end_time_ = curr_time.tv_sec*1000000+curr_time.tv_usec; + if (req_end_time_ > max_end_time) { + end_time_ = max_end_time; QE_TRACE(DEBUG, "updated end_time to:" << end_time_); } else { end_time_ = req_end_time_; @@ -517,7 +529,7 @@ void AnalyticsQuery::Init(GenDb::GenDbIf *db_if, std::string qid, * ObjectId queries are special, they are requested from Object* tables, * but the values are extrated from g_viz_constants.OBJECT_VALUE_TABLE */ - if (is_object_table_query()) { + if (is_object_table_query(table_)) { if (selectquery_->ObjectIdQuery()) { object_value_key = table_; table_ = g_viz_constants.OBJECT_VALUE_TABLE; @@ -535,7 +547,7 @@ void AnalyticsQuery::Init(GenDb::GenDbIf *db_if, std::string qid, return; } - if (this->is_stat_table_query()) { + if (is_stat_table_query(table_)) { selectquery_->stats_->SetSortOrder(postprocess_->sort_fields); } @@ -901,16 +913,17 @@ query_status_t AnalyticsQuery::process_query() } AnalyticsQuery::AnalyticsQuery(std::string qid, std::map& json_api_data, uint64_t analytics_start_time, + std::string>& json_api_data, const TtlMap &ttlmap, EventManager *evm, std::vector cassandra_ips, std::vector cassandra_ports, int batch, int total_batches): QueryUnit(NULL, this), dbif_(GenDb::GenDbIf::GenDbIfImpl( boost::bind(&AnalyticsQuery::db_err_handler, this), - cassandra_ips, cassandra_ports, 0, "QueryEngine", true)), + cassandra_ips, cassandra_ports, "QueryEngine", true)), filter_qe_logs(true), json_api_data_(json_api_data), + ttlmap_(ttlmap), where_start_(0), select_start_(0), postproc_start_(0), @@ -974,16 +987,17 @@ AnalyticsQuery::AnalyticsQuery(std::string qid, std::mapDb_SetInitDone(true); - Init(dbif, qid, json_api_data, analytics_start_time); + Init(dbif, qid, json_api_data); } AnalyticsQuery::AnalyticsQuery(std::string qid, GenDb::GenDbIf *dbif, std::map json_api_data, - uint64_t analytics_start_time, int batch, int total_batches) : + const TtlMap &ttlmap, int batch, int total_batches) : QueryUnit(NULL, this), dbif_(dbif), query_id(qid), json_api_data_(json_api_data), + ttlmap_(ttlmap), where_start_(0), select_start_(0), postproc_start_(0), @@ -992,13 +1006,12 @@ AnalyticsQuery::AnalyticsQuery(std::string qid, GenDb::GenDbIf *dbif, total_parallel_batches(total_batches), processing_needed(true), stats_(NULL) { - Init(dbif, qid, json_api_data, analytics_start_time); + Init(dbif, qid, json_api_data); } QueryEngine::QueryEngine(EventManager *evm, const std::string & redis_ip, unsigned short redis_port, - const std::string & redis_password, int max_tasks, int max_slice, - uint64_t anal_ttl) : + const std::string & redis_password, int max_tasks, int max_slice) : qosp_(new QEOpServerProxy(evm, this, redis_ip, redis_port, redis_password, max_tasks)), evm_(evm), @@ -1010,22 +1023,17 @@ QueryEngine::QueryEngine(EventManager *evm, // Initialize database connection QE_LOG_NOQID(DEBUG, "Initializing QE without database!"); - uint64_t curr_time = UTCTimestampUsec(); - QE_LOG_NOQID(DEBUG, "Could not find analytics start time"); - uint64_t ttl = anal_ttl*60*60*1000000; - stime = curr_time - ttl; - QE_LOG_NOQID(DEBUG, "set stime to " << stime << "and AnalyticsTTL to " << g_viz_constants.AnalyticsTTL); + ttlmap_ = g_viz_constants.TtlValuesDefault; } QueryEngine::QueryEngine(EventManager *evm, std::vector cassandra_ips, std::vector cassandra_ports, const std::string & redis_ip, unsigned short redis_port, - const std::string & redis_password, int max_tasks, int max_slice, - uint64_t anal_ttl, uint64_t start_time) : + const std::string & redis_password, int max_tasks, int max_slice) : dbif_(GenDb::GenDbIf::GenDbIfImpl( boost::bind(&QueryEngine::db_err_handler, this), - cassandra_ips, cassandra_ports, 0, "QueryEngine", true)), + cassandra_ips, cassandra_ports, "QueryEngine", true)), qosp_(new QEOpServerProxy(evm, this, redis_ip, redis_port, redis_password, max_tasks)), evm_(evm), @@ -1091,17 +1099,21 @@ QueryEngine::QueryEngine(EventManager *evm, sleep(5); } } - if (start_time != 0) { - stime = start_time; - } else { + { bool init_done = false; retries = 0; - while (!init_done && retries < 5) { + while (!init_done && retries < 12) { + init_done = true; + GenDb::ColList col_list; std::string cfname = g_viz_constants.SYSTEM_OBJECT_TABLE; GenDb::DbDataValueVec key; key.push_back(g_viz_constants.SYSTEM_OBJECT_ANALYTICS); + bool ttl_cached[TtlType::GLOBAL_TTL+1]; + for (int ttli=0; ttli<=TtlType::GLOBAL_TTL; ttli++) + ttl_cached[ttli] = false; + if (dbif_->Db_GetRow(col_list, cfname, key)) { for (GenDb::NewColVec::iterator it = col_list.columns_.begin(); it != col_list.columns_.end(); it++) { @@ -1110,28 +1122,50 @@ QueryEngine::QueryEngine(EventManager *evm, col_name = boost::get(it->name->at(0)); } catch (boost::bad_get& ex) { QE_LOG_NOQID(ERROR, __func__ << ": Exception on col_name get"); + break; } - - if (col_name == g_viz_constants.SYSTEM_OBJECT_START_TIME) { - try { - stime = boost::get(it->value->at(0)); - init_done = true; - } catch (boost::bad_get& ex) { - QE_LOG_NOQID(ERROR, __func__ << "Exception for boost::get, what=" << ex.what()); - break; - } + if (col_name == g_viz_constants.SYSTEM_OBJECT_GLOBAL_DATA_TTL) { + try { + ttlmap_.insert(std::make_pair(TtlType::GLOBAL_TTL, boost::get(it->value->at(0)))); + ttl_cached[TtlType::GLOBAL_TTL] = true; + } catch (boost::bad_get& ex) { + QE_LOG_NOQID(ERROR, __func__ << "Exception for boost::get, what=" << ex.what()); + } + } else if (col_name == g_viz_constants.SYSTEM_OBJECT_CONFIG_AUDIT_TTL) { + try { + ttlmap_.insert(std::make_pair(TtlType::CONFIGAUDIT_TTL, boost::get(it->value->at(0)))); + ttl_cached[TtlType::CONFIGAUDIT_TTL] = true; + } catch (boost::bad_get& ex) { + QE_LOG_NOQID(ERROR, __func__ << "Exception for boost::get, what=" << ex.what()); + } + } else if (col_name == g_viz_constants.SYSTEM_OBJECT_STATS_DATA_TTL) { + try { + ttlmap_.insert(std::make_pair(TtlType::STATSDATA_TTL, boost::get(it->value->at(0)))); + ttl_cached[TtlType::STATSDATA_TTL] = true; + } catch (boost::bad_get& ex) { + QE_LOG_NOQID(ERROR, __func__ << "Exception for boost::get, what=" << ex.what()); + } + } else if (col_name == g_viz_constants.SYSTEM_OBJECT_FLOW_DATA_TTL) { + try { + ttlmap_.insert(std::make_pair(TtlType::FLOWDATA_TTL, boost::get(it->value->at(0)))); + ttl_cached[TtlType::FLOWDATA_TTL] = true; + } catch (boost::bad_get& ex) { + QE_LOG_NOQID(ERROR, __func__ << "Exception for boost::get, what=" << ex.what()); + } } } } + for (int ttli=0; ttli<=TtlType::GLOBAL_TTL; ttli++) + if (ttl_cached[ttli] == false) + init_done = false; + retries++; if (!init_done) sleep(5); } if (!init_done) { - uint64_t ttl = anal_ttl*60*60*1000000; - uint64_t curr_time = UTCTimestampUsec(); - stime = curr_time - ttl; - QE_LOG_NOQID(ERROR, __func__ << "setting start_time manually to" << stime); + ttlmap_ = g_viz_constants.TtlValuesDefault; + QE_LOG_NOQID(ERROR, __func__ << "ttls are set manually"); } } dbif_->Db_SetInitDone(true); @@ -1164,7 +1198,7 @@ QueryEngine::QueryPrepare(QueryParams qp, table = string("ObjectCollectorInfo"); } else { - AnalyticsQuery *q = new AnalyticsQuery(qid, qp.terms, stime, evm_, + AnalyticsQuery *q = new AnalyticsQuery(qid, qp.terms, ttlmap_, evm_, cassandra_ips_, cassandra_ports_, 0, qp.maxChunks); chunk_size.clear(); q->get_query_details(need_merge, map_output, chunk_size, @@ -1181,7 +1215,7 @@ QueryEngine::QueryAccumulate(QueryParams qp, QEOpServerProxy::BufferT& output) { QE_TRACE_NOQID(DEBUG, "Creating analytics query object for merge_processing"); - AnalyticsQuery *q = new AnalyticsQuery(qp.qid, qp.terms, stime, evm_, + AnalyticsQuery *q = new AnalyticsQuery(qp.qid, qp.terms, ttlmap_, evm_, cassandra_ips_, cassandra_ports_, 1, qp.maxChunks); QE_TRACE_NOQID(DEBUG, "Calling merge_processing"); bool ret = q->merge_processing(input, output); @@ -1195,7 +1229,7 @@ QueryEngine::QueryFinalMerge(QueryParams qp, QEOpServerProxy::BufferT& output) { QE_TRACE_NOQID(DEBUG, "Creating analytics query object for final_merge_processing"); - AnalyticsQuery *q = new AnalyticsQuery(qp.qid, qp.terms, stime, evm_, + AnalyticsQuery *q = new AnalyticsQuery(qp.qid, qp.terms, ttlmap_, evm_, cassandra_ips_, cassandra_ports_, 1, qp.maxChunks); QE_TRACE_NOQID(DEBUG, "Calling final_merge_processing"); bool ret = q->final_merge_processing(inputs, output); @@ -1208,10 +1242,10 @@ QueryEngine::QueryFinalMerge(QueryParams qp, const std::vector >& inputs, QEOpServerProxy::OutRowMultimapT& output) { QE_TRACE_NOQID(DEBUG, "Creating analytics query object for final_merge_processing"); - AnalyticsQuery *q = new AnalyticsQuery(qp.qid, qp.terms, stime, evm_, + AnalyticsQuery *q = new AnalyticsQuery(qp.qid, qp.terms, ttlmap_, evm_, cassandra_ips_, cassandra_ports_, 1, qp.maxChunks); - if (!q->is_stat_table_query()) { + if (!q->is_stat_table_query(q->table())) { QE_TRACE_NOQID(DEBUG, "MultiMap merge_final is for Stats only"); delete q; return false; @@ -1249,7 +1283,7 @@ QueryEngine::QueryExec(void * handle, QueryParams qp, uint32_t chunk) return true; } - AnalyticsQuery *q = new AnalyticsQuery(qid, qp.terms, stime, evm_, + AnalyticsQuery *q = new AnalyticsQuery(qid, qp.terms, ttlmap_, evm_, cassandra_ips_, cassandra_ports_, chunk, qp.maxChunks); QE_TRACE_NOQID(DEBUG, " Finished parsing and starting processing for QID " << qid << " chunk:" << chunk); @@ -1302,15 +1336,30 @@ std::ostream &operator<<(std::ostream &out, query_result_unit_t& res) return out; } +bool +AnalyticsQuery::is_stat_table_query(const std::string & tname) { + if (tname.compare(0, g_viz_constants.STAT_VT_PREFIX.length(), + g_viz_constants.STAT_VT_PREFIX)) { + return false; + } + return true; +} + +bool AnalyticsQuery::is_flow_query(const std::string & tname) +{ + return ((tname == g_viz_constants.FLOW_SERIES_TABLE) || + (tname == g_viz_constants.FLOW_TABLE)); +} + // validation functions -bool AnalyticsQuery::is_object_table_query() +bool AnalyticsQuery::is_object_table_query(const std::string & tname) { return ( - (this->table_ != g_viz_constants.COLLECTOR_GLOBAL_TABLE) && - (this->table_ != g_viz_constants.FLOW_TABLE) && - (this->table_ != g_viz_constants.FLOW_SERIES_TABLE) && - (this->table_ != g_viz_constants.OBJECT_VALUE_TABLE) && - !is_stat_table_query()); + (tname != g_viz_constants.COLLECTOR_GLOBAL_TABLE) && + (tname != g_viz_constants.FLOW_TABLE) && + (tname != g_viz_constants.FLOW_SERIES_TABLE) && + (tname != g_viz_constants.OBJECT_VALUE_TABLE) && + !is_stat_table_query(tname)); } @@ -1328,7 +1377,7 @@ bool AnalyticsQuery::is_valid_from_field(const std::string& from_field) if (it->first == from_field) return true; } - if (is_stat_table_query()) + if (is_stat_table_query(table_)) return true; return false; @@ -1366,7 +1415,7 @@ bool AnalyticsQuery::is_valid_where_field(const std::string& where_field) } } - if (is_stat_table_query()) { + if (is_stat_table_query(table_)) { AnalyticsQuery *m_query = (AnalyticsQuery *)main_query; if (m_query->stats().is_stat_table_static()) { StatsQuery::column_t cdesc = m_query->stats().get_column_desc(where_field); @@ -1431,121 +1480,6 @@ std::string AnalyticsQuery::get_column_field_datatype( return std::string(""); } -bool AnalyticsQuery::is_flow_query() -{ - return ((this->table_ == g_viz_constants.FLOW_SERIES_TABLE) || - (this->table_ == g_viz_constants.FLOW_TABLE)); -} - -#define UNIT_TEST_MESSAGE_FILTERS - -void QueryEngine::QueryEngine_Test() -{ -#if 0 - GenDb::GenDbIf *db_if = dbif_.get(); - - // Create the query first - std::string qid("TEST-QUERY"); - std::map json_api_data; - -#ifdef UNIT_TEST_MESSAGES - QE_TRACE_NOQID(DEBUG, " Testing messages "); - QE_TRACE_NOQID(DEBUG, " Parsing sample query"); - // Flow-Series Query - json_api_data.insert(std::pair( - "table", "\"MessageTable\"" - )); - - json_api_data.insert(std::pair( - "start_time", "1363918451328671" - )); - json_api_data.insert(std::pair( - "end_time", "1363918651330163" - )); - json_api_data.insert(std::pair( - "where", "[[{\"name\":\"Source\", \"value\":\"127.0.0.1\", \"op\":1} , {\"name\":\"Messagetype\", \"value\":\"UveVirtualMachineConfigTrace\", \"op\":1} ]]" - )); - json_api_data.insert(std::pair( - "select_fields", "[\"Module\", \"Source\", \"Messagetype\"]" - )); - - AnalyticsQuery *q = - new AnalyticsQuery(db_if, qid, json_api_data, "0"); - QE_TRACE_NOQID(DEBUG, " Parsing of messages query done"); - - QE_TRACE_NOQID(DEBUG, " Invoking messages query"); - q->process_query(); - QE_TRACE_NOQID(DEBUG, " Processed messages query"); -#endif - -#ifdef UNIT_TEST_MESSAGE_FILTERS - QE_TRACE_NOQID(DEBUG, " Testing messages "); - QE_TRACE_NOQID(DEBUG, " Parsing sample query"); - // Flow-Series Query - json_api_data.insert(std::pair( - "table", "\"MessageTable\"" - )); - - json_api_data.insert(std::pair( - "start_time", "1365021325382585" - )); - json_api_data.insert(std::pair( - "end_time", "1365025325382585" - )); - json_api_data.insert(std::pair( - "select_fields", "[\"ModuleId\", \"Source\", \"Level\", \"Messagetype\"]" - )); - json_api_data.insert(std::pair("filter", "[{\"name\":\"Level\", \"value\":\"6\", \"op\":5}]")); - -#if 0 - AnalyticsQuery *q = - new AnalyticsQuery(db_if, qid, json_api_data, "0"); -#endif - QE_TRACE_NOQID(DEBUG, " Parsing of messages query done"); - - QE_TRACE_NOQID(DEBUG, " Invoking messages query"); - q->process_query(); - QE_TRACE_NOQID(DEBUG, "# of rows: " << q->final_result->size()); - QE_TRACE_NOQID(DEBUG, " Processed messages query"); -#endif - - -#ifdef UNIT_TEST_FLOW_SERIES - QE_TRACE_NOQID(DEBUG, " Testing flow series"); - QE_TRACE_NOQID(DEBUG, " Parsing sample query"); - // Flow-Series Query - json_api_data.insert(std::pair( - "table", "\"FlowSeriesTable\"" - )); - - json_api_data.insert(std::pair( - "start_time", "1363816971234206" - )); - json_api_data.insert(std::pair( - "end_time", "1363816971284356" - )); - json_api_data.insert(std::pair( - "where", "[[{\"name\":\"sourcevn\", \"value\":\"default-domain:admin:vn0\", \"op\":1},{\"name\":\"destvn\", \"value\":\"default-domain:admin:vn1\", \"op\":1}],[{\"name\":\"protocol\", \"value\":\"17\", \"op\":1},{\"name\":\"dport\", \"value\":\"80\", \"op\":1}]]" - )); - json_api_data.insert(std::pair( - "select_fields", "[\"T=5\", \"sourcevn\", \"destvn\", \"sum(packets)\"]" - )); - json_api_data.insert(std::pair( - "dir", "1" - )); - - AnalyticsQuery *q = - new AnalyticsQuery(db_if, qid, json_api_data, "0"); - QE_TRACE_NOQID(DEBUG, " Parsing of flow series query done"); - - QE_TRACE_NOQID(DEBUG, " Invoking flow series query"); - q->process_query(); - QE_TRACE_NOQID(DEBUG, " Processed flow series query"); -#endif -#endif - -} - std::map< std::string, int > trace_enable_map; void TraceEnable::HandleRequest() const { diff --git a/src/query_engine/query.h b/src/query_engine/query.h index 3c2f87488dc..115bcf6b8ba 100644 --- a/src/query_engine/query.h +++ b/src/query_engine/query.h @@ -727,13 +727,13 @@ class StatsQuery; class AnalyticsQuery: public QueryUnit { public: AnalyticsQuery(std::string qid, std::map& json_api_data, uint64_t analytics_start_time, + std::string>& json_api_data, const TtlMap& ttlmap, EventManager *evm, std::vector cassandra_ips, std::vector cassandra_ports, int batch, int total_batches); AnalyticsQuery(std::string qid, GenDb::GenDbIf *dbif, std::map json_api_data, - uint64_t analytics_start_time, int batch, int total_batches); + const TtlMap& ttlmap, int batch, int total_batches); virtual ~AnalyticsQuery() {} virtual query_status_t process_query(); @@ -760,6 +760,7 @@ class AnalyticsQuery: public QueryUnit { std::map json_api_data_; + TtlMap ttlmap_; uint64_t where_start_; uint64_t select_start_; uint64_t postproc_start_; @@ -826,12 +827,12 @@ const std::vector >& inputs, // validation functions bool is_valid_from_field(const std::string& from_field); - virtual bool is_object_table_query(); - virtual bool is_stat_table_query() { return (stats_.get()!=NULL); } + static bool is_object_table_query(const std::string& tname); + static bool is_stat_table_query(const std::string& tname); + static bool is_flow_query(const std::string& tname); // either flow-series or flow-records query bool is_valid_where_field(const std::string& where_field); bool is_valid_sort_field(const std::string& sort_field); std::string get_column_field_datatype(const std::string& col_field); - virtual bool is_flow_query(); // either flow-series or flow-records query virtual bool is_query_parallelized() { return parallelize_query_; } uint64_t parse_time(const std::string& relative_time); @@ -857,8 +858,7 @@ const std::vector >& inputs, bool parallelize_query_; // Init function void Init(GenDb::GenDbIf *db_if, std::string qid, - std::map& json_api_data, - uint64_t analytics_start_time); + std::map& json_api_data); bool can_parallelize_query(); }; @@ -889,13 +889,12 @@ class QueryEngine { std::vector cassandra_ports, const std::string & redis_ip, unsigned short redis_port, const std::string & redis_password, - int max_tasks, int max_slice, uint64_t anal_ttl, - uint64_t start_time=0); + int max_tasks, int max_slice); QueryEngine(EventManager *evm, const std::string & redis_ip, unsigned short redis_port, const std::string & redis_password, int max_tasks, - int max_slice, uint64_t anal_ttl); + int max_slice); int QueryPrepare(QueryParams qp, @@ -927,13 +926,15 @@ class QueryEngine { void QueryEngine_Test(); void db_err_handler() {}; + + TtlMap& GetTTlMap() { return ttlmap_; } private: boost::scoped_ptr dbif_; boost::scoped_ptr qosp_; EventManager *evm_; std::vector cassandra_ports_; std::vector cassandra_ips_; - + TtlMap ttlmap_; }; #endif diff --git a/src/query_engine/select.cc b/src/query_engine/select.cc index 01930d134d8..d1118ee8dde 100644 --- a/src/query_engine/select.cc +++ b/src/query_engine/select.cc @@ -27,7 +27,7 @@ SelectQuery::SelectQuery(QueryUnit *main_query, // initialize Cassandra related fields if ( (m_query->table() == g_viz_constants.COLLECTOR_GLOBAL_TABLE) || - (m_query->is_object_table_query()) + (m_query->is_object_table_query(m_query->table())) ) { cfname = g_viz_constants.COLLECTOR_GLOBAL_TABLE; @@ -55,7 +55,7 @@ SelectQuery::SelectQuery(QueryUnit *main_query, QE_PARSE_ERROR(json_select_fields.IsArray()); QE_TRACE(DEBUG, "# of select fields is " << json_select_fields.Size()); - if (m_query->is_stat_table_query()) { + if (m_query->is_stat_table_query(m_query->table())) { for (rapidjson::SizeType i = 0; i < json_select_fields.Size(); i++) { select_column_fields.push_back(json_select_fields[i].GetString()); } @@ -78,8 +78,8 @@ SelectQuery::SelectQuery(QueryUnit *main_query, { provide_timeseries = true; select_column_fields.push_back(TIMESTAMP_FIELD); - QE_INVALIDARG_ERROR(m_query->is_flow_query() || - m_query->is_stat_table_query()); + QE_INVALIDARG_ERROR(m_query->is_flow_query(m_query->table()) || + m_query->is_stat_table_query(m_query->table())); } else if (boost::starts_with(json_select_fields[i].GetString(), TIMESTAMP_GRANULARITY)) { @@ -91,19 +91,19 @@ SelectQuery::SelectQuery(QueryUnit *main_query, select_column_fields.push_back(TIMESTAMP_GRANULARITY); QE_INVALIDARG_ERROR( m_query->table() == g_viz_constants.FLOW_SERIES_TABLE || - m_query->is_stat_table_query()); + m_query->is_stat_table_query(m_query->table())); } else if (json_select_fields[i].GetString() == std::string(SELECT_PACKETS)) { agg_stats_t agg_stats_entry = {RAW, PKT_STATS}; agg_stats.push_back(agg_stats_entry); - QE_INVALIDARG_ERROR(m_query->is_flow_query()); + QE_INVALIDARG_ERROR(m_query->is_flow_query(m_query->table())); } else if (json_select_fields[i].GetString() == std::string(SELECT_BYTES)) { agg_stats_t agg_stats_entry = {RAW, BYTE_STATS}; agg_stats.push_back(agg_stats_entry); - QE_INVALIDARG_ERROR(m_query->is_flow_query()); + QE_INVALIDARG_ERROR(m_query->is_flow_query(m_query->table())); } else if (json_select_fields[i].GetString() == std::string(SELECT_SUM_PACKETS)) { @@ -496,7 +496,7 @@ query_status_t SelectQuery::process_query() { } result_->push_back(std::make_pair(cmap, nullmetadata)); } - } else if (m_query->is_stat_table_query()) { + } else if (m_query->is_stat_table_query(m_query->table())) { QE_ASSERT(stats_.get()); // can not handle query result of huge size if (query_result.size() > (size_t)query_result_size_limit) @@ -689,7 +689,7 @@ query_status_t SelectQuery::process_query() { jt != select_column_fields.end(); jt++) { std::map::iterator kt = col_res_map.find(*jt); if (kt == col_res_map.end()) { - if (m_query->is_object_table_query()) { + if (m_query->is_object_table_query(m_query->table())) { if (process_object_query_specific_select_params( *jt, col_res_map, cmap) == false) { // Exit the loop. User is not interested diff --git a/src/query_engine/stats_select.cc b/src/query_engine/stats_select.cc index 7cb0fa3ea39..809d1f9c1fa 100644 --- a/src/query_engine/stats_select.cc +++ b/src/query_engine/stats_select.cc @@ -166,7 +166,7 @@ StatsSelect::StatsSelect(AnalyticsQuery * m_query, ts_period_(0), isT_(false), isTC_(false), isTBC_(false), count_field_() { - QE_ASSERT(main_query->is_stat_table_query()); + QE_ASSERT(main_query->is_stat_table_query(main_query->table())); status_ = false; for (size_t j=0; j(), - uint64_t(0), int(0), int(0)) { + TtlMap(), int(0), int(0)) { } ~AnalyticsQueryMock() { diff --git a/src/query_engine/test/options_test.cc b/src/query_engine/test/options_test.cc index 3773d3d8002..e015a346d6d 100644 --- a/src/query_engine/test/options_test.cc +++ b/src/query_engine/test/options_test.cc @@ -75,7 +75,7 @@ TEST_F(OptionsTest, NoArguments) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), false); - EXPECT_EQ(options_.analytics_data_ttl(), ANALYTICS_DATA_TTL_DEFAULT); + EXPECT_EQ(options_.analytics_data_ttl(), 0); EXPECT_EQ(options_.start_time(), 0); EXPECT_EQ(options_.max_tasks(), 0); EXPECT_EQ(options_.max_slice(), 100); @@ -112,7 +112,7 @@ TEST_F(OptionsTest, DefaultConfFile) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), ANALYTICS_DATA_TTL_DEFAULT); + EXPECT_EQ(options_.analytics_data_ttl(), 0); EXPECT_EQ(options_.start_time(), 0); EXPECT_EQ(options_.max_tasks(), 0); EXPECT_EQ(options_.max_slice(), 100); @@ -151,7 +151,7 @@ TEST_F(OptionsTest, OverrideStringFromCommandLine) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), ANALYTICS_DATA_TTL_DEFAULT); + EXPECT_EQ(options_.analytics_data_ttl(), 0); EXPECT_EQ(options_.start_time(), 0); EXPECT_EQ(options_.max_tasks(), 0); EXPECT_EQ(options_.max_slice(), 100); @@ -190,7 +190,7 @@ TEST_F(OptionsTest, OverrideBooleanFromCommandLine) { EXPECT_EQ(options_.log_file_size(), 1024*1024); EXPECT_EQ(options_.log_level(), "SYS_NOTICE"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), ANALYTICS_DATA_TTL_DEFAULT); + EXPECT_EQ(options_.analytics_data_ttl(), 0); EXPECT_EQ(options_.start_time(), 0); EXPECT_EQ(options_.max_tasks(), 0); EXPECT_EQ(options_.max_slice(), 100); @@ -274,7 +274,7 @@ TEST_F(OptionsTest, CustomConfigFile) { EXPECT_EQ(options_.log_file_size(), 1024); EXPECT_EQ(options_.log_level(), "SYS_DEBUG"); EXPECT_EQ(options_.log_local(), true); - EXPECT_EQ(options_.analytics_data_ttl(), ANALYTICS_DATA_TTL_DEFAULT); + EXPECT_EQ(options_.analytics_data_ttl(), 0); EXPECT_EQ(options_.start_time(), 123456); EXPECT_EQ(options_.max_tasks(), 200); EXPECT_EQ(options_.max_slice(), 500); diff --git a/src/query_engine/where_query.cc b/src/query_engine/where_query.cc index 0f802faa986..868e8c9bade 100644 --- a/src/query_engine/where_query.cc +++ b/src/query_engine/where_query.cc @@ -68,7 +68,7 @@ WhereQuery::StatTermParse(QueryUnit *main_query, const rapidjson::Value& where_t std::string& sname, match_op& sop, GenDb::DbDataValue& sval, GenDb::DbDataValue& sval2) { AnalyticsQuery *m_query = (AnalyticsQuery *)main_query; - QE_ASSERT(m_query->is_stat_table_query()); + QE_ASSERT(m_query->is_stat_table_query(m_query->table())); std::string srvalstr, srval2str; @@ -346,7 +346,7 @@ WhereQuery::WhereQuery(const std::string& where_json_string, int direction, db_query->cr.finish_.push_back((uint8_t)0xff); db_query->cr.finish_.push_back((uint16_t)0xffff); - } else if (m_query->is_object_table_query()) { + } else if (m_query->is_object_table_query(m_query->table())) { // These values will encompass all possible ascii strings in their range GenDb::DbDataValue value = "\x1b", value2 = "\x7f"; @@ -486,7 +486,7 @@ WhereQuery::WhereQuery(const std::string& where_json_string, int direction, } } - bool isStat = m_query->is_stat_table_query(); + bool isStat = m_query->is_stat_table_query(m_query->table()); if ((name == g_viz_constants.SOURCE) && (!isStat)) { DbQueryUnit *db_query = new DbQueryUnit(and_node, main_query); @@ -939,7 +939,7 @@ WhereQuery::WhereQuery(const std::string& where_json_string, int direction, } } - if (m_query->is_object_table_query()) + if (m_query->is_object_table_query(m_query->table())) { // object id table query if (!object_id_specified)