Skip to content

Commit

Permalink
Enable (and strobe) heartbeat connections on rabbitmq
Browse files Browse the repository at this point in the history
consumer connection. Defaults to 120s (strobe every 60s).

Closes-Bug: 1602071

Change-Id: Ib2e1f7d2b103c4846f60c39c07c5815bd5218f7d
  • Loading branch information
Hampapur Ajay committed Aug 23, 2016
1 parent 6a1592c commit f5df227
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/config/api-server/tests/test_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def Connection(self, urls, **kwargs):
servers, self.port,
None, self.username,
self.password,
self.vhost, False)
self.vhost, 0, False)

@unittest.skipIf(is_kombu_client_v1,
"skipping because kombu client is older")
Expand Down Expand Up @@ -120,7 +120,7 @@ def _drain_events():
servers, self.port,
None, self.username,
self.password,
self.vhost, False)
self.vhost, 0, False)
_lock.wait()
kc.shutdown()

Expand Down Expand Up @@ -156,7 +156,7 @@ def _publish(args):
servers, self.port,
None, self.username,
self.password,
self.vhost, False)
self.vhost, 0, False)
gevent.sleep(0)
kc.dbe_create_publish("network", [], {})
_lock.wait()
Expand Down
4 changes: 4 additions & 0 deletions src/config/api-server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def parse_args(args_str):
'rabbit_vhost': None,
'rabbit_ha_mode': False,
'rabbit_max_pending_updates': '4096',
'rabbit_health_check_interval': '120.0', # in seconds
'cluster_id': '',
'max_requests': 1024,
'region_name': 'RegionOne',
Expand Down Expand Up @@ -275,6 +276,9 @@ def parse_args(args_str):
parser.add_argument(
"--rabbit_max_pending_updates",
help="Max updates before stateful changes disallowed")
parser.add_argument(
"--rabbit_health_check_interval",
help="Interval seconds between consumer heartbeats to rabbitmq")
parser.add_argument(
"--cluster_id",
help="Used for database keyspace separation")
Expand Down
5 changes: 5 additions & 0 deletions src/config/api-server/vnc_cfg_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,10 @@ def get_ifmap_health_check_interval(self):
return float(self._args.ifmap_health_check_interval)
# end get_ifmap_health_check_interval

def get_rabbit_health_check_interval(self):
return float(self._args.rabbit_health_check_interval)
# end get_rabbit_health_check_interval

def is_auth_disabled(self):
return self._args.auth is None

Expand Down Expand Up @@ -2466,6 +2470,7 @@ def _parse_args(self, args_str):
--disc_server_port 5998
--worker_id 1
--rabbit_max_pending_updates 4096
--rabbit_health_check_interval 120.0
--cluster_id <testbed-name>
[--auth keystone]
[--ifmap_server_loc
Expand Down
14 changes: 8 additions & 6 deletions src/config/api-server/vnc_cfg_ifmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ def useragent_kv_delete(self, key):
class VncServerKombuClient(VncKombuClient):
def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db,
rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode,
**kwargs):
rabbit_health_check_interval, **kwargs):
self._db_client_mgr = db_client_mgr
self._sandesh = db_client_mgr._sandesh
self._ifmap_db = ifmap_db
Expand All @@ -1006,7 +1006,8 @@ def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db,
super(VncServerKombuClient, self).__init__(
rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, q_name, self._dbe_subscribe_callback,
self.config_log, **kwargs)
self.config_log, heartbeat_seconds=rabbit_health_check_interval,
**kwargs)

# end __init__

Expand Down Expand Up @@ -1433,10 +1434,11 @@ def cassandra_client_init():
self._zk_db.master_election(cassandra_client_init)

self._msgbus = VncServerKombuClient(self, rabbit_servers,
rabbit_port, self._ifmap_db,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
**kwargs)
rabbit_port, self._ifmap_db,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
api_svr_mgr.get_rabbit_health_check_interval(),
**kwargs)
# end __init__

def _update_default_quota(self):
Expand Down
35 changes: 30 additions & 5 deletions src/config/common/vnc_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def sigterm_handler(self):

def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
heartbeat_seconds=0, **kwargs):
self._rabbit_ip = rabbit_ip
self._rabbit_port = rabbit_port
self._rabbit_user = rabbit_user
Expand All @@ -53,6 +53,7 @@ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
self._logger = logger
self._publish_queue = Queue()
self._conn_lock = Semaphore()
self._heartbeat_seconds = heartbeat_seconds

self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
durable=False)
Expand Down Expand Up @@ -151,6 +152,18 @@ def _connection_watch_forever(self):
connected = False
# end _connection_watch_forever

def _connection_heartbeat(self):
while True:
try:
if self._conn.connected:
self._conn.heartbeat_check()
except Exception as e:
msg = 'Error in rabbitmq heartbeat greenlet: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)
finally:
gevent.sleep(float(self._heartbeat_seconds/2))
# end _connection_heartbeat

def _publisher(self):
message = None
connected = True
Expand Down Expand Up @@ -194,13 +207,24 @@ def _start(self, client_name):
self._connection_monitor_greenlet = vnc_greenlets.VncGreenlet(
'Kombu ' + client_name + '_ConnMon',
self._connection_watch_forever)
if self._heartbeat_seconds:
self._connection_heartbeat_greenlet = vnc_greenlets.VncGreenlet(
'Kombu ' + client_name + '_ConnHeartBeat',
self._connection_heartbeat)
else:
self._connection_heartbeat_greenlet = None

def greenlets(self):
return [self._publisher_greenlet, self._connection_monitor_greenlet]
ret = [self._publisher_greenlet, self._connection_monitor_greenlet]
if self._connection_heartbeat_greenlet:
ret.append(self._connection_heartbeat_greenlet)
return ret

def shutdown(self):
self._publisher_greenlet.kill()
self._connection_monitor_greenlet.kill()
if self._connection_heartbeat_greenlet:
self._connection_heartbeat_greenlet.kill()
self._producer.close()
self._consumer.close()
self._delete_queue()
Expand Down Expand Up @@ -285,12 +309,12 @@ def _parse_rabbit_hosts(self, rabbit_hosts):

def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
heartbeat_seconds=0, **kwargs):
super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, subscribe_cb, logger,
**kwargs)
heartbeat_seconds, **kwargs)
self._server_addrs = rabbit_hosts.split(',')

_hosts = self._parse_rabbit_hosts(rabbit_hosts)
Expand All @@ -304,7 +328,8 @@ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
self._logger(msg, level=SandeshLevel.SYS_NOTICE)
self._update_sandesh_status(ConnectionStatus.INIT)
self._conn_state = ConnectionStatus.INIT
self._conn = kombu.Connection(self._urls, ssl=self._ssl_params)
self._conn = kombu.Connection(self._urls, ssl=self._ssl_params,
heartbeat=heartbeat_seconds)
queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None
self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange,
durable=False,
Expand Down

0 comments on commit f5df227

Please sign in to comment.