diff --git a/src/libpartition/libpartition.py b/src/libpartition/libpartition.py index 4246863a038..e4159af1b47 100644 --- a/src/libpartition/libpartition.py +++ b/src/libpartition/libpartition.py @@ -1,4 +1,5 @@ from kazoo.client import KazooClient +from kazoo.exceptions import CancelledError import gevent from gevent import Greenlet from consistent_hash import ConsistentHash @@ -45,7 +46,7 @@ def own_change_cb(l): """ def __init__( self, app_name, self_name, cluster_list, max_partition, - partition_update_cb, zk_server): + partition_update_cb, zk_server, logger = None): # Initialize local variables self._zk_server = zk_server @@ -62,7 +63,11 @@ def __init__( raise ValueError('cluster list is missing local server name') # initialize logging and other stuff - logging.basicConfig() + if logger is None: + logging.basicConfig() + self._logger = logging + else: + self._logger = logger self._conn_state = None self._sandesh_connection_info_update(status='INIT', message='') @@ -96,6 +101,8 @@ def __init__( # initialize partition # to lock acquire greenlet dictionary self._part_lock_task_dict = {} + + self._logger.error("initial servers:" + str(self._cluster_list)) # update target partition ownership list for part in range(0, self._max_partition): @@ -122,11 +129,11 @@ def _sandesh_connection_info_update(self, status, message): if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and new_conn_state == ConnectionStatus.DOWN): msg = 'Connection to Zookeeper down: %s' %(message) - logging.error(msg) + self._logger.error(msg) if (self._conn_state and self._conn_state != new_conn_state and new_conn_state == ConnectionStatus.UP): msg = 'Connection to Zookeeper ESTABLISHED' - logging.info(msg) + self._logger.error(msg) self._conn_state = new_conn_state # end _sandesh_connection_info_update @@ -138,26 +145,26 @@ def _acquire_lock(self, part): # lock for the partition l = self._part_locks[part] - while True: - if (l.cancelled == True): - # a lock acquisition is getting cancelled let's wait - logging.info("lock acquisition is getting cancelled, \ - lets wait") - gevent.sleep(1) - else: - break - # go in an infinite loop waiting to acquire the lock - while True: - ret = l.acquire(blocking=False) - if ret == True: - - logging.info("Acquired lock for:" + str(part)) - self._curr_part_ownership_list.append(part) - self._update_cb(self._curr_part_ownership_list) - return ret - else: - gevent.sleep(1) + try: + while True: + ret = l.acquire(blocking=False) + if ret == True: + self._logger.error("Acquired lock for:" + str(part)) + self._curr_part_ownership_list.append(part) + self._update_cb(self._curr_part_ownership_list) + return True + else: + gevent.sleep(1) + except CancelledError: + self._logger.error("Lock acquire cancelled for:" + str(part)) + return False + except Exception as ex: + # TODO: If we have a non-KazooException, the lock object + # may get stuck in the "cancelled" state + self._logger.error("Lock acquire unexpected error!: " + str(ex)) + assert() + return False #end _acquire_lock # get rid of finished spawned tasks from datastructures @@ -178,46 +185,59 @@ def _acquire_partition_ownership(self): # list of partitions for which locks have to be released release_lock_list = [] + self._logger.error("known servers: %s" % self._con_hash.get_all_nodes()) + for part in range(0, self._max_partition): if (part in self._target_part_ownership_list): if (part in self._curr_part_ownership_list): # do nothing, I already have ownership of this partition - logging.info("No need to acquire ownership of:" + + self._logger.error("No need to acquire ownership of:" + str(part)) else: # I need to acquire lock for this partition before I own if (part in self._part_lock_task_dict.keys()): - # do nothing there is already a greenlet running to - # acquire the lock - logging.info("Already a greenlet running to" + try: + self._part_lock_task_dict[part].get(block=False) + except: + # do nothing there is already a greenlet running to + # acquire the lock + self._logger.error("Already a greenlet running to" + " acquire:" + str(part)) + continue + + # Greenlet died without getting ownership. Cleanup + self._logger.error("Cleanup stale greenlet running to" " acquire:" + str(part)) - else: - # launch the greenlet to acquire the loc, k - g = Greenlet.spawn(self._acquire_lock, part) - self._part_lock_task_dict[part] = g + del self._part_lock_task_dict[part] + + self._logger.error("Starting greenlet running to" + " acquire:" + str(part)) + # launch the greenlet to acquire the loc, k + g = Greenlet.spawn(self._acquire_lock, part) + self._part_lock_task_dict[part] = g else: # give up ownership of the partition # cancel any lock acquisition which is ongoing if (part in self._part_lock_task_dict.keys()): - # kill the greenlet trying to get the lock for this - # partition - self._part_lock_task_dict[part].kill() - del self._part_lock_task_dict[part] - - logging.info("canceling lock acquisition going on \ - for:" + str(part)) try: - self._part_locks[part].cancel() + self._part_lock_task_dict[part].get(block=False) except: - pass - + + self._logger.error("canceling lock acquisition going on \ + for:" + str(part)) + # Cancelling the lock should result in killing the gevent + self._part_locks[part].cancel() + self._part_lock_task_dict[part].get(block=True) + + del self._part_lock_task_dict[part] + if (part in self._curr_part_ownership_list): release_lock_list.append(part) self._curr_part_ownership_list.remove(part) updated_curr_ownership = True - logging.info("giving up ownership of:" + str(part)) + self._logger.error("giving up ownership of:" + str(part)) if (updated_curr_ownership is True): # current partition membership was updated call the callback @@ -226,11 +246,11 @@ def _acquire_partition_ownership(self): if (len(release_lock_list) != 0): # release locks which were acquired for part in release_lock_list: - logging.info("release the lock which was acquired:" + \ + self._logger.error("release the lock which was acquired:" + \ str(part)) try: self._part_locks[part].release() - logging.info("fully gave up ownership of:" + str(part)) + self._logger.error("fully gave up ownership of:" + str(part)) except: pass #end _acquire_partition_ownership @@ -252,9 +272,9 @@ def update_cluster_list(self, cluster_list): self._cluster_list)) deleted_servers = list(set(self._cluster_list).difference( new_cluster_list)) - self._cluster_list = cluster_list - logging.info("deleted servers:" + str(deleted_servers)) - logging.info("new servers:" + str(new_servers)) + self._cluster_list = set(cluster_list) + self._logger.error("deleted servers:" + str(deleted_servers)) + self._logger.error("new servers:" + str(new_servers)) # update the hash structure if new_servers: diff --git a/src/opserver/alarmgen.py b/src/opserver/alarmgen.py index 9a7ed81026a..85169f8e83d 100644 --- a/src/opserver/alarmgen.py +++ b/src/opserver/alarmgen.py @@ -394,7 +394,7 @@ def start_libpart(self, ag_list): pc = PartitionClient("alarmgen", self._libpart_name, ag_list, self._conf.partitions(), self.libpart_cb, - ','.join(self._conf.zk_list())) + ','.join(self._conf.zk_list()), self._logger) self._logger.error('Started PC') return pc except Exception as e: