Skip to content

Commit

Permalink
Merge "[VNC API server] Improving list resources"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 3, 2016
2 parents 822364f + 6835c37 commit 9f25523
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/config/api-server/vnc_cfg_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def pre_dbe_update(cls, id, fq_name, obj_dict, db_conn,

ok, result = cls.dbe_read(db_conn, 'virtual-network',
vn_uuid,
obj_fields=['network_ipam-refs'])
obj_fields=['network_ipam_refs'])
if not ok:
return ok, result

Expand Down
160 changes: 104 additions & 56 deletions src/config/common/vnc_cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,20 @@
import re
from operator import itemgetter
import itertools
from collections import OrderedDict
import sys
from collections import Mapping


def merge_dict(orig_dict, new_dict):
for key, value in new_dict.iteritems():
if isinstance(value, Mapping):
orig_dict[key] = merge_dict(orig_dict.get(key, {}), value)
elif isinstance(value, list):
orig_dict[key] = orig_dict[key].append(value)
else:
orig_dict[key] = new_dict[key]
return orig_dict


class VncCassandraClient(object):
# Name to ID mapping keyspace + tables
Expand Down Expand Up @@ -102,56 +115,62 @@ def add(self, cf_name, key, value):
return False
#end

def get(self, cf_name, key, columns=[], start='', finish=''):
result = OrderedDict()

for column in columns:
col_res = self.multiget(cf_name,
[key],
start=column,
finish=column)
if key in col_res:
result.update(col_res[key])

if start or finish:
col_res = self.multiget(cf_name,
[key],
start=start,
finish=finish)
if key in col_res:
result.update(col_res[key])

if not columns and not start and not finish:
col_res = self.multiget(cf_name, [key])
if key in col_res:
result.update(col_res[key])

return result

def multiget(self, cf_name, keys, start='', finish='', timestamp=False):
results = OrderedDict()
def get(self, cf_name, key, columns=None, start='', finish=''):
result = self.multiget(cf_name,
[key],
columns=columns,
start=start,
finish=finish)
return result.get(key)

def multiget(self, cf_name, keys, columns=None, start='', finish='',
timestamp=False):
_thrift_limit_size = 10000
results = {}
cf = self.get_cf(cf_name)

for key in keys:
row = results.get(key, OrderedDict())
for col, val in cf.xget(key,
if not columns or start or finish:
for key in keys:
rows = dict(cf.xget(key,
column_start=start,
column_finish=finish,
include_timestamp=timestamp):
include_timestamp=timestamp))
if rows:
results[key] = rows

if columns:
if len(keys) * len(columns) < _thrift_limit_size:
rows = cf.multiget(keys,
columns=columns,
include_timestamp=timestamp)
merge_dict(results, rows)
else:
for key in keys:
for column_chunk in [columns[x:x+(_thrift_limit_size - 1)] for x in
xrange(0, len(columns), _thrift_limit_size - 1)]:
try:
cols = cf.get(key,
columns=column_chunk,
include_timestamp=timestamp)
except pycassa.NotFoundException:
continue
results.setdefault(key, {})
results[key].update(cols)

for key in results:
for col, val in results[key].items():
try:
if timestamp:
row[col] = (json.loads(val[0]), val[1])
results[key][col] = (json.loads(val[0]), val[1])
else:
row[col] = json.loads(val)
results[key][col] = json.loads(val)
except ValueError as e:
msg = ("Cannot json load the value of cf: %s, key:%s "
"(error: %s). Use it as is: %s" %
(cf_name, key, str(e),
val if not timestamp else val[0]))
self._logger(msg, level=SandeshLevel.SYS_WARN)
row[col] = val
if row:
results[key] = row
results[key][col] = val

return results

Expand All @@ -171,7 +190,7 @@ def get_range(self, cf_name):
#end

def get_one_col(self, cf_name, key, column):
col = self.multiget(cf_name, [key], start=column, finish=column)
col = self.multiget(cf_name, [key], columns=[column])
if key not in col:
raise NoIdError(key)
elif len(col[key]) > 1:
Expand Down Expand Up @@ -315,10 +334,11 @@ def _update_sandesh_status(self, status, msg=''):

def _handle_exceptions(self, func):
def wrapper(*args, **kwargs):
if func.__name__ in ['get', 'multiget']:
if (sys._getframe(1).f_code.co_name != 'multiget' and
func.__name__ in ['get', 'multiget']):
msg = ("It is not recommended to use 'get' or 'multiget' "
"pycassa methods with precaution. It's better to use "
"'xget' or 'get_range' methods")
"pycassa methods. It's better to use 'xget' or "
"'get_range' methods due to thrift limitations")
self._logger(msg, level=SandeshLevel.SYS_WARN)
try:
if self._conn_state != ConnectionStatus.UP:
Expand Down Expand Up @@ -448,8 +468,9 @@ def _cassandra_init_conn_pools(self):

for (cf, _) in cf_list:
self._cf_dict[cf] = ColumnFamily(
pool, cf, read_consistency_level = rd_consistency,
write_consistency_level = wr_consistency)
pool, cf, read_consistency_level=rd_consistency,
write_consistency_level=wr_consistency,
dict_class=dict)

ConnectionState.update(conn_type = ConnType.DATABASE,
name = 'Cassandra', status = ConnectionStatus.UP, message = '',
Expand Down Expand Up @@ -569,21 +590,34 @@ def object_read(self, res_type, obj_uuids, field_names=None):
# if field_names=None, all fields will be read/returned
obj_type = res_type.replace('-', '_')
obj_class = self._get_resource_class(obj_type)
obj_uuid_cf = self._obj_uuid_cf

# optimize for common case of reading non-backref, non-children fields
# ignoring columns starting from 'b' and 'c' - significant performance
# impact in scaled setting. e.g. read of project
columns = set([])
column_start = ''
if (field_names and
not (set(field_names) & (obj_class.backref_fields |
obj_class.children_fields))):
column_finish = ''
if (field_names is None or
(set(field_names) & (obj_class.backref_fields |
obj_class.children_fields))):
# atleast one backref/children field is needed
column_start = ''
elif not set(field_names) & (obj_class.ref_fields):
# specific props have been asked fetch exactly those
column_start = 'parent:'
column_finish = 'parent;'
columns = set(['type', 'fq_name', 'parent_type'])
for fname in field_names:
if fname in obj_class.prop_fields:
columns.add('prop:' + fname)
else:
# ignore reading backref + children columns
column_start = 'd'

obj_rows = self.multiget(self._OBJ_UUID_CF_NAME,
obj_uuids,
columns=list(columns),
start=column_start,
finish=column_finish,
timestamp=True)

if not obj_rows:
Expand Down Expand Up @@ -634,11 +668,12 @@ def object_read(self, res_type, obj_uuids, field_names=None):
if prop_name not in result:
result[prop_name] = {wrapper_field: []}
result[prop_name][wrapper_field].append(
obj_cols[col_name][0])
(obj_cols[col_name][0], prop_elem_position))
else:
if prop_name not in result:
result[prop_name] = []
result[prop_name].append(obj_cols[col_name][0])
result[prop_name].append((obj_cols[col_name][0],
prop_elem_position))

if self._re_match_children.match(col_name):
(_, child_type, child_uuid) = col_name.split(':')
Expand Down Expand Up @@ -682,6 +717,21 @@ def object_read(self, res_type, obj_uuids, field_names=None):
[child.pop('tstamp') for child in result[child_field]]
# for all children

# Ordering property lists and maps by position attribute
for prop_name in (obj_class.prop_list_fields |
obj_class.prop_map_fields):
if prop_name not in result:
continue
if isinstance(result[prop_name], list):
result[prop_name] = [el[0] for el in
sorted(result[prop_name],
key=itemgetter(1))]
elif isinstance(result[prop_name], dict):
wrapper, unsorted_list = result[prop_name].popitem()
result[prop_name][wrapper] = [el[0] for el in
sorted(unsorted_list,
key=itemgetter(1))]

results.append(result)
# end for all rows

Expand Down Expand Up @@ -838,7 +888,7 @@ def object_update(self, res_type, obj_uuid, new_obj_dict,
# end object_update

def object_list(self, res_type, parent_uuids=None, back_ref_uuids=None,
obj_uuids=None, count=False, filters=None):
obj_uuids=None, count=False, filters=None):
obj_type = res_type.replace('-', '_')
obj_class = self._get_resource_class(obj_type)

Expand All @@ -849,10 +899,10 @@ def filter_rows(coll_infos, filters=None):
return coll_infos

filtered_infos = {}
columns = ['prop:%s' % filter_key for filter_key in filters]
rows = self.multiget(self._OBJ_UUID_CF_NAME,
coll_infos.keys(),
start='prop:',
finish='prop;')
columns=columns)
for obj_uuid, properties in rows.items():
# give chance for zk heartbeat/ping
gevent.sleep(0)
Expand Down Expand Up @@ -920,7 +970,6 @@ def filter_rows_parent_anchor(sort=False):

if back_ref_uuids:
# go from anchor to backrefs
obj_uuid_cf = self._obj_uuid_cf
col_start = 'backref:%s:' %(obj_type)
col_fin = 'backref:%s;' %(obj_type)

Expand Down Expand Up @@ -952,7 +1001,6 @@ def filter_rows_backref_anchor():
children_fq_names_uuids.extend(filter_rows_backref_anchor())

if not parent_uuids and not back_ref_uuids:
obj_uuid_cf = self._obj_uuid_cf
if obj_uuids:
# exact objects specified
def filter_rows_object_list():
Expand Down

0 comments on commit 9f25523

Please sign in to comment.