-
Notifications
You must be signed in to change notification settings - Fork 390
/
libpartition.py
332 lines (284 loc) · 12.7 KB
/
libpartition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
from kazoo.client import KazooClient
from kazoo.exceptions import CancelledError
import gevent
from gevent import Greenlet
from consistent_hash import ConsistentHash
import logging
""" Partition Library
This library provides functionality to implement partition sharing between
cluster nodes
"""
class PartitionClient(object):
""" Client Class for the Partition Library
Example usage:
---------------------
import libpartition
from libpartition.libpartition import PartitionClient
def own_change_cb(l):
print "ownership change:" + str(l)
c = PartitionClient("test", "s1", ["s1", "s2", "s3"], 32,
own_change_cb, "zookeeper_s1")
##do some real work now"
if (c.own_partition(1)):
...... do something with partition #1 .....
.........
...
c.update_cluster_list(["s1", "s2"])
...
----------------------
You should not call any partition library routine from within the
callback function
Args:
app_name(str): Name of the app for which partition cluster is used
self_name(str): Name of the local cluster node (can be ip address)
cluster_list(list): List of all the nodes in the cluster including
local node
max_partition(int): Partition space always go from 0..max_partition-1
partition_update_cb: Callback function invoked when partition
ownership list is updated.x
zk_server(str): <zookeeper server>:<zookeeper server port>
"""
def __init__(
self, app_name, self_name, cluster_list, max_partition,
partition_update_cb, zk_server, logger = None):
# Initialize local variables
self._zk_server = zk_server
self._cluster_list = set(cluster_list)
self._max_partition = max_partition
self._update_cb = partition_update_cb
self._curr_part_ownership_list = []
self._target_part_ownership_list = []
self._con_hash = ConsistentHash(cluster_list)
self._name = self_name
# some sanity check
if not(self._name in cluster_list):
raise ValueError('cluster list is missing local server name')
# initialize logging and other stuff
if logger is None:
logging.basicConfig()
self._logger = logging
else:
self._logger = logger
self._conn_state = None
self._sandesh_connection_info_update(status='INIT', message='')
# connect to zookeeper
self._zk = KazooClient(zk_server)
while True:
try:
self._zk.start()
break
except gevent.event.Timeout as e:
# Update connection info
self._sandesh_connection_info_update(status='DOWN',
message=str(e))
gevent.sleep(1)
# Zookeeper is also throwing exception due to delay in master election
except Exception as e:
# Update connection info
self._sandesh_connection_info_update(status='DOWN',
message=str(e))
gevent.sleep(1)
# Update connection info
self._sandesh_connection_info_update(status='UP', message='')
# Done connecting to ZooKeeper
# create a lock array to contain locks for each partition
self._part_locks = []
for part in range(0, self._max_partition):
lockpath = "/lockpath/"+ app_name + "/" + str(part)
l = self._zk.Lock(lockpath, self._name)
self._part_locks.append(l)
# initialize partition # to lock acquire greenlet dictionary
self._part_lock_task_dict = {}
self._logger.error("initial servers:" + str(self._cluster_list))
# update target partition ownership list
for part in range(0, self._max_partition):
if (self._con_hash.get_node(str(part)) == self._name):
self._target_part_ownership_list.append(part)
# update current ownership list
self._acquire_partition_ownership()
#end __init__
def _sandesh_connection_info_update(self, status, message):
from pysandesh.connection_info import ConnectionState
from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \
ConnectionType
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
new_conn_state = getattr(ConnectionStatus, status)
ConnectionState.update(conn_type = ConnectionType.ZOOKEEPER,
name = 'Zookeeper', status = new_conn_state,
message = message,
server_addrs = self._zk_server.split(','))
if (self._conn_state and self._conn_state != ConnectionStatus.DOWN and
new_conn_state == ConnectionStatus.DOWN):
msg = 'Connection to Zookeeper down: %s' %(message)
self._logger.error(msg)
if (self._conn_state and self._conn_state != new_conn_state and
new_conn_state == ConnectionStatus.UP):
msg = 'Connection to Zookeeper ESTABLISHED'
self._logger.error(msg)
self._conn_state = new_conn_state
# end _sandesh_connection_info_update
# following routine is the greenlet task function to acquire the lock
# for a partition
def _acquire_lock(self, part):
# lock for the partition
l = self._part_locks[part]
# go in an infinite loop waiting to acquire the lock
try:
while True:
ret = l.acquire(blocking=False)
if ret == True:
self._logger.error("Acquired lock for:" + str(part))
self._curr_part_ownership_list.append(part)
self._update_cb(self._curr_part_ownership_list)
return True
else:
gevent.sleep(1)
except CancelledError:
self._logger.error("Lock acquire cancelled for:" + str(part))
return False
except Exception as ex:
# TODO: If we have a non-KazooException, the lock object
# may get stuck in the "cancelled" state
self._logger.error("Lock acquire unexpected error!: " + str(ex))
# This exception should get propogated to main thread
raise SystemExit
return False
#end _acquire_lock
# get rid of finished spawned tasks from datastructures
def _cleanup_greenlets(self):
for part in self._part_lock_task_dict.keys():
if (self._part_lock_task_dict[part].ready()):
del self._part_lock_task_dict[part]
#end _cleanup_greenlets
# following routine launches tasks to acquire partition locks
def _acquire_partition_ownership(self):
# cleanup any finished greenlets
self._cleanup_greenlets()
# this variable will help us decide if we need to call callback
updated_curr_ownership = False
# list of partitions for which locks have to be released
release_lock_list = []
self._logger.info("known servers: %s" % self._con_hash.get_all_nodes())
for part in range(0, self._max_partition):
if (part in self._target_part_ownership_list):
if (part in self._curr_part_ownership_list):
# do nothing, I already have ownership of this partition
self._logger.info("No need to acquire ownership of:" +
str(part))
else:
# I need to acquire lock for this partition before I own
if (part in self._part_lock_task_dict.keys()):
try:
self._part_lock_task_dict[part].get(block=False)
except:
# do nothing there is already a greenlet running to
# acquire the lock
self._logger.error("Already a greenlet running to"
" acquire:" + str(part))
continue
# Greenlet died without getting ownership. Cleanup
self._logger.error("Cleanup stale greenlet running to"
" acquire:" + str(part))
del self._part_lock_task_dict[part]
self._logger.error("Starting greenlet running to"
" acquire:" + str(part))
# launch the greenlet to acquire the loc, k
g = Greenlet.spawn(self._acquire_lock, part)
self._part_lock_task_dict[part] = g
else:
# give up ownership of the partition
# cancel any lock acquisition which is ongoing
if (part in self._part_lock_task_dict.keys()):
try:
self._part_lock_task_dict[part].get(block=False)
except:
self._logger.error("canceling lock acquisition going on \
for:" + str(part))
# Cancelling the lock should result in killing the gevent
self._part_locks[part].cancel()
self._part_lock_task_dict[part].get(block=True)
del self._part_lock_task_dict[part]
if (part in self._curr_part_ownership_list):
release_lock_list.append(part)
self._curr_part_ownership_list.remove(part)
updated_curr_ownership = True
self._logger.error("giving up ownership of:" + str(part))
if (updated_curr_ownership is True):
# current partition membership was updated call the callback
self._update_cb(self._curr_part_ownership_list)
if (len(release_lock_list) != 0):
# release locks which were acquired
for part in release_lock_list:
self._logger.error("release the lock which was acquired:" + \
str(part))
try:
self._part_locks[part].release()
self._logger.error("fully gave up ownership of:" + str(part))
except:
pass
#end _acquire_partition_ownership
def update_cluster_list(self, cluster_list):
""" Updates the cluster node list
Args:
cluster_list(list): New list of names of the nodes in
the cluster
Returns:
None
"""
# some sanity check
if not(self._name in cluster_list):
raise ValueError('cluster list is missing local server name')
new_cluster_list = set(cluster_list)
new_servers = list(new_cluster_list.difference(
self._cluster_list))
deleted_servers = list(set(self._cluster_list).difference(
new_cluster_list))
self._cluster_list = set(cluster_list)
# update the hash structure
if new_servers:
self._logger.error("new servers:" + str(new_servers))
self._con_hash.add_nodes(new_servers)
if deleted_servers:
self._logger.error("deleted servers:" + str(deleted_servers))
self._con_hash.del_nodes(deleted_servers)
# update target partition ownership list
self._target_part_ownership_list = []
for part in range(0, self._max_partition):
if (self._con_hash.get_node(str(part)) == self._name):
if not (part in self._target_part_ownership_list):
self._target_part_ownership_list.append(part)
# update current ownership list
self._acquire_partition_ownership()
#end update_cluster_list
def own_partition(self, part_no):
""" Returns ownership information of a partition
Args:
part_no(int) : Partition no
Returns:
True if partition is owned by the local node
False if partition is not owned by the local node
"""
return part_no in self._curr_part_ownership_list
#end own_partition
def close(self):
""" Closes any connections and frees up any data structures
Args:
Returns:
None
"""
# clean up greenlets
for part in self._part_lock_task_dict.keys():
try:
self._part_lock_task_dict[part].kill()
except:
pass
# close zookeeper
try:
self._zk.stop()
except:
pass
try:
self._zk.close()
except:
pass
#end close