Skip to content

Commit

Permalink
Merge "Build a uuid-fqname cache in all config daemons on start up" i…
Browse files Browse the repository at this point in the history
…nto R3.1
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Sep 15, 2016
2 parents c09303c + c6579f3 commit 0166af3
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 56 deletions.
26 changes: 10 additions & 16 deletions src/config/api-server/vnc_cfg_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2631,22 +2631,16 @@ def _db_connect(self, reset_config):
cred = None
if cassandra_user is not None and cassandra_password is not None:
cred = {'username':cassandra_user,'password':cassandra_password}
db_conn = VncDbClient(self, ifmap_ip, ifmap_port, user, passwd,
cass_server_list, rabbit_servers, rabbit_port,
rabbit_user, rabbit_password, rabbit_vhost,
rabbit_ha_mode, reset_config,
zk_server, self._args.cluster_id,
cassandra_credential=cred,
rabbit_use_ssl = self._args.rabbit_use_ssl,
kombu_ssl_version =
self._args.kombu_ssl_version,
kombu_ssl_keyfile =
self._args.kombu_ssl_keyfile,
kombu_ssl_certfile =
self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs =
self._args.kombu_ssl_ca_certs)
self._db_conn = db_conn
self._db_conn = VncDbClient(
self, ifmap_ip, ifmap_port, user, passwd, cass_server_list,
rabbit_servers, rabbit_port, rabbit_user, rabbit_password,
rabbit_vhost, rabbit_ha_mode, reset_config, zk_server,
self._args.cluster_id, cassandra_credential=cred,
rabbit_use_ssl=self._args.rabbit_use_ssl,
kombu_ssl_version=self._args.kombu_ssl_version,
kombu_ssl_keyfile= self._args.kombu_ssl_keyfile,
kombu_ssl_certfile=self._args.kombu_ssl_certfile,
kombu_ssl_ca_certs=self._args.kombu_ssl_ca_certs)
# end _db_connect

def _ensure_id_perms_present(self, obj_uuid, obj_dict):
Expand Down
35 changes: 0 additions & 35 deletions src/config/api-server/vnc_cfg_ifmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -987,41 +987,6 @@ def useragent_kv_delete(self, key):
self._useragent_kv_cf.remove(key)
# end useragent_kv_delete

def walk(self, fn):
type_to_object = {}
for obj_uuid, obj_col in self._obj_uuid_cf.get_range(
columns=['type', 'fq_name']):
try:
obj_type = json.loads(obj_col['type'])
obj_fq_name = json.loads(obj_col['fq_name'])
# prep cache to avoid n/w round-trip in db.read for ref
self.cache_uuid_to_fq_name_add(obj_uuid, obj_fq_name, obj_type)

try:
type_to_object[obj_type].append(obj_uuid)
except KeyError:
type_to_object[obj_type] = [obj_uuid]
except Exception as e:
self.config_log('Error in db walk read %s' %(str(e)),
level=SandeshLevel.SYS_ERR)
continue

walk_results = []
for obj_type, uuid_list in type_to_object.items():
try:
self.config_log('Resync: obj_type %s len %s'
%(obj_type, len(uuid_list)),
level=SandeshLevel.SYS_INFO)
result = fn(obj_type, uuid_list)
if result:
walk_results.append(result)
except Exception as e:
self.config_log('Error in db walk invoke %s' %(str(e)),
level=SandeshLevel.SYS_ERR)
continue

return walk_results
# end walk
# end class VncCassandraClient


Expand Down
50 changes: 45 additions & 5 deletions src/config/common/vnc_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
from pysandesh.gen_py.process_info.ttypes import ConnectionStatus
from pysandesh.gen_py.process_info.ttypes import ConnectionType as ConnType
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from sandesh_common.vns.constants import API_SERVER_KEYSPACE_NAME, \
CASSANDRA_DEFAULT_GC_GRACE_SECONDS
from sandesh_common.vns import constants as vns_constants
import time
from cfgm_common import jsonutils as json
import utils
Expand All @@ -43,7 +42,7 @@ def merge_dict(orig_dict, new_dict):

class VncCassandraClient(object):
# Name to ID mapping keyspace + tables
_UUID_KEYSPACE_NAME = API_SERVER_KEYSPACE_NAME
_UUID_KEYSPACE_NAME = vns_constants.API_SERVER_KEYSPACE_NAME

# TODO describe layout
_OBJ_UUID_CF_NAME = 'obj_uuid_table'
Expand Down Expand Up @@ -111,7 +110,8 @@ def _is_children(column_name):
return column_name[:9] == 'children:'

def __init__(self, server_list, db_prefix, rw_keyspaces, ro_keyspaces,
logger, generate_url=None, reset_config=False, credential=None):
logger, generate_url=None, reset_config=False, credential=None,
walk=True):
self._reset_config = reset_config
self._cache_uuid_to_fq_name = {}
if db_prefix:
Expand All @@ -138,6 +138,8 @@ def __init__(self, server_list, db_prefix, rw_keyspaces, ro_keyspaces,
self._obj_uuid_cf = self._cf_dict[self._OBJ_UUID_CF_NAME]
self._obj_fq_name_cf = self._cf_dict[self._OBJ_FQ_NAME_CF_NAME]
self._obj_shared_cf = self._cf_dict[self._OBJ_SHARED_CF_NAME]
if walk:
self.walk()
# end __init__

def get_cf(self, cf_name):
Expand Down Expand Up @@ -485,7 +487,7 @@ def _cassandra_ensure_keyspace(self, keyspace_name, cf_dict):
# TODO verify only EEXISTS
self._logger("Warning! " + str(e), level=SandeshLevel.SYS_WARN)

gc_grace_sec = CASSANDRA_DEFAULT_GC_GRACE_SECONDS
gc_grace_sec = vns_constants.CASSANDRA_DEFAULT_GC_GRACE_SECONDS

for cf_name in cf_dict:
create_cf_kwargs = cf_dict[cf_name].get('create_cf_args', {})
Expand Down Expand Up @@ -1375,3 +1377,41 @@ def _read_back_ref(self, result, obj_uuid, back_ref_obj_type, back_ref_uuid,

result['%s_back_refs' % (back_ref_obj_type)].append(back_ref_info)
# end _read_back_ref

def walk(self, fn=None):
type_to_object = {}
for obj_uuid, obj_col in self._obj_uuid_cf.get_range(
columns=['type', 'fq_name']):
try:
obj_type = json.loads(obj_col['type'])
obj_fq_name = json.loads(obj_col['fq_name'])
# prep cache to avoid n/w round-trip in db.read for ref
self.cache_uuid_to_fq_name_add(obj_uuid, obj_fq_name, obj_type)

try:
type_to_object[obj_type].append(obj_uuid)
except KeyError:
type_to_object[obj_type] = [obj_uuid]
except Exception as e:
self._logger('Error in db walk read %s' %(str(e)),
level=SandeshLevel.SYS_ERR)
continue

if fn is None:
return []
walk_results = []
for obj_type, uuid_list in type_to_object.items():
try:
self._logger('DB walk: obj_type %s len %s'
%(obj_type, len(uuid_list)),
level=SandeshLevel.SYS_INFO)
result = fn(obj_type, uuid_list)
if result:
walk_results.append(result)
except Exception as e:
self._logger('Error in db walk invoke %s' %(str(e)),
level=SandeshLevel.SYS_ERR)
continue

return walk_results
# end walk
3 changes: 3 additions & 0 deletions src/config/device-manager/device_manager/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,8 @@ def _vnc_subscribe_callback(self, oper_info):
if oper_info['oper'] == 'CREATE':
obj_dict = oper_info['obj_dict']
obj_id = obj_dict['uuid']
self._cassandra.cache_uuid_to_fq_name_add(
obj_id, obj_dict['fq_name'], obj_type)
obj = obj_class.locate(obj_id, obj_dict)
dependency_tracker = DependencyTracker(
DBBaseDM.get_obj_type_map(), self._REACTION_MAP)
Expand Down Expand Up @@ -340,6 +342,7 @@ def _vnc_subscribe_callback(self, oper_info):
set(ids))
elif oper_info['oper'] == 'DELETE':
obj_id = oper_info['uuid']
self._cassandra.cache_uuid_to_fq_name_del(obj_id)
obj = obj_class.get(obj_id)
if obj is None:
return
Expand Down
3 changes: 3 additions & 0 deletions src/config/schema-transformer/to_bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def _vnc_subscribe_callback(self, oper_info):
if oper == 'CREATE':
obj_dict = oper_info['obj_dict']
obj_fq_name = ':'.join(obj_dict['fq_name'])
self._cassandra.cache_uuid_to_fq_name_add(
obj_id, obj_dict['fq_name'], obj_type)
obj = obj_class.locate(obj_fq_name)
if obj is None:
self.config_log('%s id %s fq_name %s not found' % (
Expand Down Expand Up @@ -304,6 +306,7 @@ def _vnc_subscribe_callback(self, oper_info):
set(dependency_tracker.resources[resource]) |
set(ids))
elif oper == 'DELETE':
self._cassandra.cache_uuid_to_fq_name_del(obj_id)
obj = obj_class.get_by_uuid(obj_id)
if obj is None:
return
Expand Down
3 changes: 3 additions & 0 deletions src/config/svc-monitor/svc_monitor/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def _vnc_subscribe_actions(self, oper_info):
if oper_info['oper'] == 'CREATE':
obj_dict = oper_info['obj_dict']
obj_id = oper_info['uuid']
DBBaseSM._cassandra.cache_uuid_to_fq_name_add(
obj_id, obj_dict['fq_name'], obj_type)
obj = obj_class.locate(obj_id)
if obj is not None:
dependency_tracker = DependencyTracker(
Expand Down Expand Up @@ -90,6 +92,7 @@ def _vnc_subscribe_actions(self, oper_info):
set(ids))
elif oper_info['oper'] == 'DELETE':
obj_id = oper_info['uuid']
DBBaseSM._cassandra.cache_uuid_to_fq_name_del(obj_id)
obj = obj_class.get(obj_id)
if obj is None:
return
Expand Down

0 comments on commit 0166af3

Please sign in to comment.