Skip to content

Commit

Permalink
This commit does two things:
Browse files Browse the repository at this point in the history
a) Modifies database purge uve to include error/warning strings related to the
last purge operation.

b) Make sure all analytics start time are same when the system first gets started.

Closes-Bug: 1451619
Change-Id: I68a78f6c8e120fafd771bfb84e89b06b49f3bbfd
(cherry picked from commit 2015613)

Fix database purge code in analytics-api

This patch fixes the following issues in db_purge() method.

1) The key type for MESSAGE_TABLE_TIMESTAMP is different from the
   other message index tables.

   pycassaShell output:

   >>> MESSAGETABLETIMESTAMP.key_validation_class
   'IntegerType'

   >>> MESSAGETABLESOURCE.key_validation_class
   'CompositeType(IntegerType, UTF8Type)'

   >>> MESSAGETABLEMESSAGETYPE.key_validation_class
   'CompositeType(IntegerType, UTF8Type)'

   The purge function always expects composite key and tries to extract
   the first element (timestamp) from the key.
   But for MESSAGE_TABLE_TIMESTAMP, the key is of type integer
   and hence it raises exception. Side-effect of this exception:
   data from the table MESSAGE_TABLE_TIMESTAMP not purged.

2) object identity check fails for table.
   => if (table is MESSAGE_TABLE_SOURCE):
   The above check always fails due to object id mismatch.
   Therefore, data is not purged from the MessageTable.
   Replaced object identity check with equality check.

Change-Id: I51d619ac5275acf737094b7cdf36bb3d462fcf81
Closes-Bug: #1487966
(cherry picked from commit 95077da)
  • Loading branch information
chandanjuniper authored and Sundaresan Rajangam committed Oct 29, 2015
1 parent 062460a commit 2a333c8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 41 deletions.
10 changes: 6 additions & 4 deletions src/analytics/db_handler.cc
Expand Up @@ -178,20 +178,22 @@ bool DbHandler::CreateTables() {
GenDb::NewColVec& columns = col_list->columns_;
columns.reserve(4);

uint64_t current_tm = UTCTimestampUsec();

GenDb::NewCol *col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_START_TIME, UTCTimestampUsec(), 0));
g_viz_constants.SYSTEM_OBJECT_START_TIME, current_tm, 0));
columns.push_back(col);

GenDb::NewCol *flow_col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_FLOW_START_TIME, UTCTimestampUsec(), 0));
g_viz_constants.SYSTEM_OBJECT_FLOW_START_TIME, current_tm, 0));
columns.push_back(flow_col);

GenDb::NewCol *msg_col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_MSG_START_TIME, UTCTimestampUsec(), 0));
g_viz_constants.SYSTEM_OBJECT_MSG_START_TIME, current_tm, 0));
columns.push_back(msg_col);

GenDb::NewCol *stat_col(new GenDb::NewCol(
g_viz_constants.SYSTEM_OBJECT_STAT_START_TIME, UTCTimestampUsec(), 0));
g_viz_constants.SYSTEM_OBJECT_STAT_START_TIME, current_tm, 0));
columns.push_back(stat_col);

if (!dbif_->Db_AddColumnSync(col_list)) {
Expand Down
1 change: 1 addition & 0 deletions src/opserver/analytics_database.sandesh
Expand Up @@ -22,6 +22,7 @@ struct DatabasePurgeStats {
3: optional u64 rows_deleted
4: optional u64 duration
5: string purge_status
6: list<string> purge_status_details
}

struct DatabasePurgeInfo {
Expand Down
54 changes: 34 additions & 20 deletions src/opserver/analytics_db.py
Expand Up @@ -155,21 +155,26 @@ def get_analytics_db_purge_status(self, redis_list):

def db_purge(self, purge_cutoff, purge_id):
total_rows_deleted = 0 # total number of rows deleted
purge_error_details = []
if (self._pool == None):
self.connect_db()
if not self._pool:
self._logger.error('Connection to AnalyticsDb has Timed out')
return -1
purge_error_details.append('Connection to AnalyticsDb has Timed out')
return (-1, purge_error_details)
sysm = self._get_sysm()
if (sysm == None):
self._logger.error('Failed to connect SystemManager')
return -1
purge_error_details.append('Failed to connect SystemManager')
return (-1, purge_error_details)
try:
table_list = sysm.get_keyspace_column_families(COLLECTOR_KEYSPACE)
except Exception as e:
self._logger.error("Exception: Purge_id %s Failed to get "
"Analytics Column families %s" % (purge_id, e))
return -1
purge_error_details.append("Exception: Failed to get "
"Analytics Column families %s" % (e))
return (-1, purge_error_details)

# delete entries from message table
msg_table = COLLECTOR_GLOBAL_TABLE
Expand All @@ -180,7 +185,9 @@ def db_purge(self, purge_cutoff, purge_id):
except Exception as e:
self._logger.error("purge_id %s Failure in fetching "
"message table columnfamily %s" % e)
return -1
purge_error_details.append("Failure in fetching "
"message table columnfamily %s" % e)
return (-1, purge_error_details)

for table in table_list:
# purge from index tables
Expand All @@ -207,25 +214,33 @@ def db_purge(self, purge_cutoff, purge_id):
except Exception as e:
self._logger.error("purge_id %s Failure in fetching "
"the columnfamily %s" % e)
return -1
purge_error_details.append("Failure in fetching "
"the columnfamily %s" % e)
return (-1, purge_error_details)
b = cf.batch()
try:
# get all columns only in case of one message index table
if (table is MESSAGE_TABLE_SOURCE):
if table == MESSAGE_TABLE_SOURCE:
cols_to_fetch = 1000000
else:
cols_to_fetch = 1

for key, cols in cf.get_range(column_count=cols_to_fetch):
t2 = key[0]
# key is of type integer for MESSAGE_TABLE_TIMESTAMP.
# For other tables, key is a composite type with
# first element being timestamp (integer).
if table == MESSAGE_TABLE_TIMESTAMP:
t2 = key
else:
t2 = key[0]
# each row will have equivalent of 2^23 = 8388608 usecs
row_time = (float(t2)*pow(2, RowTimeInBits))
if (row_time < purge_time):
per_table_deleted +=1
total_rows_deleted +=1
if (table is MESSAGE_TABLE_SOURCE):
if table == MESSAGE_TABLE_SOURCE:
# get message table uuids to delete
del_msg_uuids.append(list(cols.values()))
del_msg_uuids.extend(cols.values())
try:
b.remove(key)
except Exception as e:
Expand All @@ -251,24 +266,23 @@ def db_purge(self, purge_cutoff, purge_id):
except Exception as e:
self._logger.error("Exception: Purge_id %s message table "
"doesnot have uuid %s" % (purge_id, e))


purge_error_details.append("Exception: Message table "
"doesnot have uuid %s" % (e))
except Exception as e:
self._logger.error("Exception: Purge_id:%s table:%s "
"error: %s" % (purge_id, table, e))
purge_error_details.append("Exception: Table:%s "
"error: %s" % (table, e))
continue
self._logger.info("Purge_id %s deleted %d rows from table: %s"
% (purge_id, per_table_deleted, table))
self._logger.warning("Purge_id %s deleted %d rows from "
"table: %s" % (purge_id, per_table_deleted, table))

self._logger.info("Purge_id %s deleted %d rows from table: %s"
self._logger.warning("Purge_id %s deleted %d rows from table: %s"
% (purge_id, msg_table_deleted, COLLECTOR_GLOBAL_TABLE))
# end deleting all relevant UUIDs from message table


self._logger.info("Purge_id %s total rows deleted: %s"
self._logger.warning("Purge_id %s total rows deleted: %s"
% (purge_id, total_rows_deleted))
return total_rows_deleted
# end purge_data
return (total_rows_deleted, purge_error_details)
# end db_purge

def get_dbusage_info(self, rest_api_port):
"""Collects database usage information from all db nodes
Expand Down
3 changes: 2 additions & 1 deletion src/opserver/opserver.py
Expand Up @@ -1782,7 +1782,8 @@ def db_purge_operation(self, purge_cutoff, purge_id):
self._analytics_db.number_of_purge_requests += 1
purge_info.number_of_purge_requests = \
self._analytics_db.number_of_purge_requests
total_rows_deleted = self._analytics_db.db_purge(purge_cutoff, purge_id)
total_rows_deleted, purge_stat.purge_status_details = \
self._analytics_db.db_purge(purge_cutoff, purge_id)
self._analytics_db.delete_db_purge_status()

if (total_rows_deleted > 0):
Expand Down
63 changes: 48 additions & 15 deletions src/opserver/test/utils/analytics_fixture.py
Expand Up @@ -21,6 +21,7 @@
from opserver_introspect_utils import VerificationOpsSrv
from collector_introspect_utils import VerificationCollector
from alarmgen_introspect_utils import VerificationAlarmGen
from generator_introspect_utils import VerificationGenerator
from opserver.sandesh.viz.constants import COLLECTOR_GLOBAL_TABLE, SOURCE, MODULE
from opserver.opserver_util import OpServerUtils
from sandesh_common.vns.constants import NodeTypeNames, ModuleNames
Expand Down Expand Up @@ -1923,7 +1924,6 @@ def verify_collector_object_log(self, start_time, end_time):
@retry(delay=1, tries=5)
def verify_collector_object_log_before_purge(self, start_time, end_time):
self.logger.info('verify_collector_object_log_before_purge')
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port);
res = self.verify_collector_object_log(start_time, end_time)
self.logger.info("collector object log before purging: %s" % res)
if not res:
Expand All @@ -1935,32 +1935,57 @@ def verify_database_purge_query(self, json_qstr):
self.logger.info('verify database purge query');
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port);
res = vns.post_purge_query_json(json_qstr)
assert(res == 'started')
return True
try:
assert(res['status'] == 'started')
purge_id = res['purge_id']
except KeyError:
assert(False)
else:
return purge_id
# end verify_database_purge_query

@retry(delay=2, tries=5)
def verify_collector_object_log_after_purge(self, start_time, end_time):
self.logger.info('verify_collector_object_log_after_purge')
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
res = self.verify_collector_object_log(start_time, end_time)
self.logger.info("collector object log after purging: %s" % res)
if res != []:
return False
return True
# end verify_collector_object_log_after_purge

@retry(delay=2, tries=5)
def verify_database_purge_status(self, purge_id):
self.logger.info('verify database purge status: purge_id [%s]' %
(purge_id))
try:
ops_introspect = VerificationGenerator('127.0.0.1',
self.opserver.http_port)
db_purge_uve = ops_introspect.get_uve('DatabasePurgeInfo')
db_purge_stats = db_purge_uve['stats'][0]
except Exception as e:
self.logger.error('Failed to get DatabasePurgeInfo UVE: %s' % (e))
return False
else:
self.logger.info(str(db_purge_stats))
if db_purge_stats['purge_id'] != purge_id or \
db_purge_stats['purge_status'] != 'success' or \
db_purge_stats['purge_status_details']:
return False
return True
# end verify_database_purge_status

def verify_database_purge_with_percentage_input(self):
self.logger.info('verify database purge query')
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
self.logger.info('verify database purge with percentage input')
end_time = UTCTimestampUsec()
start_time = end_time - 10*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
json_qstr = json.dumps({'purge_input': 100})
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_query
# end verify_database_purge_with_percentage_input

def verify_database_purge_support_utc_time_format(self):
self.logger.info('verify database purge support utc time format')
Expand All @@ -1969,7 +1994,8 @@ def verify_database_purge_support_utc_time_format(self):
end_time = OpServerUtils.convert_to_utc_timestamp_usec('now')
start_time = end_time - 20*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_support_utc_time_format
Expand All @@ -1982,7 +2008,8 @@ def verify_database_purge_support_datetime_format(self):
end_time = OpServerUtils.convert_to_utc_timestamp_usec(dt)
start_time = end_time - 30*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_support_datetime_format
Expand All @@ -1994,7 +2021,8 @@ def verify_database_purge_support_deltatime_format(self):
end_time = OpServerUtils.convert_to_utc_timestamp_usec('-1s')
start_time = end_time - 10*60*pow(10,6)
assert(self.verify_collector_object_log_before_purge(start_time, end_time))
assert(self.verify_database_purge_query(json_qstr))
purge_id = self.verify_database_purge_query(json_qstr)
assert(self.verify_database_purge_status(purge_id))
assert(self.verify_collector_object_log_after_purge(start_time, end_time))
return True
# end verify_database_purge_support_deltatime_format
Expand All @@ -2004,11 +2032,16 @@ def verify_database_purge_request_limit(self):
vns = VerificationOpsSrv('127.0.0.1', self.opserver_port)
json_qstr = json.dumps({'purge_input': 50})
res = vns.post_purge_query_json(json_qstr)
if (res == 'started'):
self.logger.info(str(res))
try:
assert(res['status'] == 'started')
purge_id = res['purge_id']
res1 = vns.post_purge_query_json(json_qstr)
if (res1 == 'running'):
return True
return False
assert(res1['status'] == 'running')
assert(res1['purge_id'] == purge_id)
except KeyError:
assert(False)
return True
# end verify_database_purge_request_limit

@retry(delay=1, tries=5)
Expand Down
1 change: 0 additions & 1 deletion src/opserver/test/utils/opserver_introspect_utils.py
Expand Up @@ -135,7 +135,6 @@ def post_purge_query_json(self, json_str, sync=True):
purge_request_url, json_str, sync)
if resp is not None:
res = json.loads(resp)
res = res['status']
except Exception as e:
print str(e)
finally:
Expand Down

0 comments on commit 2a333c8

Please sign in to comment.