-
Notifications
You must be signed in to change notification settings - Fork 42
/
sandesh_connection.py
180 lines (157 loc) · 7.19 KB
/
sandesh_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#
# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
#
#
# Sandesh Connection
#
import gevent
import os
from transport import TTransport
from protocol import TXMLProtocol
from sandesh_session import SandeshSession, SandeshReader
from sandesh_state_machine import SandeshStateMachine, Event
from sandesh_uve import SandeshUVETypeMaps
from gen_py.sandesh.constants import *
class SandeshConnection(object):
def __init__(self, sandesh_instance, client, primary_collector,
secondary_collector, discovery_client):
self._sandesh_instance = sandesh_instance
self._logger = sandesh_instance.logger()
self._client = client
self._primary_collector = primary_collector
self._secondary_collector = secondary_collector
# Collector name. Updated upon receiving the control message
# from the Collector during connection negotiation.
self._collector = None
self._admin_down = False
self._state_machine = SandeshStateMachine(self, self._logger,
primary_collector,
secondary_collector)
self._state_machine.initialize()
from sandesh_common.vns.constants import \
COLLECTOR_DISCOVERY_SERVICE_NAME
if primary_collector is None and discovery_client is not None:
discovery_client.subscribe(COLLECTOR_DISCOVERY_SERVICE_NAME, 2,
self._handle_collector_update)
#end __init__
# Public methods
def session(self):
return self._state_machine.session()
#end session
def statemachine(self):
return self._state_machine
#end statemachine
def sandesh_instance(self):
return self._sandesh_instance
#end sandesh_instance
def server(self):
return self._state_machine.active_collector()
#end server
def primary_collector(self):
return self._primary_collector
#end primary_collector
def secondary_collector(self):
return self._secondary_collector
#end secondary_collector
def collector(self):
return self._collector
#end collector
def set_collector(self, collector):
self._collector = collector
#end set_collector
def reset_collector(self):
self._collector = None
#end reset_collector
def state(self):
return self._state_machine.state()
#end state
def handle_initialized(self, count):
uve_types = []
uve_global_map = self._sandesh_instance._uve_type_maps.get_uve_global_map()
for uve_type_key in uve_global_map.iterkeys():
uve_types.append(uve_type_key)
from gen_py.sandesh_ctrl.ttypes import SandeshCtrlClientToServer
ctrl_msg = SandeshCtrlClientToServer(self._sandesh_instance.source_id(),
self._sandesh_instance.module(), count, uve_types, os.getpid(), 0,
self._sandesh_instance.node_type(),
self._sandesh_instance.instance_id())
self._logger.debug('Send sandesh control message. uve type count # %d' % (len(uve_types)))
ctrl_msg.request('ctrl', sandesh=self._sandesh_instance)
#end handle_initialized
def handle_sandesh_ctrl_msg(self, ctrl_msg):
self._client.handle_sandesh_ctrl_msg(ctrl_msg)
#end handle_sandesh_ctrl_msg
def handle_sandesh_uve_msg(self, uve_msg):
self._client.send_sandesh(uve_msg)
#end handle_sandesh_uve_msg
def set_admin_state(self, down):
if self._admin_down != down:
self._admin_down = down
self._state_machine.set_admin_state(down)
#end set_admin_state
# Private methods
def _handle_collector_update(self, collector_info):
if collector_info is not None:
self._logger.info('Received discovery update %s for collector service' \
% (str(collector_info)))
old_primary_collector = self._primary_collector
old_secondary_collector = self._secondary_collector
if len(collector_info) > 0:
try:
self._primary_collector = collector_info[0]['ip-address'] \
+ ':' + collector_info[0]['port']
except KeyError:
self._logger.error('Failed to decode collector info from the discovery service')
return
else:
self._primary_collector = None
if len(collector_info) > 1:
try:
self._secondary_collector = collector_info[1]['ip-address'] \
+ ':' + collector_info[1]['port']
except KeyError:
self._logger.error('Failed to decode collector info from the discovery service')
return
else:
self._secondary_collector = None
if (old_primary_collector != self._primary_collector) or \
(old_secondary_collector != self._secondary_collector):
self._state_machine.enqueue_event(Event(
event = Event._EV_COLLECTOR_CHANGE,
primary_collector = self._primary_collector,
secondary_collector = self._secondary_collector))
#end _handle_collector_update
def _receive_sandesh_msg(self, session, msg):
(hdr, hdr_len, sandesh_name) = SandeshReader.extract_sandesh_header(msg)
if sandesh_name is None:
self._sandesh_instance.msg_stats().update_rx_drop_stats(
'__UNKNOWN__', len(msg))
self._logger.error('Failed to decode sandesh header for "%s"' % (msg))
return
if hdr.Hints & SANDESH_CONTROL_HINT:
self._logger.debug('Received sandesh control message [%s]' % (sandesh_name))
if sandesh_name != 'SandeshCtrlServerToClient':
self._sandesh_instance.msg_stats().update_rx_drop_stats(
sandesh_name, len(msg))
self._logger.error('Invalid sandesh control message [%s]' % (sandesh_name))
return
transport = TTransport.TMemoryBuffer(msg[hdr_len:])
protocol_factory = TXMLProtocol.TXMLProtocolFactory()
protocol = protocol_factory.getProtocol(transport)
from gen_py.sandesh_ctrl.ttypes import SandeshCtrlServerToClient
sandesh_ctrl_msg = SandeshCtrlServerToClient()
if sandesh_ctrl_msg.read(protocol) == -1:
self._sandesh_instance.msg_stats().update_rx_drop_stats(
sandesh_name, len(msg))
self._logger.error('Failed to decode sandesh control message "%s"' %(msg))
else:
self._sandesh_instance.msg_stats().update_rx_stats(
sandesh_name, len(msg))
self._state_machine.on_sandesh_ctrl_msg_receive(session, sandesh_ctrl_msg,
hdr.Source)
else:
self._logger.debug('Received sandesh message [%s]' % (sandesh_name))
self._client.handle_sandesh_msg(sandesh_name,
msg[hdr_len:], len(msg))
#end _receive_sandesh_msg
#end class SandeshConnection