Skip to content

Commit

Permalink
Schema transformer uses rabbitmq instead of ifmap
Browse files Browse the repository at this point in the history
Schema transformer now uses rabbitmq instead of ifmap.  This allows us to use
rabbitmq as central message bus across all config node daemons. Messaging
structure is simpler since rabbitmq only sends uuid of the objects being
created/updated/deleted.

Also, schema transformer now uses dependency tracker to track dependencies among
various objects.

Closes-Bug: 1470621

Change-Id: I8c35a09f11faac8d965c4556d189431f0bac47ea
(cherry picked from commit a6a73ee2575d07be1a36cb9140b6142808c391b0)
  • Loading branch information
Sachin Bansal committed Sep 14, 2015
1 parent f68e562 commit d760565
Show file tree
Hide file tree
Showing 10 changed files with 3,322 additions and 3,410 deletions.
15 changes: 9 additions & 6 deletions src/config/common/dependency_tracker.py
Expand Up @@ -4,10 +4,13 @@

"""
This file contains implementation of dependency tracker
for physical router configuration manager
for contrail config daemons
"""


# This class tracks dependencies among different objects based on a reaction map.
# Objects could be derived from DBBase. Each object has an object_type and the
# mapping from object_type to the class is specified using object_class_map
class DependencyTracker(object):

def __init__(self, object_class_map, reaction_map):
Expand All @@ -16,21 +19,21 @@ def __init__(self, object_class_map, reaction_map):
self.resources = {}
# end __init__

def _add_resource(self, obj_type, obj_uuid):
def _add_resource(self, obj_type, obj_key):
if obj_type in self.resources:
if obj_uuid in self.resources[obj_type]:
if obj_key in self.resources[obj_type]:
# already visited
return False
self.resources[obj_type].append(obj_uuid)
self.resources[obj_type].append(obj_key)
else:
self.resources[obj_type] = [obj_uuid]
self.resources[obj_type] = [obj_key]
return True
# end _add_resource

def evaluate(self, obj_type, obj, from_type='self'):
if obj_type not in self._reaction_map:
return
if not self._add_resource(obj_type, obj.uuid):
if not self._add_resource(obj_type, obj.get_key()):
return

for ref_type in self._reaction_map[obj_type][from_type]:
Expand Down
2 changes: 0 additions & 2 deletions src/config/common/tests/test_common.py
Expand Up @@ -176,8 +176,6 @@ def kill_svc_monitor(glet):

def kill_schema_transformer(glet):
glet.kill()
to_bgp.transformer.ssrc_task.kill()
to_bgp.transformer.arc_task.kill()
to_bgp.transformer.reset()

def launch_schema_transformer(api_server_ip, api_server_port):
Expand Down
149 changes: 98 additions & 51 deletions src/config/common/vnc_db.py
Expand Up @@ -18,6 +18,10 @@ class DBBase(object):
_cassandra = None
_manager = None

# objects in the database could be indexed by uuid or fq-name
# set _indexed_by_name to True in the derived class to use fq-name as index
_indexed_by_name = False

@classmethod
def init(cls, manager, logger, cassandra):
cls._logger = logger
Expand Down Expand Up @@ -45,16 +49,16 @@ def items(cls):

@classmethod
def get(cls, key):
if key in cls._dict:
return cls._dict[key]
return None
return cls._dict.get(key)
# end get

@classmethod
def locate(cls, key, *args):
if key not in cls._dict:
try:
cls._dict[key] = cls(key, *args)
obj = cls(key, *args)
cls._dict[key] = obj
return obj
except NoIdError as e:
cls._logger.debug(
"Exception %s while creating %s for %s",
Expand All @@ -63,10 +67,17 @@ def locate(cls, key, *args):
return cls._dict[key]
# end locate

def delete_obj(self):
# Override in derived class to provide additional functionality
pass

@classmethod
def delete(cls, key):
if key in cls._dict:
del cls._dict[key]
obj = cls.get(key)
if obj is None:
return
obj.delete_obj()
del cls._dict[key]
# end delete

def get_ref_uuid_from_dict(self, obj_dict, ref_name):
Expand All @@ -75,6 +86,12 @@ def get_ref_uuid_from_dict(self, obj_dict, ref_name):
else:
return None

def get_key(self):
if self._indexed_by_name:
return self.name
return self.uuid
# end get_key

def add_ref(self, ref_type, ref):
if hasattr(self, ref_type):
setattr(self, ref_type, ref)
Expand All @@ -91,77 +108,103 @@ def delete_ref(self, ref_type, ref):
ref_set.discard(ref)
# end delete_ref

def add_to_parent(self, obj_dict):
self.parent_type = obj_dict.get('parent_type')
self.parent_id = obj_dict.get('parent_uuid')
if not self.parent_type or not self.parent_id:
def add_to_parent(self, obj):
if isinstance(obj, dict):
self.parent_type = obj.get('parent_type')
else:
self.parent_type = obj.get_parent_type()
if self._indexed_by_name:
if isinstance(obj, dict):
fq_name = obj_dict.get('fq_name', [])
if fq_name:
self.parent_key = ':'.join(fq_name[:-1])
else:
return
else:
self.parent_key = obj.get_parent_fq_name_str()
else:
if isinstance(obj, dict):
self.parent_key = obj.get('parent_uuid')
else:
self.parent_key = obj.get_parent_uuid
if not self.parent_type or not self.parent_key:
return
p_obj = self.get_obj_type_map()[self.parent_type].get(self.parent_id)
p_obj = self.get_obj_type_map()[self.parent_type].get(self.parent_key)
if p_obj is not None:
p_obj.add_ref(self.obj_type, self.uuid)
p_obj.add_ref(self.obj_type, self.get_key())
# end

def remove_from_parent(self):
if not self.parent_type or not self.parent_id:
if not self.parent_type or not self.parent_key:
return
p_obj = self.get_obj_type_map()[self.parent_type].get(self.parent_id)
p_obj = self.get_obj_type_map()[self.parent_type].get(self.parent_key)
if p_obj is not None:
p_obj.delete_ref(self.obj_type, self.uuid)
p_obj.delete_ref(self.obj_type, self.get_key())

def update_single_ref(self, ref_type, obj):
refs = obj.get(ref_type+'_refs') or obj.get(ref_type+'_back_refs')
if refs:
def _get_ref_key(self, ref):
if self._indexed_by_name:
key = ':'.join(ref['to'])
else:
try:
new_id = refs[0]['uuid']
key = ref['uuid']
except KeyError:
fq_name = refs[0]['to']
new_id = self._cassandra.fq_name_to_uuid(ref_type, fq_name)
fq_name = ref['to']
key = self._cassandra.fq_name_to_uuid(ref_type, fq_name)
return key
# end _get_ref_key

def update_single_ref(self, ref_type, obj):
if isinstance(obj, dict):
refs = obj.get(ref_type+'_refs') or obj.get(ref_type+'_back_refs')
else:
new_id = None
old_id = getattr(self, ref_type, None)
if old_id == new_id:
refs = (getattr(obj, ref_type+'_refs', None) or
getattr(obj, ref_type+'_back_refs', None))

if refs:
new_key = self._get_ref_key(refs[0])
else:
new_key = None
old_key = getattr(self, ref_type, None)
if old_key == new_key:
return
ref_obj = self.get_obj_type_map()[ref_type].get(old_id)
ref_obj = self.get_obj_type_map()[ref_type].get(old_key)
if ref_obj is not None:
ref_obj.delete_ref(self.obj_type, self.uuid)
ref_obj = self.get_obj_type_map()[ref_type].get(new_id)
ref_obj.delete_ref(self.obj_type, self.get_key())
ref_obj = self.get_obj_type_map()[ref_type].get(new_key)
if ref_obj is not None:
ref_obj.add_ref(self.obj_type, self.uuid)
setattr(self, ref_type, new_id)
ref_obj.add_ref(self.obj_type, self.get_key())
setattr(self, ref_type, new_key)
# end update_single_ref

def set_children(self, ref_type, obj):
refs = obj.get(ref_type+'s')
new_refs = set()
for ref in refs or []:
try:
new_id = ref['uuid']
except KeyError:
fq_name = ref['to']
new_id = self._cassandra.fq_name_to_uuid(ref_type, fq_name)
new_refs.add(new_id)
new_key = self._get_ref_key(ref)
new_refs.add(new_key)
setattr(self, ref_type+'s', new_refs)
# end

def update_multiple_refs(self, ref_type, obj):
refs = obj.get(ref_type+'_refs') or obj.get(ref_type+'_back_refs')
if isinstance(obj, dict):
refs = obj.get(ref_type+'_refs') or obj.get(ref_type+'_back_refs')
else:
refs = (getattr(obj, ref_type+'_refs', None) or
getattr(obj, ref_type+'_back_refs', None))

new_refs = set()
for ref in refs or []:
try:
new_id = ref['uuid']
except KeyError:
fq_name = ref['to']
new_id = self._cassandra.fq_name_to_uuid(ref_type, fq_name)
new_refs.add(new_id)
new_key = self._get_ref_key(ref)
new_refs.add(new_key)
old_refs = getattr(self, ref_type+'s')
for ref_id in old_refs - new_refs:
ref_obj = self.get_obj_type_map()[ref_type].get(ref_id)
for ref_key in old_refs - new_refs:
ref_obj = self.get_obj_type_map()[ref_type].get(ref_key)
if ref_obj is not None:
ref_obj.delete_ref(self.obj_type, self.uuid)
for ref_id in new_refs - old_refs:
ref_obj = self.get_obj_type_map()[ref_type].get(ref_id)
ref_obj.delete_ref(self.obj_type, self.get_key())
for ref_key in new_refs - old_refs:
ref_obj = self.get_obj_type_map()[ref_type].get(ref_key)
if ref_obj is not None:
ref_obj.add_ref(self.obj_type, self.uuid)
ref_obj.add_ref(self.obj_type, self.get_key())
setattr(self, ref_type+'s', new_refs)
# end update_multiple_refs

Expand All @@ -174,17 +217,21 @@ def read_obj(self, uuid, obj_type=None):
return objs[0]
# end read_obj

@classmethod
def vnc_obj_from_dict(cls, obj_type, obj_dict):
cls = obj_type_to_vnc_class(obj_type, __name__)
return cls.from_dict(**obj_dict)

def read_vnc_obj(self, uuid=None, fq_name=None, obj_type=None):
if uuid is None and fq_name is None:
raise NoIdError('')
obj_type = obj_type or self.obj_type
if uuid is None:
if isinstance(fq_name, str):
if isinstance(fq_name, basestring):
fq_name = fq_name.split(':')
uuid = self._cassandra.fq_name_to_uuid(obj_type, fq_name)
obj_dict = self.read_obj(uuid, obj_type)
cls = obj_type_to_vnc_class(obj_type, __name__)
obj = cls.from_dict(**obj_dict)
obj = self.vnc_obj_from_dict(obj_type, obj_dict)
obj.clear_pending_updates()
return obj
# end read_vnc_obj
Expand Down
3 changes: 3 additions & 0 deletions src/config/common/vnc_kombu.py
Expand Up @@ -165,6 +165,9 @@ def _start(self):
self._publisher_greenlet = gevent.spawn(self._publisher)
self._connection_monitor_greenlet = gevent.spawn(self._connection_watch_forever)

def greenlets(self):
return [self._publisher_greenlet, self._connection_monitor_greenlet]

def shutdown(self):
self._publisher_greenlet.kill()
self._connection_monitor_greenlet.kill()
Expand Down
4 changes: 1 addition & 3 deletions src/config/device-manager/device_manager/device_manager.py
Expand Up @@ -227,9 +227,7 @@ def __init__(self, args=None):
pr.set_config_state()

self._db_resync_done.set()
while 1:
# Just wait indefinitely
time.sleep(5)
gevent.joinall(self._vnc_kombu.greenlets())
# end __init__

def connection_state_update(self, status, message=None):
Expand Down
1 change: 1 addition & 0 deletions src/config/schema-transformer/SConscript
Expand Up @@ -27,6 +27,7 @@ for file in setup_sources:
local_sources = [
'__init__.py',
'to_bgp.py',
'config_db.py',
'db.py',
'ifmap_view.py',
]
Expand Down

0 comments on commit d760565

Please sign in to comment.