Skip to content

Commit

Permalink
Adds ssl support to rabbitmq connections
Browse files Browse the repository at this point in the history
This change adds ssl support to rabbitmq connections in various
components that talk to rabbit.

Change-Id: I05637e341b113382162a58e1064e6decff21c97c
Closes-Bug: #1594490
  • Loading branch information
rtapadar committed Jun 25, 2016
1 parent 4e8a992 commit 2db709e
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/config/api-server/tests/test_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def _url(self, server,
"skipping because kombu client is older")
def test_url_parsing(self):
check_value = []
def Connection(self, urls):
def Connection(self, urls, **kwargs):
if set(urls) != set(check_value):
raise WrongValueException("expected %s - received %s", str(check_value), str(urls))
else:
Expand Down
5 changes: 5 additions & 0 deletions src/config/api-server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def parse_args(args_str):
'ifmap_health_check_interval': '60', # in seconds
'stale_lock_seconds': '5', # lock but no resource past this => stale
'cloud_admin_role': _CLOUD_ADMIN_ROLE,
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
# ssl options
secopts = {
Expand Down
11 changes: 10 additions & 1 deletion src/config/api-server/vnc_cfg_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2458,7 +2458,16 @@ def _db_connect(self, reset_config):
rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, reset_config,
zk_server, self._args.cluster_id,
cassandra_credential=cred)
cassandra_credential=cred,
rabbit_use_ssl = self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)
self._db_conn = db_conn
# end _db_connect

Expand Down
12 changes: 8 additions & 4 deletions src/config/api-server/vnc_cfg_ifmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,15 +986,17 @@ def walk(self, fn):

class VncServerKombuClient(VncKombuClient):
def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db,
rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode):
rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode,
**kwargs):
self._db_client_mgr = db_client_mgr
self._sandesh = db_client_mgr._sandesh
self._ifmap_db = ifmap_db
listen_port = db_client_mgr.get_server_port()
q_name = 'vnc_config.%s-%s' %(socket.gethostname(), listen_port)
super(VncServerKombuClient, self).__init__(
rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, q_name, self._dbe_subscribe_callback, self.config_log)
rabbit_ha_mode, q_name, self._dbe_subscribe_callback,
self.config_log, **kwargs)

# end __init__

Expand Down Expand Up @@ -1315,7 +1317,8 @@ def __init__(self, api_svr_mgr, ifmap_srv_ip, ifmap_srv_port, uname,
passwd, cass_srv_list,
rabbit_servers, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, reset_config=False,
zk_server_ip=None, db_prefix='', cassandra_credential=None):
zk_server_ip=None, db_prefix='', cassandra_credential=None,
**kwargs):

self._api_svr_mgr = api_svr_mgr
self._sandesh = api_svr_mgr._sandesh
Expand Down Expand Up @@ -1376,7 +1379,8 @@ def cassandra_client_init():
self._msgbus = VncServerKombuClient(self, rabbit_servers,
rabbit_port, self._ifmap_db,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode)
rabbit_vhost, rabbit_ha_mode,
**kwargs)
# end __init__

def _update_default_quota(self):
Expand Down
52 changes: 46 additions & 6 deletions src/config/common/vnc_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
import re
import amqp.exceptions
from distutils.util import strtobool
import kombu
import gevent
import gevent.monkey
Expand All @@ -21,6 +22,7 @@
from pysandesh.gen_py.process_info.ttypes import ConnectionType as ConnType
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from cfgm_common import vnc_greenlets
import ssl

__all__ = "VncKombuClient"

Expand All @@ -41,7 +43,8 @@ def sigterm_handler(self):
exit()

def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
self._rabbit_ip = rabbit_ip
self._rabbit_port = rabbit_port
self._rabbit_user = rabbit_user
Expand All @@ -54,6 +57,7 @@ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,

self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
durable=False)
self._ssl_params = self._fetch_ssl_params(**kwargs)

# Register a handler for SIGTERM so that we can release the lock
# Without it, it can take several minutes before new master is elected
Expand Down Expand Up @@ -206,14 +210,48 @@ def shutdown(self):
def reset(self):
self._publish_queue = Queue()

_SSL_PROTOCOLS = {
"tlsv1": ssl.PROTOCOL_TLSv1,
"sslv23": ssl.PROTOCOL_SSLv23
}

@classmethod
def validate_ssl_version(cls, version):
version = version.lower()
try:
return cls._SSL_PROTOCOLS[version]
except KeyError:
raise RuntimeError('Invalid SSL version: {}'.format(version))

def _fetch_ssl_params(self, **kwargs):
if strtobool(str(kwargs.get('rabbit_use_ssl', False))):
ssl_params = dict()
ssl_version = kwargs.get('kombu_ssl_version', '')
keyfile = kwargs.get('kombu_ssl_keyfile', '')
certfile = kwargs.get('kombu_ssl_certfile', '')
ca_certs = kwargs.get('kombu_ssl_ca_certs', '')
if ssl_version:
ssl_params.update({'ssl_version':
self.validate_ssl_version(ssl_version)})
if keyfile:
ssl_params.update({'keyfile': keyfile})
if certfile:
ssl_params.update({'certfile': certfile})
if ca_certs:
ssl_params.update({'ca_certs': ca_certs})
ssl_params.update({'cert_reqs': ssl.CERT_REQUIRED})
return ssl_params or True
return False

class VncKombuClientV1(VncKombuClientBase):
def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
super(VncKombuClientV1, self).__init__(rabbit_ip, rabbit_port,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, subscribe_cb, logger)
q_name, subscribe_cb, logger,
**kwargs)
self._server_addrs = ["%s:%s" % (self._rabbit_ip, self._rabbit_port)]

self._conn = kombu.Connection(hostname=self._rabbit_ip,
Expand Down Expand Up @@ -247,11 +285,13 @@ def _parse_rabbit_hosts(self, rabbit_hosts):
return ret

def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger):
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, subscribe_cb, logger)
q_name, subscribe_cb, logger,
**kwargs)
self._server_addrs = rabbit_hosts.split(',')

_hosts = self._parse_rabbit_hosts(rabbit_hosts)
Expand All @@ -265,7 +305,7 @@ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
self._logger(msg, level=SandeshLevel.SYS_NOTICE)
self._update_sandesh_status(ConnectionStatus.INIT)
self._conn_state = ConnectionStatus.INIT
self._conn = kombu.Connection(self._urls)
self._conn = kombu.Connection(self._urls, ssl=self._ssl_params)
queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None
self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange,
durable=False,
Expand Down
16 changes: 15 additions & 1 deletion src/config/device-manager/device_manager/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,16 @@ def __init__(self, args=None):
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.config_log)
self.config_log, rabbit_use_ssl =
self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)

self._cassandra = DMCassandraDB.getInstance(self, _zookeeper_client)

Expand Down Expand Up @@ -438,6 +447,11 @@ def parse_args(args_str):
'push_delay_max': '100',
'push_delay_enable': 'True',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
secopts = {
'use_certs': False,
Expand Down
16 changes: 15 additions & 1 deletion src/config/schema-transformer/to_bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,16 @@ def __init__(self, args=None):
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.config_log)
self.config_log, rabbit_use_ssl =
self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)
try:
self._cassandra = SchemaTransformerDB(self, _zookeeper_client)
DBBaseST.init(self, self._sandesh.logger(), self._cassandra)
Expand Down Expand Up @@ -660,6 +669,11 @@ def parse_args(args_str):
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
'bgpaas_port_start': 50000,
'bgpaas_port_end': 50256,
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
secopts = {
'use_certs': False,
Expand Down
6 changes: 5 additions & 1 deletion src/config/svc-monitor/svc_monitor/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ def _connect_rabbit(self):
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, self._vnc_subscribe_callback,
self.logger.log)
self.logger.log, rabbit_use_ssl = self._args.rabbit_use_ssl,
kombu_ssl_version = self._args.kombu_ssl_version,
kombu_ssl_keyfile = self._args.kombu_ssl_keyfile,
kombu_ssl_certfile = self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs = self._args.kombu_ssl_ca_certs)

def _vnc_subscribe_callback(self, oper_info):
self._db_resync_done.wait()
Expand Down
5 changes: 5 additions & 0 deletions src/config/svc-monitor/svc_monitor/svc_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,11 @@ def parse_args(args_str):
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
'check_service_interval': '60',
'nova_endpoint_type': 'internalURL',
'rabbit_use_ssl': False,
'kombu_ssl_version': '',
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
}
secopts = {
'use_certs': False,
Expand Down

0 comments on commit 2db709e

Please sign in to comment.