Skip to content

Commit

Permalink
Merge "Enable (and strobe) heartbeat connections on rabbitmq consumer…
Browse files Browse the repository at this point in the history
… connection. Defaults to 120s (strobe every 60s)."
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 26, 2016
2 parents 113ba3e + f5df227 commit b725910
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/config/api-server/tests/test_kombu.py
Expand Up @@ -89,7 +89,7 @@ def Connection(self, urls, **kwargs):
servers, self.port,
None, self.username,
self.password,
self.vhost, False)
self.vhost, 0, False)

@unittest.skipIf(is_kombu_client_v1,
"skipping because kombu client is older")
Expand Down Expand Up @@ -120,7 +120,7 @@ def _drain_events():
servers, self.port,
None, self.username,
self.password,
self.vhost, False)
self.vhost, 0, False)
_lock.wait()
kc.shutdown()

Expand Down Expand Up @@ -156,7 +156,7 @@ def _publish(args):
servers, self.port,
None, self.username,
self.password,
self.vhost, False)
self.vhost, 0, False)
gevent.sleep(0)
kc.dbe_create_publish("network", [], {})
_lock.wait()
Expand Down
4 changes: 4 additions & 0 deletions src/config/api-server/utils.py
Expand Up @@ -66,6 +66,7 @@ def parse_args(args_str):
'rabbit_vhost': None,
'rabbit_ha_mode': False,
'rabbit_max_pending_updates': '4096',
'rabbit_health_check_interval': '120.0', # in seconds
'cluster_id': '',
'max_requests': 1024,
'region_name': 'RegionOne',
Expand Down Expand Up @@ -275,6 +276,9 @@ def parse_args(args_str):
parser.add_argument(
"--rabbit_max_pending_updates",
help="Max updates before stateful changes disallowed")
parser.add_argument(
"--rabbit_health_check_interval",
help="Interval seconds between consumer heartbeats to rabbitmq")
parser.add_argument(
"--cluster_id",
help="Used for database keyspace separation")
Expand Down
5 changes: 5 additions & 0 deletions src/config/api-server/vnc_cfg_api_server.py
Expand Up @@ -1627,6 +1627,10 @@ def get_ifmap_health_check_interval(self):
return float(self._args.ifmap_health_check_interval)
# end get_ifmap_health_check_interval

def get_rabbit_health_check_interval(self):
return float(self._args.rabbit_health_check_interval)
# end get_rabbit_health_check_interval

def is_auth_disabled(self):
return self._args.auth is None

Expand Down Expand Up @@ -2466,6 +2470,7 @@ def _parse_args(self, args_str):
--disc_server_port 5998
--worker_id 1
--rabbit_max_pending_updates 4096
--rabbit_health_check_interval 120.0
--cluster_id <testbed-name>
[--auth keystone]
[--ifmap_server_loc
Expand Down
14 changes: 8 additions & 6 deletions src/config/api-server/vnc_cfg_ifmap.py
Expand Up @@ -997,7 +997,7 @@ def useragent_kv_delete(self, key):
class VncServerKombuClient(VncKombuClient):
def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db,
rabbit_user, rabbit_password, rabbit_vhost, rabbit_ha_mode,
**kwargs):
rabbit_health_check_interval, **kwargs):
self._db_client_mgr = db_client_mgr
self._sandesh = db_client_mgr._sandesh
self._ifmap_db = ifmap_db
Expand All @@ -1006,7 +1006,8 @@ def __init__(self, db_client_mgr, rabbit_ip, rabbit_port, ifmap_db,
super(VncServerKombuClient, self).__init__(
rabbit_ip, rabbit_port, rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, q_name, self._dbe_subscribe_callback,
self.config_log, **kwargs)
self.config_log, heartbeat_seconds=rabbit_health_check_interval,
**kwargs)

# end __init__

Expand Down Expand Up @@ -1433,10 +1434,11 @@ def cassandra_client_init():
self._zk_db.master_election(cassandra_client_init)

self._msgbus = VncServerKombuClient(self, rabbit_servers,
rabbit_port, self._ifmap_db,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
**kwargs)
rabbit_port, self._ifmap_db,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
api_svr_mgr.get_rabbit_health_check_interval(),
**kwargs)
# end __init__

def _update_default_quota(self):
Expand Down
35 changes: 30 additions & 5 deletions src/config/common/vnc_kombu.py
Expand Up @@ -43,7 +43,7 @@ def sigterm_handler(self):

def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
heartbeat_seconds=0, **kwargs):
self._rabbit_ip = rabbit_ip
self._rabbit_port = rabbit_port
self._rabbit_user = rabbit_user
Expand All @@ -53,6 +53,7 @@ def __init__(self, rabbit_ip, rabbit_port, rabbit_user, rabbit_password,
self._logger = logger
self._publish_queue = Queue()
self._conn_lock = Semaphore()
self._heartbeat_seconds = heartbeat_seconds

self.obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
durable=False)
Expand Down Expand Up @@ -151,6 +152,18 @@ def _connection_watch_forever(self):
connected = False
# end _connection_watch_forever

def _connection_heartbeat(self):
while True:
try:
if self._conn.connected:
self._conn.heartbeat_check()
except Exception as e:
msg = 'Error in rabbitmq heartbeat greenlet: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)
finally:
gevent.sleep(float(self._heartbeat_seconds/2))
# end _connection_heartbeat

def _publisher(self):
message = None
connected = True
Expand Down Expand Up @@ -194,13 +207,24 @@ def _start(self, client_name):
self._connection_monitor_greenlet = vnc_greenlets.VncGreenlet(
'Kombu ' + client_name + '_ConnMon',
self._connection_watch_forever)
if self._heartbeat_seconds:
self._connection_heartbeat_greenlet = vnc_greenlets.VncGreenlet(
'Kombu ' + client_name + '_ConnHeartBeat',
self._connection_heartbeat)
else:
self._connection_heartbeat_greenlet = None

def greenlets(self):
return [self._publisher_greenlet, self._connection_monitor_greenlet]
ret = [self._publisher_greenlet, self._connection_monitor_greenlet]
if self._connection_heartbeat_greenlet:
ret.append(self._connection_heartbeat_greenlet)
return ret

def shutdown(self):
self._publisher_greenlet.kill()
self._connection_monitor_greenlet.kill()
if self._connection_heartbeat_greenlet:
self._connection_heartbeat_greenlet.kill()
self._producer.close()
self._consumer.close()
self._delete_queue()
Expand Down Expand Up @@ -285,12 +309,12 @@ def _parse_rabbit_hosts(self, rabbit_hosts):

def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, q_name, subscribe_cb, logger,
**kwargs):
heartbeat_seconds=0, **kwargs):
super(VncKombuClientV2, self).__init__(rabbit_hosts, rabbit_port,
rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode,
q_name, subscribe_cb, logger,
**kwargs)
heartbeat_seconds, **kwargs)
self._server_addrs = rabbit_hosts.split(',')

_hosts = self._parse_rabbit_hosts(rabbit_hosts)
Expand All @@ -304,7 +328,8 @@ def __init__(self, rabbit_hosts, rabbit_port, rabbit_user, rabbit_password,
self._logger(msg, level=SandeshLevel.SYS_NOTICE)
self._update_sandesh_status(ConnectionStatus.INIT)
self._conn_state = ConnectionStatus.INIT
self._conn = kombu.Connection(self._urls, ssl=self._ssl_params)
self._conn = kombu.Connection(self._urls, ssl=self._ssl_params,
heartbeat=heartbeat_seconds)
queue_args = {"x-ha-policy": "all"} if rabbit_ha_mode else None
self._update_queue_obj = kombu.Queue(q_name, self.obj_upd_exchange,
durable=False,
Expand Down

0 comments on commit b725910

Please sign in to comment.