Skip to content

Commit

Permalink
Merge "Adding instrumentation to libpartition"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Dec 7, 2015
2 parents 161462c + 79d47fd commit 2295294
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 48 deletions.
114 changes: 67 additions & 47 deletions src/libpartition/libpartition.py
@@ -1,4 +1,5 @@
from kazoo.client import KazooClient
from kazoo.exceptions import CancelledError
import gevent
from gevent import Greenlet
from consistent_hash import ConsistentHash
Expand Down Expand Up @@ -45,7 +46,7 @@ def own_change_cb(l):
"""
def __init__(
self, app_name, self_name, cluster_list, max_partition,
partition_update_cb, zk_server):
partition_update_cb, zk_server, logger = None):

# Initialize local variables
self._zk_server = zk_server
Expand All @@ -62,7 +63,11 @@ def __init__(
raise ValueError('cluster list is missing local server name')

# initialize logging and other stuff
logging.basicConfig()
if logger is None:
logging.basicConfig()
self._logger = logging
else:
self._logger = logger
self._conn_state = None
self._sandesh_connection_info_update(status='INIT', message='')

Expand Down Expand Up @@ -96,6 +101,8 @@ def __init__(

# initialize partition # to lock acquire greenlet dictionary
self._part_lock_task_dict = {}

self._logger.error("initial servers:" + str(self._cluster_list))

# update target partition ownership list
for part in range(0, self._max_partition):
Expand All @@ -122,11 +129,11 @@ def _sandesh_connection_info_update(self, status, message):
if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and
new_conn_state == ConnectionStatus.DOWN):
msg = 'Connection to Zookeeper down: %s' %(message)
logging.error(msg)
self._logger.error(msg)
if (self._conn_state and self._conn_state != new_conn_state and
new_conn_state == ConnectionStatus.UP):
msg = 'Connection to Zookeeper ESTABLISHED'
logging.info(msg)
self._logger.error(msg)

self._conn_state = new_conn_state
# end _sandesh_connection_info_update
Expand All @@ -138,26 +145,26 @@ def _acquire_lock(self, part):
# lock for the partition
l = self._part_locks[part]

while True:
if (l.cancelled == True):
# a lock acquisition is getting cancelled let's wait
logging.info("lock acquisition is getting cancelled, \
lets wait")
gevent.sleep(1)
else:
break

# go in an infinite loop waiting to acquire the lock
while True:
ret = l.acquire(blocking=False)
if ret == True:

logging.info("Acquired lock for:" + str(part))
self._curr_part_ownership_list.append(part)
self._update_cb(self._curr_part_ownership_list)
return ret
else:
gevent.sleep(1)
try:
while True:
ret = l.acquire(blocking=False)
if ret == True:
self._logger.error("Acquired lock for:" + str(part))
self._curr_part_ownership_list.append(part)
self._update_cb(self._curr_part_ownership_list)
return True
else:
gevent.sleep(1)
except CancelledError:
self._logger.error("Lock acquire cancelled for:" + str(part))
return False
except Exception as ex:
# TODO: If we have a non-KazooException, the lock object
# may get stuck in the "cancelled" state
self._logger.error("Lock acquire unexpected error!: " + str(ex))
assert()
return False
#end _acquire_lock

# get rid of finished spawned tasks from datastructures
Expand All @@ -178,46 +185,59 @@ def _acquire_partition_ownership(self):
# list of partitions for which locks have to be released
release_lock_list = []

self._logger.error("known servers: %s" % self._con_hash.get_all_nodes())

for part in range(0, self._max_partition):
if (part in self._target_part_ownership_list):
if (part in self._curr_part_ownership_list):
# do nothing, I already have ownership of this partition
logging.info("No need to acquire ownership of:" +
self._logger.error("No need to acquire ownership of:" +
str(part))
else:
# I need to acquire lock for this partition before I own
if (part in self._part_lock_task_dict.keys()):
# do nothing there is already a greenlet running to
# acquire the lock
logging.info("Already a greenlet running to"
try:
self._part_lock_task_dict[part].get(block=False)
except:
# do nothing there is already a greenlet running to
# acquire the lock
self._logger.error("Already a greenlet running to"
" acquire:" + str(part))
continue

# Greenlet died without getting ownership. Cleanup
self._logger.error("Cleanup stale greenlet running to"
" acquire:" + str(part))
else:
# launch the greenlet to acquire the loc, k
g = Greenlet.spawn(self._acquire_lock, part)
self._part_lock_task_dict[part] = g
del self._part_lock_task_dict[part]

self._logger.error("Starting greenlet running to"
" acquire:" + str(part))
# launch the greenlet to acquire the loc, k
g = Greenlet.spawn(self._acquire_lock, part)
self._part_lock_task_dict[part] = g

else:
# give up ownership of the partition

# cancel any lock acquisition which is ongoing
if (part in self._part_lock_task_dict.keys()):
# kill the greenlet trying to get the lock for this
# partition
self._part_lock_task_dict[part].kill()
del self._part_lock_task_dict[part]

logging.info("canceling lock acquisition going on \
for:" + str(part))
try:
self._part_locks[part].cancel()
self._part_lock_task_dict[part].get(block=False)
except:
pass


self._logger.error("canceling lock acquisition going on \
for:" + str(part))
# Cancelling the lock should result in killing the gevent
self._part_locks[part].cancel()
self._part_lock_task_dict[part].get(block=True)

del self._part_lock_task_dict[part]

if (part in self._curr_part_ownership_list):
release_lock_list.append(part)
self._curr_part_ownership_list.remove(part)
updated_curr_ownership = True
logging.info("giving up ownership of:" + str(part))
self._logger.error("giving up ownership of:" + str(part))

if (updated_curr_ownership is True):
# current partition membership was updated call the callback
Expand All @@ -226,11 +246,11 @@ def _acquire_partition_ownership(self):
if (len(release_lock_list) != 0):
# release locks which were acquired
for part in release_lock_list:
logging.info("release the lock which was acquired:" + \
self._logger.error("release the lock which was acquired:" + \
str(part))
try:
self._part_locks[part].release()
logging.info("fully gave up ownership of:" + str(part))
self._logger.error("fully gave up ownership of:" + str(part))
except:
pass
#end _acquire_partition_ownership
Expand All @@ -252,9 +272,9 @@ def update_cluster_list(self, cluster_list):
self._cluster_list))
deleted_servers = list(set(self._cluster_list).difference(
new_cluster_list))
self._cluster_list = cluster_list
logging.info("deleted servers:" + str(deleted_servers))
logging.info("new servers:" + str(new_servers))
self._cluster_list = set(cluster_list)
self._logger.error("deleted servers:" + str(deleted_servers))
self._logger.error("new servers:" + str(new_servers))

# update the hash structure
if new_servers:
Expand Down
2 changes: 1 addition & 1 deletion src/opserver/alarmgen.py
Expand Up @@ -394,7 +394,7 @@ def start_libpart(self, ag_list):
pc = PartitionClient("alarmgen",
self._libpart_name, ag_list,
self._conf.partitions(), self.libpart_cb,
','.join(self._conf.zk_list()))
','.join(self._conf.zk_list()), self._logger)
self._logger.error('Started PC')
return pc
except Exception as e:
Expand Down

0 comments on commit 2295294

Please sign in to comment.