From 2db709eb78b3b9e5a32aa8c707c5b75dd395219e Mon Sep 17 00:00:00 2001 From: Rudrajit Tapadar Date: Fri, 24 Jun 2016 11:02:02 -0700 Subject: [PATCH] Adds ssl support to rabbitmq connections This change adds ssl support to rabbitmq connections in various components that talk to rabbit. Change-Id: I05637e341b113382162a58e1064e6decff21c97c Closes-Bug: #1594490 --- src/config/api-server/tests/test_kombu.py | 2 +- src/config/api-server/utils.py | 5 ++ src/config/api-server/vnc_cfg_api_server.py | 11 +++- src/config/api-server/vnc_cfg_ifmap.py | 12 +++-- src/config/common/vnc_kombu.py | 52 ++++++++++++++++--- .../device_manager/device_manager.py | 16 +++++- src/config/schema-transformer/to_bgp.py | 16 +++++- src/config/svc-monitor/svc_monitor/rabbit.py | 6 ++- .../svc-monitor/svc_monitor/svc_monitor.py | 5 ++ 9 files changed, 110 insertions(+), 15 deletions(-) diff --git a/src/config/api-server/tests/test_kombu.py b/src/config/api-server/tests/test_kombu.py index 99fa4fef516..f56aeadffc1 100644 --- a/src/config/api-server/tests/test_kombu.py +++ b/src/config/api-server/tests/test_kombu.py @@ -69,7 +69,7 @@ def _url(self, server, "skipping because kombu client is older") def test_url_parsing(self): check_value = [] - def Connection(self, urls): + def Connection(self, urls, **kwargs): if set(urls) != set(check_value): raise WrongValueException("expected %s - received %s", str(check_value), str(urls)) else: diff --git a/src/config/api-server/utils.py b/src/config/api-server/utils.py index d90fd15c798..0e9eb3cea5d 100644 --- a/src/config/api-server/utils.py +++ b/src/config/api-server/utils.py @@ -73,6 +73,11 @@ def parse_args(args_str): 'ifmap_health_check_interval': '60', # in seconds 'stale_lock_seconds': '5', # lock but no resource past this => stale 'cloud_admin_role': _CLOUD_ADMIN_ROLE, + 'rabbit_use_ssl': False, + 'kombu_ssl_version': '', + 'kombu_ssl_keyfile': '', + 'kombu_ssl_certfile': '', + 'kombu_ssl_ca_certs': '', } # ssl options secopts = { diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index 35896fba2f2..5ec3cc0b2d4 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -2458,7 +2458,16 @@ def _db_connect(self, reset_config): rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, reset_config, zk_server, self._args.cluster_id, - cassandra_credential=cred) + cassandra_credential=cred, + rabbit_use_ssl = self._args.rabbit_use_ssl, + kombu_ssl_version = + self._args.kombu_ssl_version, + kombu_ssl_keyfile = + self._args.kombu_ssl_keyfile, + kombu_ssl_certfile = + self._args.kombu_ssl_certfile, + kombu_ssl_ca_certs = + self._args.kombu_ssl_ca_certs) self._db_conn = db_conn # end _db_connect diff --git a/src/config/api-server/vnc_cfg_ifmap.py b/src/config/api-server/vnc_cfg_ifmap.py index 8d9737506ce..ee8d2e0909b 100644 --- a/src/config/api-server/vnc_cfg_ifmap.py +++ b/src/config/api-server/vnc_cfg_ifmap.py @@ -986,7 +986,8 @@ def walk(self, fn): class VncServerKombuClient(VncKombuClient): def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db, - rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode): + rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, + **kwargs): self._db_client_mgr = db_client_mgr self._sandesh = db_client_mgr._sandesh self._ifmap_db = ifmap_db @@ -994,7 +995,8 @@ def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db, q_name = 'vnc_config.%s-%s' %(socket.gethostname(), listen_port) super(VncServerKombuClient, self).__init__( rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, - rabbit_ha_mode, q_name, self._dbe_subscribe_callback, self.config_log) + rabbit_ha_mode, q_name, self._dbe_subscribe_callback, + self.config_log, **kwargs) # end __init__ @@ -1315,7 +1317,8 @@ def __init__(self, api_svr_mgr, ifmap_srv_ip, ifmap_srv_port, uname, passwd, cass_srv_list, rabbit_servers, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, reset_config=False, - zk_server_ip=None, db_prefix='', cassandra_credential=None): + zk_server_ip=None, db_prefix='', cassandra_credential=None, + **kwargs): self._api_svr_mgr = api_svr_mgr self._sandesh = api_svr_mgr._sandesh @@ -1376,7 +1379,8 @@ def cassandra_client_init(): self._msgbus = VncServerKombuClient(self, rabbit_servers, rabbit_port, self._ifmap_db, rabbit_user, rabbit_password, - rabbit_vhost, rabbit_ha_mode) + rabbit_vhost, rabbit_ha_mode, + **kwargs) # end __init__ def _update_default_quota(self): diff --git a/src/config/common/vnc_kombu.py b/src/config/common/vnc_kombu.py index 5e5f6ae1e32..58ccaff9d0d 100644 --- a/src/config/common/vnc_kombu.py +++ b/src/config/common/vnc_kombu.py @@ -3,6 +3,7 @@ # import re import amqp.exceptions +from distutils.util import strtobool import kombu import gevent import gevent.monkey @@ -21,6 +22,7 @@ from pysandesh.gen_py.process_info.ttypes import ConnectionType as ConnType from pysandesh.gen_py.sandesh.ttypes import SandeshLevel from cfgm_common import vnc_greenlets +import ssl __all__ = "VncKombuClient" @@ -41,7 +43,8 @@ def sigterm_handler(self): exit() def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, - rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger): + rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger, + **kwargs): self._rabbit_ip = rabbit_ip self._rabbit_port = rabbit_port self._rabbit_user = rabbit_user @@ -54,6 +57,7 @@ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout', durable=False) + self._ssl_params = self._fetch_ssl_params(**kwargs) # Register a handler for SIGTERM so that we can release the lock # Without it, it can take several minutes before new master is elected @@ -206,14 +210,48 @@ def shutdown(self): def reset(self): self._publish_queue = Queue() + _SSL_PROTOCOLS = { + "tlsv1": ssl.PROTOCOL_TLSv1, + "sslv23": ssl.PROTOCOL_SSLv23 + } + + @classmethod + def validate_ssl_version(cls, version): + version = version.lower() + try: + return cls._SSL_PROTOCOLS[version] + except KeyError: + raise RuntimeError('Invalid SSL version: {}'.format(version)) + + def _fetch_ssl_params(self, **kwargs): + if strtobool(str(kwargs.get('rabbit_use_ssl', False))): + ssl_params = dict() + ssl_version = kwargs.get('kombu_ssl_version', '') + keyfile = kwargs.get('kombu_ssl_keyfile', '') + certfile = kwargs.get('kombu_ssl_certfile', '') + ca_certs = kwargs.get('kombu_ssl_ca_certs', '') + if ssl_version: + ssl_params.update({'ssl_version': + self.validate_ssl_version(ssl_version)}) + if keyfile: + ssl_params.update({'keyfile': keyfile}) + if certfile: + ssl_params.update({'certfile': certfile}) + if ca_certs: + ssl_params.update({'ca_certs': ca_certs}) + ssl_params.update({'cert_reqs': ssl.CERT_REQUIRED}) + return ssl_params or True + return False class VncKombuClientV1(VncKombuClientBase): def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, - rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger): + rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger, + **kwargs): super(VncKombuClientV1, self).__init__(rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, - q_name, subscribe_cb, logger) + q_name, subscribe_cb, logger, + **kwargs) self._server_addrs = ["%s:%s" % (self._rabbit_ip, self._rabbit_port)] self._conn = kombu.Connection(hostname=self._rabbit_ip, @@ -247,11 +285,13 @@ def _parse_rabbit_hosts(self, rabbit_hosts): return ret def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, - rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger): + rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger, + **kwargs): super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, - q_name, subscribe_cb, logger) + q_name, subscribe_cb, logger, + **kwargs) self._server_addrs = rabbit_hosts.split(',') _hosts = self._parse_rabbit_hosts(rabbit_hosts) @@ -265,7 +305,7 @@ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, self._logger(msg, level=SandeshLevel.SYS_NOTICE) self._update_sandesh_status(ConnectionStatus.INIT) self._conn_state = ConnectionStatus.INIT - self._conn = kombu.Connection(self._urls) + self._conn = kombu.Connection(self._urls, ssl=self._ssl_params) queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange, durable=False, diff --git a/src/config/device-manager/device_manager/device_manager.py b/src/config/device-manager/device_manager/device_manager.py index dfcd81fb628..006ebd64991 100644 --- a/src/config/device-manager/device_manager/device_manager.py +++ b/src/config/device-manager/device_manager/device_manager.py @@ -210,7 +210,16 @@ def __init__(self, args=None): rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, self._vnc_subscribe_callback, - self.config_log) + self.config_log, rabbit_use_ssl = + self._args.rabbit_use_ssl, + kombu_ssl_version = + self._args.kombu_ssl_version, + kombu_ssl_keyfile = + self._args.kombu_ssl_keyfile, + kombu_ssl_certfile = + self._args.kombu_ssl_certfile, + kombu_ssl_ca_certs = + self._args.kombu_ssl_ca_certs) self._cassandra = DMCassandraDB.getInstance(self, _zookeeper_client) @@ -438,6 +447,11 @@ def parse_args(args_str): 'push_delay_max': '100', 'push_delay_enable': 'True', 'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(), + 'rabbit_use_ssl': False, + 'kombu_ssl_version': '', + 'kombu_ssl_keyfile': '', + 'kombu_ssl_certfile': '', + 'kombu_ssl_ca_certs': '', } secopts = { 'use_certs': False, diff --git a/src/config/schema-transformer/to_bgp.py b/src/config/schema-transformer/to_bgp.py index b5cacfa7c2b..e2637e6342c 100644 --- a/src/config/schema-transformer/to_bgp.py +++ b/src/config/schema-transformer/to_bgp.py @@ -208,7 +208,16 @@ def __init__(self, args=None): rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, self._vnc_subscribe_callback, - self.config_log) + self.config_log, rabbit_use_ssl = + self._args.rabbit_use_ssl, + kombu_ssl_version = + self._args.kombu_ssl_version, + kombu_ssl_keyfile = + self._args.kombu_ssl_keyfile, + kombu_ssl_certfile = + self._args.kombu_ssl_certfile, + kombu_ssl_ca_certs = + self._args.kombu_ssl_ca_certs) try: self._cassandra = SchemaTransformerDB(self, _zookeeper_client) DBBaseST.init(self, self._sandesh.logger(), self._cassandra) @@ -660,6 +669,11 @@ def parse_args(args_str): 'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(), 'bgpaas_port_start': 50000, 'bgpaas_port_end': 50256, + 'rabbit_use_ssl': False, + 'kombu_ssl_version': '', + 'kombu_ssl_keyfile': '', + 'kombu_ssl_certfile': '', + 'kombu_ssl_ca_certs': '', } secopts = { 'use_certs': False, diff --git a/src/config/svc-monitor/svc_monitor/rabbit.py b/src/config/svc-monitor/svc_monitor/rabbit.py index a75fe7655fa..95884b7a3fd 100644 --- a/src/config/svc-monitor/svc_monitor/rabbit.py +++ b/src/config/svc-monitor/svc_monitor/rabbit.py @@ -31,7 +31,11 @@ def _connect_rabbit(self): rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, self._vnc_subscribe_callback, - self.logger.log) + self.logger.log, rabbit_use_ssl = self._args.rabbit_use_ssl, + kombu_ssl_version = self._args.kombu_ssl_version, + kombu_ssl_keyfile = self._args.kombu_ssl_keyfile, + kombu_ssl_certfile = self._args.kombu_ssl_certfile, + kombu_ssl_ca_certs = self._args.kombu_ssl_ca_certs) def _vnc_subscribe_callback(self, oper_info): self._db_resync_done.wait() diff --git a/src/config/svc-monitor/svc_monitor/svc_monitor.py b/src/config/svc-monitor/svc_monitor/svc_monitor.py index ab0391924a7..f8c06f1711b 100644 --- a/src/config/svc-monitor/svc_monitor/svc_monitor.py +++ b/src/config/svc-monitor/svc_monitor/svc_monitor.py @@ -694,6 +694,11 @@ def parse_args(args_str): 'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(), 'check_service_interval': '60', 'nova_endpoint_type': 'internalURL', + 'rabbit_use_ssl': False, + 'kombu_ssl_version': '', + 'kombu_ssl_keyfile': '', + 'kombu_ssl_certfile': '', + 'kombu_ssl_ca_certs': '', } secopts = { 'use_certs': False,