/
verification_util.py
232 lines (193 loc) · 7.28 KB
/
verification_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import os
import json
import urllib2
import requests
import logging as LOG
from lxml import etree
LOG.basicConfig(format='%(levelname)s: %(message)s', level=LOG.INFO)
class JsonDrv (object):
_DEFAULT_HEADERS = {
'Content-type': 'application/json; charset="UTF-8"',
}
_DEFAULT_AUTHN_URL = "/v2.0/tokens"
def __init__(self, vub, logger=LOG, args=None, use_admin_auth=False):
self.log = logger
self._vub = vub
self._headers = None
self._args = args
self._use_admin_auth = use_admin_auth
def _auth(self):
if self._args:
if os.getenv('OS_AUTH_URL'):
url = os.getenv('OS_AUTH_URL') + '/tokens'
else:
url = "%s://%s:%s%s" % (self._args.auth_protocol,
self._args.auth_ip,
self._args.auth_port,
self._DEFAULT_AUTHN_URL)
insecure = bool(os.getenv('OS_INSECURE',True))
verify = not insecure
self._authn_body = \
'{"auth":{"passwordCredentials":{"username": "%s", "password": "%s"}, "tenantName":"%s"}}' % (
self._args.admin_username if self._use_admin_auth else self._args.stack_user,
self._args.admin_password if self._use_admin_auth else self._args.stack_password,
self._args.admin_tenant if self._use_admin_auth else self._args.project_name)
response = requests.post(url, data=self._authn_body,
headers=self._DEFAULT_HEADERS,
verify=verify)
if response.status_code == 200:
# plan is to re-issue original request with new token
authn_content = json.loads(response.text)
self._auth_token = authn_content['access']['token']['id']
self._headers = {'X-AUTH-TOKEN': self._auth_token}
return
raise RuntimeError('Authentication Failure')
def load(self, url, retry=True):
self.log.debug("Requesting: %s", url)
resp = requests.get(url, headers=self._headers)
if resp.status_code == 401:
if retry:
self._auth()
return self.load(url, False)
if resp.status_code == 200:
return json.loads(resp.text)
self.log.debug("Response Code: %d" % resp.status_code)
return None
class XmlDrv (object):
def __init__(self, vub, logger=LOG, args=None, use_admin_auth=False):
self.log = logger
self._vub = vub
if args:
pass
def load(self, url):
try:
self.log.debug("Requesting: %s", url)
resp = requests.get(url)
return etree.fromstring(resp.text)
except requests.ConnectionError, e:
self.log.error("Socket Connection error: %s", str(e))
return None
class VerificationUtilBase (object):
def __init__(self, ip, port, drv=JsonDrv, logger=LOG, args=None, use_admin_auth=False):
self.log = logger
self._ip = ip
self._port = port
self._drv = drv(self, logger=logger, args=args, use_admin_auth=use_admin_auth)
self._force_refresh = False
def get_force_refresh(self):
return self._force_refresh
def set_force_refresh(self, force=False):
self._force_refresh = force
return self.get_force_refresh()
def _mk_url_str(self, path=''):
if path.startswith('http:'):
return path
return "http://%s:%s/%s" % (self._ip, str(self._port), path)
def dict_get(self, path=''):
try:
if path:
return self._drv.load(self._mk_url_str(path))
except urllib2.HTTPError:
return None
# end dict_get
def elem2dict(node, alist=False):
d = list() if alist else dict()
for e in node.iterchildren():
#key = e.tag.split('}')[1] if '}' in e.tag else e.tag
if e.tag == 'list':
value = elem2dict(e, alist=True)
else:
value = e.text if e.text else elem2dict(e)
if type(d) == type(list()):
d.append(value)
else:
d[e.tag] = value
return d
class Result (dict):
def __init__(self, d={}):
super(Result, self).__init__()
if type(d) is not dict and hasattr(d, 'tag'):
d = elem2dict(d)
self.update(d)
def xpath(self, *plist):
''' basic path '''
d = self
try:
for p in plist:
d = d[p]
return d
except KeyError, e:
return None
class EtreeToDict(object):
"""Converts the xml etree to dictionary/list of dictionary."""
def __init__(self, xpath):
self.xpath = xpath
self.xml_list = ['policy-rule']
def _handle_list(self, elems):
"""Handles the list object in etree."""
a_list = []
for elem in elems.getchildren():
rval = self._get_one(elem, a_list)
if 'element' in rval.keys():
a_list.append(rval['element'])
elif 'list' in rval.keys():
a_list.append(rval['list'])
else:
a_list.append(rval)
if not a_list:
return None
return a_list
def _get_one(self, xp, a_list=None):
"""Recrusively looks for the entry in etree and converts to dictionary.
Returns a dictionary.
"""
val = {}
child = xp.getchildren()
if not child:
val.update({xp.tag: xp.text})
return val
for elem in child:
if elem.tag == 'list':
val.update({xp.tag: self._handle_list(elem)})
if elem.tag == 'data':
# Remove CDATA; if present
text = elem.text.replace("<![CDATA[<", "<").strip("]]>")
nxml = etree.fromstring(text)
rval = self._get_one(nxml, a_list)
else:
rval = self._get_one(elem, a_list)
if elem.tag in self.xml_list:
val.update({xp.tag: self._handle_list(xp)})
if elem.tag in rval.keys():
val.update({elem.tag: rval[elem.tag]})
elif 'SandeshData' in elem.tag:
val.update({xp.tag: rval})
else:
val.update({elem.tag: rval})
return val
def get_all_entry(self, path):
"""All entries in the etree is converted to the dictionary
Returns the list of dictionary/didctionary.
"""
xps = path.xpath(self.xpath)
if not xps:
# sometime ./xpath dosen't work; work around
# should debug to find the root cause.
xps = path.xpath(self.xpath.strip('.'))
if type(xps) is not list:
return self._get_one(xps)
val = []
for xp in xps:
val.append(self._get_one(xp))
if len(val) == 1:
return val[0]
return val
def find_entry(self, path, match):
"""Looks for a particular entry in the etree.
Returns the element looked for/None.
"""
xp = path.xpath(self.xpath)
f = filter(lambda x: x.text == match, xp)
if len(f):
return f[0].text
return None