From 79d47fdd3170c6fed95a7d268733d91521bcf2e5 Mon Sep 17 00:00:00 2001 From: Anish Mehta Date: Thu, 3 Dec 2015 23:44:20 -0800 Subject: [PATCH] Adding instrumentation to libpartition Instead of killing the lock acquisition gevent directly, libpartition now relies on "cancel" to make the gevent finish. Also, the lock acqusition gevent now relies on catching the Cancellation exception to discover cancellation, instead of directly examining the lock structure This prevents situation where locks are stuck in the cancelled state Change-Id: I31f381c85f22b43f63cf2e3b6d68e4cc894a12af Partial-Bug: 1428271 Partial-Bug: 1519231 --- src/libpartition/libpartition.py | 114 ++++++++++++++++++------------- src/opserver/alarmgen.py | 2 +- 2 files changed, 68 insertions(+), 48 deletions(-) diff --git a/src/libpartition/libpartition.py b/src/libpartition/libpartition.py index 4246863a038..e4159af1b47 100644 --- a/src/libpartition/libpartition.py +++ b/src/libpartition/libpartition.py @@ -1,4 +1,5 @@ from kazoo.client import KazooClient +from kazoo.exceptions import CancelledError import gevent from gevent import Greenlet from consistent_hash import ConsistentHash @@ -45,7 +46,7 @@ def own_change_cb(l): """ def __init__( self, app_name, self_name, cluster_list, max_partition, - partition_update_cb, zk_server): + partition_update_cb, zk_server, logger = None): # Initialize local variables self._zk_server = zk_server @@ -62,7 +63,11 @@ def __init__( raise ValueError('cluster list is missing local server name') # initialize logging and other stuff - logging.basicConfig() + if logger is None: + logging.basicConfig() + self._logger = logging + else: + self._logger = logger self._conn_state = None self._sandesh_connection_info_update(status='INIT', message='') @@ -96,6 +101,8 @@ def __init__( # initialize partition # to lock acquire greenlet dictionary self._part_lock_task_dict = {} + + self._logger.error("initial servers:" + str(self._cluster_list)) # update target partition ownership list for part in range(0, self._max_partition): @@ -122,11 +129,11 @@ def _sandesh_connection_info_update(self, status, message): if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and new_conn_state == ConnectionStatus.DOWN): msg = 'Connection to Zookeeper down: %s' %(message) - logging.error(msg) + self._logger.error(msg) if (self._conn_state and self._conn_state != new_conn_state and new_conn_state == ConnectionStatus.UP): msg = 'Connection to Zookeeper ESTABLISHED' - logging.info(msg) + self._logger.error(msg) self._conn_state = new_conn_state # end _sandesh_connection_info_update @@ -138,26 +145,26 @@ def _acquire_lock(self, part): # lock for the partition l = self._part_locks[part] - while True: - if (l.cancelled == True): - # a lock acquisition is getting cancelled let's wait - logging.info("lock acquisition is getting cancelled, \ - lets wait") - gevent.sleep(1) - else: - break - # go in an infinite loop waiting to acquire the lock - while True: - ret = l.acquire(blocking=False) - if ret == True: - - logging.info("Acquired lock for:" + str(part)) - self._curr_part_ownership_list.append(part) - self._update_cb(self._curr_part_ownership_list) - return ret - else: - gevent.sleep(1) + try: + while True: + ret = l.acquire(blocking=False) + if ret == True: + self._logger.error("Acquired lock for:" + str(part)) + self._curr_part_ownership_list.append(part) + self._update_cb(self._curr_part_ownership_list) + return True + else: + gevent.sleep(1) + except CancelledError: + self._logger.error("Lock acquire cancelled for:" + str(part)) + return False + except Exception as ex: + # TODO: If we have a non-KazooException, the lock object + # may get stuck in the "cancelled" state + self._logger.error("Lock acquire unexpected error!: " + str(ex)) + assert() + return False #end _acquire_lock # get rid of finished spawned tasks from datastructures @@ -178,46 +185,59 @@ def _acquire_partition_ownership(self): # list of partitions for which locks have to be released release_lock_list = [] + self._logger.error("known servers: %s" % self._con_hash.get_all_nodes()) + for part in range(0, self._max_partition): if (part in self._target_part_ownership_list): if (part in self._curr_part_ownership_list): # do nothing, I already have ownership of this partition - logging.info("No need to acquire ownership of:" + + self._logger.error("No need to acquire ownership of:" + str(part)) else: # I need to acquire lock for this partition before I own if (part in self._part_lock_task_dict.keys()): - # do nothing there is already a greenlet running to - # acquire the lock - logging.info("Already a greenlet running to" + try: + self._part_lock_task_dict[part].get(block=False) + except: + # do nothing there is already a greenlet running to + # acquire the lock + self._logger.error("Already a greenlet running to" + " acquire:" + str(part)) + continue + + # Greenlet died without getting ownership. Cleanup + self._logger.error("Cleanup stale greenlet running to" " acquire:" + str(part)) - else: - # launch the greenlet to acquire the loc, k - g = Greenlet.spawn(self._acquire_lock, part) - self._part_lock_task_dict[part] = g + del self._part_lock_task_dict[part] + + self._logger.error("Starting greenlet running to" + " acquire:" + str(part)) + # launch the greenlet to acquire the loc, k + g = Greenlet.spawn(self._acquire_lock, part) + self._part_lock_task_dict[part] = g else: # give up ownership of the partition # cancel any lock acquisition which is ongoing if (part in self._part_lock_task_dict.keys()): - # kill the greenlet trying to get the lock for this - # partition - self._part_lock_task_dict[part].kill() - del self._part_lock_task_dict[part] - - logging.info("canceling lock acquisition going on \ - for:" + str(part)) try: - self._part_locks[part].cancel() + self._part_lock_task_dict[part].get(block=False) except: - pass - + + self._logger.error("canceling lock acquisition going on \ + for:" + str(part)) + # Cancelling the lock should result in killing the gevent + self._part_locks[part].cancel() + self._part_lock_task_dict[part].get(block=True) + + del self._part_lock_task_dict[part] + if (part in self._curr_part_ownership_list): release_lock_list.append(part) self._curr_part_ownership_list.remove(part) updated_curr_ownership = True - logging.info("giving up ownership of:" + str(part)) + self._logger.error("giving up ownership of:" + str(part)) if (updated_curr_ownership is True): # current partition membership was updated call the callback @@ -226,11 +246,11 @@ def _acquire_partition_ownership(self): if (len(release_lock_list) != 0): # release locks which were acquired for part in release_lock_list: - logging.info("release the lock which was acquired:" + \ + self._logger.error("release the lock which was acquired:" + \ str(part)) try: self._part_locks[part].release() - logging.info("fully gave up ownership of:" + str(part)) + self._logger.error("fully gave up ownership of:" + str(part)) except: pass #end _acquire_partition_ownership @@ -252,9 +272,9 @@ def update_cluster_list(self, cluster_list): self._cluster_list)) deleted_servers = list(set(self._cluster_list).difference( new_cluster_list)) - self._cluster_list = cluster_list - logging.info("deleted servers:" + str(deleted_servers)) - logging.info("new servers:" + str(new_servers)) + self._cluster_list = set(cluster_list) + self._logger.error("deleted servers:" + str(deleted_servers)) + self._logger.error("new servers:" + str(new_servers)) # update the hash structure if new_servers: diff --git a/src/opserver/alarmgen.py b/src/opserver/alarmgen.py index 9a7ed81026a..85169f8e83d 100644 --- a/src/opserver/alarmgen.py +++ b/src/opserver/alarmgen.py @@ -394,7 +394,7 @@ def start_libpart(self, ag_list): pc = PartitionClient("alarmgen", self._libpart_name, ag_list, self._conf.partitions(), self.libpart_cb, - ','.join(self._conf.zk_list())) + ','.join(self._conf.zk_list()), self._logger) self._logger.error('Started PC') return pc except Exception as e: