-
Notifications
You must be signed in to change notification settings - Fork 42
/
sandesh_state_machine.py
474 lines (421 loc) · 19.9 KB
/
sandesh_state_machine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
#
# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
#
#
# Sandesh State Machine
#
import gevent
from fysom import Fysom
from work_queue import WorkQueue
from sandesh_logger import SandeshLogger
from sandesh_session import SandeshSession
class State(object):
# FSM states
_IDLE = 'Idle'
_DISCONNECT = 'Disconnect'
_CONNECT = 'Connect'
_CONNECT_TO_BACKUP = 'ConnectToBackup'
_CLIENT_INIT = 'ClientInit'
_ESTABLISHED = 'Established'
#end class State
class Event(object):
# FSM events
_EV_START = 'EvStart'
_EV_STOP = 'EvStop'
_EV_IDLE_HOLD_TIMER_EXPIRED = 'EvIdleHoldTimerExpired'
_EV_CONNECT_TIMER_EXPIRED = 'EvConnectTimerExpired'
_EV_COLLECTOR_UNKNOWN = 'EvCollectorUnknown'
_EV_BACKUP_COLLECTOR_UNKNOWN = 'EvBackupCollectorUnknown'
_EV_TCP_CONNECTED = 'EvTcpConnected'
_EV_TCP_CONNECT_FAIL = 'EvTcpConnectFail'
_EV_TCP_CLOSE = 'EvTcpClose'
_EV_COLLECTOR_CHANGE = 'EvCollectorChange'
_EV_SANDESH_CTRL_MESSAGE_RECV = 'EvSandeshCtrlMessageRecv'
_EV_SANDESH_UVE_SEND = 'EvSandeshUVESend'
def __init__(self, event, session=None, msg=None, source=None,
primary_collector=None, secondary_collector=None):
self.event = event
self.session = session
self.msg = msg
self.source = source
self.primary_collector = primary_collector
self.secondary_collector = secondary_collector
#end __init__
#end class Event
class SandeshStateMachine(object):
_IDLE_HOLD_TIME = 5 # in seconds
_CONNECT_TIME = 30 # in seconds
def __init__(self, connection, logger, primary_collector,
secondary_collector):
def _update_connection_state(e, status):
from connection_info import ConnectionState
from gen_py.process_info.ttypes import ConnectionType
collector_addr = e.sm._active_collector
if collector_addr is None:
collector_addr = ''
ConnectionState.update(conn_type = ConnectionType.COLLECTOR,
name = '',
status = status,
server_addrs = [collector_addr],
message = '%s to %s on %s' % (e.src, e.dst, e.event))
#end _update_connection_state
def _connection_state_up(e):
from gen_py.process_info.ttypes import ConnectionStatus
_update_connection_state(e, ConnectionStatus.UP)
#end _connection_state_up
def _connection_state_down(e):
from gen_py.process_info.ttypes import ConnectionStatus
_update_connection_state(e, ConnectionStatus.DOWN)
#end _connection_state_down
def _connection_state_init(e):
from gen_py.process_info.ttypes import ConnectionStatus
_update_connection_state(e, ConnectionStatus.INIT)
#end _connection_state_init
def _on_idle(e):
if e.sm._connect_timer is not None:
e.sm._cancel_connect_timer()
# Reset active and backup collector
self._active_collector = self._connection.primary_collector()
self._backup_collector = self._connection.secondary_collector()
# clean up existing connection
e.sm._delete_session()
if e.sm._disable != True:
e.sm._start_idle_hold_timer()
# update connection state
_connection_state_down(e)
#end _on_idle
def _on_disconnect(e):
# update connection state
_connection_state_down(e)
#end _on_disconnect
def _on_connect(e):
if e.sm._idle_hold_timer is not None:
e.sm._cancel_idle_hold_timer()
e.sm._connection.reset_collector()
# clean up existing connection
e.sm._delete_session()
if e.sm._active_collector is not None:
# update connection state
_connection_state_init(e)
e.sm._create_session()
e.sm._start_connect_timer()
e.sm._session.connect()
else:
e.sm.enqueue_event(Event(event = Event._EV_COLLECTOR_UNKNOWN))
#end _on_connect
def _on_connect_to_backup(e):
if e.sm._connect_timer is not None:
e.sm._cancel_connect_timer()
# clean up existing connection
e.sm._delete_session()
# try to connect to the backup collector, if known
if e.sm._backup_collector is not None:
e.sm._active_collector, e.sm._backup_collector = \
e.sm._backup_collector, e.sm._active_collector
# update connection state
_connection_state_init(e)
e.sm._create_session()
e.sm._start_connect_timer()
e.sm._session.connect()
else:
e.sm.enqueue_event(Event(event = Event._EV_BACKUP_COLLECTOR_UNKNOWN))
#end _on_connect_to_backup
def _on_client_init(e):
e.sm._connects += 1
gevent.spawn(e.sm._session.read)
e.sm._connection.handle_initialized(e.sm._connects)
e.sm._connection.sandesh_instance().send_generator_info()
# update connection state
_connection_state_init(e)
#end _on_client_init
def _on_established(e):
e.sm._cancel_connect_timer()
e.sm._connection.set_collector(e.sm_event.source)
e.sm._connection.handle_sandesh_ctrl_msg(e.sm_event.msg)
e.sm._connection.sandesh_instance().send_generator_info()
# update connection state
_connection_state_up(e)
#end _on_established
# FSM - Fysom
self._fsm = Fysom({
'initial': {'state' : State._IDLE,
'event' : Event._EV_START,
'defer' : True
},
'events': [
# _IDLE
{'name' : Event._EV_IDLE_HOLD_TIMER_EXPIRED,
'src' : State._IDLE,
'dst' : State._CONNECT
},
{'name' : Event._EV_COLLECTOR_CHANGE,
'src' : State._IDLE,
'dst' : State._CONNECT
},
{'name' : Event._EV_START,
'src' : State._IDLE,
'dst' : State._CONNECT
},
# _DISCONNECT
{'name' : Event._EV_COLLECTOR_CHANGE,
'src' : State._DISCONNECT,
'dst' : State._CONNECT
},
# _CONNECT
{'name' : Event._EV_COLLECTOR_UNKNOWN,
'src' : State._CONNECT,
'dst' : State._DISCONNECT
},
{'name' : Event._EV_TCP_CONNECT_FAIL,
'src' : State._CONNECT,
'dst' : State._CONNECT_TO_BACKUP
},
{'name' : Event._EV_CONNECT_TIMER_EXPIRED,
'src' : State._CONNECT,
'dst' : State._CONNECT_TO_BACKUP
},
{'name' : Event._EV_COLLECTOR_CHANGE,
'src' : State._CONNECT,
'dst' : State._IDLE
},
{'name' : Event._EV_TCP_CONNECTED,
'src' : State._CONNECT,
'dst' : State._CLIENT_INIT
},
# _CONNECT_TO_BACKUP
{'name' : Event._EV_BACKUP_COLLECTOR_UNKNOWN,
'src' : State._CONNECT_TO_BACKUP,
'dst' : State._IDLE
},
{'name' : Event._EV_TCP_CONNECT_FAIL,
'src' : State._CONNECT_TO_BACKUP,
'dst' : State._IDLE
},
{'name' : Event._EV_CONNECT_TIMER_EXPIRED,
'src' : State._CONNECT_TO_BACKUP,
'dst' : State._IDLE
},
{'name' : Event._EV_COLLECTOR_CHANGE,
'src' : State._CONNECT_TO_BACKUP,
'dst' : State._IDLE
},
{'name' : Event._EV_TCP_CONNECTED,
'src' : State._CONNECT_TO_BACKUP,
'dst' : State._CLIENT_INIT
},
# _CLIENT_INIT
{'name' : Event._EV_CONNECT_TIMER_EXPIRED,
'src' : State._CLIENT_INIT,
'dst' : State._IDLE
},
{'name' : Event._EV_TCP_CLOSE,
'src' : State._CLIENT_INIT,
'dst' : State._IDLE
},
{'name' : Event._EV_COLLECTOR_CHANGE,
'src' : State._CLIENT_INIT,
'dst' : State._IDLE
},
{'name' : Event._EV_SANDESH_CTRL_MESSAGE_RECV,
'src' : State._CLIENT_INIT,
'dst' : State._ESTABLISHED
},
# _ESTABLISHED
{'name' : Event._EV_TCP_CLOSE,
'src' : State._ESTABLISHED,
'dst' : State._CONNECT_TO_BACKUP
},
{'name' : Event._EV_STOP,
'src' : State._ESTABLISHED,
'dst' : State._IDLE
},
{'name' : Event._EV_COLLECTOR_CHANGE,
'src' : State._ESTABLISHED,
'dst' : State._CONNECT
}
],
'callbacks': {
'on' + State._IDLE : _on_idle,
'on' + State._CONNECT : _on_connect,
'on' + State._CONNECT_TO_BACKUP : _on_connect_to_backup,
'on' + State._CLIENT_INIT : _on_client_init,
'on' + State._ESTABLISHED : _on_established,
}
})
self._connection = connection
self._session = None
self._connects = 0
self._disable = False
self._idle_hold_timer = None
self._connect_timer = None
self._active_collector = primary_collector
self._backup_collector = secondary_collector
self._logger = logger
self._event_queue = WorkQueue(self._dequeue_event,
self._is_ready_to_dequeue_event)
#end __init__
# Public functions
def initialize(self):
self.enqueue_event(Event(event = Event._EV_START))
#end initialize
def session(self):
return self._session
#end session
def state(self):
return self._fsm.current
#end state
def shutdown(self):
self._disable = True
self.enqueue_event(Event(event = Event._EV_STOP))
#end shutdown
def set_admin_state(self, down):
if down == True:
self._disable = True
self.enqueue_event(Event(event = Event._EV_STOP))
else:
self._disable = False
self.enqueue_event(Event(event = Event._EV_START))
#end set_admin_state
def connect_count(self):
return self._connects
#end connect_count
def active_collector(self):
return self._active_collector
#end active_collector
def backup_collector(self):
return self._backup_collector
#end backup_collector
def enqueue_event(self, event):
self._event_queue.enqueue(event)
#end enqueue_event
def on_session_event(self, session, event):
if session is not self._session:
self._logger.error("Ignore session event [%d] received for old session" % (event))
return
if SandeshSession.SESSION_ESTABLISHED == event:
self._logger.info("Session Event: TCP Connected")
self.enqueue_event(Event(event = Event._EV_TCP_CONNECTED,
session = session))
elif SandeshSession.SESSION_ERROR == event:
self._logger.error("Session Event: TCP Connect Fail")
self.enqueue_event(Event(event = Event._EV_TCP_CONNECT_FAIL,
session = session))
elif SandeshSession.SESSION_CLOSE == event:
self._logger.error("Session Event: TCP Connection Closed")
self.enqueue_event(Event(event = Event._EV_TCP_CLOSE,
session = session))
else:
self._logger.error("Received unknown session event [%d]" % (event))
#end on_session_event
def on_sandesh_ctrl_msg_receive(self, session, sandesh_ctrl, collector):
if sandesh_ctrl.success == True:
self.enqueue_event(Event(event = Event._EV_SANDESH_CTRL_MESSAGE_RECV,
session = session,
msg = sandesh_ctrl,
source = collector))
else:
# Negotiation with the Collector failed, reset the
# connection and retry after sometime.
self._logger.error("Negotiation with the Collector %s failed." % (collector))
self._session.close()
#end on_sandesh_ctrl_msg_receive
def on_sandesh_uve_msg_send(self, sandesh_uve):
self.enqueue_event(Event(event = Event._EV_SANDESH_UVE_SEND,
msg = sandesh_uve))
#end on_sandesh_uve_msg_send
# Private functions
def _create_session(self):
assert self._session is None
col_info = self._active_collector.split(':')
collector = (col_info[0], int(col_info[1]))
self._session = SandeshSession(self._connection.sandesh_instance(),
collector,
self.on_session_event,
self._connection._receive_sandesh_msg)
#end _create_session
def _delete_session(self):
if self._session:
self._session.close()
self._session = None
self._connection.reset_collector()
#end _delete_session
def _start_idle_hold_timer(self):
if self._idle_hold_timer is None:
if self._IDLE_HOLD_TIME:
self._idle_hold_timer = gevent.spawn_later(self._IDLE_HOLD_TIME,
self._idle_hold_timer_expiry_handler)
else:
self.enqueue_event(Event(event = Event._EV_IDLE_HOLD_TIMER_EXPIRED))
#end _start_idle_hold_timer
def _cancel_idle_hold_timer(self):
if self._idle_hold_timer is not None:
gevent.kill(self._idle_hold_timer)
self._idle_hold_timer = None
#end _cancel_idle_hold_timer
def _idle_hold_timer_expiry_handler(self):
self._idle_hold_timer = None
self.enqueue_event(Event(event = Event._EV_IDLE_HOLD_TIMER_EXPIRED))
#end _idle_hold_timer_expiry_handler
def _start_connect_timer(self):
if self._connect_timer is None:
self._connect_timer = gevent.spawn_later(self._CONNECT_TIME,
self._connect_timer_expiry_handler,
self._session)
#end _start_connect_timer
def _cancel_connect_timer(self):
if self._connect_timer is not None:
gevent.kill(self._connect_timer)
self._connect_timer = None
#end _cancel_connect_timer
def _connect_timer_expiry_handler(self, session):
self._connect_timer = None
self.enqueue_event(Event(event = Event._EV_CONNECT_TIMER_EXPIRED,
session = session))
#end _connect_timer_expiry_handler
def _is_ready_to_dequeue_event(self):
return True
#end _is_ready_to_dequeue_event
def _log_event(self, event):
if self._fsm.current == State._ESTABLISHED and \
event.event == Event._EV_SANDESH_UVE_SEND:
return False
return True
#end _log_event
def _dequeue_event(self, event):
if self._log_event(event):
self._logger.info("Processing event[%s] in state[%s]" \
% (event.event, self._fsm.current))
if event.session is not None and event.session is not self._session:
self._logger.info("Ignore event [%s] received for old session" \
% (event.event))
return
if event.event == Event._EV_COLLECTOR_CHANGE:
old_active_collector = self._active_collector
self._active_collector = event.primary_collector
self._backup_collector = event.secondary_collector
if old_active_collector == self._active_collector:
self._logger.info("No change in active collector. Ignore event [%s]" \
% (event.event))
return
if event.event == Event._EV_SANDESH_UVE_SEND:
if self._fsm.current == State._ESTABLISHED or self._fsm.current == State._CLIENT_INIT:
self._connection.handle_sandesh_uve_msg(event.msg)
else:
self._connection.sandesh_instance().msg_stats().update_tx_drop_stats(
event.msg.__class__.__name__, 0)
self._logger.info("Discarding event[%s] in state[%s]" \
% (event.event, self._fsm.current))
elif event.event == Event._EV_SANDESH_CTRL_MESSAGE_RECV and \
self._fsm.current == State._ESTABLISHED:
self._connection.handle_sandesh_ctrl_msg(event.msg)
elif self._fsm.cannot(event.event) is True:
self._logger.info("Unconsumed event[%s] in state[%s]" \
% (event.event, self._fsm.current))
else:
prev_state = self.state()
getattr(self._fsm, event.event)(sm = self, sm_event = event)
# Log state transition
self._logger.info("Sandesh Client: Event[%s] => State[%s] -> State[%s]" \
% (event.event, prev_state, self.state()))
#end _dequeue_event
#end class SandeshStateMachine