Skip to content

Commit

Permalink
Merge "1) Fix bug in processing of keystone section in discovery conf…
Browse files Browse the repository at this point in the history
…ig file 2) Allow auto load balance even when client send service-in-use-list 3) Fix bug in handlng of boolean service options"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Mar 2, 2016
2 parents a424785 + 23115bc commit af61e5a
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 43 deletions.
75 changes: 32 additions & 43 deletions src/discovery/disc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def __init__(self, args):
'db_upd_hb': 0,
'throttle_subs':0,
'503': 0,
'count_lb': 0,
'auto_lb': 0,
'lb_count': 0,
'lb_auto': 0,
'db_exc_unknown': 0,
'db_exc_info': '',
'wl_rejects_pub': 0,
Expand Down Expand Up @@ -802,54 +802,38 @@ def api_subscribe(self):
if count == 0:
count = len(pubs_active)

# if subscriber in-use-list present, forget previous assignments
if len(inuse_list) and subs:
for service_id, expired in subs:
self._db_conn.delete_subscription(service_type, client_id, service_id)
subs = None
# Auto load-balance is triggered if enabled and some servers are
# more than 5% off expected average allocation.
load_balance = self.get_service_config(service_type, 'load-balance')
if load_balance:
total_subs = sum([entry['in_use'] for entry in pubs_active])
avg = total_subs/len(pubs_active)

for service_id in inuse_list:
entry = plist.get(service_id, None)
if entry is None:
continue
msg = ' in-service-list assign service=%s' % entry['service_id']
m = sandesh.dsSubscribe(msg=msg, sandesh=self._sandesh)
m.trace_msg(name='dsSubscribeTraceBuf', sandesh=self._sandesh)
self.syslog("%s %s" % (cid, msg))
expiry_dict = dict((service_id,expiry) for service_id, expiry in subs or [])
policy = self.get_service_config(service_type, 'policy')

if assign:
assign -= 1
self._db_conn.insert_client(service_type, entry['service_id'],
client_id, entry['info'], ttl)
r.append(entry)
count -= 1
assigned_sid.add(service_id)
# if subscriber in-use-list present, forget previous assignments
if len(inuse_list):
subs = [(service_id, expiry_dict.get(service_id, False)) for service_id in inuse_list]

if subs and count:
policy = self.get_service_config(service_type, 'policy')

# Auto load-balance is triggered if enabled and some servers are
# more than 5% off expected average allocation.
load_balance = self.get_service_config(service_type, 'load-balance')
if load_balance:
total_subs = sum([entry['in_use'] for entry in pubs_active])
avg = total_subs/len(pubs_active)

for service_id, expired in subs:
# expired True if service was marked for deletion by LB command
# previously published service is gone
# force renew for fixed policy since some service may have flapped
entry = plist.get(service_id, None)
if entry is None or expired or policy == 'fixed' or (load_balance and entry['in_use'] > int(1.05*avg)):
self._db_conn.delete_subscription(service_type, client_id, service_id)
# delete fixed policy server if expired
if policy == 'fixed' and entry is None:
self._db_conn.delete_service(plist_all.get(service_id, None))
# load-balance one at at time to avoid churn
if load_balance and entry and entry['in_use'] > int(1.05*avg):
self._debug['auto_lb'] += 1
self._debug['lb_auto'] += 1
load_balance = False
continue
msg = ' subs service=%s, assign=%d, count=%d' % (service_id, assign, count)
m = sandesh.dsSubscribe(msg=msg, sandesh=self._sandesh)
m.trace_msg(name='dsSubscribeTraceBuf', sandesh=self._sandesh)
self.syslog("%s %s" % (cid, msg))

if assign:
assign -= 1
self._db_conn.insert_client(
Expand Down Expand Up @@ -919,8 +903,8 @@ def api_lb_service(self, service_type):
if clients is None:
return

self.syslog('Initial load-balance server-list: %s, avg-per-pub %d, clients %d' \
% (lb_list, avg_per_pub, len(clients)))
self.syslog('%s: Initial load-balance server-list: %s, avg-per-pub %d, clients %d' \
% (service_type, lb_list, avg_per_pub, len(clients)))

"""
Walk through all subscribers and mark one publisher per subscriber down
Expand All @@ -934,10 +918,11 @@ def api_lb_service(self, service_type):
for client in clients:
(service_type, client_id, service_id, mtime, ttl) = client
if client_id not in clients_lb_done and service_id in lb_list and lb_list[service_id] > 0:
self.syslog('expire client=%s, service=%s, ttl=%d' % (client_id, service_id, ttl))
self._db_conn.mark_delete_subscription(service_type, client_id, service_id)
clients_lb_done.append(client_id)
lb_list[service_id] -= 1
self._debug['count_lb'] += 1
self._debug['lb_count'] += 1
return {}
# end api_lb_service

Expand Down Expand Up @@ -1334,6 +1319,7 @@ def parse_args(args_str):
'policy': None,
'load-balance': False,
}
service_bool_opts = ['load-balance']

cassandra_opts = {
'cassandra_user' : None,
Expand Down Expand Up @@ -1364,13 +1350,16 @@ def parse_args(args_str):
if section == "DEFAULTS":
defaults.update(dict(config.items("DEFAULTS")))
continue
if 'KEYSTONE' in config.sections():
if section == "KEYSTONE":
keystone_opts.update(dict(config.items("KEYSTONE")))
continue
service_config[
section.lower()] = default_service_opts.copy()
service_config[section.lower()].update(
dict(config.items(section)))
service = section.lower()
service_config[service] = default_service_opts.copy()
opt_dict = dict(config.items(section))
service_config[service].update(opt_dict)
for opt in service_bool_opts:
if opt in opt_dict:
service_config[service][opt] = config.getboolean(section, opt)

defaults.update(keystone_opts)
parser.set_defaults(**defaults)
Expand Down
88 changes: 88 additions & 0 deletions src/discovery/tests/test_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,91 @@ def test_load_balance_min_instances(self):
response = json.loads(msg)
failure = validate_assignment_count(response, 'In-use count after clients with min_instances 2')
self.assertEqual(failure, False)

def test_load_balance_siul(self):
# publish 2 instances
tasks = []
service_type = 'SvcLoadBalance'
pubcount = 2
for i in range(pubcount):
client_type = 'test-discovery'
pub_id = 'test_discovery-%d' % i
pub_data = {service_type : '%s-%d' % (service_type, i)}
disc = client.DiscoveryClient(
self._disc_server_ip, self._disc_server_port,
client_type, pub_id)
task = disc.publish(service_type, pub_data)
tasks.append(task)

time.sleep(1)
(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)

response = json.loads(msg)
self.assertEqual(len(response['services']), pubcount)
self.assertEqual(response['services'][0]['service_type'], service_type)

# multiple subscribers for 2 instances each
subcount = 20
service_count = 2
suburl = "/subscribe"
payload = {
'service' : '%s' % service_type,
'instances' : service_count,
'client-type' : 'Vrouter-Agent',
'service-in-use-list' : {'publisher-id': ["test_discovery-0", 'test_discovery-1'] }
}
for i in range(subcount):
payload['client'] = "ut-client-%d" % i
(code, msg) = self._http_post(suburl, json.dumps(payload))
self.assertEqual(code, 200)
response = json.loads(msg)
self.assertEqual(len(response[service_type]), service_count)

# validate both publishers are assigned fairly
time.sleep(1)
(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
failure = validate_assignment_count(response, 'In-use count after clients with service-in-use-list')
self.assertEqual(failure, False)

# start one more publisher
pub_id = 'test_discovery-2'
pub_data = {service_type : '%s-2' % service_type}
disc = client.DiscoveryClient(
self._disc_server_ip, self._disc_server_port,
client_type, pub_id)
task = disc.publish(service_type, pub_data)
tasks.append(task)
pubcount += 1

# verify new publisher is up
(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
self.assertEqual(len(response['services']), pubcount)
subs = sum([item['in_use'] for item in response['services']])
self.assertEqual(subs, subcount*service_count)

# verify newly added in-use count is 0
data = [item for item in response['services'] if item['service_id'] == '%s:%s' % (pub_id, service_type)]
entry = data[0]
self.assertEqual(len(data), 1)
self.assertEqual(entry['in_use'], 0)

# Issue load-balance command
(code, msg) = self._http_post('/load-balance/%s' % service_type, '')
self.assertEqual(code, 200)

for i in range(subcount):
payload['client'] = "ut-client-%d" % i
(code, msg) = self._http_post(suburl, json.dumps(payload))
self.assertEqual(code, 200)

(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
print response
failure = validate_assignment_count(response, 'In-use count after LB command')
self.assertEqual(failure, False)

0 comments on commit af61e5a

Please sign in to comment.