Skip to content

Commit

Permalink
Merge "Send publisher ID in subscribe response to allow a subscriber …
Browse files Browse the repository at this point in the history
…to reflect it back in future subscribe request. This is to indicate which publisher a client is using."
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Dec 4, 2015
2 parents 58d5d6d + 15f1e26 commit 42c7657
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 8 deletions.
11 changes: 8 additions & 3 deletions src/discovery/disc_server.py
Expand Up @@ -603,7 +603,10 @@ def api_subscribe(self):

# handle query for all publishers
if count == 0:
r = [entry['info'] for entry in pubs_active]
for entry in pubs_active:
r_dict = entry['info'].copy()
r_dict['@publisher-id'] = entry['service_id']
r.append(r_dict)
response = {'ttl': ttl, service_type: r}
if 'application/xml' in ctype:
response = xmltodict.unparse({'response': response})
Expand All @@ -625,7 +628,8 @@ def api_subscribe(self):
if policy == 'fixed' and entry is None and entry2:
self._db_conn.delete_service(entry2)
continue
result = entry['info']
result = entry['info'].copy()
result['@publisher-id'] = entry['service_id']
self._db_conn.insert_client(
service_type, service_id, client_id, result, ttl)
r.append(result)
Expand All @@ -646,7 +650,8 @@ def api_subscribe(self):

# take first 'count' publishers
for entry in pubs[:min(count, len(pubs))]:
result = entry['info']
result = entry['info'].copy()
result['@publisher-id'] = entry['service_id']
r.append(result)

self.syslog(' assign service=%s, info=%s' %
Expand Down
141 changes: 136 additions & 5 deletions src/discovery/tests/test_load_balancer.py
Expand Up @@ -9,13 +9,38 @@

import discoveryclient.client as client

def info_callback(info):
def info_callback(info, client_id):
# print 'In subscribe callback handler'
# print '%s' % (info)
print 'client-id %s info %s' % (client_id, info)
pass

"""
Validate publisher in-use count is reasonable (typically
after load-balance event. Discovery server will try to keep
in-use count with 5% of expected average. To provide some
buffer around server calculations, we allow 10% deviation
"""
def validate_assignment_count(response, context):
services = response['services']
in_use_counts = {entry['service_id']:entry['in_use'] for entry in services}
print '%s %s' % (context, in_use_counts)

# only use active pubs
pubs_active = [entry for entry in services if entry['status'] != 'down']

# validate
avg = sum([entry['in_use'] for entry in pubs_active])/len(pubs_active)

# return failure status
return True in [e['in_use'] > int(1.1*avg) for e in pubs_active]

class DiscoveryServerTestCase(test_discovery.TestCase, fixtures.TestWithFixtures):
def setUp(self):
extra_config_knobs = [
('Collector', 'load-balance', 'True'),
]
super(DiscoveryServerTestCase, self).setUp(extra_config_knobs=extra_config_knobs)

def test_load_balance(self):
# publish 3 instances of service foobar
tasks = []
Expand Down Expand Up @@ -43,11 +68,11 @@ def test_load_balance(self):
service_count = 2
tasks = []
for i in range(subcount):
client_id = "test-load-balance-%d" % i
disc = client.DiscoveryClient(
self._disc_server_ip, self._disc_server_port,
"test-load-balance-%d" % i)
self._disc_server_ip, self._disc_server_port, client_id)
obj = disc.subscribe(
service_type, service_count, info_callback)
service_type, service_count, info_callback, client_id)
tasks.append(obj.task)
time.sleep(1)
print 'Started %d tasks to subscribe service %s, count %d' \
Expand Down Expand Up @@ -115,3 +140,109 @@ def test_load_balance(self):
self.assertEqual(len(data), 1)
print 'After LB entry %s' % entry
self.assertEqual(entry['in_use'], 10)

def test_active_load_balance(self):
# publish 3 instances of service. Active LB must be enabled!
tasks = []
service_type = 'Collector'
for i in range(3):
client_type = 'test-discovery'
pub_id = 'test_discovery-%d' % i
pub_data = '%s-%d' % ('collector', i)
disc = client.DiscoveryClient(
self._disc_server_ip, self._disc_server_port,
client_type, pub_id)
task = disc.publish(service_type, pub_data)
tasks.append(task)

time.sleep(1)
(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
self.assertEqual(len(response['services']), 3)
self.assertEqual(response['services'][0]['service_type'], service_type)
failure = validate_assignment_count(response, 'In-use count just after publishing')
self.assertEqual(failure, False)

# multiple subscribers for 2 instances each
subcount = 20
service_count = 2
tasks = []
for i in range(subcount):
client_id = "test-load-balance-%d" % i
disc = client.DiscoveryClient(
self._disc_server_ip, self._disc_server_port, client_id)
obj = disc.subscribe(
service_type, service_count, info_callback, client_id)
tasks.append(obj.task)
time.sleep(1)

# validate all clients have subscribed
time.sleep(1)
(code, msg) = self._http_get('/clients.json')
self.assertEqual(code, 200)
response = json.loads(msg)
self.assertEqual(len(response['services']), subcount*service_count)

(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
failure = validate_assignment_count(response, 'In-use count just after initial subscribe')
self.assertEqual(failure, False)

# start one more publisher
pub_id = 'test_discovery-3'
pub_data = 'collector-3'
pub_url = '/service/%s' % pub_id
disc = client.DiscoveryClient(
self._disc_server_ip, self._disc_server_port,
client_type, pub_id)
task = disc.publish(service_type, pub_data)
tasks.append(task)

# wait for all TTL to expire before looking at publisher's counters
print 'Waiting for all client TTL to expire (1 min)'
time.sleep(1*60)

(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
failure = validate_assignment_count(response, 'In-use count just after bringing up one more publisher')
self.assertEqual(failure, False)

# set operational state down - new service
payload = {
'service-type' : '%s' % service_type,
'oper-state' : 'down',
}
(code, msg) = self._http_put(pub_url, json.dumps(payload))
self.assertEqual(code, 200)

# wait for all TTL to expire before looking at publisher's counters
print 'Waiting for all client TTL to expire (1 min)'
time.sleep(1*60)

(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
failure = validate_assignment_count(response, 'In-use count just after publisher-3 down')
self.assertEqual(failure, False)

# set operational state up - again
payload = {
'service-type' : '%s' % service_type,
'oper-state' : 'up',
}
(code, msg) = self._http_put(pub_url, json.dumps(payload))
self.assertEqual(code, 200)

# wait for all TTL to expire before looking at publisher's counters
print 'Waiting for all client TTL to expire (1 min)'
time.sleep(1*60)

# total subscriptions must be subscount * service_count
(code, msg) = self._http_get('/services.json')
self.assertEqual(code, 200)
response = json.loads(msg)
failure = validate_assignment_count(response, 'In-use count just after publisher-3 up again')
self.assertEqual(failure, False)

0 comments on commit 42c7657

Please sign in to comment.