diff --git a/src/analytics/OpServerProxy.cc b/src/analytics/OpServerProxy.cc index d2b5e17b0f6..81c534a8d23 100644 --- a/src/analytics/OpServerProxy.cc +++ b/src/analytics/OpServerProxy.cc @@ -31,6 +31,7 @@ #include "viz_sandesh.h" #include "viz_collector.h" +using std::map; using std::string; using boost::shared_ptr; using boost::assign::list_of; @@ -635,6 +636,7 @@ OpServerProxy::UVENotif(const std::string &type, const std::string &module, const std::string &instance_id, const std::string &table, const std::string &barekey, + const std::map& value, bool deleted) { std::string key = table + ":" + barekey; @@ -703,6 +705,12 @@ OpServerProxy::UVENotif(const std::string &type, } else { rapidjson::Value val(rapidjson::kObjectType); val.SetObject(); + for (map::const_iterator it = value.begin(); + it != value.end(); it++) { + rapidjson::Value sval(rapidjson::kStringType); + sval.SetString((it->second).c_str()); + val.AddMember(it->first.c_str(), sval, dd.GetAllocator()); + } dd.AddMember("value", val, dd.GetAllocator()); } diff --git a/src/analytics/OpServerProxy.h b/src/analytics/OpServerProxy.h index 5ec30a94338..6ca487399ac 100644 --- a/src/analytics/OpServerProxy.h +++ b/src/analytics/OpServerProxy.h @@ -49,6 +49,7 @@ class OpServerProxy { const std::string &module, const std::string &instance_id, const std::string &table, const std::string &barekey, + const std::map& value, bool deleted); // Use this to delete the object when the deleted attribute is set diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index e4d82bead63..3b0edca16ce 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -57,7 +57,7 @@ DbHandler::DbHandler(EventManager *evm, const std::string& cassandra_user, const std::string& cassandra_password) : name_(name), - drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map) { + drop_level_(SandeshLevel::INVALID), ttl_map_(ttl_map), field_cache_t2_(0) { dbif_.reset(new ThriftIf(err_handler, cassandra_ips, cassandra_ports, name, false, cassandra_user, cassandra_password)); @@ -68,7 +68,7 @@ DbHandler::DbHandler(EventManager *evm, DbHandler::DbHandler(GenDb::GenDbIf *dbif, const TtlMap& ttl_map) : dbif_(dbif), - ttl_map_(ttl_map) { + ttl_map_(ttl_map), field_cache_t2_(0) { } DbHandler::~DbHandler() { @@ -561,12 +561,15 @@ void DbHandler::MessageTableInsert(const VizMsg *vmsgp) { //Insert only if sandesh type is a SYSTEM LOG or SYSLOG //Insert into the FieldNames stats table entries for Messagetype and Module ID int ttl = GetTtl(TtlType::GLOBAL_TTL); - FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE, - ":Messagetype", message_type, header.get_Timestamp(), ttl); - FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE, - ":ModuleId", header.get_Module(), header.get_Timestamp(), ttl); - FieldNamesTableInsert(g_viz_constants.COLLECTOR_GLOBAL_TABLE, - ":Source", header.get_Source(), header.get_Timestamp(), ttl); + FieldNamesTableInsert(header.get_Timestamp(), + g_viz_constants.COLLECTOR_GLOBAL_TABLE, + ":Messagetype", message_type, ttl); + FieldNamesTableInsert(header.get_Timestamp(), + g_viz_constants.COLLECTOR_GLOBAL_TABLE, + ":ModuleId", header.get_Module(), ttl); + FieldNamesTableInsert(header.get_Timestamp(), + g_viz_constants.COLLECTOR_GLOBAL_TABLE, + ":Source", header.get_Source(), ttl); } } @@ -574,20 +577,47 @@ void DbHandler::MessageTableInsert(const VizMsg *vmsgp) { * This function takes field name and field value as arguments and inserts * into the FieldNames stats table */ -void DbHandler::FieldNamesTableInsert(const std::string& table_prefix, - const std::string& field_name, const std::string& field_val, - uint64_t timestamp, int ttl) { +void DbHandler::FieldNamesTableInsert(uint64_t timestamp, + const std::string& table_prefix, + const std::string& field_name, const std::string& field_val, int ttl) { /* * Insert the message types in the stat table * Construct the atttributes,attrib_tags before inserting * to the StatTableInsert */ + uint32_t temp_u32 = timestamp >> g_viz_constants.RowTimeInBits; + std::string table_name(table_prefix); + table_name.append(field_name); + + /* Check if fieldname and value were already seen in this T2 + We only need to record them if they have NOT been seen yet */ + bool record = false; + std::string fc_entry(table_name); + fc_entry.append(":"); + fc_entry.append(field_val); + { + tbb::mutex::scoped_lock lock(smutex_); + if (temp_u32 > field_cache_t2_) { + field_cache_set_.clear(); + field_cache_t2_ = temp_u32; + } + if (temp_u32 == field_cache_t2_) { + if (field_cache_set_.find(fc_entry) == field_cache_set_.end()) { + field_cache_set_.insert(fc_entry); + record = true; + } + } else { + /* This is an old time-stamp */ + record = true; + } + } + + if (!record) return; + DbHandler::TagMap tmap; DbHandler::AttribMap amap; DbHandler::Var pv; DbHandler::AttribMap attribs; - std::string table_name(table_prefix); - table_name.append(field_name); pv = table_name; tmap.insert(make_pair("name", make_pair(pv, amap))); attribs.insert(make_pair(string("name"), pv)); @@ -682,11 +712,14 @@ void DbHandler::ObjectTableInsert(const std::string &table, const std::string &o const SandeshHeader &header(vmsgp->msg->GetHeader()); const std::string &message_type(vmsgp->msg->GetMessageType()); //Insert into the FieldNames stats table entries for Messagetype and Module ID - FieldNamesTableInsert(table, ":Objecttype", objectkey_str, timestamp, ttl); - FieldNamesTableInsert(table, ":Messagetype", message_type, timestamp, ttl); - FieldNamesTableInsert(table, ":ModuleId", header.get_Module(), - timestamp, ttl); - FieldNamesTableInsert(table, ":Source", header.get_Source(), timestamp, ttl); + FieldNamesTableInsert(timestamp, + table, ":ObjectId", objectkey_str, ttl); + FieldNamesTableInsert(timestamp, + table, ":Messagetype", message_type, ttl); + FieldNamesTableInsert(timestamp, + table, ":ModuleId", header.get_Module(), ttl); + FieldNamesTableInsert(timestamp, + table, ":Source", header.get_Source(), ttl); } } @@ -944,6 +977,16 @@ DbHandler::StatTableInsertTtl(uint64_t ts, pair ptag; ptag.first = it->first; ptag.second = it->second.first; + + /* Record in the fieldNames table if we have a string tag, + and if we are not recording a fieldNames stats entry itself */ + if ((ptag.second.type == DbHandler::STRING) && + (statName.compare("FieldNames") != 0)) { + FieldNamesTableInsert(ts, std::string("StatTable.") + + statName + "." + statAttr, + std::string(":") + ptag.first, ptag.second.str, ttl); + } + if (it->second.second.empty()) { pair stag; StatTableWrite(temp_u32, statName, statAttr, @@ -1004,7 +1047,8 @@ boost::uuids::uuid DbHandler::seed_uuid = StringToUuid(std::string("ffffffff-fff static void PopulateFlowRecordTableColumns( const std::vector &frvt, - FlowValueArray &fvalues, GenDb::NewColVec& columns, const TtlMap& ttl_map) { + FlowValueArray &fvalues, GenDb::NewColVec& columns, const TtlMap& ttl_map, + boost::function fncb) { int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL); columns.reserve(frvt.size()); for (std::vector::const_iterator it = frvt.begin(); @@ -1014,6 +1058,13 @@ static void PopulateFlowRecordTableColumns( GenDb::NewCol *col(new GenDb::NewCol( g_viz_constants.FlowRecordNames[(*it)], db_value, ttl)); columns.push_back(col); + if ((*it==FlowRecordFields::FLOWREC_VROUTER)|| + (*it==FlowRecordFields::FLOWREC_SOURCEVN)|| + (*it==FlowRecordFields::FLOWREC_DESTVN)) { + std::string sval = boost::get(db_value); + fncb(std::string(":")+g_viz_constants.FlowRecordNames[(*it)], + sval, ttl); + } } } } @@ -1028,12 +1079,13 @@ static void PopulateFlowRecordTableRowKey( } static bool PopulateFlowRecordTable(FlowValueArray &fvalues, - GenDb::GenDbIf *dbif, const TtlMap& ttl_map) { + GenDb::GenDbIf *dbif, const TtlMap& ttl_map, + boost::function fncb) { std::auto_ptr colList(new GenDb::ColList); colList->cfname_ = g_viz_constants.FLOW_TABLE; PopulateFlowRecordTableRowKey(fvalues, colList->rowkey_); PopulateFlowRecordTableColumns(FlowRecordTableColumns, fvalues, - colList->columns_, ttl_map); + colList->columns_, ttl_map, fncb); return dbif->Db_AddColumn(colList); } @@ -1082,13 +1134,21 @@ static const std::string& FlowIndexTable2String(FlowIndexTableType ttype) { static void PopulateFlowIndexTableColumnValues( const std::vector &frvt, - FlowValueArray &fvalues, GenDb::DbDataValueVec &cvalues) { + FlowValueArray &fvalues, GenDb::DbDataValueVec &cvalues, int ttl, + boost::function fncb) { cvalues.reserve(frvt.size()); for (std::vector::const_iterator it = frvt.begin(); it != frvt.end(); it++) { GenDb::DbDataValue &db_value(fvalues[(*it)]); if (db_value.which() != GenDb::DB_VALUE_BLANK) { cvalues.push_back(db_value); + if ((*it==FlowRecordFields::FLOWREC_VROUTER)|| + (*it==FlowRecordFields::FLOWREC_SOURCEVN)|| + (*it==FlowRecordFields::FLOWREC_DESTVN)) { + std::string sval = boost::get(db_value); + fncb(std::string(":")+g_viz_constants.FlowRecordNames[(*it)], + sval, ttl); + } } } } @@ -1152,14 +1212,16 @@ static void PopulateFlowIndexTableColumns(FlowIndexTableType ftype, static bool PopulateFlowIndexTables(FlowValueArray &fvalues, uint32_t &T2, uint32_t &T1, uint8_t partition_no, - GenDb::GenDbIf *dbif, const TtlMap& ttl_map) { + GenDb::GenDbIf *dbif, const TtlMap& ttl_map, + boost::function fncb) { // Populate row key and column values (same for all flow index // tables) GenDb::DbDataValueVec rkey; PopulateFlowIndexTableRowKey(fvalues, T2, partition_no, rkey); GenDb::DbDataValueVec cvalues; + int ttl = DbHandler::GetTtlFromMap(ttl_map, TtlType::FLOWDATA_TTL); PopulateFlowIndexTableColumnValues(FlowIndexTableColumnValues, fvalues, - cvalues); + cvalues, ttl, fncb); // Populate the Flow Index Tables for (int tid = FLOW_INDEX_TABLE_MIN; tid < FLOW_INDEX_TABLE_MAX_PLUS_1; ++tid) { @@ -1286,7 +1348,12 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, // Partition no uint8_t partition_no = 0; // Populate Flow Record Table - if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_)) { + + boost::function fncb = + boost::bind(&DbHandler::FieldNamesTableInsert, + this, timestamp, g_viz_constants.FLOW_TABLE, _1,_2,_3); + if (!PopulateFlowRecordTable(flow_entry_values, dbif_.get(), ttl_map_, + fncb)) { DB_LOG(ERROR, "Populating FlowRecordTable FAILED"); } GenDb::DbDataValue &diff_bytes( @@ -1295,10 +1362,13 @@ bool DbHandler::FlowSampleAdd(const pugi::xml_node& flow_sample, flow_entry_values[FlowRecordFields::FLOWREC_DIFF_PACKETS]); // Populate Flow Index Tables only if FLOWREC_DIFF_BYTES and // FLOWREC_DIFF_PACKETS are present + boost::function fncb2 = + boost::bind(&DbHandler::FieldNamesTableInsert, + this, timestamp, g_viz_constants.FLOW_SERIES_TABLE, _1,_2,_3); if (diff_bytes.which() != GenDb::DB_VALUE_BLANK && diff_packets.which() != GenDb::DB_VALUE_BLANK) { if (!PopulateFlowIndexTables(flow_entry_values, T2, T1, partition_no, - dbif_.get(), ttl_map_)) { + dbif_.get(), ttl_map_, fncb2)) { DB_LOG(ERROR, "Populating FlowIndexTables FAILED"); } } diff --git a/src/analytics/db_handler.h b/src/analytics/db_handler.h index 58698b67b6d..27c86830749 100644 --- a/src/analytics/db_handler.h +++ b/src/analytics/db_handler.h @@ -104,9 +104,9 @@ class DbHandler { const boost::uuids::uuid& unm, const std::string keyword); virtual void MessageTableInsert(const VizMsg *vmsgp); void MessageTableOnlyInsert(const VizMsg *vmsgp); - void FieldNamesTableInsert(const std::string& table_name, - const std::string& field_name, const std::string& field_val, - uint64_t timestamp, int ttl); + void FieldNamesTableInsert(uint64_t timestamp, + const std::string& table_name, + const std::string& field_name, const std::string& field_val, int ttl); void GetRuleMap(RuleMap& rulemap); void ObjectTableInsert(const std::string &table, const std::string &rowkey, @@ -175,7 +175,9 @@ class DbHandler { GenDb::DbTableStatistics stable_stats_; mutable tbb::mutex smutex_; TtlMap ttl_map_; - + uint32_t field_cache_t2_; + std::set field_cache_set_; + DISALLOW_COPY_AND_ASSIGN(DbHandler); }; diff --git a/src/analytics/ruleeng.cc b/src/analytics/ruleeng.cc index 1cd0df2157d..2ee5f9c7efb 100644 --- a/src/analytics/ruleeng.cc +++ b/src/analytics/ruleeng.cc @@ -559,6 +559,7 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent, return false; } + map emap, vmap; if (deleted) { if (!osp_->UVEDelete(object.name(), source, node_type, module, instance_id, key, seq, is_alarm)) { @@ -571,7 +572,7 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent, } LOG(DEBUG, __func__ << " Deleted " << key); osp_->UVENotif(object.name(), - source, node_type, module, instance_id, table, barekey, deleted); + source, node_type, module, instance_id, table, barekey, emap ,deleted); return true; } @@ -585,6 +586,7 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent, if (strcmp(tempstr, "")) { continue; } + vmap.insert(make_pair(node.name(), ostr.str())); tempstr = node.attribute("aggtype").value(); if (strcmp(tempstr, "")) { agg = std::string(tempstr); @@ -636,8 +638,13 @@ bool Ruleeng::handle_uve_publish(const pugi::xml_node& parent, } // Publish on the Kafka bus that this UVE has changed - osp_->UVENotif(object.name(), - source, node_type, module, instance_id, table, barekey, deleted); + if (!strcmp(object.name(), "UVEAlarms")) { + osp_->UVENotif(object.name(), + source, node_type, module, instance_id, table, barekey, vmap, deleted); + } else { + osp_->UVENotif(object.name(), + source, node_type, module, instance_id, table, barekey, emap, deleted); + } return true; } diff --git a/src/analytics/test/db_handler_test.cc b/src/analytics/test/db_handler_test.cc index d16990a34e4..f3218c32ff1 100644 --- a/src/analytics/test/db_handler_test.cc +++ b/src/analytics/test/db_handler_test.cc @@ -424,18 +424,6 @@ TEST_F(DbHandlerTest, ObjectTableInsertTest) { } { - boost::uuids::uuid unm_allf = StringToUuid(std::string("ffffffff-ffff-ffff-ffff-ffffffffffff")); - DbDataValueVec *colname(new DbDataValueVec); - colname->reserve(4); - colname->push_back("ObjectTableInsertTest:Objecttype"); - colname->push_back(""); - colname->push_back((uint32_t)0); - colname->push_back(unm_allf); - DbDataValueVec *colvalue(new DbDataValueVec(1,"")); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, 0)); - GenDb::DbDataValueVec rowkey; rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); rowkey.push_back((uint8_t)0); @@ -452,18 +440,6 @@ TEST_F(DbHandlerTest, ObjectTableInsertTest) { } { - boost::uuids::uuid unm_allf = StringToUuid(std::string("ffffffff-ffff-ffff-ffff-ffffffffffff")); - DbDataValueVec *colname(new DbDataValueVec); - colname->reserve(4); - colname->push_back(hdr.get_Source()); - colname->push_back(""); - colname->push_back((uint32_t)0); - colname->push_back(unm_allf); - DbDataValueVec *colvalue(new DbDataValueVec(1,"")); - boost::ptr_vector expected_vector = - boost::assign::ptr_list_of - (GenDb::NewCol(colname, colvalue, 0)); - GenDb::DbDataValueVec rowkey; rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); rowkey.push_back((uint8_t)0); @@ -492,7 +468,6 @@ TEST_F(DbHandlerTest, FlowTableInsertTest) { hdr.set_Source("127.0.0.1"); std::string messagetype(""); std::vector > > flow_msgs; - // Flow sandesh with single flow sample { std::string xmlmessage = "555788e0-513c-4351-8711-3fc481cf2eb40default-domain:demo:vn1-1062731011default-domain:demo:vn0-106273126765201-2459004430130-664a-4b89-9287-39d71f35120758745ee7-d616-4e59-b8f7-96f896487c9f0000"; @@ -548,6 +523,38 @@ TEST_F(DbHandlerTest, FlowTableInsertTest) { flowdata_list.push_back(flow_data2); flow_msgs.push_back(std::make_pair(xmlmessage, flowdata_list)); } + // FieldNames will be call 7 times each for FlowTable and FlowSeriesTable + // This dataset has 3 unique svn, 3 unique dvn, and a single vrouter + { + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); + rowkey.push_back((uint8_t)0); + rowkey.push_back("FieldNames"); + rowkey.push_back("fields"); + rowkey.push_back("name"); + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.STATS_TABLE_BY_STR_TAG), + Field(&GenDb::ColList::rowkey_, rowkey),_)))) + .Times(14) + .WillRepeatedly(Return(true)); + } + { + GenDb::DbDataValueVec rowkey; + rowkey.push_back((uint32_t)(hdr.get_Timestamp() >> g_viz_constants.RowTimeInBits)); + rowkey.push_back((uint8_t)0); + rowkey.push_back("FieldNames"); + rowkey.push_back("fields"); + rowkey.push_back("Source"); + EXPECT_CALL(*dbif_mock(), + Db_AddColumnProxy( + Pointee( + AllOf(Field(&GenDb::ColList::cfname_, g_viz_constants.STATS_TABLE_BY_STR_TAG), + Field(&GenDb::ColList::rowkey_, rowkey),_)))) + .Times(14) + .WillRepeatedly(Return(true)); + } std::vector > >:: const_iterator fit; @@ -560,7 +567,6 @@ TEST_F(DbHandlerTest, FlowTableInsertTest) { std::vector::const_iterator dit; for (dit = fit->second.begin(); dit != fit->second.end(); dit++) { boost::uuids::uuid flowu = StringToUuid(dit->get_flowuuid()); - // set expectations for FLOW_TABLE { GenDb::DbDataValueVec rowkey; diff --git a/src/nodemgr/main.py b/src/nodemgr/main.py index 0083fb48682..06d57f2c2df 100755 --- a/src/nodemgr/main.py +++ b/src/nodemgr/main.py @@ -76,7 +76,6 @@ def main(args_str=' '.join(sys.argv[1:])): node_type = args.nodetype if (node_type == 'contrail-analytics'): config_file = '/etc/contrail/contrail-analytics-nodemgr.conf' - default['collectors'] = ['127.0.0.1:8086'] elif (node_type == 'contrail-config'): config_file = '/etc/contrail/contrail-config-nodemgr.conf' elif (node_type == 'contrail-control'): diff --git a/src/opserver/alarmgen.py b/src/opserver/alarmgen.py index 291eb00856d..11e569afbeb 100644 --- a/src/opserver/alarmgen.py +++ b/src/opserver/alarmgen.py @@ -218,6 +218,25 @@ def __init__(self, conf, test_logger=None): self._node_type_name = NodeTypeNames[node_type] self._hostname = socket.gethostname() self._instance_id = self._conf.worker_id() + + self.disc = None + self._libpart_name = self._hostname + ":" + self._instance_id + self._libpart = None + self._partset = set() + if self._conf.discovery()['server']: + self._max_out_rows = 20 + data = { + 'ip-address': self._hostname , + 'port': self._instance_id + } + self.disc = client.DiscoveryClient( + self._conf.discovery()['server'], + self._conf.discovery()['port'], + ModuleNames[Module.ALARM_GENERATOR]) + print("Disc Publish to %s : %s" + % (str(self._conf.discovery()), str(data))) + self.disc.publish(ALARM_GENERATOR_SERVICE_NAME, data) + is_collector = True if test_logger is not None: is_collector = False @@ -233,6 +252,7 @@ def __init__(self, conf, test_logger=None): self._conf.http_port(), ['opserver.sandesh', 'sandesh'], host_ip=self._conf.host_ip(), + discovery_client=self.disc, connect_to_collector = is_collector) if test_logger is not None: self._logger = test_logger @@ -287,28 +307,8 @@ def __init__(self, conf, test_logger=None): self._us = UVEServer(None, self._logger, self._conf.redis_password()) - self._workers = {} - self._uvestats = {} - self._uveq = {} - self._uveqf = {} - - self.disc = None - self._libpart_name = self._hostname + ":" + self._instance_id - self._libpart = None - self._partset = set() - if self._conf.discovery()['server']: - data = { - 'ip-address': self._hostname , - 'port': self._instance_id - } - self.disc = client.DiscoveryClient( - self._conf.discovery()['server'], - self._conf.discovery()['port'], - ModuleNames[Module.ALARM_GENERATOR]) - self._logger.info("Disc Publish to %s : %s" - % (str(self._conf.discovery()), str(data))) - self.disc.publish(ALARM_GENERATOR_SERVICE_NAME, data) - else: + if not self.disc: + self._max_out_rows = 2 # If there is no discovery service, use fixed redis_uve list redis_uve_list = [] try: @@ -317,13 +317,18 @@ def __init__(self, conf, test_logger=None): redis_elem = (redis_ip_port[0], int(redis_ip_port[1]),0) redis_uve_list.append(redis_elem) except Exception as e: - self._logger.error('Failed to parse redis_uve_list: %s' % e) + print('Failed to parse redis_uve_list: %s' % e) else: self._us.update_redis_uve_list(redis_uve_list) # If there is no discovery service, use fixed alarmgen list self._libpart = self.start_libpart(self._conf.alarmgen_list()) + self._workers = {} + self._uvestats = {} + self._uveq = {} + self._uveqf = {} + PartitionOwnershipReq.handle_request = self.handle_PartitionOwnershipReq PartitionStatusReq.handle_request = self.handle_PartitionStatusReq UVETableAlarmReq.handle_request = self.handle_UVETableAlarmReq @@ -446,6 +451,37 @@ def handle_resource_check(self, part, current_inst, msgs): return disc_instances, coll_delete, chg_res + def reconnect_agg_uve(self, lredis): + self._logger.error("Connected to Redis for Agg") + lredis.ping() + for pp in self._workers.keys(): + self._workers[pp].reset_acq_time() + self._workers[pp].kill(\ + RuntimeError('UVE Proc failed'), + block=False) + self.clear_agg_uve(lredis, + self._instance_id, + pp, + self._workers[pp].acq_time()) + self.stop_uve_partition(pp) + for part in self._uveq.keys(): + del self._uveq[part] + + def clear_agg_uve(self, redish, inst, part, acq_time): + self._logger.error("Agg %s reset part %d, acq %d" % \ + (inst, part, acq_time)) + ppe2 = redish.pipeline() + ppe2.hdel("AGPARTS:%s" % inst, part) + ppe2.smembers("AGPARTKEYS:%s:%d" % (inst, part)) + pperes2 = ppe2.execute() + ppe3 = redish.pipeline() + # Remove all contents for this AG-Partition + for elem in pperes2[-1]: + ppe3.delete("AGPARTVALUES:%s:%d:%s" % (inst, part, elem)) + ppe3.delete("AGPARTKEYS:%s:%d" % (inst, part)) + ppe3.hset("AGPARTS:%s" % inst, part, acq_time) + pperes3 = ppe3.execute() + def send_agg_uve(self, redish, inst, part, acq_time, rows): """ This function writes aggregated UVEs to redis @@ -456,6 +492,8 @@ def send_agg_uve(self, redish, inst, part, acq_time, rows): The key and typename information is also published on a redis channel """ + if not redish: + assert() old_acq_time = redish.hget("AGPARTS:%s" % inst, part) if old_acq_time is None: self._logger.error("Agg %s part %d new" % (inst, part)) @@ -465,17 +503,7 @@ def send_agg_uve(self, redish, inst, part, acq_time, rows): if int(old_acq_time) != acq_time: self._logger.error("Agg %s stale info part %d, acqs %d,%d" % \ (inst, part, int(old_acq_time), acq_time)) - ppe2 = redish.pipeline() - ppe2.hdel("AGPARTS:%s" % inst, part) - ppe2.smembers("AGPARTKEYS:%s:%d" % (inst, part)) - pperes2 = ppe2.execute() - ppe3 = redish.pipeline() - # Remove all contents for this AG-Partition - for elem in pperes2[-1]: - ppe3.delete("AGPARTVALUES:%s:%d:%s" % (inst, part, elem)) - ppe3.delete("AGPARTKEYS:%s:%d" % (inst, part)) - ppe3.hset("AGPARTS:%s" % inst, part, acq_time) - pperes3 = ppe3.execute() + self.clear_agg_uve(redish, inst, part, acq_time) pub_list = [] ppe = redish.pipeline() @@ -486,15 +514,18 @@ def send_agg_uve(self, redish, inst, part, acq_time, rows): key = row.key pub_list.append({"key":key,"type":typ}) if typ is None: + self._logger.debug("Agg remove part %d, key %s" % (part,key)) # The entire contents of the UVE should be removed ppe.srem("AGPARTKEYS:%s:%d" % (inst, part), key) ppe.delete("AGPARTVALUES:%s:%d:%s" % (inst, part, key)) else: if row.val is None: + self._logger.debug("Agg remove part %d, key %s, type %s" % (part,key,typ)) # Remove the given struct from the UVE ppe.hdel("AGPARTVALUES:%s:%d:%s" % (inst, part, key), typ) check_keys.add(key) else: + self._logger.debug("Agg update part %d, key %s, type %s" % (part,key,typ)) ppe.sadd("AGPARTKEYS:%s:%d" % (inst, part), key) ppe.hset("AGPARTVALUES:%s:%d:%s" % (inst, part, key), typ, vjson) @@ -526,6 +557,7 @@ def send_agg_uve(self, redish, inst, part, acq_time, rows): redish.publish('AGPARTPUB:%s:%d' % (inst, part), json.dumps(pub_list)) if retry: + self._logger.error("Agg unexpected rows %s" % str(rows)) assert() def run_uve_processing(self): @@ -538,10 +570,6 @@ def run_uve_processing(self): set should not grow in an unbounded manner (like a queue can) """ - if self.disc: - max_out_rows = 20 - else: - max_out_rows = 2 lredis = None while True: for part in self._uveqf.keys(): @@ -552,41 +580,41 @@ def run_uve_processing(self): if part in self._uveq: del self._uveq[part] prev = time.time() - gevs = {} - pendingset = {} - for part in self._uveq.keys(): - if not len(self._uveq[part]): - continue - self._logger.info("UVE Process for %d" % part) - - # Allow the partition handlers to queue new UVEs without - # interfering with the work of processing the current UVEs - pendingset[part] = copy.deepcopy(self._uveq[part]) - self._uveq[part] = {} - - gevs[part] = gevent.spawn(self.handle_uve_notif,part,\ - pendingset[part]) - if len(gevs): - gevent.joinall(gevs.values()) - for part in gevs.keys(): - # If UVE processing failed, requeue the working set - outp = gevs[part].get() - if outp is None: - self._logger.error("UVE Process failed for %d" % part) - self.handle_uve_notifq(part, pendingset[part]) - elif not part in self._workers: - self._logger.error( - "Part %d is gone, cannot process UVEs" % part) - else: - acq_time = self._workers[part].acq_time() - try: - if lredis is None: - lredis = redis.StrictRedis( - host="127.0.0.1", - port=self._conf.redis_server_port(), - password=self._conf.redis_password(), - db=2) - + try: + if lredis is None: + lredis = redis.StrictRedis( + host="127.0.0.1", + port=self._conf.redis_server_port(), + password=self._conf.redis_password(), + db=7) + self.reconnect_agg_uve(lredis) + gevs = {} + pendingset = {} + for part in self._uveq.keys(): + if not len(self._uveq[part]): + continue + self._logger.info("UVE Process for %d" % part) + + # Allow the partition handlers to queue new UVEs without + # interfering with the work of processing the current UVEs + pendingset[part] = copy.deepcopy(self._uveq[part]) + self._uveq[part] = {} + + gevs[part] = gevent.spawn(self.handle_uve_notif,part,\ + pendingset[part]) + if len(gevs): + gevent.joinall(gevs.values()) + for part in gevs.keys(): + # If UVE processing failed, requeue the working set + outp = gevs[part].get() + if outp is None: + self._logger.error("UVE Process failed for %d" % part) + self.handle_uve_notifq(part, pendingset[part]) + elif not part in self._workers: + self._logger.error( + "Part %d is gone, cannot process UVEs" % part) + else: + acq_time = self._workers[part].acq_time() if len(outp): rows = [] for ku,vu in outp.iteritems(): @@ -594,7 +622,7 @@ def run_uve_processing(self): # This message has no type! # Its used to indicate a delete of the entire UVE rows.append(OutputRow(key=ku, typ=None, val=None)) - if len(rows) >= max_out_rows: + if len(rows) >= self._max_out_rows: self.send_agg_uve(lredis, self._instance_id, part, @@ -604,7 +632,7 @@ def run_uve_processing(self): continue for kt,vt in vu.iteritems(): rows.append(OutputRow(key=ku, typ=kt, val=vt)) - if len(rows) >= max_out_rows: + if len(rows) >= self._max_out_rows: self.send_agg_uve(lredis, self._instance_id, part, @@ -620,16 +648,14 @@ def run_uve_processing(self): rows) rows[:] = [] - except Exception as ex: - template = "Exception {0} in uve proc. Arguments:\n{1!r}" - messag = template.format(type(ex).__name__, ex.args) - self._logger.error("%s : traceback %s" % \ - (messag, traceback.format_exc())) - lredis = None - # We need to requeue - self.handle_uve_notifq(part, pendingset[part]) - gevent.sleep(1) - + except Exception as ex: + template = "Exception {0} in uve proc. Arguments:\n{1!r}" + messag = template.format(type(ex).__name__, ex.args) + self._logger.error("%s : traceback %s" % \ + (messag, traceback.format_exc())) + lredis = None + gevent.sleep(1) + curr = time.time() if (curr - prev) < 0.5: gevent.sleep(0.5 - (curr - prev)) diff --git a/src/opserver/alarmgen_cfg.py b/src/opserver/alarmgen_cfg.py index 319181c1e9c..ab8d1b0e022 100644 --- a/src/opserver/alarmgen_cfg.py +++ b/src/opserver/alarmgen_cfg.py @@ -19,7 +19,6 @@ def parse(self): --log_file --use_syslog --syslog_facility LOG_USER - --collectors 127.0.0.1:8086 --disc_server_ip 127.0.0.1 --disc_server_port 5998 --worker_id 0 @@ -52,7 +51,7 @@ def parse(self): defaults = { 'host_ip' : '127.0.0.1', - 'collectors' : ['127.0.0.1:8086'], + 'collectors' : [], 'kafka_broker_list' : ['127.0.0.1:9092'], 'log_local' : False, 'log_level' : SandeshLevel.SYS_DEBUG, diff --git a/src/opserver/partition_handler.py b/src/opserver/partition_handler.py index 75aa21d666e..298ffa2bf24 100644 --- a/src/opserver/partition_handler.py +++ b/src/opserver/partition_handler.py @@ -111,7 +111,7 @@ def _get_uve_content(self, table, barekeys, tfilter, ackfilter, keysonly): host=pi.ip_address, port=pi.port, password=self._rpass, - db=2) + db=7) ppe = lredis.pipeline() luves = list(uveparts[pkey]) for elem in luves: @@ -342,7 +342,7 @@ def _run(self): host=self._pi.ip_address, port=self._pi.port, password=self._rpass, - db=2) + db=7) pb = lredis.pubsub() inst = self._pi.instance_id part = self._partno @@ -587,7 +587,7 @@ def _run(self): # start reading from last previously processed message if mi != None: - consumer.seek(0,1) + consumer.seek(-1,1) else: consumer.seek(0,0) @@ -657,6 +657,9 @@ def __init__(self, brokers, partition, uve_topic, logger, callback, self._acq_time = UTCTimestampUsec() self._rport = rport + def reset_acq_time(self): + self._acq_time = UTCTimestampUsec() + def acq_time(self): return self._acq_time @@ -672,28 +675,11 @@ def resource_check(self, msgs): if len(chg_res): self.start_partition(chg_res) self.disc_rset = newset - if self._disc: - data = { 'instance-id' : self._aginst, - 'partition' : str(self._partno), - 'ip-address': self._host_ip, - 'acq-time': str(self._acq_time), - 'port':str(self._rport)} - self._disc.publish(ALARM_PARTITION_SERVICE_NAME, data) def stop_partition(self, kcoll=None): clist = [] if not kcoll: clist = self._uvedb.keys() - # If all collectors are being cleared, clear resoures too - self.disc_rset = set() - if self._disc: - # TODO: Unpublish instead of setting acq-time to 0 - data = { 'instance-id' : self._aginst, - 'partition' : str(self._partno), - 'ip-address': self._host_ip, - 'acq-time': "0", - 'port':str(self._rport)} - self._disc.publish(ALARM_PARTITION_SERVICE_NAME, data) else: clist = [kcoll] self._logger.error("Stopping part %d collectors %s" % \ @@ -715,13 +701,33 @@ def stop_partition(self, kcoll=None): del self._uvedb[coll] self._logger.error("Stopping part %d UVEs %s" % \ (self._partno,str(chg.keys()))) - self._callback(self._partno, chg) + if kcoll: + self._callback(self._partno, chg) + else: + # If all collectors are being cleared, clear resoures too + self.disc_rset = set() + if self._disc: + # TODO: Unpublish instead of setting acq-time to 0 + data = { 'instance-id' : self._aginst, + 'partition' : str(self._partno), + 'ip-address': self._host_ip, + 'acq-time': "0", + 'port':str(self._rport)} + self._disc.publish(ALARM_PARTITION_SERVICE_NAME, data) + return partdb def start_partition(self, cbdb): ''' This function loads the initial UVE database. for the partition ''' + if self._disc: + data = { 'instance-id' : self._aginst, + 'partition' : str(self._partno), + 'ip-address': self._host_ip, + 'acq-time': str(self._acq_time), + 'port':str(self._rport)} + self._disc.publish(ALARM_PARTITION_SERVICE_NAME, data) self._logger.error("Starting part %d collectors %s" % \ (self._partno, str(cbdb.keys()))) uves = {} @@ -738,13 +744,14 @@ def start_partition(self, cbdb): self._uvedb[kcoll][kgen][tab][rkey] = {} if not kk in uves: - uves[kk] = {} + uves[kk] = None for typ, contents in gen[kk].iteritems(): self._uvedb[kcoll][kgen][tab][rkey][typ] = {} self._uvedb[kcoll][kgen][tab][rkey][typ]["c"] = 0 self._uvedb[kcoll][kgen][tab][rkey][typ]["u"] = \ uuid.uuid1(self._ip_code) - uves[kk][typ] = contents + # TODO: for loading only specific types: + # uves[kk][typ] = contents self._logger.error("Starting part %d UVEs %s" % \ (self._partno, str(uves.keys()))) diff --git a/src/opserver/test/test_alarm.py b/src/opserver/test/test_alarm.py index 9c6b2672364..2279ab9b476 100755 --- a/src/opserver/test/test_alarm.py +++ b/src/opserver/test/test_alarm.py @@ -267,6 +267,8 @@ def tearDown(self): self._agtask.kill() + @mock.patch('opserver.alarmgen.Controller.reconnect_agg_uve') + @mock.patch('opserver.alarmgen.Controller.clear_agg_uve') @mock.patch('opserver.alarmgen.Controller.send_agg_uve') @mock.patch.object(UVEServer, 'get_part') @mock.patch.object(UVEServer, 'get_uve') @@ -275,7 +277,8 @@ def tearDown(self): # Test partition shutdown as well def test_00_init(self, mock_SimpleConsumer, - mock_get_uve, mock_get_part, mock_send_agg_uve): + mock_get_uve, mock_get_part, + mock_send_agg_uve, mock_clear_agg_uve, mock_reconnect_agg_uve): m_get_part = Mock_get_part() m_get_part[(1,("127.0.0.1",0,0))] = "127.0.0.1:0", \ @@ -303,6 +306,8 @@ def test_00_init(self, self._ag.ptab_info, False)) + @mock.patch('opserver.alarmgen.Controller.reconnect_agg_uve') + @mock.patch('opserver.alarmgen.Controller.clear_agg_uve') @mock.patch('opserver.alarmgen.Controller.send_agg_uve') @mock.patch.object(UVEServer, 'get_part') @mock.patch.object(UVEServer, 'get_uve') @@ -311,7 +316,8 @@ def test_00_init(self, # Also test for deletetion of a boot-straped UVE def test_01_rxmsg(self, mock_SimpleConsumer, - mock_get_uve, mock_get_part, mock_send_agg_uve): + mock_get_uve, mock_get_part, + mock_send_agg_uve, mock_clear_agg_uve, mock_reconnect_agg_uve): m_get_part = Mock_get_part() m_get_part[(1,("127.0.0.1",0,0))] = "127.0.0.1:0", \ @@ -340,6 +346,8 @@ def test_01_rxmsg(self, self.assertTrue(self.checker_exact(\ self._ag.ptab_info[1]["ObjectYY"]["uve2"].values(), {"type2" : {"yy": 1}})) + @mock.patch('opserver.alarmgen.Controller.reconnect_agg_uve') + @mock.patch('opserver.alarmgen.Controller.clear_agg_uve') @mock.patch('opserver.alarmgen.Controller.send_agg_uve') @mock.patch.object(UVEServer, 'get_part') @mock.patch.object(UVEServer, 'get_uve') @@ -348,7 +356,8 @@ def test_01_rxmsg(self, # Also test collector shutdown def test_02_collectorha(self, mock_SimpleConsumer, - mock_get_uve, mock_get_part, mock_send_agg_uve): + mock_get_uve, mock_get_part, + mock_send_agg_uve, mock_clear_agg_uve, mock_reconnect_agg_uve): m_get_part = Mock_get_part() m_get_part[(1,("127.0.0.1",0,0))] = "127.0.0.1:0", \ diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index 10ad84f9803..c9cbeacd028 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -261,8 +261,9 @@ def start(self): "contrail-alarm-gen", ["http"], args, None, False) self.http_port = ports["http"] - for part in range(0,self.partitions): - assert(self.analytics_fixture.set_alarmgen_partition(part,1) == 'true') + if self.http_port: + for part in range(0,self.partitions): + self.analytics_fixture.set_alarmgen_partition(part,1) return self.verify_setup() # end start @@ -2227,7 +2228,7 @@ def verify_fieldname_table(self): start_time='-1m', end_time='now', select_fields=['fields.value'], - where_clause = 'name=ObjectVNTable:Objecttype') + where_clause = 'name=ObjectVNTable:ObjectId') self.logger.info(str(res)) #Verify that 2 different n/w are present vn0 and vn1 assert(len(res)==2) @@ -2583,15 +2584,22 @@ def start_with_ephemeral_ports(self, modname, pnames, args, preexec, try: line = pipein.readline()[:-1] port = int(line) - self.logger.info("Found %s_port %d" % (k, port)) + self.logger.info("Found %s_port %d for %s" % (k, port, modname)) tries = -1 except Exception as e: - self.logger.info("No %s_port found" % k) + self.logger.info("No %s_port found for %s" % (k, modname)) gevent.sleep(1) tries = tries - 1 pipein.close() os.unlink(pipe_name) pmap[k] = port + if not instance.poll() is None: + (p_out, p_err) = instance.communicate() + rcode = instance.returncode + self.logger.info('%s returned %d at startup!' % (modname,rcode)) + self.logger.info('%s terminated stdout: %s' % (modname, p_out)) + self.logger.info('%s terminated stderr: %s' % (modname, p_err)) + return pmap, instance @staticmethod diff --git a/src/opserver/uveserver.py b/src/opserver/uveserver.py index f270bce550b..c60fd1aa3b1 100644 --- a/src/opserver/uveserver.py +++ b/src/opserver/uveserver.py @@ -380,7 +380,7 @@ def get_uve(self, key, flat, filters=None, base_url=None): failures = True else: self._redis_inst_up(r_inst, redish) - self._logger.debug("Computed %s as %s" % (key,str(rsp))) + self._logger.debug("Computed %s as %s" % (key,rsp.keys())) return failures, rsp # end get_uve