Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Partial-Bug: #1522597, Server Manager support for tls certificate and…
… key distribution - commiting cert utility files to avoid package dependancy Change-Id: I69ac4026b81aa06616a045798e011671e89e67b9
- Loading branch information
Showing
2 changed files
with
205 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#!/usr/bin/python | ||
|
||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||
""" | ||
Name : server_mgr_cert_utils.py | ||
Author : Prasad Miriyala | ||
Description : Cert utility | ||
""" | ||
import os | ||
import logging | ||
import subprocess | ||
import sys | ||
|
||
__version__ = '1.0' | ||
|
||
log = logging.getLogger('smgrcerts') | ||
log.setLevel(logging.DEBUG) | ||
|
||
class CertsLogger(object): | ||
@staticmethod | ||
def initialize_logger(log_file='smgrcerts.log', log_level=40, stdout=True): | ||
log = logging.getLogger('smgrcerts') | ||
file_h = logging.FileHandler(log_file) | ||
file_h.setLevel(logging.DEBUG) | ||
long_format = '[%(asctime)-15s: %(filename)s:%(lineno)s:%(funcName)s: %(levelname)s] %(message)s' | ||
file_formatter = logging.Formatter(long_format) | ||
file_h.setFormatter(file_formatter) | ||
log.addHandler(file_h) | ||
if not stdout: | ||
return | ||
stream_h = logging.StreamHandler(sys.stdout) | ||
stream_h.setLevel(log_level) | ||
short_format = '[%(asctime)-15s: %(funcName)s] %(message)s' | ||
stream_formatter = logging.Formatter(short_format) | ||
stream_h.setFormatter(stream_formatter) | ||
log.addHandler(stream_h) | ||
|
||
class Cmd(object): | ||
@staticmethod | ||
def local_exec(cmd, error_on_fail=False): | ||
exit_status = 1 | ||
log.info('[localhost]: %s' % cmd) | ||
proc = subprocess.Popen(cmd, shell=True, close_fds=True, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
stdin=subprocess.PIPE) | ||
stdout, stderr = proc.communicate() | ||
if proc.returncode != 0: | ||
exit_status = 0 | ||
log.error(stdout) | ||
log.error(stderr) | ||
if error_on_fail: | ||
raise RuntimeError('Command (%s) Failed' % cmd) | ||
return exit_status, stdout, stderr | ||
|
||
class Cert(object): | ||
@staticmethod | ||
def generate_private_key(location, method='rsa', numbits=2048, force=False): | ||
exit_status = 1 | ||
if not force: | ||
if os.path.isfile(location): | ||
return exit_status | ||
exit_status, stdout, stderr = \ | ||
Cmd.local_exec('openssl genrsa -out %s' % (location), error_on_fail=True) | ||
return exit_status | ||
|
||
@staticmethod | ||
def generate_csr(location, private_key, subj='/', force=False): | ||
exit_status = 1 | ||
if not force: | ||
if os.path.isfile(location): | ||
return exit_status | ||
exit_status, stdout, stderr = \ | ||
Cmd.local_exec('openssl req -new -key %s -out %s -subj %s' % (private_key, location, subj), | ||
error_on_fail=True) | ||
return exit_status | ||
|
||
|
||
@staticmethod | ||
def generate_cert(location, key, root_pem='', csr='', | ||
force=False, self_signed=False, subj='/', | ||
days=3640, method='rsa', numbits=4096): | ||
exit_status = 1 | ||
if not force: | ||
if os.path.isfile(location): | ||
return exit_status | ||
if self_signed: | ||
cmd = 'openssl req -x509 -new -nodes -key %s -days %s -out %s -subj %s' % \ | ||
(key, days, location, subj) | ||
else: | ||
cmd = 'openssl x509 -req -in %s -CA %s -CAkey %s -CAcreateserial -out %s -days %s' % \ | ||
(csr, root_pem, key, location, days) | ||
exit_stats, stdout, stderr = Cmd.local_exec(cmd, error_on_fail=True) | ||
return exit_status | ||
|
||
|
||
if __name__ == '__main__': | ||
log.info('Executing: %s' % " ".join(sys.argv)) | ||
# update log level and log file | ||
log_level = [logging.ERROR, logging.WARN, \ | ||
logging.INFO, logging.DEBUG] | ||
CertsLogger.initialize_logger(log_file='smgrcerts.log', | ||
log_level=log_level[3], stdout=True) | ||
# test code | ||
Cert.generate_private_key('test.key') | ||
Cert.generate_cert('test.pem', 'test.key', self_signed=True) | ||
Cert.generate_private_key('server.key') | ||
Cert.generate_csr('server.csr', 'server.key', subj='test') | ||
Cert.generate_cert('server.pem', 'test.key', 'test.pem', 'server.csr') | ||
|
||
|
||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
#!/usr/bin/env python | ||
|
||
# vim: tabstop=4 shiftwidth=4 softtabstop=4 | ||
""" | ||
Name : server_mgr_certs.py | ||
Author : Prasad Miriyala | ||
Description : server manager certs | ||
""" | ||
from server_mgr_cert_utils import * | ||
|
||
__version__ = '1.0' | ||
|
||
_DEF_CERT_LOCATION = '/etc/contrail_smgr/puppet/ssl/' | ||
_DEF_CERT_LOG = '/var/log/contrail-server-manager/smgrcerts.log' | ||
|
||
class ServerMgrCerts(): | ||
def __init__(self, cert_location=_DEF_CERT_LOCATION, log_file=_DEF_CERT_LOG, | ||
log_level = logging.DEBUG, | ||
db=None): | ||
Cmd.local_exec('mkdir -p %s' % (cert_location), error_on_fail=True) | ||
self._smgr_cert_location = cert_location | ||
self._smgr_ca_private_key = None | ||
self._smgr_ca_cert = None | ||
CertsLogger.initialize_logger(log_file=log_file, | ||
log_level=log_level) | ||
|
||
def create_sm_ca_cert(self, force=False): | ||
sm_ca_private_key = self._smgr_cert_location + 'sm_ca.key' | ||
sm_ca_cert = self._smgr_cert_location + 'sm_ca.cert' | ||
if not force and os.path.isfile(sm_ca_private_key) and os.path.isfile(sm_ca_cert): | ||
self._smgr_ca_private_key = sm_ca_private_key | ||
self._smgr_ca_cert = sm_ca_cert | ||
return sm_ca_private_key, sm_ca_cert | ||
Cert.generate_private_key(sm_ca_private_key, force=force) | ||
self._smgr_ca_private_key = sm_ca_private_key | ||
exit_code, fqdn, _ = Cmd.local_exec('hostname -f') | ||
subject = '/CN=' + fqdn | ||
Cert.generate_cert(sm_ca_cert, sm_ca_private_key, self_signed=True, subj=subject, force=force) | ||
self._smgr_ca_cert = sm_ca_cert | ||
return sm_ca_private_key, sm_ca_cert | ||
|
||
def create_server_cert(self, server, force=False): | ||
server_private_key = self._smgr_cert_location + server['id'] + '.key' | ||
server_csr = self._smgr_cert_location + server['id'] + '.csr' | ||
server_pem = self._smgr_cert_location + server['id'] + '.pem' | ||
if not force and os.path.isfile(server_private_key) and os.path.isfile(server_pem): | ||
return server_private_key, server_csr, server_pem | ||
subject = '/CN=' + server['id'] | ||
Cert.generate_private_key(server_private_key, force=force) | ||
Cert.generate_csr(server_csr, server_private_key, subj=subject, force=force) | ||
Cert.generate_cert(server_pem, self._smgr_ca_private_key, root_pem=self._smgr_ca_cert, | ||
csr=server_csr, force=force) | ||
return server_private_key, server_csr, server_pem | ||
|
||
def delete_server_cert(self, server): | ||
server_private_key = self._smgr_cert_location + server['id'] + '.key' | ||
server_csr = self._smgr_cert_location + server['id'] + '.csr' | ||
server_pem = self._smgr_cert_location + server['id'] + '.pem' | ||
if os.path.isfile(server_private_key): | ||
os.remove(server_private_key) | ||
if os.path.isfile(server_csr): | ||
os.remove(server_csr) | ||
if os.path.isfile(server_pem): | ||
os.remove(server_pem) | ||
|
||
|
||
if __name__ == '__main__': | ||
# test cases | ||
sm_certs = ServerMgrCerts(os.path.expanduser('./'), | ||
os.path.expanduser('./smgrcerts.log')) | ||
sm_private_key, sm_cert = sm_certs.create_sm_ca_cert() | ||
server = {'id':'server1'} | ||
server_private_key, _, server_cert = sm_certs.create_server_cert(server) | ||
server = {'id':'server2'} | ||
server_private_key, _, server_cert = sm_certs.create_server_cert(server, force=True) | ||
sm_private_key, sm_cert = sm_certs.create_sm_ca_cert(force=True) | ||
server = {'id':'server1'} | ||
server_private_key, _, server_cert = sm_certs.create_server_cert(server, force=True) | ||
server = {'id':'server2'} | ||
server_private_key, _, server_cert = sm_certs.create_server_cert(server, force=True) | ||
server = {'id':'server1'} | ||
sm_certs.delete_server_cert(server) | ||
server = {'id':'server2'} | ||
sm_certs.delete_server_cert(server) | ||
server = {'id':'server1'} | ||
server_private_key, _, server_cert = sm_certs.create_server_cert(server) | ||
server = {'id':'server2'} | ||
server_private_key, _, server_cert = sm_certs.create_server_cert(server) | ||
|
||
|
||
|