Skip to content

Commit

Permalink
config-resilience: Handle all rabbitmq producer/consumer reconnects
Browse files Browse the repository at this point in the history
Improve connection handling with rabbit such that
1. The producer and consumer greenlets never die
2. Use context manager for semaphore and handle fail while wait
3. Log appropriately on these events.

Add unit tests to excercise these paths.

Closes-Bug: #1467000
Change-Id: If609a17b97039932d06ab70b40fee6dbdee624f3
(cherry picked from commit eac2914)
  • Loading branch information
Hampapur Ajay authored and sbalineni committed Feb 12, 2016
1 parent 8f3a788 commit 014e76f
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 42 deletions.
170 changes: 170 additions & 0 deletions src/config/api-server/tests/test_crud_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from vnc_api.gen.resource_test import *
import cfgm_common
from cfgm_common import vnc_plugin_base
from cfgm_common import imid

sys.path.append('../common/tests')
from test_utils import *
Expand Down Expand Up @@ -705,6 +706,7 @@ def err_rabbitq_pub(*args, **kwargs):
raise Exception("Faking Rabbit publish failure")

def err_rabbitq_conn(*args, **kwargs):
gevent.sleep(0.1)
raise Exception("Faking RabbitMQ connection failure")

api_server._db_conn._msgbus._producer.publish = err_rabbitq_pub
Expand Down Expand Up @@ -765,6 +767,174 @@ def err_call_async_result(*args, **kwargs):
api_server._db_conn._ifmap_db._mapclient.call_async_result = err_call_async_result
test_obj = self._create_test_object()
self.assertTill(self.ifmap_has_ident, obj=test_obj)

def test_reconnect_to_rabbit(self):
exceptions = [(FakeKombu.Connection.ConnectionException(), 'conn'),
(FakeKombu.Connection.ChannelException(), 'chan'),
(Exception(), 'generic')]

# fake problem on publish to rabbit
# restore, ensure retry and successful publish
for exc_obj, exc_type in exceptions:
obj = VirtualNetwork('%s-pub-%s' %(self.id(), exc_type))
obj.uuid = str(uuid.uuid4())
publish_captured = [False]
def err_on_publish(orig_method, *args, **kwargs):
msg = args[0]
if msg['oper'] == 'CREATE' and msg['uuid'] == obj.uuid:
publish_captured[0] = True
raise exc_obj
return orig_method(*args, **kwargs)

rabbit_producer = self._api_server._db_conn._msgbus._producer
with test_common.patch(rabbit_producer,
'publish', err_on_publish):
self._vnc_lib.virtual_network_create(obj)
self.assertTill(lambda: publish_captured[0] == True)
# unpatch err publish

self.assertTill(self.ifmap_has_ident, obj=obj)
# end exception types on publish

# fake problem on consume from rabbit
# restore, ensure retry and successful consume
for exc_obj, exc_type in exceptions:
obj = VirtualNetwork('%s-sub-%s' %(self.id(), exc_type))
obj.uuid = str(uuid.uuid4())
consume_captured = [False]
consume_test_payload = [None]
rabbit_consumer = self._api_server._db_conn._msgbus._consumer
def err_on_consume(orig_method, *args, **kwargs):
msg = orig_method()
payload = msg.payload
if payload['oper'] == 'UPDATE' and payload['uuid'] == obj.uuid:
if (consume_test_payload[0] == payload):
return msg
consume_captured[0] = True
consume_test_payload[0] = payload
rabbit_consumer.queues.put(payload, None)
raise exc_obj
return msg

with test_common.patch(rabbit_consumer.queues,
'get', err_on_consume):
# create the object to insert 'get' handler,
# update oper will test the error handling
self._vnc_lib.virtual_network_create(obj)
obj.display_name = 'test_update'
self._vnc_lib.virtual_network_update(obj)
self.assertTill(lambda: consume_captured[0] == True)
# unpatch err consume

def ifmap_has_ident_update():
ifmap_id = imid.get_ifmap_id_from_fq_name(obj.get_type(),
obj.get_fq_name())
node = FakeIfmapClient._graph.get(ifmap_id)
if not node:
return False
meta = node.get('links', {}).get('contrail:display-name',
{}).get('meta')
if not meta:
return False
if not 'test_update' in etree.tostring(meta):
return False

return True

self.assertTill(ifmap_has_ident_update)
# end exception types on consume

# fake problem on consume and publish at same time
# restore, ensure retry and successful publish + consume
obj = VirtualNetwork('%s-pub-sub' %(self.id()))
obj.uuid = str(uuid.uuid4())

msgbus = self._api_server._db_conn._msgbus
pub_greenlet = msgbus._publisher_greenlet
sub_greenlet = msgbus._connection_monitor_greenlet
setattr(pub_greenlet, 'unittest', {'name': 'producer'})
setattr(sub_greenlet, 'unittest', {'name': 'consumer'})

consume_captured = [False]
consume_test_payload = [None]
publish_connect_done = [False]
publish_captured = [False]
def err_on_consume(orig_method, *args, **kwargs):
msg = orig_method()
payload = msg.payload
if payload['oper'] == 'UPDATE' and payload['uuid'] == obj.uuid:
if (consume_test_payload[0] == payload):
return msg
consume_captured[0] = True
consume_test_payload[0] = payload
rabbit_consumer = self._api_server._db_conn._msgbus._consumer
rabbit_consumer.queues.put(payload, None)
raise exc_obj
return msg

def block_on_connect(orig_method, *args, **kwargs):
# block consumer till publisher does update,
# fake consumer connect exceptions till publisher connects fine
utvars = getattr(gevent.getcurrent(), 'unittest', None)
if utvars and utvars['name'] == 'producer':
publish_connect_done[0] = True
return orig_method(*args, **kwargs)

while not publish_captured[0]:
gevent.sleep(0.1)

while not publish_connect_done[0]:
gevent.sleep(0.1)
raise Exception('Faking connection fail')

return orig_method(*args, **kwargs)

rabbit_consumer = self._api_server._db_conn._msgbus._consumer
rabbit_conn = self._api_server._db_conn._msgbus._conn
with test_common.patch(rabbit_consumer.queues,
'get', err_on_consume):
with test_common.patch(rabbit_conn,
'connect', block_on_connect):
# create the object to insert 'get' handler,
# update oper will test the error handling
self._vnc_lib.virtual_network_create(obj)
obj.display_name = 'test_update_1'
self._vnc_lib.virtual_network_update(obj)
self.assertTill(lambda: consume_captured[0] == True)

def err_on_publish(orig_method, *args, **kwargs):
msg = args[0]
if msg['oper'] == 'UPDATE' and msg['uuid'] == obj.uuid:
publish_captured[0] = True
raise exc_obj
return orig_method(*args, **kwargs)
rabbit_producer = self._api_server._db_conn._msgbus._producer
with test_common.patch(rabbit_producer,
'publish', err_on_publish):
obj.display_name = 'test_update_2'
self._vnc_lib.virtual_network_update(obj)
self.assertTill(lambda: publish_captured[0] == True)
# unpatch err publish
# unpatch connect
# unpatch err consume

def ifmap_has_update_2():
ifmap_id = imid.get_ifmap_id_from_fq_name(obj.get_type(),
obj.get_fq_name())
node = FakeIfmapClient._graph.get(ifmap_id)
if not node:
return False
meta = node.get('links', {}).get('contrail:display-name',
{}).get('meta')
if meta is None:
return False
if not 'test_update_2' in etree.tostring(meta):
return False

return True

self.assertTill(ifmap_has_update_2)
# end test_reconnect_to_rabbit

def test_handle_trap_on_exception(self):
api_server = test_common.vnc_cfg_api_server.server
Expand Down
9 changes: 6 additions & 3 deletions src/config/common/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,9 @@ def get(self):
# end class Queue

class Connection(object):
class ConnectionException(Exception): pass
class ChannelException(Exception): pass

def __init__(self, *args, **kwargs):
pass
# end __init__
Expand Down Expand Up @@ -729,11 +732,11 @@ def drain_events(self):

@property
def connection_errors(self):
return (Exception,)
return (self.ConnectionException, )

@property
def channel_errors(self):
return (Exception, )
return (self.ChannelException, )
# end class Connection

class Consumer(object):
Expand All @@ -744,8 +747,8 @@ def __init__(self, *args, **kwargs):

def consume(self):
while True:
msg = self.queues.get()
try:
msg = self.queues.get()
for c in self.callbacks:
c(msg.payload, msg)
except Exception:
Expand Down
98 changes: 59 additions & 39 deletions src/config/common/vnc_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,45 +75,46 @@ def _reconnect(self, delete_old_q=False):
# the connection and releases the lock, so the other one can
# just wait on the lock, till it gets released
self._conn_lock.wait()
return

self._conn_lock.acquire()

msg = "RabbitMQ connection down"
self._logger(msg, level=SandeshLevel.SYS_ERR)
self._update_sandesh_status(ConnectionStatus.DOWN)
self._conn_state = ConnectionStatus.DOWN

self._conn.close()

self._conn.ensure_connection()
self._conn.connect()

self._update_sandesh_status(ConnectionStatus.UP)
self._conn_state = ConnectionStatus.UP
msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn)
self._logger(msg, level=SandeshLevel.SYS_NOTICE)

self._channel = self._conn.channel()
if delete_old_q:
# delete the old queue in first-connect context
# as db-resync would have caught up with history.
try:
bound_q = self._update_queue_obj(self._channel)
bound_q.delete()
except Exception as e:
msg = 'Unable to delete the old ampq queue: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)

self._consumer = kombu.Consumer(self._channel,
queues=self._update_queue_obj,
callbacks=[self._subscribe])
self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange)

self._conn_lock.release()
if self._conn_state == ConnectionStatus.UP:
return

with self._conn_lock:
msg = "RabbitMQ connection down"
self._logger(msg, level=SandeshLevel.SYS_ERR)
self._update_sandesh_status(ConnectionStatus.DOWN)
self._conn_state = ConnectionStatus.DOWN

self._conn.close()

self._conn.ensure_connection()
self._conn.connect()

self._update_sandesh_status(ConnectionStatus.UP)
self._conn_state = ConnectionStatus.UP
msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn)
self._logger(msg, level=SandeshLevel.SYS_NOTICE)

self._channel = self._conn.channel()
if delete_old_q:
# delete the old queue in first-connect context
# as db-resync would have caught up with history.
try:
bound_q = self._update_queue_obj(self._channel)
bound_q.delete()
except Exception as e:
msg = 'Unable to delete the old ampq queue: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)

self._consumer = kombu.Consumer(self._channel,
queues=self._update_queue_obj,
callbacks=[self._subscribe])
self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange)
# end _reconnect

def _connection_watch(self):
def _connection_watch(self, connected):
if not connected:
self._reconnect()

self.prepare_to_consume()
while True:
try:
Expand All @@ -123,10 +124,27 @@ def _connection_watch(self):
self._reconnect()
# end _connection_watch

def _connection_watch_forever(self):
connected = True
while True:
try:
self._connection_watch(connected)
except Exception as e:
msg = 'Error in rabbitmq drainer greenlet: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)
# avoid 'reconnect()' here as that itself might cause exception
connected = False
# end _connection_watch_forever

def _publisher(self):
message = None
connected = True
while True:
try:
if not connected:
self._reconnect()
connected = True

if not message:
# earlier was sent fine, dequeue one more
message = self._publish_queue.get()
Expand All @@ -139,8 +157,10 @@ def _publisher(self):
except self._conn.connection_errors + self._conn.channel_errors as e:
self._reconnect()
except Exception as e:
log_str = "Unknown exception in _publisher greenlet" + str(e)
log_str = "Error in rabbitmq publisher greenlet: %s" %(str(e))
self._logger(log_str, level=SandeshLevel.SYS_ERR)
# avoid 'reconnect()' here as that itself might cause exception
connected = False
# end _publisher

def _subscribe(self, body, message):
Expand All @@ -154,7 +174,7 @@ def _start(self):
self._reconnect(delete_old_q=True)

self._publisher_greenlet = gevent.spawn(self._publisher)
self._connection_monitor_greenlet = gevent.spawn(self._connection_watch)
self._connection_monitor_greenlet = gevent.spawn(self._connection_watch_forever)

def shutdown(self):
self._publisher_greenlet.kill()
Expand Down

0 comments on commit 014e76f

Please sign in to comment.