Skip to content

Commit

Permalink
[VNC API Server] Fix UVE sending method on AMQP notifications
Browse files Browse the repository at this point in the history
The method 'dbe_uve_trace' signature of class vnc_db.VncDbClient
was updated [1] but the same method of class vnc_db.VncServerKombuClient
which depends on it was not.
That fix simplifies it to directly calls
vnc_db.VncDbClient.dbe_uve_trace with AMQP message dict which have the
same signature as the method.
Also remove unused methods and factorize notification methods in
vnc_db.VncServerKombuClient class.

[1] https://review.opencontrail.org/#/c/29589/

Change-Id: I14f74952b5bab3494be23930aa840ff8aa3e7247
Closes-Bug: #1673802
  • Loading branch information
Édouard Thuleau committed Mar 21, 2017
1 parent 6d6624d commit 50364d4
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 78 deletions.
8 changes: 4 additions & 4 deletions src/config/api-server/tests/test_crud_basic.py
Expand Up @@ -1744,8 +1744,8 @@ def test_uve_trace_delete_name_from_msg(self):
uve_delete_trace_invoked = []
uuid_to_fq_name_on_delete_invoked = []
def spy_uve_trace(orig_method, *args, **kwargs):
oper = args[0].upper()
obj_uuid = args[2]
oper = kwargs['oper'].upper()
obj_uuid = kwargs['uuid']
if oper == 'DELETE' and obj_uuid == test_obj.uuid:
if not uve_delete_trace_invoked:
uve_delete_trace_invoked.append(True)
Expand All @@ -1756,9 +1756,9 @@ def assert_on_call(*args, **kwargs):
return orig_method(*args, **kwargs)
else:
return orig_method(*args, **kwargs)
with test_common.patch(db_client,
'dbe_uve_trace', spy_uve_trace):
with test_common.patch(db_client, 'dbe_uve_trace', spy_uve_trace):
self._delete_test_object(test_obj)
gevent.sleep(0.01)
self.assert_vnc_db_doesnt_have_ident(test_obj)
self.assertEqual(len(uve_delete_trace_invoked), 1,
'uve_trace not invoked on object delete')
Expand Down
2 changes: 1 addition & 1 deletion src/config/api-server/tests/test_kombu.py
Expand Up @@ -164,7 +164,7 @@ def _publish(args):
self.username, self.password, self.vhost, 0,
False)
gevent.sleep(0)
kc.dbe_create_publish("network", [], {'fq_name': ['vn1']})
kc.dbe_publish('CREATE', 'virtual_network', ['vn1'], {})
_lock.wait()

# check if message is not missed out by publish error
Expand Down
36 changes: 18 additions & 18 deletions src/config/api-server/vnc_cfg_types.py
Expand Up @@ -29,18 +29,18 @@


def _parse_rt(rt):
(prefix, asn, target) = rt.split(':')
if prefix != 'target':
raise ValueError()
target = int(target)
if not asn.isdigit():
try:
netaddr.IPAddress(asn)
except netaddr.core.AddrFormatError:
raise ValueError()
else:
asn = int(asn)
return (prefix, asn, target)
(prefix, asn, target) = rt.split(':')
if prefix != 'target':
raise ValueError()
target = int(target)
if not asn.isdigit():
try:
netaddr.IPAddress(asn)
except netaddr.core.AddrFormatError:
raise ValueError()
else:
asn = int(asn)
return (prefix, asn, target)


class ResourceDbMixin(object):
Expand Down Expand Up @@ -105,7 +105,7 @@ def post_dbe_delete(cls, id, obj_dict, db_conn):
return True, ''

@classmethod
def dbe_create_notification(cls, obj_id):
def dbe_create_notification(cls, db_conn, obj_id):
pass

@classmethod
Expand Down Expand Up @@ -332,7 +332,7 @@ def post_dbe_delete(cls, id, obj_dict, db_conn):


@classmethod
def dbe_create_notification(cls, obj_id):
def dbe_create_notification(cls, db_conn, obj_id):
ok, obj_dict = cls.dbe_read(db_conn, 'floating_ip', obj_id)
if not ok:
return
Expand Down Expand Up @@ -403,7 +403,7 @@ def post_dbe_delete(cls, id, obj_dict, db_conn):


@classmethod
def dbe_create_notification(cls, obj_id):
def dbe_create_notification(cls, db_conn, obj_id):
ok, obj_dict = cls.dbe_read(db_conn, 'alias_ip', obj_id)
if not ok:
return
Expand Down Expand Up @@ -593,7 +593,7 @@ def post_dbe_delete(cls, id, obj_dict, db_conn):
# end post_dbe_delete

@classmethod
def dbe_create_notification(cls, obj_id):
def dbe_create_notification(cls, db_conn, obj_id):
ok, obj_dict = cls.dbe_read(db_conn, 'instance_ip', obj_id)
if not ok:
return
Expand Down Expand Up @@ -1580,7 +1580,7 @@ def subnet_ip_count(cls, vn_fq_name, subnet_list):
# end subnet_ip_count

@classmethod
def dbe_create_notification(cls, obj_id):
def dbe_create_notification(cls, db_conn, obj_id):
cls.addr_mgmt.net_create_notify(obj_id)
# end dbe_create_notification

Expand Down Expand Up @@ -1793,7 +1793,7 @@ def undo():
# end pre_dbe_delete

@classmethod
def dbe_create_notification(cls, obj_id):
def dbe_create_notification(cls, db_conn, obj_id):
cls.addr_mgmt.ipam_create_notify(obj_id)
# end dbe_create_notification

Expand Down
89 changes: 34 additions & 55 deletions src/config/api-server/vnc_db.py
Expand Up @@ -53,7 +53,7 @@ def get_trace_id():
try:
req_id = gevent.getcurrent().trace_request_id
except Exception:
req_id = 'req-%s' %(str(uuid.uuid4()))
req_id = 'req-%s' % str(uuid.uuid4())
gevent.getcurrent().trace_request_id = req_id

return req_id
Expand Down Expand Up @@ -273,26 +273,10 @@ def __init__(self, db_client_mgr, rabbit_ip, rabbit_port,

# end __init__

def prepare_to_consume(self):
self._db_client_mgr.wait_for_resync_done()
# prepare_to_consume

def config_log(self, msg, level):
self._db_client_mgr.config_log(msg, level)
# end config_log

def uuid_to_fq_name(self, uuid):
self._db_client_mgr.uuid_to_fq_name(uuid)
# end uuid_to_fq_name

def dbe_uve_trace(self, oper, typ, uuid, body):
self._db_client_mgr.dbe_uve_trace(oper, typ, uuid, body)
# end dbe_uve_trace

def dbe_oper_publish_pending(self):
return self.num_pending_messages()
# end dbe_oper_publish_pending

@ignore_exceptions
def _generate_msgbus_notify_trace(self, oper_info):
req_id = oper_info.get('request-id',
Expand All @@ -313,9 +297,10 @@ def _dbe_subscribe_callback(self, oper_info):
self.config_log(msg, level=SandeshLevel.SYS_DEBUG)
trace = self._generate_msgbus_notify_trace(oper_info)

self._db_client_mgr.dbe_uve_trace(**oper_info)
if oper_info['oper'] == 'CREATE':
self._dbe_create_notification(oper_info)
if oper_info['oper'] == 'UPDATE':
elif oper_info['oper'] == 'UPDATE':
self._dbe_update_notification(oper_info)
elif oper_info['oper'] == 'DELETE':
self._dbe_delete_notification(oper_info)
Expand All @@ -330,40 +315,36 @@ def _dbe_subscribe_callback(self, oper_info):
sandesh=self._sandesh, error_msg=errmsg)
# end _dbe_subscribe_callback

def dbe_create_publish(self, obj_type, obj_id, obj_dict):
def dbe_publish(self, oper, obj_type, obj_id, fq_name, obj_dict=None):
req_id = get_trace_id()
oper_info = {'request-id': req_id,
'oper': 'CREATE',
'type': obj_type,
'uuid': obj_id,
'fq_name': obj_dict['fq_name']}
oper_info = {
'request-id': req_id,
'oper': oper,
'type': obj_type,
'uuid': obj_id,
'fq_name': fq_name,
}
if obj_dict is not None:
oper_info['obj_dict'] = obj_dict
self.publish(oper_info)
# end dbe_create_publish

def _dbe_create_notification(self, obj_info):
obj_type = obj_info['type']
obj_uuid = obj_info['uuid']
self.dbe_uve_trace("CREATE", obj_type, obj_uuid)

try:
r_class = self._db_client_mgr.get_resource_class(obj_type)
if r_class:
r_class.dbe_create_notification(obj_uuid)
r_class.dbe_create_notification(self._db_client_mgr, obj_uuid)
except Exception as e:
err_msg = ("Failed in dbe_create_notification " + str(e))
self.config_log(err_msg, level=SandeshLevel.SYS_ERR)
raise
# end _dbe_create_notification

def dbe_update_publish(self, obj_type, obj_id):
oper_info = {'oper': 'UPDATE', 'type': obj_type, 'uuid': obj_id}
self.publish(oper_info)
# end dbe_update_publish

def _dbe_update_notification(self, obj_info):
obj_type = obj_info['type']
obj_uuid = obj_info['uuid']
self.dbe_uve_trace("UPDATE", obj_type, obj_uuid)

try:
r_class = self._db_client_mgr.get_resource_class(obj_type)
Expand All @@ -375,17 +356,10 @@ def _dbe_update_notification(self, obj_info):
raise
# end _dbe_update_notification

def dbe_delete_publish(self, obj_type, obj_id, obj_dict):
oper_info = {'oper': 'DELETE', 'type': obj_type, 'uuid': obj_id,
'fq_name': obj_dict['fq_name'], 'obj_dict': obj_dict}
self.publish(oper_info)
# end dbe_delete_publish

def _dbe_delete_notification(self, obj_info):
obj_type = obj_info['type']
obj_uuid = obj_info['uuid']
obj_dict = obj_info['obj_dict']
self.dbe_uve_trace("DELETE", obj_type, obj_uuid, obj_dict)

db_client_mgr = self._db_client_mgr
db_client_mgr._object_db.cache_uuid_to_fq_name_del(obj_uuid)
Expand Down Expand Up @@ -971,13 +945,13 @@ def dbe_alloc(self, obj_type, obj_dict, uuid_requested=None):
return (True, obj_dict['uuid'])
# end dbe_alloc

def dbe_uve_trace(self, oper, type, uuid, obj_dict=None):
def dbe_uve_trace(self, oper, type, uuid, obj_dict=None, **kwargs):
if type not in self._UVEMAP:
return

if obj_dict is None:
try:
(ok, obj_dict) = self._db_client_mgr.dbe_read(type, uuid)
(ok, obj_dict) = self.dbe_read(type, uuid)
if not ok:
return
except NoIdError:
Expand Down Expand Up @@ -1093,12 +1067,13 @@ def wrapper2(self, obj_type, obj_id, obj_dict):

@dbe_trace('create')
@build_shared_index('create')
def dbe_create(self, obj_type, obj_id, obj_dict):
(ok, result) = self._object_db.object_create(obj_type, obj_id, obj_dict)
def dbe_create(self, obj_type, obj_uuid, obj_dict):
(ok, result) = self._object_db.object_create(obj_type, obj_uuid, obj_dict)

if ok:
# publish to msgbus
self._msgbus.dbe_create_publish(obj_type, obj_id, obj_dict)
self._msgbus.dbe_publish('CREATE', obj_type, obj_uuid,
obj_dict['fq_name'], obj_dict)

return (ok, result)
# end dbe_create
Expand Down Expand Up @@ -1145,12 +1120,13 @@ def dbe_is_latest(self, obj_id, tstamp):

@dbe_trace('update')
@build_shared_index('update')
def dbe_update(self, obj_type, obj_id, new_obj_dict):
def dbe_update(self, obj_type, obj_uuid, new_obj_dict):
(ok, cassandra_result) = self._object_db.object_update(
obj_type, obj_id, new_obj_dict)
obj_type, obj_uuid, new_obj_dict)

# publish to message bus (rabbitmq)
self._msgbus.dbe_update_publish(obj_type, obj_id)
fq_name = self.uuid_to_fq_name(obj_uuid)
self._msgbus.dbe_publish('UPDATE', obj_type, obj_uuid, fq_name)

return (ok, cassandra_result)
# end dbe_update
Expand Down Expand Up @@ -1241,12 +1217,13 @@ def dbe_list(self, obj_type, parent_uuids=None, back_ref_uuids=None,
# end dbe_list

@dbe_trace('delete')
def dbe_delete(self, obj_type, obj_id, obj_dict):
def dbe_delete(self, obj_type, obj_uuid, obj_dict):
(ok, cassandra_result) = self._object_db.object_delete(
obj_type, obj_id)
obj_type, obj_uuid)

# publish to message bus (rabbitmq)
self._msgbus.dbe_delete_publish(obj_type, obj_id, obj_dict)
self._msgbus.dbe_publish('DELETE', obj_type, obj_uuid,
obj_dict['fq_name'], obj_dict)

# finally remove mapping in zk
self.dbe_release(obj_type, obj_dict['fq_name'])
Expand All @@ -1259,7 +1236,7 @@ def dbe_release(self, obj_type, obj_fq_name):
# end dbe_release

def dbe_oper_publish_pending(self):
return self._msgbus.dbe_oper_publish_pending()
return self._msgbus.num_pending_messages()
# end dbe_oper_publish_pending

def useragent_kv_store(self, key, value):
Expand Down Expand Up @@ -1350,15 +1327,17 @@ def prop_collection_update(self, obj_type, obj_uuid, updates):
return

self._object_db.prop_collection_update(obj_type, obj_uuid, updates)
self._msgbus.dbe_update_publish(obj_type, obj_uuid)
fq_name = self.uuid_to_fq_name(obj_uuid)
self._msgbus.dbe_publish('UPDATE', obj_type, obj_uuid, fq_name)
return True, ''
# end prop_collection_update

def ref_update(self, obj_type, obj_uuid, ref_obj_type, ref_uuid, ref_data,
operation):
self._object_db.ref_update(obj_type, obj_uuid, ref_obj_type,
ref_uuid, ref_data, operation)
self._msgbus.dbe_update_publish(obj_type, obj_uuid)
ref_uuid, ref_data, operation)
fq_name = self.uuid_to_fq_name(obj_uuid)
self._msgbus.dbe_publish('UPDATE', obj_type, obj_uuid, fq_name)
# ref_update

def ref_relax_for_delete(self, obj_uuid, ref_uuid):
Expand Down

0 comments on commit 50364d4

Please sign in to comment.