-
Notifications
You must be signed in to change notification settings - Fork 390
/
haproxy_cert.py
165 lines (142 loc) · 5.9 KB
/
haproxy_cert.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import json
import sys
import logging
import os
import requests
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Cert_Manager(object):
"""Class to download certs from specific
drivers mentioned in the conf_file"""
def __init__(self):
pass
def _request(self, url, headers=None, body=None, request_type=None):
try:
if request_type == 'PUT':
encoded_body = json.dumps(body)
return requests.put(url, headers=headers, data=encoded_body)
elif request_type == 'POST':
encoded_body = json.dumps(body)
return requests.post(url, headers=headers, data=encoded_body)
else:
return requests.get(url, headers=headers)
except Exception as e:
logging.error("Failed sending request to keystone")
return None
@abc.abstractmethod
def _validate_tls_secret(self, tls_container_ref):
pass
@abc.abstractmethod
def _populate_tls_pem(self, tls_container_ref):
pass
class Barbican_Cert_Manager(Cert_Manager):
"""Class to download certs from barbican and
populate the pem file as required by HAProxy
"""
def __init__(self, identity=None):
super(Barbican_Cert_Manager, self).__init__()
self.identity = identity
def _get_barbican_entity(self, barbican_ep, auth_token,
entity_ref, metadata=True):
if metadata:
accept_data = 'application/json'
else:
accept_data = 'text/plain'
try:
headers = {
"Accept": "%s" % accept_data,
"X-Auth-Token": "%s" % auth_token
}
url = entity_ref
resp = self._request(url, headers, 'GET')
if resp.status_code in range(200, 299):
if metadata:
return json.loads(resp.text)
else:
return resp.text
else:
logging.error("%s getting barbican entity %s" % \
(resp.text, url))
except Exception as e:
logging.error("%s getting barbican entity %s" % \
(str(e), url))
return None
def _validate_tls_secret(self, tls_container_ref):
try:
if self.identity:
#self.identity = keystone_auth.Identity()
container_detail = self._get_barbican_entity(\
self.identity.barbican_ep,
self.identity.auth_token,
entity_ref=tls_container_ref,
metadata=True)
if not container_detail:
return False
# Validate that secrets are stored plain text
for secret in container_detail['secret_refs']:
secret_meta_data = self._get_barbican_entity(\
self.identity.barbican_ep,
self.identity.auth_token,
entity_ref=secret['secret_ref'],
metadata=True)
if not secret_meta_data or secret_meta_data\
['content_types']['default'] != 'text/plain':
logging.error("Invalid secret format: %s" % \
secret_meta_data['content_types']['default'])
return False
return True
else:
return False
except Exception as e:
logging.error("%s while validating TLS Container" % str(e))
return False
def _populate_tls_pem(self, tls_container_ref):
try:
if self.identity:
#self.identity = keystone_auth.Identity()
container_detail = self._get_barbican_entity(\
self.identity.barbican_ep,
self.identity.auth_token,
entity_ref=tls_container_ref,
metadata=True)
if not container_detail:
return False
# Fetch the secrets stored in plain text
secret_text = ''
for secret in container_detail['secret_refs']:
secret_detail = self._get_barbican_entity(\
self.identity.barbican_ep,
self.identity.auth_token,
entity_ref=secret['secret_ref'],
metadata=False)
if secret_detail:
secret_text += secret_detail
secret_text += "\n"
return secret_text
else:
return None
except Exception as e:
logging.error("%s while populating SSL Pem file" % str(e))
return None
class Generic_Cert_Manager(Cert_Manager):
"""Class to download certs from Generic Cert Manager and
populate the pem file as required by HAProxy
"""
def __init__(self, identity=None):
super(Generic_Cert_Manager, self).__init__()
def _validate_tls_secret(self, tls_container_ref):
if tls_container_ref is None:
return False
# Check if the file exists
if not os.path.isfile(tls_container_ref):
return False
# Check if file is readable
if not os.access(tls_container_ref, os.R_OK):
return False
return True
def _populate_tls_pem(self, tls_container_ref):
secret_text = ''
with open(tls_container_ref) as tls_container:
secret_text = tls_container.read()
return secret_text