/
overlay_to_underlay_mapper.py
303 lines (262 loc) · 12.5 KB
/
overlay_to_underlay_mapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
#
# Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
#
#
# Overlay To Underlay Mapper
#
# Utility to get the Underlay information for the Overlay flow(s).
#
import json
from sandesh.viz.constants import *
from opserver_util import OpServerUtils
class OverlayToUnderlayMapperError(Exception):
"""Base Exception class for this module.
All the Exceptions defined in this module should be derived from
this class. The application/module that calls any method in the
OverlayToUnderlayMapper class should catch this base Exception.
"""
pass
class OverlayToUnderlayMapper(object):
def __init__(self, query_json, analytics_api_ip,
analytics_api_port, user, password, logger):
self.query_json = query_json
self._analytics_api_ip = analytics_api_ip
self._analytics_api_port = analytics_api_port
self._user = user
self._password = password
self._logger = logger
if self.query_json is not None:
self._start_time = self.query_json['start_time']
self._end_time = self.query_json['end_time']
# If the start_time/end_time in the query is specified as
# relative time, then the actual start_time/end_time for the
# FlowRecordTable query and UFlowData query would be different.
# Since the FlowRecordTable is queried first and the result of
# which is used to query the UFlowData table, the result may
# not be correct if the start_time/end_time is different for
# FlowRecord and UFlowData queries. Therefore, convert the
# relative start/end time to absolute time.
if not str(self._start_time).isdigit():
self._start_time = \
OpServerUtils.convert_to_utc_timestamp_usec(self._start_time)
if not str(self._end_time).isdigit():
self._end_time = \
OpServerUtils.convert_to_utc_timestamp_usec(self._end_time)
# end __init__
def process_query(self):
"""Process the OverlayToUnderlay Flow query and returns
the response."""
flow_record_data = self._get_overlay_flow_data()
uflow_data = self._get_underlay_flow_data(flow_record_data)
return self._send_response(uflow_data)
# end process_query
def _overlay_to_flowrecord_name(self, oname):
try:
fname = OverlayToFlowRecordFields[oname]
except KeyError:
raise _OverlayToFlowRecordFieldsNameError(oname)
return fname
# end _overlay_to_flowrecord_name
def _flowrecord_to_uflowdata_name(self, fname):
try:
ufname = FlowRecordToUFlowDataFields[fname]
except KeyError:
raise _FlowRecordToUFlowDataFieldsNameError(fname)
return ufname
# end _flowrecord_to_uflowdata_name
def _underlay_to_uflowdata_name(self, uname):
try:
ufname = UnderlayToUFlowDataFields[uname]
except KeyError:
raise _UnderlayToUFlowDataFieldsNameError(uname)
return ufname
# end _underlay_to_uflowdata_name
def _get_overlay_flow_data(self):
"""Fetch the overlay flow data from the FlowRecord Table.
Convert the where clause in the OverlayToUnderlay query according
to the schema defined for the FlowRecord Table. Get the overlay
flow data [source vrouter, destination vrouter, flowtuple hash,
encapsulation] from the FlowRecord Table required to query the
underlay data.
"""
# process where clause
try:
where_or_list = self.query_json['where']
except KeyError:
where_or_list = []
flow_record_where = []
for where_and_list in where_or_list:
flow_record_where_and_list = []
for match_term in where_and_list:
fname = self._overlay_to_flowrecord_name(match_term['name'])
match = OpServerUtils.Match(name=fname,
value=match_term['value'],
op=match_term['op'],
value2=match_term.get('value2'))
flow_record_where_and_list.append(match.__dict__)
if match_term.get('suffix') is not None:
fname = self._overlay_to_flowrecord_name(
match_term['suffix']['name'])
match = OpServerUtils.Match(name=fname,
value=match_term['suffix']['value'],
op=match_term['suffix']['op'],
value2=match_term['suffix'].get('value2'))
flow_record_where_and_list.append(match.__dict__)
flow_record_where.append(flow_record_where_and_list)
# populate the select list
flow_record_select = [
FlowRecordNames[FlowRecordFields.FLOWREC_VROUTER_IP],
FlowRecordNames[FlowRecordFields.FLOWREC_OTHER_VROUTER_IP],
FlowRecordNames[FlowRecordFields.FLOWREC_UNDERLAY_SPORT],
FlowRecordNames[FlowRecordFields.FLOWREC_UNDERLAY_PROTO]
]
flow_record_query = OpServerUtils.Query(table=FLOW_TABLE,
start_time=self._start_time,
end_time=self._end_time,
select_fields=flow_record_select,
where=flow_record_where,
dir=1)
return self._send_query(json.dumps(flow_record_query.__dict__))
# end _get_overlay_flow_data
def _get_underlay_flow_data(self, flow_record_data):
"""Fetch the underlay data from the UFlowData table.
Construct the Where clause for the UFlowData query from the
FlowRecord query response. Convert the select clause, sort_fields,
filter clause in the OverlayToUnderlay query according to the schema
defined for the UFlowData table.
"""
if not len(flow_record_data):
return []
# populate where clause for Underlay Flow query
uflow_data_where = []
for row in flow_record_data:
# if any of the column value is None, then skip the row
if any(col == None for col in row.values()):
continue
uflow_data_where_and_list = []
ufname = self._flowrecord_to_uflowdata_name(
FlowRecordNames[FlowRecordFields.FLOWREC_VROUTER_IP])
val = row[FlowRecordNames[FlowRecordFields.FLOWREC_VROUTER_IP]]
sip = OpServerUtils.Match(name=ufname, value=val,
op=OpServerUtils.MatchOp.EQUAL)
uflow_data_where_and_list.append(sip.__dict__)
ufname = self._flowrecord_to_uflowdata_name(
FlowRecordNames[FlowRecordFields.FLOWREC_OTHER_VROUTER_IP])
val = \
row[FlowRecordNames[FlowRecordFields.FLOWREC_OTHER_VROUTER_IP]]
dip = OpServerUtils.Match(name=ufname, value=val,
op=OpServerUtils.MatchOp.EQUAL)
uflow_data_where_and_list.append(dip.__dict__)
ufname = self._flowrecord_to_uflowdata_name(
FlowRecordNames[FlowRecordFields.FLOWREC_UNDERLAY_SPORT])
val = row[FlowRecordNames[FlowRecordFields.FLOWREC_UNDERLAY_SPORT]]
sport = OpServerUtils.Match(name=ufname, value=val,
op=OpServerUtils.MatchOp.EQUAL)
ufname = self._flowrecord_to_uflowdata_name(
FlowRecordNames[FlowRecordFields.FLOWREC_UNDERLAY_PROTO])
val = row[FlowRecordNames[FlowRecordFields.FLOWREC_UNDERLAY_PROTO]]
# get the protocol from tunnel_type
val = OpServerUtils.tunnel_type_to_protocol(val)
protocol = OpServerUtils.Match(name=ufname, value=val,
op=OpServerUtils.MatchOp.EQUAL, suffix=sport)
uflow_data_where_and_list.append(protocol.__dict__)
uflow_data_where.append(uflow_data_where_and_list)
# if the where clause is empty, then no need to send
# the UFlowData query
if not len(uflow_data_where):
return []
# populate UFlowData select
uflow_data_select = []
for select in self.query_json['select_fields']:
uflow_data_select.append(self._underlay_to_uflowdata_name(select))
# sort_fields specified in the query?
uflow_data_sort_fields = None
if self.query_json.get('sort_fields'):
uflow_data_sort_fields = []
for field in self.query_json['sort_fields']:
uflow_data_sort_fields.append(
self._underlay_to_uflowdata_name(field))
uflow_data_sort_type = self.query_json.get('sort')
# does the query contain limit attribute?
uflow_data_limit = self.query_json.get('limit')
# add filter if specified
uflow_data_filter = None
if self.query_json.get('filter') is not None:
uflow_data_filter = list(self.query_json['filter'])
if len(uflow_data_filter):
if not isinstance(uflow_data_filter[0], list):
uflow_data_filter = [uflow_data_filter]
for filter_and in uflow_data_filter:
for match_term in filter_and:
match_term['name'] = self._underlay_to_uflowdata_name(
match_term['name'])
uflow_data_query = OpServerUtils.Query(
table='StatTable.UFlowData.flow',
start_time=self._start_time,
end_time=self._end_time,
select_fields=uflow_data_select,
where=uflow_data_where,
sort=uflow_data_sort_type,
sort_fields=uflow_data_sort_fields,
limit=uflow_data_limit,
filter=uflow_data_filter)
return self._send_query(json.dumps(uflow_data_query.__dict__))
# end _get_underlay_flow_data
def _send_query(self, query):
"""Post the query to the analytics-api server and returns the
response."""
self._logger.debug('Sending query: %s' % (query))
opserver_url = OpServerUtils.opserver_query_url(self._analytics_api_ip,
str(self._analytics_api_port))
resp = OpServerUtils.post_url_http(opserver_url, query, self._user,
self._password, True)
try:
resp = json.loads(resp)
value = resp['value']
except (TypeError, ValueError, KeyError):
raise _QueryError(query)
self._logger.debug('Query response: %s' % str(value))
return value
# end _send_query
def _send_response(self, uflow_data):
"""Converts the UFlowData query response according to the
schema defined for the OverlayToUnderlayFlowMap table."""
underlay_response = {}
underlay_data = []
for row in uflow_data:
underlay_row = {}
for field in self.query_json['select_fields']:
name = self._underlay_to_uflowdata_name(field)
underlay_row[field] = row[name]
underlay_data.append(underlay_row)
underlay_response['value'] = underlay_data
return json.dumps(underlay_response)
# end _send_response
# end class OverlayToUnderlayMapper
class _OverlayToFlowRecordFieldsNameError(OverlayToUnderlayMapperError):
def __init__(self, field):
self.field = field
def __str__(self):
return 'No mapping for <%s> in "OverlayToFlowRecordFields"' \
% (self.field)
# end class _OverlayToFlowRecordFieldsNameError
class _FlowRecordToUFlowDataFieldsNameError(OverlayToUnderlayMapperError):
def __init__(self, field):
self.field = field
def __str__(self):
return 'No mapping for <%s> in "FlowRecordToUFlowDataFields"' \
% (self.field)
# end class _FlowRecordToUFlowDataFieldsNameError
class _UnderlayToUFlowDataFieldsNameError(OverlayToUnderlayMapperError):
def __init__(self, field):
self.field = field
def __str__(self):
return 'No mapping for <%s> in "UnderlayToUFlowDataFields"' \
% (self.field)
# end class _UnderlayToUFlowDataFieldsNameError
class _QueryError(OverlayToUnderlayMapperError):
def __init__(self, query):
self.query = query
def __str__(self):
return 'Error in query processing: %s' % (self.query)
# end class _QueryError