Skip to content

Commit

Permalink
Adding instrumentation to libpartition
Browse files Browse the repository at this point in the history
Instead of killing the lock acquisition gevent directly,
libpartition now relies on "cancel" to make the gevent finish.

Also, the lock acqusition gevent now relies on catching the
Cancellation exception to discover cancellation, instead of directly
examining the lock structure
This prevents situation where locks are stuck in the cancelled state

Change-Id: I31f381c85f22b43f63cf2e3b6d68e4cc894a12af
Partial-Bug: 1428271
Partial-Bug: 1519231
  • Loading branch information
anishmehta committed Dec 5, 2015
1 parent 4a6bdc5 commit 79d47fd
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 48 deletions.
114 changes: 67 additions & 47 deletions src/libpartition/libpartition.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from kazoo.client import KazooClient
from kazoo.exceptions import CancelledError
import gevent
from gevent import Greenlet
from consistent_hash import ConsistentHash
Expand Down Expand Up @@ -45,7 +46,7 @@ def own_change_cb(l):
"""
def __init__(
self, app_name, self_name, cluster_list, max_partition,
partition_update_cb, zk_server):
partition_update_cb, zk_server, logger = None):

# Initialize local variables
self._zk_server = zk_server
Expand All @@ -62,7 +63,11 @@ def __init__(
raise ValueError('cluster list is missing local server name')

# initialize logging and other stuff
logging.basicConfig()
if logger is None:
logging.basicConfig()
self._logger = logging
else:
self._logger = logger
self._conn_state = None
self._sandesh_connection_info_update(status='INIT', message='')

Expand Down Expand Up @@ -96,6 +101,8 @@ def __init__(

# initialize partition # to lock acquire greenlet dictionary
self._part_lock_task_dict = {}

self._logger.error("initial servers:" + str(self._cluster_list))

# update target partition ownership list
for part in range(0, self._max_partition):
Expand All @@ -122,11 +129,11 @@ def _sandesh_connection_info_update(self, status, message):
if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and
new_conn_state == ConnectionStatus.DOWN):
msg = 'Connection to Zookeeper down: %s' %(message)
logging.error(msg)
self._logger.error(msg)
if (self._conn_state and self._conn_state != new_conn_state and
new_conn_state == ConnectionStatus.UP):
msg = 'Connection to Zookeeper ESTABLISHED'
logging.info(msg)
self._logger.error(msg)

self._conn_state = new_conn_state
# end _sandesh_connection_info_update
Expand All @@ -138,26 +145,26 @@ def _acquire_lock(self, part):
# lock for the partition
l = self._part_locks[part]

while True:
if (l.cancelled == True):
# a lock acquisition is getting cancelled let's wait
logging.info("lock acquisition is getting cancelled, \
lets wait")
gevent.sleep(1)
else:
break

# go in an infinite loop waiting to acquire the lock
while True:
ret = l.acquire(blocking=False)
if ret == True:

logging.info("Acquired lock for:" + str(part))
self._curr_part_ownership_list.append(part)
self._update_cb(self._curr_part_ownership_list)
return ret
else:
gevent.sleep(1)
try:
while True:
ret = l.acquire(blocking=False)
if ret == True:
self._logger.error("Acquired lock for:" + str(part))
self._curr_part_ownership_list.append(part)
self._update_cb(self._curr_part_ownership_list)
return True
else:
gevent.sleep(1)
except CancelledError:
self._logger.error("Lock acquire cancelled for:" + str(part))
return False
except Exception as ex:
# TODO: If we have a non-KazooException, the lock object
# may get stuck in the "cancelled" state
self._logger.error("Lock acquire unexpected error!: " + str(ex))
assert()
return False
#end _acquire_lock

# get rid of finished spawned tasks from datastructures
Expand All @@ -178,46 +185,59 @@ def _acquire_partition_ownership(self):
# list of partitions for which locks have to be released
release_lock_list = []

self._logger.error("known servers: %s" % self._con_hash.get_all_nodes())

for part in range(0, self._max_partition):
if (part in self._target_part_ownership_list):
if (part in self._curr_part_ownership_list):
# do nothing, I already have ownership of this partition
logging.info("No need to acquire ownership of:" +
self._logger.error("No need to acquire ownership of:" +
str(part))
else:
# I need to acquire lock for this partition before I own
if (part in self._part_lock_task_dict.keys()):
# do nothing there is already a greenlet running to
# acquire the lock
logging.info("Already a greenlet running to"
try:
self._part_lock_task_dict[part].get(block=False)
except:
# do nothing there is already a greenlet running to
# acquire the lock
self._logger.error("Already a greenlet running to"
" acquire:" + str(part))
continue

# Greenlet died without getting ownership. Cleanup
self._logger.error("Cleanup stale greenlet running to"
" acquire:" + str(part))
else:
# launch the greenlet to acquire the loc, k
g = Greenlet.spawn(self._acquire_lock, part)
self._part_lock_task_dict[part] = g
del self._part_lock_task_dict[part]

self._logger.error("Starting greenlet running to"
" acquire:" + str(part))
# launch the greenlet to acquire the loc, k
g = Greenlet.spawn(self._acquire_lock, part)
self._part_lock_task_dict[part] = g

else:
# give up ownership of the partition

# cancel any lock acquisition which is ongoing
if (part in self._part_lock_task_dict.keys()):
# kill the greenlet trying to get the lock for this
# partition
self._part_lock_task_dict[part].kill()
del self._part_lock_task_dict[part]

logging.info("canceling lock acquisition going on \
for:" + str(part))
try:
self._part_locks[part].cancel()
self._part_lock_task_dict[part].get(block=False)
except:
pass


self._logger.error("canceling lock acquisition going on \
for:" + str(part))
# Cancelling the lock should result in killing the gevent
self._part_locks[part].cancel()
self._part_lock_task_dict[part].get(block=True)

del self._part_lock_task_dict[part]

if (part in self._curr_part_ownership_list):
release_lock_list.append(part)
self._curr_part_ownership_list.remove(part)
updated_curr_ownership = True
logging.info("giving up ownership of:" + str(part))
self._logger.error("giving up ownership of:" + str(part))

if (updated_curr_ownership is True):
# current partition membership was updated call the callback
Expand All @@ -226,11 +246,11 @@ def _acquire_partition_ownership(self):
if (len(release_lock_list) != 0):
# release locks which were acquired
for part in release_lock_list:
logging.info("release the lock which was acquired:" + \
self._logger.error("release the lock which was acquired:" + \
str(part))
try:
self._part_locks[part].release()
logging.info("fully gave up ownership of:" + str(part))
self._logger.error("fully gave up ownership of:" + str(part))
except:
pass
#end _acquire_partition_ownership
Expand All @@ -252,9 +272,9 @@ def update_cluster_list(self, cluster_list):
self._cluster_list))
deleted_servers = list(set(self._cluster_list).difference(
new_cluster_list))
self._cluster_list = cluster_list
logging.info("deleted servers:" + str(deleted_servers))
logging.info("new servers:" + str(new_servers))
self._cluster_list = set(cluster_list)
self._logger.error("deleted servers:" + str(deleted_servers))
self._logger.error("new servers:" + str(new_servers))

# update the hash structure
if new_servers:
Expand Down
2 changes: 1 addition & 1 deletion src/opserver/alarmgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def start_libpart(self, ag_list):
pc = PartitionClient("alarmgen",
self._libpart_name, ag_list,
self._conf.partitions(), self.libpart_cb,
','.join(self._conf.zk_list()))
','.join(self._conf.zk_list()), self._logger)
self._logger.error('Started PC')
return pc
except Exception as e:
Expand Down

0 comments on commit 79d47fd

Please sign in to comment.