Skip to content

Commit

Permalink
Merge "[VNC API server] Optimize sending UVEs during initializing"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 29, 2016
2 parents b93c783 + 7c2b1e3 commit cff2ad9
Showing 1 changed file with 65 additions and 92 deletions.
157 changes: 65 additions & 92 deletions src/config/api-server/vnc_cfg_ifmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,24 +1321,17 @@ def __init__(self, api_svr_mgr, ifmap_srv_ip, ifmap_srv_port, uname,
self._sandesh = api_svr_mgr._sandesh

self._UVEMAP = {
"virtual_network" : "ObjectVNTable",
"service_instance" : "ObjectSITable",
"virtual_router" : "ObjectVRouter",
"analytics_node" : "ObjectCollectorInfo",
"database_node" : "ObjectDatabaseInfo",
"config_node" : "ObjectConfigNode",
"service_chain" : "ServiceChain",
"physical_router" : "ObjectPRouter",
"virtual_network" : ("ObjectVNTable", False),
"service_instance" : ("ObjectSITable", False),
"virtual_router" : ("ObjectVRouter", True),
"analytics_node" : ("ObjectCollectorInfo", True),
"database_node" : ("ObjectDatabaseInfo", True),
"config_node" : ("ObjectConfigNode", True),
"service_chain" : ("ServiceChain", False),
"physical_router" : ("ObjectPRouter", True),
"bgp_router": ("ObjectBgpRouter", True),
}

self._UVEGLOBAL = set([
"virtual_router",
"analytics_node",
"database_node",
"config_node",
"physical_router"
])

# certificate auth
ssl_options = None
if api_svr_mgr._args.use_certs:
Expand Down Expand Up @@ -1505,10 +1498,10 @@ def match_uuid(self, obj_dict, obj_uuid):
return False
# end match_uuid

def update_subnet_uuid(self, vn_dict, do_update=False):
def update_subnet_uuid(self, vn_dict):
vn_uuid = vn_dict.get('uuid')

def _read_subnet_uuid(subnet):
def _locate_subnet_uuid(subnet):
if vn_uuid is None:
return None
pfx = subnet['subnet']['ip_prefix']
Expand All @@ -1519,23 +1512,17 @@ def _read_subnet_uuid(subnet):
try:
return self.useragent_kv_retrieve(subnet_key)
except NoUserAgentKey:
return None
return str(uuid.uuid4())

ipam_refs = vn_dict.get('network_ipam_refs', [])
updated = False
for ipam in ipam_refs:
vnsn = ipam['attr']
subnets = vnsn['ipam_subnets']
for subnet in subnets:
if subnet.get('subnet_uuid'):
continue

subnet_uuid = _read_subnet_uuid(subnet) or str(uuid.uuid4())
subnet['subnet_uuid'] = subnet_uuid
if not updated:
updated = True

if updated and do_update:
for ipam in vn_dict.get('network_ipam_refs', []):
subnets = ipam['attr']['ipam_subnets']
for subnet in [subnet for subnet in subnets
if 'subnet_uuid' not in subnet]:
subnet['subnet_uuid'] = _locate_subnet_uuid(subnet)
updated = True

if updated:
self._cassandra_db.object_update('virtual_network', vn_uuid,
vn_dict)
# end update_subnet_uuid
Expand All @@ -1545,21 +1532,17 @@ def update_bgp_router_type(self, obj_dict):
if router_type is not set.
"""
router_params = obj_dict['bgp_router_parameters']
if not router_params.get('router_type'):
if 'router_type' not in router_params:
router_type = 'router'
if router_params['vendor'] == 'contrail':
router_type = 'control-node'
router_params.update({'router_type': router_type})
obj_dict.update({'bgp_router_parameters': router_params})
obj_uuid = obj_dict.get('uuid')
self._cassandra_db.object_update('bgp_router', obj_uuid, obj_dict)
# end update_bgp_router_type

def iip_update_subnet_uuid(self, iip_dict):
""" Set the subnet uuid as instance-ip attribute """
if iip_dict.get('subnet_uuid'):
return

for vn_ref in iip_dict.get('virtual_network_refs', []):
(ok, results) = self._cassandra_db.object_read(
'virtual_network', [vn_ref['uuid']],
Expand All @@ -1586,31 +1569,33 @@ def _dbe_resync(self, obj_type, obj_uuids):
obj_fields = list(obj_class.prop_fields) + list(obj_class.ref_fields)
(ok, obj_dicts) = self._cassandra_db.object_read(
obj_type, obj_uuids, field_names=obj_fields)
uve_trace_list = []
for obj_dict in obj_dicts:
try:
obj_uuid = obj_dict['uuid']
self.dbe_uve_trace("RESYNC", obj_type, obj_uuid, obj_dict)
# TODO remove backward compat (use RT instead of VN->LR ref)
if (obj_type == 'virtual_network' and
'logical_router_refs' in obj_dict):
for router in obj_dict['logical_router_refs']:
self._cassandra_db._delete_ref(None, obj_type, obj_uuid,
uve_trace_list.append(("RESYNC", obj_type, obj_uuid, obj_dict))

if obj_type == 'virtual_network':
# TODO remove backward compat (use RT instead of VN->LR ref)
for router in obj_dict.get('logical_router_refs', []):
self._cassandra_db._delete_ref(None,
obj_type,
obj_uuid,
'logical_router',
router['uuid'])
if 'network_ipam_refs' in obj_dict:
self.update_subnet_uuid(obj_dict)

# create new perms if upgrading
if obj_dict.get('perms2') is None:
if 'perms2' not in obj_dict:
self._cassandra_db.update_perms2(obj_uuid)

if (obj_type == 'virtual_network' and
'network_ipam_refs' in obj_dict):
self.update_subnet_uuid(obj_dict, do_update=True)

if (obj_type == 'bgp_router' and
'bgp_router_parameters' in obj_dict):
'bgp_router_parameters' in obj_dict and
'router_type' not in obj_dict['bgp_router_parameters']):
self.update_bgp_router_type(obj_dict)

if obj_type == 'instance_ip':
if obj_type == 'instance_ip' and 'subnet_uuid' not in obj_dict:
self.iip_update_subnet_uuid(obj_dict)

# Ifmap alloc
Expand All @@ -1633,6 +1618,12 @@ def _dbe_resync(self, obj_type, obj_uuids):
self.config_log(tb, level=SandeshLevel.SYS_ERR)
continue
# end for all objects

# Send UVEs resync with a pool of workers
uve_workers = gevent.pool.Group()
def format_args_for_dbe_uve_trace(args):
return self.dbe_uve_trace(*args)
uve_workers.map(format_args_for_dbe_uve_trace, uve_trace_list)
# end _dbe_resync


Expand Down Expand Up @@ -1703,52 +1694,34 @@ def dbe_alloc(self, obj_type, obj_dict, uuid_requested=None):
# end dbe_alloc

def dbe_uve_trace(self, oper, type, uuid, obj_dict):
oo = {}
oo['uuid'] = uuid
if oper.upper() == 'DELETE':
oo['name'] = obj_dict['fq_name']
else:
oo['name'] = self.uuid_to_fq_name(uuid)
oo['value'] = obj_dict
oo['type'] = type
if type not in self._UVEMAP:
return

oper = oper.upper()
req_id = get_trace_id()
if 'fq_name' not in obj_dict:
obj_dict['fq_name'] = self.uuid_to_fq_name(uuid)
obj_json = {k: json.dumps(obj_dict[k]) for k in obj_dict or {}}

db_trace = DBRequestTrace(request_id=req_id)
db_trace.operation = oper
db_trace.body = "name=" + str(oo['name']) + " type=" + type + " value=" + json.dumps(obj_dict)
trace_msg(db_trace, 'DBUVERequestTraceBuf', self._sandesh)

attr_contents = None
emap = {}
if oo['value']:
for ck,cv in oo['value'].iteritems():
emap[ck] = json.dumps(cv)

utype = oo['type']
urawkey = ':'.join(oo['name'])
ukey = None
utab = None
if utype in self._UVEMAP:
utab = self._UVEMAP[utype]
if utype in self._UVEGLOBAL:
ukey = urawkey.split(":",1)[1]
else:
ukey = urawkey
elif utype == 'bgp_router':
utab = "ObjectBgpRouter"
ukey = urawkey.rsplit(":",1)[1]
db_trace.body = "name=%s type=%s value=%s" % (obj_dict['fq_name'],
type,
json.dumps(obj_dict))
uve_table, global_uve = self._UVEMAP[type]
if global_uve:
uve_name = obj_dict['fq_name'][-1]
else:
return

if oper.upper() == 'DELETE':
cc = ContrailConfig(name=ukey, elements=emap, deleted=True)
else:
cc = ContrailConfig(name=ukey, elements=emap)

cfg_msg = ContrailConfigTrace(data=cc, table=utab,
sandesh=self._sandesh)
cfg_msg.send(sandesh=self._sandesh)
# end dbe_uve_trace
uve_name = ':'.join(obj_dict['fq_name'])
contrail_config = ContrailConfig(name=uve_name,
elements=obj_json,
deleted=oper=='DELETE')
contrail_config_msg = ContrailConfigTrace(data=contrail_config,
table=uve_table,
sandesh=self._sandesh)

contrail_config_msg.send(sandesh=self._sandesh)
trace_msg(db_trace, 'DBUVERequestTraceBuf', self._sandesh)

def dbe_trace(oper):
def wrapper1(func):
Expand Down

0 comments on commit cff2ad9

Please sign in to comment.