diff --git a/src/discovery/disc_server.py b/src/discovery/disc_server.py index 546db108872..de9bfeb9cc6 100644 --- a/src/discovery/disc_server.py +++ b/src/discovery/disc_server.py @@ -603,7 +603,10 @@ def api_subscribe(self): # handle query for all publishers if count == 0: - r = [entry['info'] for entry in pubs_active] + for entry in pubs_active: + r_dict = entry['info'].copy() + r_dict['@publisher-id'] = entry['service_id'] + r.append(r_dict) response = {'ttl': ttl, service_type: r} if 'application/xml' in ctype: response = xmltodict.unparse({'response': response}) @@ -625,7 +628,8 @@ def api_subscribe(self): if policy == 'fixed' and entry is None and entry2: self._db_conn.delete_service(entry2) continue - result = entry['info'] + result = entry['info'].copy() + result['@publisher-id'] = entry['service_id'] self._db_conn.insert_client( service_type, service_id, client_id, result, ttl) r.append(result) @@ -646,7 +650,8 @@ def api_subscribe(self): # take first 'count' publishers for entry in pubs[:min(count, len(pubs))]: - result = entry['info'] + result = entry['info'].copy() + result['@publisher-id'] = entry['service_id'] r.append(result) self.syslog(' assign service=%s, info=%s' % diff --git a/src/discovery/tests/test_load_balancer.py b/src/discovery/tests/test_load_balancer.py index 6eb6601f10f..3d9f7fe5b45 100644 --- a/src/discovery/tests/test_load_balancer.py +++ b/src/discovery/tests/test_load_balancer.py @@ -9,13 +9,38 @@ import discoveryclient.client as client -def info_callback(info): +def info_callback(info, client_id): # print 'In subscribe callback handler' - # print '%s' % (info) + print 'client-id %s info %s' % (client_id, info) pass +""" +Validate publisher in-use count is reasonable (typically +after load-balance event. Discovery server will try to keep +in-use count with 5% of expected average. To provide some +buffer around server calculations, we allow 10% deviation +""" +def validate_assignment_count(response, context): + services = response['services'] + in_use_counts = {entry['service_id']:entry['in_use'] for entry in services} + print '%s %s' % (context, in_use_counts) + + # only use active pubs + pubs_active = [entry for entry in services if entry['status'] != 'down'] + + # validate + avg = sum([entry['in_use'] for entry in pubs_active])/len(pubs_active) + + # return failure status + return True in [e['in_use'] > int(1.1*avg) for e in pubs_active] class DiscoveryServerTestCase(test_discovery.TestCase, fixtures.TestWithFixtures): + def setUp(self): + extra_config_knobs = [ + ('Collector', 'load-balance', 'True'), + ] + super(DiscoveryServerTestCase, self).setUp(extra_config_knobs=extra_config_knobs) + def test_load_balance(self): # publish 3 instances of service foobar tasks = [] @@ -43,11 +68,11 @@ def test_load_balance(self): service_count = 2 tasks = [] for i in range(subcount): + client_id = "test-load-balance-%d" % i disc = client.DiscoveryClient( - self._disc_server_ip, self._disc_server_port, - "test-load-balance-%d" % i) + self._disc_server_ip, self._disc_server_port, client_id) obj = disc.subscribe( - service_type, service_count, info_callback) + service_type, service_count, info_callback, client_id) tasks.append(obj.task) time.sleep(1) print 'Started %d tasks to subscribe service %s, count %d' \ @@ -115,3 +140,109 @@ def test_load_balance(self): self.assertEqual(len(data), 1) print 'After LB entry %s' % entry self.assertEqual(entry['in_use'], 10) + + def test_active_load_balance(self): + # publish 3 instances of service. Active LB must be enabled! + tasks = [] + service_type = 'Collector' + for i in range(3): + client_type = 'test-discovery' + pub_id = 'test_discovery-%d' % i + pub_data = '%s-%d' % ('collector', i) + disc = client.DiscoveryClient( + self._disc_server_ip, self._disc_server_port, + client_type, pub_id) + task = disc.publish(service_type, pub_data) + tasks.append(task) + + time.sleep(1) + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response['services']), 3) + self.assertEqual(response['services'][0]['service_type'], service_type) + failure = validate_assignment_count(response, 'In-use count just after publishing') + self.assertEqual(failure, False) + + # multiple subscribers for 2 instances each + subcount = 20 + service_count = 2 + tasks = [] + for i in range(subcount): + client_id = "test-load-balance-%d" % i + disc = client.DiscoveryClient( + self._disc_server_ip, self._disc_server_port, client_id) + obj = disc.subscribe( + service_type, service_count, info_callback, client_id) + tasks.append(obj.task) + time.sleep(1) + + # validate all clients have subscribed + time.sleep(1) + (code, msg) = self._http_get('/clients.json') + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response['services']), subcount*service_count) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after initial subscribe') + self.assertEqual(failure, False) + + # start one more publisher + pub_id = 'test_discovery-3' + pub_data = 'collector-3' + pub_url = '/service/%s' % pub_id + disc = client.DiscoveryClient( + self._disc_server_ip, self._disc_server_port, + client_type, pub_id) + task = disc.publish(service_type, pub_data) + tasks.append(task) + + # wait for all TTL to expire before looking at publisher's counters + print 'Waiting for all client TTL to expire (1 min)' + time.sleep(1*60) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after bringing up one more publisher') + self.assertEqual(failure, False) + + # set operational state down - new service + payload = { + 'service-type' : '%s' % service_type, + 'oper-state' : 'down', + } + (code, msg) = self._http_put(pub_url, json.dumps(payload)) + self.assertEqual(code, 200) + + # wait for all TTL to expire before looking at publisher's counters + print 'Waiting for all client TTL to expire (1 min)' + time.sleep(1*60) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after publisher-3 down') + self.assertEqual(failure, False) + + # set operational state up - again + payload = { + 'service-type' : '%s' % service_type, + 'oper-state' : 'up', + } + (code, msg) = self._http_put(pub_url, json.dumps(payload)) + self.assertEqual(code, 200) + + # wait for all TTL to expire before looking at publisher's counters + print 'Waiting for all client TTL to expire (1 min)' + time.sleep(1*60) + + # total subscriptions must be subscount * service_count + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after publisher-3 up again') + self.assertEqual(failure, False)