diff --git a/src/analytics/db_handler.cc b/src/analytics/db_handler.cc index 04537ed9ab5..7c02978e656 100644 --- a/src/analytics/db_handler.cc +++ b/src/analytics/db_handler.cc @@ -183,20 +183,22 @@ bool DbHandler::CreateTables() { GenDb::NewColVec& columns = col_list->columns_; columns.reserve(4); + uint64_t current_tm = UTCTimestampUsec(); + GenDb::NewCol *col(new GenDb::NewCol( - g_viz_constants.SYSTEM_OBJECT_START_TIME, UTCTimestampUsec(), 0)); + g_viz_constants.SYSTEM_OBJECT_START_TIME, current_tm, 0)); columns.push_back(col); GenDb::NewCol *flow_col(new GenDb::NewCol( - g_viz_constants.SYSTEM_OBJECT_FLOW_START_TIME, UTCTimestampUsec(), 0)); + g_viz_constants.SYSTEM_OBJECT_FLOW_START_TIME, current_tm, 0)); columns.push_back(flow_col); GenDb::NewCol *msg_col(new GenDb::NewCol( - g_viz_constants.SYSTEM_OBJECT_MSG_START_TIME, UTCTimestampUsec(), 0)); + g_viz_constants.SYSTEM_OBJECT_MSG_START_TIME, current_tm, 0)); columns.push_back(msg_col); GenDb::NewCol *stat_col(new GenDb::NewCol( - g_viz_constants.SYSTEM_OBJECT_STAT_START_TIME, UTCTimestampUsec(), 0)); + g_viz_constants.SYSTEM_OBJECT_STAT_START_TIME, current_tm, 0)); columns.push_back(stat_col); if (!dbif_->Db_AddColumnSync(col_list)) { diff --git a/src/opserver/analytics_database.sandesh b/src/opserver/analytics_database.sandesh index 2e0e3af9450..0f046215100 100644 --- a/src/opserver/analytics_database.sandesh +++ b/src/opserver/analytics_database.sandesh @@ -22,6 +22,7 @@ struct DatabasePurgeStats { 3: optional u64 rows_deleted 4: optional u64 duration 5: string purge_status + 6: list purge_status_details } struct DatabasePurgeInfo { diff --git a/src/opserver/analytics_db.py b/src/opserver/analytics_db.py index 51b0f5dbefe..c6ba5c6149e 100644 --- a/src/opserver/analytics_db.py +++ b/src/opserver/analytics_db.py @@ -196,21 +196,26 @@ def get_analytics_db_purge_status(self, redis_list): def db_purge(self, purge_cutoff, purge_id): total_rows_deleted = 0 # total number of rows deleted + purge_error_details = [] if (self._pool == None): self.connect_db() if not self._pool: self._logger.error('Connection to AnalyticsDb has Timed out') - return -1 + purge_error_details.append('Connection to AnalyticsDb has Timed out') + return (-1, purge_error_details) sysm = self._get_sysm() if (sysm == None): self._logger.error('Failed to connect SystemManager') - return -1 + purge_error_details.append('Failed to connect SystemManager') + return (-1, purge_error_details) try: table_list = sysm.get_keyspace_column_families(COLLECTOR_KEYSPACE) except Exception as e: self._logger.error("Exception: Purge_id %s Failed to get " "Analytics Column families %s" % (purge_id, e)) - return -1 + purge_error_details.append("Exception: Failed to get " + "Analytics Column families %s" % (e)) + return (-1, purge_error_details) # delete entries from message table msg_table = COLLECTOR_GLOBAL_TABLE @@ -221,7 +226,9 @@ def db_purge(self, purge_cutoff, purge_id): except Exception as e: self._logger.error("purge_id %s Failure in fetching " "message table columnfamily %s" % e) - return -1 + purge_error_details.append("Failure in fetching " + "message table columnfamily %s" % e) + return (-1, purge_error_details) for table in table_list: # purge from index tables @@ -248,25 +255,33 @@ def db_purge(self, purge_cutoff, purge_id): except Exception as e: self._logger.error("purge_id %s Failure in fetching " "the columnfamily %s" % e) - return -1 + purge_error_details.append("Failure in fetching " + "the columnfamily %s" % e) + return (-1, purge_error_details) b = cf.batch() try: # get all columns only in case of one message index table - if (table is MESSAGE_TABLE_SOURCE): + if table == MESSAGE_TABLE_SOURCE: cols_to_fetch = 1000000 else: cols_to_fetch = 1 for key, cols in cf.get_range(column_count=cols_to_fetch): - t2 = key[0] + # key is of type integer for MESSAGE_TABLE_TIMESTAMP. + # For other tables, key is a composite type with + # first element being timestamp (integer). + if table == MESSAGE_TABLE_TIMESTAMP: + t2 = key + else: + t2 = key[0] # each row will have equivalent of 2^23 = 8388608 usecs row_time = (float(t2)*pow(2, RowTimeInBits)) if (row_time < purge_time): per_table_deleted +=1 total_rows_deleted +=1 - if (table is MESSAGE_TABLE_SOURCE): + if table == MESSAGE_TABLE_SOURCE: # get message table uuids to delete - del_msg_uuids.append(list(cols.values())) + del_msg_uuids.extend(cols.values()) try: b.remove(key) except Exception as e: @@ -292,24 +307,23 @@ def db_purge(self, purge_cutoff, purge_id): except Exception as e: self._logger.error("Exception: Purge_id %s message table " "doesnot have uuid %s" % (purge_id, e)) - - + purge_error_details.append("Exception: Message table " + "doesnot have uuid %s" % (e)) except Exception as e: self._logger.error("Exception: Purge_id:%s table:%s " "error: %s" % (purge_id, table, e)) + purge_error_details.append("Exception: Table:%s " + "error: %s" % (table, e)) continue - self._logger.info("Purge_id %s deleted %d rows from table: %s" - % (purge_id, per_table_deleted, table)) + self._logger.warning("Purge_id %s deleted %d rows from " + "table: %s" % (purge_id, per_table_deleted, table)) - self._logger.info("Purge_id %s deleted %d rows from table: %s" + self._logger.warning("Purge_id %s deleted %d rows from table: %s" % (purge_id, msg_table_deleted, COLLECTOR_GLOBAL_TABLE)) - # end deleting all relevant UUIDs from message table - - - self._logger.info("Purge_id %s total rows deleted: %s" + self._logger.warning("Purge_id %s total rows deleted: %s" % (purge_id, total_rows_deleted)) - return total_rows_deleted - # end purge_data + return (total_rows_deleted, purge_error_details) + # end db_purge def get_dbusage_info(self, rest_api_port): """Collects database usage information from all db nodes diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index c689effab2a..7b24a17f7da 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -1776,7 +1776,8 @@ def db_purge_operation(self, purge_cutoff, purge_id): self._analytics_db.number_of_purge_requests += 1 purge_info.number_of_purge_requests = \ self._analytics_db.number_of_purge_requests - total_rows_deleted = self._analytics_db.db_purge(purge_cutoff, purge_id) + total_rows_deleted, purge_stat.purge_status_details = \ + self._analytics_db.db_purge(purge_cutoff, purge_id) self._analytics_db.delete_db_purge_status() if (total_rows_deleted > 0): diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index ab5a4deee93..838b45513d3 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -21,6 +21,7 @@ from opserver_introspect_utils import VerificationOpsSrv from collector_introspect_utils import VerificationCollector from alarmgen_introspect_utils import VerificationAlarmGen +from generator_introspect_utils import VerificationGenerator from opserver.sandesh.viz.constants import COLLECTOR_GLOBAL_TABLE, SOURCE, MODULE from opserver.opserver_util import OpServerUtils from sandesh_common.vns.constants import NodeTypeNames, ModuleNames @@ -1923,7 +1924,6 @@ def verify_collector_object_log(self, start_time, end_time): @retry(delay=1, tries=5) def verify_collector_object_log_before_purge(self, start_time, end_time): self.logger.info('verify_collector_object_log_before_purge') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); res = self.verify_collector_object_log(start_time, end_time) self.logger.info("collector object log before purging: %s" % res) if not res: @@ -1935,14 +1935,18 @@ def verify_database_purge_query(self, json_qstr): self.logger.info('verify database purge query'); vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); res = vns.post_purge_query_json(json_qstr) - assert(res == 'started') - return True + try: + assert(res['status'] == 'started') + purge_id = res['purge_id'] + except KeyError: + assert(False) + else: + return purge_id # end verify_database_purge_query @retry(delay=2, tries=5) def verify_collector_object_log_after_purge(self, start_time, end_time): self.logger.info('verify_collector_object_log_after_purge') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) res = self.verify_collector_object_log(start_time, end_time) self.logger.info("collector object log after purging: %s" % res) if res != []: @@ -1950,17 +1954,38 @@ def verify_collector_object_log_after_purge(self, start_time, end_time): return True # end verify_collector_object_log_after_purge + @retry(delay=2, tries=5) + def verify_database_purge_status(self, purge_id): + self.logger.info('verify database purge status: purge_id [%s]' % + (purge_id)) + try: + ops_introspect = VerificationGenerator('127.0.0.1', + self.opserver.http_port) + db_purge_uve = ops_introspect.get_uve('DatabasePurgeInfo') + db_purge_stats = db_purge_uve['stats'][0] + except Exception as e: + self.logger.error('Failed to get DatabasePurgeInfo UVE: %s' % (e)) + return False + else: + self.logger.info(str(db_purge_stats)) + if db_purge_stats['purge_id'] != purge_id or \ + db_purge_stats['purge_status'] != 'success' or \ + db_purge_stats['purge_status_details']: + return False + return True + # end verify_database_purge_status + def verify_database_purge_with_percentage_input(self): - self.logger.info('verify database purge query') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + self.logger.info('verify database purge with percentage input') end_time = UTCTimestampUsec() start_time = end_time - 10*60*pow(10,6) assert(self.verify_collector_object_log_before_purge(start_time, end_time)) json_qstr = json.dumps({'purge_input': 100}) - assert(self.verify_database_purge_query(json_qstr)) + purge_id = self.verify_database_purge_query(json_qstr) + assert(self.verify_database_purge_status(purge_id)) assert(self.verify_collector_object_log_after_purge(start_time, end_time)) return True - # end verify_database_purge_query + # end verify_database_purge_with_percentage_input def verify_database_purge_support_utc_time_format(self): self.logger.info('verify database purge support utc time format') @@ -1969,7 +1994,8 @@ def verify_database_purge_support_utc_time_format(self): end_time = OpServerUtils.convert_to_utc_timestamp_usec('now') start_time = end_time - 20*60*pow(10,6) assert(self.verify_collector_object_log_before_purge(start_time, end_time)) - assert(self.verify_database_purge_query(json_qstr)) + purge_id = self.verify_database_purge_query(json_qstr) + assert(self.verify_database_purge_status(purge_id)) assert(self.verify_collector_object_log_after_purge(start_time, end_time)) return True # end verify_database_purge_support_utc_time_format @@ -1982,7 +2008,8 @@ def verify_database_purge_support_datetime_format(self): end_time = OpServerUtils.convert_to_utc_timestamp_usec(dt) start_time = end_time - 30*60*pow(10,6) assert(self.verify_collector_object_log_before_purge(start_time, end_time)) - assert(self.verify_database_purge_query(json_qstr)) + purge_id = self.verify_database_purge_query(json_qstr) + assert(self.verify_database_purge_status(purge_id)) assert(self.verify_collector_object_log_after_purge(start_time, end_time)) return True # end verify_database_purge_support_datetime_format @@ -1994,7 +2021,8 @@ def verify_database_purge_support_deltatime_format(self): end_time = OpServerUtils.convert_to_utc_timestamp_usec('-1s') start_time = end_time - 10*60*pow(10,6) assert(self.verify_collector_object_log_before_purge(start_time, end_time)) - assert(self.verify_database_purge_query(json_qstr)) + purge_id = self.verify_database_purge_query(json_qstr) + assert(self.verify_database_purge_status(purge_id)) assert(self.verify_collector_object_log_after_purge(start_time, end_time)) return True # end verify_database_purge_support_deltatime_format @@ -2004,11 +2032,16 @@ def verify_database_purge_request_limit(self): vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) json_qstr = json.dumps({'purge_input': 50}) res = vns.post_purge_query_json(json_qstr) - if (res == 'started'): + self.logger.info(str(res)) + try: + assert(res['status'] == 'started') + purge_id = res['purge_id'] res1 = vns.post_purge_query_json(json_qstr) - if (res1 == 'running'): - return True - return False + assert(res1['status'] == 'running') + assert(res1['purge_id'] == purge_id) + except KeyError: + assert(False) + return True # end verify_database_purge_request_limit @retry(delay=1, tries=5) diff --git a/src/opserver/test/utils/opserver_introspect_utils.py b/src/opserver/test/utils/opserver_introspect_utils.py index ba85d37bab1..d8a9b46847b 100644 --- a/src/opserver/test/utils/opserver_introspect_utils.py +++ b/src/opserver/test/utils/opserver_introspect_utils.py @@ -135,7 +135,6 @@ def post_purge_query_json(self, json_str, sync=True): purge_request_url, json_str, sync) if resp is not None: res = json.loads(resp) - res = res['status'] except Exception as e: print str(e) finally: