Skip to content

Commit

Permalink
Merge "Adds ssl support to rabbitmq connections"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 29, 2016
2 parents cff2ad9 + 2db709e commit d0ac986
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/config/api-server/tests/test_kombu.py
Expand Up @@ -69,7 +69,7 @@ def _url(self, server,
"skipping because kombu client is older")
def test_url_parsing(self):
check_value = []
def Connection(self, urls):
def Connection(self, urls, **kwargs):
if set(urls) != set(check_value):
raise WrongValueException("expected %s - received %s", str(check_value), str(urls))
else:
Expand Down
5 changes: 5 additions & 0 deletions src/config/api-server/utils.py
Expand Up @@ -73,6 +73,11 @@ def parse_args(args_str):
'ifmap_health_check_interval': '60', # in seconds
'stale_lock_seconds': '5', # lock but no resource past this => stale
'cloud_admin_role': _CLOUD_ADMIN_ROLE,
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
# ssl options
secopts = {
Expand Down
11 changes: 10 additions & 1 deletion src/config/api-server/vnc_cfg_api_server.py
Expand Up @@ -2583,7 +2583,16 @@ def _db_connect(self, reset_config):
rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, reset_config,
zk_server, self._args.cluster_id,
cassandra_credential=cred)
cassandra_credential=cred,
rabbit_use_ssl = self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)
self._db_conn = db_conn
# end _db_connect

Expand Down
12 changes: 8 additions & 4 deletions src/config/api-server/vnc_cfg_ifmap.py
Expand Up @@ -986,15 +986,17 @@ def walk(self, fn):

class VncServerKombuClient(VncKombuClient):
def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db,
rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode):
rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode,
**kwargs):
self._db_client_mgr = db_client_mgr
self._sandesh = db_client_mgr._sandesh
self._ifmap_db = ifmap_db
listen_port = db_client_mgr.get_server_port()
q_name = 'vnc_config.%s-%s' %(socket.gethostname(), listen_port)
super(VncServerKombuClient, self).__init__(
rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, q_name, self._dbe_subscribe_callback, self.config_log)
rabbit_ha_mode, q_name, self._dbe_subscribe_callback,
self.config_log, **kwargs)

# end __init__

Expand Down Expand Up @@ -1315,7 +1317,8 @@ def __init__(self, api_svr_mgr, ifmap_srv_ip, ifmap_srv_port, uname,
passwd, cass_srv_list,
rabbit_servers, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, reset_config=False,
zk_server_ip=None, db_prefix='', cassandra_credential=None):
zk_server_ip=None, db_prefix='', cassandra_credential=None,
**kwargs):

self._api_svr_mgr = api_svr_mgr
self._sandesh = api_svr_mgr._sandesh
Expand Down Expand Up @@ -1369,7 +1372,8 @@ def cassandra_client_init():
self._msgbus = VncServerKombuClient(self, rabbit_servers,
rabbit_port, self._ifmap_db,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode)
rabbit_vhost, rabbit_ha_mode,
**kwargs)
# end __init__

def _update_default_quota(self):
Expand Down
52 changes: 46 additions & 6 deletions src/config/common/vnc_kombu.py
Expand Up @@ -3,6 +3,7 @@
#
import re
import amqp.exceptions
from distutils.util import strtobool
import kombu
import gevent
import gevent.monkey
Expand All @@ -21,6 +22,7 @@
from pysandesh.gen_py.process_info.ttypes import ConnectionType as ConnType
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from cfgm_common import vnc_greenlets
import ssl

__all__ = "VncKombuClient"

Expand All @@ -41,7 +43,8 @@ def sigterm_handler(self):
exit()

def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
self._rabbit_ip = rabbit_ip
self._rabbit_port = rabbit_port
self._rabbit_user = rabbit_user
Expand All @@ -54,6 +57,7 @@ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,

self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
durable=False)
self._ssl_params = self._fetch_ssl_params(**kwargs)

# Register a handler for SIGTERM so that we can release the lock
# Without it, it can take several minutes before new master is elected
Expand Down Expand Up @@ -206,14 +210,48 @@ def shutdown(self):
def reset(self):
self._publish_queue = Queue()

_SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1,
"sslv23": ssl.PROTOCOL_SSLv23
}

@classmethod
def validate_ssl_version(cls, version):
version = version.lower()
try:
return cls._SSL_PROTOCOLS[version]
except KeyError:
raise RuntimeError('Invalid SSL version: {}'.format(version))

def _fetch_ssl_params(self, **kwargs):
if strtobool(str(kwargs.get('rabbit_use_ssl', False))):
ssl_params = dict()
ssl_version = kwargs.get('kombu_ssl_version', '')
keyfile = kwargs.get('kombu_ssl_keyfile', '')
certfile = kwargs.get('kombu_ssl_certfile', '')
ca_certs = kwargs.get('kombu_ssl_ca_certs', '')
if ssl_version:
ssl_params.update({'ssl_version':
self.validate_ssl_version(ssl_version)})
if keyfile:
ssl_params.update({'keyfile': keyfile})
if certfile:
ssl_params.update({'certfile': certfile})
if ca_certs:
ssl_params.update({'ca_certs': ca_certs})
ssl_params.update({'cert_reqs': ssl.CERT_REQUIRED})
return ssl_params or True
return False

class VncKombuClientV1(VncKombuClientBase):
def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
super(VncKombuClientV1, self).__init__(rabbit_ip, rabbit_port,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, subscribe_cb, logger)
q_name, subscribe_cb, logger,
**kwargs)
self._server_addrs = ["%s:%s" % (self._rabbit_ip, self._rabbit_port)]

self._conn = kombu.Connection(hostname=self._rabbit_ip,
Expand Down Expand Up @@ -247,11 +285,13 @@ def _parse_rabbit_hosts(self, rabbit_hosts):
return ret

def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, subscribe_cb, logger)
q_name, subscribe_cb, logger,
**kwargs)
self._server_addrs = rabbit_hosts.split(',')

_hosts = self._parse_rabbit_hosts(rabbit_hosts)
Expand All @@ -265,7 +305,7 @@ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
self._logger(msg, level=SandeshLevel.SYS_NOTICE)
self._update_sandesh_status(ConnectionStatus.INIT)
self._conn_state = ConnectionStatus.INIT
self._conn = kombu.Connection(self._urls)
self._conn = kombu.Connection(self._urls, ssl=self._ssl_params)
queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None
self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange,
durable=False,
Expand Down
16 changes: 15 additions & 1 deletion src/config/device-manager/device_manager/device_manager.py
Expand Up @@ -210,7 +210,16 @@ def __init__(self, args=None):
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.config_log)
self.config_log, rabbit_use_ssl =
self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)

self._cassandra = DMCassandraDB.getInstance(self, _zookeeper_client)

Expand Down Expand Up @@ -438,6 +447,11 @@ def parse_args(args_str):
'push_delay_max': '100',
'push_delay_enable': 'True',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
secopts = {
'use_certs': False,
Expand Down
16 changes: 15 additions & 1 deletion src/config/schema-transformer/to_bgp.py
Expand Up @@ -212,7 +212,16 @@ def __init__(self, args=None):
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.config_log)
self.config_log, rabbit_use_ssl =
self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)
try:
self._cassandra = SchemaTransformerDB(self, _zookeeper_client)
DBBaseST.init(self, self._sandesh.logger(), self._cassandra)
Expand Down Expand Up @@ -669,6 +678,11 @@ def parse_args(args_str):
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
'bgpaas_port_start': 50000,
'bgpaas_port_end': 50256,
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
secopts = {
'use_certs': False,
Expand Down
6 changes: 5 additions & 1 deletion src/config/svc-monitor/svc_monitor/rabbit.py
Expand Up @@ -31,7 +31,11 @@ def _connect_rabbit(self):
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.logger.log)
self.logger.log, rabbit_use_ssl = self._args.rabbit_use_ssl,
kombu_ssl_version = self._args.kombu_ssl_version,
kombu_ssl_keyfile = self._args.kombu_ssl_keyfile,
kombu_ssl_certfile = self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs = self._args.kombu_ssl_ca_certs)

def _vnc_subscribe_callback(self, oper_info):
self._db_resync_done.wait()
Expand Down
5 changes: 5 additions & 0 deletions src/config/svc-monitor/svc_monitor/svc_monitor.py
Expand Up @@ -694,6 +694,11 @@ def parse_args(args_str):
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
'check_service_interval': '60',
'nova_endpoint_type': 'internalURL',
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
secopts = {
'use_certs': False,
Expand Down

0 comments on commit d0ac986

Please sign in to comment.