diff --git a/src/config/api-server/tests/test_kombu.py b/src/config/api-server/tests/test_kombu.py index a470a8abb69..1d54a316717 100644 --- a/src/config/api-server/tests/test_kombu.py +++ b/src/config/api-server/tests/test_kombu.py @@ -89,7 +89,7 @@ def Connection(self, urls, **kwargs): servers, self.port, None, self.username, self.password, - self.vhost, False) + self.vhost, 0, False) @unittest.skipIf(is_kombu_client_v1, "skipping because kombu client is older") @@ -120,7 +120,7 @@ def _drain_events(): servers, self.port, None, self.username, self.password, - self.vhost, False) + self.vhost, 0, False) _lock.wait() kc.shutdown() @@ -156,7 +156,7 @@ def _publish(args): servers, self.port, None, self.username, self.password, - self.vhost, False) + self.vhost, 0, False) gevent.sleep(0) kc.dbe_create_publish("network", [], {}) _lock.wait() diff --git a/src/config/api-server/utils.py b/src/config/api-server/utils.py index 0e9eb3cea5d..d90d047ef72 100644 --- a/src/config/api-server/utils.py +++ b/src/config/api-server/utils.py @@ -66,6 +66,7 @@ def parse_args(args_str): 'rabbit_vhost': None, 'rabbit_ha_mode': False, 'rabbit_max_pending_updates': '4096', + 'rabbit_health_check_interval': '120.0', # in seconds 'cluster_id': '', 'max_requests': 1024, 'region_name': 'RegionOne', @@ -275,6 +276,9 @@ def parse_args(args_str): parser.add_argument( "--rabbit_max_pending_updates", help="Max updates before stateful changes disallowed") + parser.add_argument( + "--rabbit_health_check_interval", + help="Interval seconds between consumer heartbeats to rabbitmq") parser.add_argument( "--cluster_id", help="Used for database keyspace separation") diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index 71782757812..0c3e156f918 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -1627,6 +1627,10 @@ def get_ifmap_health_check_interval(self): return float(self._args.ifmap_health_check_interval) # end get_ifmap_health_check_interval + def get_rabbit_health_check_interval(self): + return float(self._args.rabbit_health_check_interval) + # end get_rabbit_health_check_interval + def is_auth_disabled(self): return self._args.auth is None @@ -2466,6 +2470,7 @@ def _parse_args(self, args_str): --disc_server_port 5998 --worker_id 1 --rabbit_max_pending_updates 4096 + --rabbit_health_check_interval 120.0 --cluster_id [--auth keystone] [--ifmap_server_loc diff --git a/src/config/api-server/vnc_cfg_ifmap.py b/src/config/api-server/vnc_cfg_ifmap.py index 4d571459627..6b3e939bc60 100644 --- a/src/config/api-server/vnc_cfg_ifmap.py +++ b/src/config/api-server/vnc_cfg_ifmap.py @@ -997,7 +997,7 @@ def useragent_kv_delete(self, key): class VncServerKombuClient(VncKombuClient): def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, - **kwargs): + rabbit_health_check_interval, **kwargs): self._db_client_mgr = db_client_mgr self._sandesh = db_client_mgr._sandesh self._ifmap_db = ifmap_db @@ -1006,7 +1006,8 @@ def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db, super(VncServerKombuClient, self).__init__( rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, self._dbe_subscribe_callback, - self.config_log, **kwargs) + self.config_log, heartbeat_seconds=rabbit_health_check_interval, + **kwargs) # end __init__ @@ -1433,10 +1434,11 @@ def cassandra_client_init(): self._zk_db.master_election(cassandra_client_init) self._msgbus = VncServerKombuClient(self, rabbit_servers, - rabbit_port, self._ifmap_db, - rabbit_user, rabbit_password, - rabbit_vhost, rabbit_ha_mode, - **kwargs) + rabbit_port, self._ifmap_db, + rabbit_user, rabbit_password, + rabbit_vhost, rabbit_ha_mode, + api_svr_mgr.get_rabbit_health_check_interval(), + **kwargs) # end __init__ def _update_default_quota(self): diff --git a/src/config/common/vnc_kombu.py b/src/config/common/vnc_kombu.py index bd3f1086d61..23b7a680e85 100644 --- a/src/config/common/vnc_kombu.py +++ b/src/config/common/vnc_kombu.py @@ -43,7 +43,7 @@ def sigterm_handler(self): def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger, - **kwargs): + heartbeat_seconds=0, **kwargs): self._rabbit_ip = rabbit_ip self._rabbit_port = rabbit_port self._rabbit_user = rabbit_user @@ -53,6 +53,7 @@ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password, self._logger = logger self._publish_queue = Queue() self._conn_lock = Semaphore() + self._heartbeat_seconds = heartbeat_seconds self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout', durable=False) @@ -151,6 +152,18 @@ def _connection_watch_forever(self): connected = False # end _connection_watch_forever + def _connection_heartbeat(self): + while True: + try: + if self._conn.connected: + self._conn.heartbeat_check() + except Exception as e: + msg = 'Error in rabbitmq heartbeat greenlet: %s' %(str(e)) + self._logger(msg, level=SandeshLevel.SYS_ERR) + finally: + gevent.sleep(float(self._heartbeat_seconds/2)) + # end _connection_heartbeat + def _publisher(self): message = None connected = True @@ -194,13 +207,24 @@ def _start(self, client_name): self._connection_monitor_greenlet = vnc_greenlets.VncGreenlet( 'Kombu ' + client_name + '_ConnMon', self._connection_watch_forever) + if self._heartbeat_seconds: + self._connection_heartbeat_greenlet = vnc_greenlets.VncGreenlet( + 'Kombu ' + client_name + '_ConnHeartBeat', + self._connection_heartbeat) + else: + self._connection_heartbeat_greenlet = None def greenlets(self): - return [self._publisher_greenlet, self._connection_monitor_greenlet] + ret = [self._publisher_greenlet, self._connection_monitor_greenlet] + if self._connection_heartbeat_greenlet: + ret.append(self._connection_heartbeat_greenlet) + return ret def shutdown(self): self._publisher_greenlet.kill() self._connection_monitor_greenlet.kill() + if self._connection_heartbeat_greenlet: + self._connection_heartbeat_greenlet.kill() self._producer.close() self._consumer.close() self._delete_queue() @@ -285,12 +309,12 @@ def _parse_rabbit_hosts(self, rabbit_hosts): def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger, - **kwargs): + heartbeat_seconds=0, **kwargs): super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger, - **kwargs) + heartbeat_seconds, **kwargs) self._server_addrs = rabbit_hosts.split(',') _hosts = self._parse_rabbit_hosts(rabbit_hosts) @@ -304,7 +328,8 @@ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password, self._logger(msg, level=SandeshLevel.SYS_NOTICE) self._update_sandesh_status(ConnectionStatus.INIT) self._conn_state = ConnectionStatus.INIT - self._conn = kombu.Connection(self._urls, ssl=self._ssl_params) + self._conn = kombu.Connection(self._urls, ssl=self._ssl_params, + heartbeat=heartbeat_seconds) queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange, durable=False,