Skip to content

Commit

Permalink
Merge "config-resilience: Handle all rabbitmq producer/consumer recon…
Browse files Browse the repository at this point in the history
…nects" into R2.22-dev
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 25, 2015
2 parents fdc8c87 + 86357f8 commit fdfceee
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 42 deletions.
170 changes: 170 additions & 0 deletions src/config/api-server/tests/test_crud_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from vnc_api.gen.resource_test import *
import cfgm_common
from cfgm_common import vnc_plugin_base
from cfgm_common import imid

sys.path.append('../common/tests')
from test_utils import *
Expand Down Expand Up @@ -699,6 +700,7 @@ def err_rabbitq_pub(*args, **kwargs):
raise Exception("Faking Rabbit publish failure")

def err_rabbitq_conn(*args, **kwargs):
gevent.sleep(0.1)
raise Exception("Faking RabbitMQ connection failure")

api_server._db_conn._msgbus._producer.publish = err_rabbitq_pub
Expand Down Expand Up @@ -759,6 +761,174 @@ def err_call_async_result(*args, **kwargs):
api_server._db_conn._ifmap_db._mapclient.call_async_result = err_call_async_result
test_obj = self._create_test_object()
self.assertTill(self.ifmap_has_ident, obj=test_obj)

def test_reconnect_to_rabbit(self):
exceptions = [(FakeKombu.Connection.ConnectionException(), 'conn'),
(FakeKombu.Connection.ChannelException(), 'chan'),
(Exception(), 'generic')]

# fake problem on publish to rabbit
# restore, ensure retry and successful publish
for exc_obj, exc_type in exceptions:
obj = VirtualNetwork('%s-pub-%s' %(self.id(), exc_type))
obj.uuid = str(uuid.uuid4())
publish_captured = [False]
def err_on_publish(orig_method, *args, **kwargs):
msg = args[0]
if msg['oper'] == 'CREATE' and msg['uuid'] == obj.uuid:
publish_captured[0] = True
raise exc_obj
return orig_method(*args, **kwargs)

rabbit_producer = self._api_server._db_conn._msgbus._producer
with test_common.patch(rabbit_producer,
'publish', err_on_publish):
self._vnc_lib.virtual_network_create(obj)
self.assertTill(lambda: publish_captured[0] == True)
# unpatch err publish

self.assertTill(self.ifmap_has_ident, obj=obj)
# end exception types on publish

# fake problem on consume from rabbit
# restore, ensure retry and successful consume
for exc_obj, exc_type in exceptions:
obj = VirtualNetwork('%s-sub-%s' %(self.id(), exc_type))
obj.uuid = str(uuid.uuid4())
consume_captured = [False]
consume_test_payload = [None]
rabbit_consumer = self._api_server._db_conn._msgbus._consumer
def err_on_consume(orig_method, *args, **kwargs):
msg = orig_method()
payload = msg.payload
if payload['oper'] == 'UPDATE' and payload['uuid'] == obj.uuid:
if (consume_test_payload[0] == payload):
return msg
consume_captured[0] = True
consume_test_payload[0] = payload
rabbit_consumer.queues.put(payload, None)
raise exc_obj
return msg

with test_common.patch(rabbit_consumer.queues,
'get', err_on_consume):
# create the object to insert 'get' handler,
# update oper will test the error handling
self._vnc_lib.virtual_network_create(obj)
obj.display_name = 'test_update'
self._vnc_lib.virtual_network_update(obj)
self.assertTill(lambda: consume_captured[0] == True)
# unpatch err consume

def ifmap_has_ident_update():
ifmap_id = imid.get_ifmap_id_from_fq_name(obj.get_type(),
obj.get_fq_name())
node = FakeIfmapClient._graph.get(ifmap_id)
if not node:
return False
meta = node.get('links', {}).get('contrail:display-name',
{}).get('meta')
if not meta:
return False
if not 'test_update' in etree.tostring(meta):
return False

return True

self.assertTill(ifmap_has_ident_update)
# end exception types on consume

# fake problem on consume and publish at same time
# restore, ensure retry and successful publish + consume
obj = VirtualNetwork('%s-pub-sub' %(self.id()))
obj.uuid = str(uuid.uuid4())

msgbus = self._api_server._db_conn._msgbus
pub_greenlet = msgbus._publisher_greenlet
sub_greenlet = msgbus._connection_monitor_greenlet
setattr(pub_greenlet, 'unittest', {'name': 'producer'})
setattr(sub_greenlet, 'unittest', {'name': 'consumer'})

consume_captured = [False]
consume_test_payload = [None]
publish_connect_done = [False]
publish_captured = [False]
def err_on_consume(orig_method, *args, **kwargs):
msg = orig_method()
payload = msg.payload
if payload['oper'] == 'UPDATE' and payload['uuid'] == obj.uuid:
if (consume_test_payload[0] == payload):
return msg
consume_captured[0] = True
consume_test_payload[0] = payload
rabbit_consumer = self._api_server._db_conn._msgbus._consumer
rabbit_consumer.queues.put(payload, None)
raise exc_obj
return msg

def block_on_connect(orig_method, *args, **kwargs):
# block consumer till publisher does update,
# fake consumer connect exceptions till publisher connects fine
utvars = getattr(gevent.getcurrent(), 'unittest', None)
if utvars and utvars['name'] == 'producer':
publish_connect_done[0] = True
return orig_method(*args, **kwargs)

while not publish_captured[0]:
gevent.sleep(0.1)

while not publish_connect_done[0]:
gevent.sleep(0.1)
raise Exception('Faking connection fail')

return orig_method(*args, **kwargs)

rabbit_consumer = self._api_server._db_conn._msgbus._consumer
rabbit_conn = self._api_server._db_conn._msgbus._conn
with test_common.patch(rabbit_consumer.queues,
'get', err_on_consume):
with test_common.patch(rabbit_conn,
'connect', block_on_connect):
# create the object to insert 'get' handler,
# update oper will test the error handling
self._vnc_lib.virtual_network_create(obj)
obj.display_name = 'test_update_1'
self._vnc_lib.virtual_network_update(obj)
self.assertTill(lambda: consume_captured[0] == True)

def err_on_publish(orig_method, *args, **kwargs):
msg = args[0]
if msg['oper'] == 'UPDATE' and msg['uuid'] == obj.uuid:
publish_captured[0] = True
raise exc_obj
return orig_method(*args, **kwargs)
rabbit_producer = self._api_server._db_conn._msgbus._producer
with test_common.patch(rabbit_producer,
'publish', err_on_publish):
obj.display_name = 'test_update_2'
self._vnc_lib.virtual_network_update(obj)
self.assertTill(lambda: publish_captured[0] == True)
# unpatch err publish
# unpatch connect
# unpatch err consume

def ifmap_has_update_2():
ifmap_id = imid.get_ifmap_id_from_fq_name(obj.get_type(),
obj.get_fq_name())
node = FakeIfmapClient._graph.get(ifmap_id)
if not node:
return False
meta = node.get('links', {}).get('contrail:display-name',
{}).get('meta')
if meta is None:
return False
if not 'test_update_2' in etree.tostring(meta):
return False

return True

self.assertTill(ifmap_has_update_2)
# end test_reconnect_to_rabbit

def test_handle_trap_on_exception(self):
api_server = test_common.vnc_cfg_api_server.server
Expand Down
9 changes: 6 additions & 3 deletions src/config/common/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,9 @@ def get(self):
# end class Queue

class Connection(object):
class ConnectionException(Exception): pass
class ChannelException(Exception): pass

def __init__(self, *args, **kwargs):
pass
# end __init__
Expand Down Expand Up @@ -672,11 +675,11 @@ def drain_events(self):

@property
def connection_errors(self):
return (Exception,)
return (self.ConnectionException, )

@property
def channel_errors(self):
return (Exception, )
return (self.ChannelException, )
# end class Connection

class Consumer(object):
Expand All @@ -687,8 +690,8 @@ def __init__(self, *args, **kwargs):

def consume(self):
while True:
msg = self.queues.get()
try:
msg = self.queues.get()
for c in self.callbacks:
c(msg.payload, msg)
except Exception:
Expand Down
98 changes: 59 additions & 39 deletions src/config/common/vnc_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,45 +64,46 @@ def _reconnect(self, delete_old_q=False):
# the connection and releases the lock, so the other one can
# just wait on the lock, till it gets released
self._conn_lock.wait()
return

self._conn_lock.acquire()

msg = "RabbitMQ connection down"
self._logger(msg, level=SandeshLevel.SYS_ERR)
self._update_sandesh_status(ConnectionStatus.DOWN)
self._conn_state = ConnectionStatus.DOWN

self._conn.close()

self._conn.ensure_connection()
self._conn.connect()

self._update_sandesh_status(ConnectionStatus.UP)
self._conn_state = ConnectionStatus.UP
msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn)
self._logger(msg, level=SandeshLevel.SYS_NOTICE)

self._channel = self._conn.channel()
if delete_old_q:
# delete the old queue in first-connect context
# as db-resync would have caught up with history.
try:
bound_q = self._update_queue_obj(self._channel)
bound_q.delete()
except Exception as e:
msg = 'Unable to delete the old ampq queue: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)

self._consumer = kombu.Consumer(self._channel,
queues=self._update_queue_obj,
callbacks=[self._subscribe])
self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange)

self._conn_lock.release()
if self._conn_state == ConnectionStatus.UP:
return

with self._conn_lock:
msg = "RabbitMQ connection down"
self._logger(msg, level=SandeshLevel.SYS_ERR)
self._update_sandesh_status(ConnectionStatus.DOWN)
self._conn_state = ConnectionStatus.DOWN

self._conn.close()

self._conn.ensure_connection()
self._conn.connect()

self._update_sandesh_status(ConnectionStatus.UP)
self._conn_state = ConnectionStatus.UP
msg = 'RabbitMQ connection ESTABLISHED %s' % repr(self._conn)
self._logger(msg, level=SandeshLevel.SYS_NOTICE)

self._channel = self._conn.channel()
if delete_old_q:
# delete the old queue in first-connect context
# as db-resync would have caught up with history.
try:
bound_q = self._update_queue_obj(self._channel)
bound_q.delete()
except Exception as e:
msg = 'Unable to delete the old ampq queue: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)

self._consumer = kombu.Consumer(self._channel,
queues=self._update_queue_obj,
callbacks=[self._subscribe])
self._producer = kombu.Producer(self._channel, exchange=self.obj_upd_exchange)
# end _reconnect

def _connection_watch(self):
def _connection_watch(self, connected):
if not connected:
self._reconnect()

self.prepare_to_consume()
while True:
try:
Expand All @@ -112,10 +113,27 @@ def _connection_watch(self):
self._reconnect()
# end _connection_watch

def _connection_watch_forever(self):
connected = True
while True:
try:
self._connection_watch(connected)
except Exception as e:
msg = 'Error in rabbitmq drainer greenlet: %s' %(str(e))
self._logger(msg, level=SandeshLevel.SYS_ERR)
# avoid 'reconnect()' here as that itself might cause exception
connected = False
# end _connection_watch_forever

def _publisher(self):
message = None
connected = True
while True:
try:
if not connected:
self._reconnect()
connected = True

if not message:
# earlier was sent fine, dequeue one more
message = self._publish_queue.get()
Expand All @@ -128,8 +146,10 @@ def _publisher(self):
except self._conn.connection_errors + self._conn.channel_errors as e:
self._reconnect()
except Exception as e:
log_str = "Unknown exception in _publisher greenlet" + str(e)
log_str = "Error in rabbitmq publisher greenlet: %s" %(str(e))
self._logger(log_str, level=SandeshLevel.SYS_ERR)
# avoid 'reconnect()' here as that itself might cause exception
connected = False
# end _publisher

def _subscribe(self, body, message):
Expand All @@ -143,7 +163,7 @@ def _start(self):
self._reconnect(delete_old_q=True)

self._publisher_greenlet = gevent.spawn(self._publisher)
self._connection_monitor_greenlet = gevent.spawn(self._connection_watch)
self._connection_monitor_greenlet = gevent.spawn(self._connection_watch_forever)

def shutdown(self):
self._publisher_greenlet.kill()
Expand Down

0 comments on commit fdfceee

Please sign in to comment.