From 43564524c9ba4642e6ea5b25e56a4ffe7834e999 Mon Sep 17 00:00:00 2001 From: Deepinder Setia Date: Tue, 24 Nov 2015 17:03:19 -0800 Subject: [PATCH] Support live load-balancing for specific service type Auto load-balance is triggered if some servers are more than 5% off expected average allocation. To enable, set load-balance to True in discovery configuration file. For example: ... [Collector] load-balance=True ... Live load-balancing knob is not global. It must be set per service as needed. Change-Id: I6189f1875bedfbdd2b43a85d665fd6ad1e33a2c3 Closes-Bug: #1484293 --- src/discovery/disc_server.py | 21 +++- src/discovery/tests/test_load_balancer.py | 141 +++++++++++++++++++++- 2 files changed, 153 insertions(+), 9 deletions(-) diff --git a/src/discovery/disc_server.py b/src/discovery/disc_server.py index 546db108872..bee0d5e35b1 100644 --- a/src/discovery/disc_server.py +++ b/src/discovery/disc_server.py @@ -75,6 +75,7 @@ def __init__(self, args): 'throttle_subs':0, '503': 0, 'count_lb': 0, + 'auto_lb': 0, } self._ts_use = 1 self.short_ttl_map = {} @@ -613,17 +614,28 @@ def api_subscribe(self): plist = dict((entry['service_id'],entry) for entry in pubs_active) plist_all = dict((entry['service_id'],entry) for entry in pubs) policy = self.get_service_config(service_type, 'policy') + + # Auto load-balance is triggered if enabled and some servers are + # more than 5% off expected average allocation. + load_balance = self.get_service_config(service_type, 'load-balance') + if load_balance: + total_subs = sum([entry['in_use'] for entry in pubs_active]) + avg = total_subs/len(pubs_active) + for service_id, expired in subs: # expired True if service was marked for deletion by LB command # previously published service is gone # force renew for fixed policy since some service may have flapped - entry2 = plist_all.get(service_id, None) entry = plist.get(service_id, None) - if entry is None or expired or policy == 'fixed': + if entry is None or expired or policy == 'fixed' or (load_balance and entry['in_use'] > int(1.05*avg)): self._db_conn.delete_subscription(service_type, client_id, service_id) # delete fixed policy server if expired - if policy == 'fixed' and entry is None and entry2: - self._db_conn.delete_service(entry2) + if policy == 'fixed' and entry is None: + self._db_conn.delete_service(plist_all.get(service_id, None)) + # load-balance one at at time to avoid churn + if load_balance and entry and entry['in_use'] > int(1.05*avg): + self._debug['auto_lb'] += 1 + load_balance = False continue result = entry['info'] self._db_conn.insert_client( @@ -1098,6 +1110,7 @@ def parse_args(args_str): # per service options default_service_opts = { 'policy': None, + 'load-balance': False, } cassandra_opts = { diff --git a/src/discovery/tests/test_load_balancer.py b/src/discovery/tests/test_load_balancer.py index 6eb6601f10f..3d9f7fe5b45 100644 --- a/src/discovery/tests/test_load_balancer.py +++ b/src/discovery/tests/test_load_balancer.py @@ -9,13 +9,38 @@ import discoveryclient.client as client -def info_callback(info): +def info_callback(info, client_id): # print 'In subscribe callback handler' - # print '%s' % (info) + print 'client-id %s info %s' % (client_id, info) pass +""" +Validate publisher in-use count is reasonable (typically +after load-balance event. Discovery server will try to keep +in-use count with 5% of expected average. To provide some +buffer around server calculations, we allow 10% deviation +""" +def validate_assignment_count(response, context): + services = response['services'] + in_use_counts = {entry['service_id']:entry['in_use'] for entry in services} + print '%s %s' % (context, in_use_counts) + + # only use active pubs + pubs_active = [entry for entry in services if entry['status'] != 'down'] + + # validate + avg = sum([entry['in_use'] for entry in pubs_active])/len(pubs_active) + + # return failure status + return True in [e['in_use'] > int(1.1*avg) for e in pubs_active] class DiscoveryServerTestCase(test_discovery.TestCase, fixtures.TestWithFixtures): + def setUp(self): + extra_config_knobs = [ + ('Collector', 'load-balance', 'True'), + ] + super(DiscoveryServerTestCase, self).setUp(extra_config_knobs=extra_config_knobs) + def test_load_balance(self): # publish 3 instances of service foobar tasks = [] @@ -43,11 +68,11 @@ def test_load_balance(self): service_count = 2 tasks = [] for i in range(subcount): + client_id = "test-load-balance-%d" % i disc = client.DiscoveryClient( - self._disc_server_ip, self._disc_server_port, - "test-load-balance-%d" % i) + self._disc_server_ip, self._disc_server_port, client_id) obj = disc.subscribe( - service_type, service_count, info_callback) + service_type, service_count, info_callback, client_id) tasks.append(obj.task) time.sleep(1) print 'Started %d tasks to subscribe service %s, count %d' \ @@ -115,3 +140,109 @@ def test_load_balance(self): self.assertEqual(len(data), 1) print 'After LB entry %s' % entry self.assertEqual(entry['in_use'], 10) + + def test_active_load_balance(self): + # publish 3 instances of service. Active LB must be enabled! + tasks = [] + service_type = 'Collector' + for i in range(3): + client_type = 'test-discovery' + pub_id = 'test_discovery-%d' % i + pub_data = '%s-%d' % ('collector', i) + disc = client.DiscoveryClient( + self._disc_server_ip, self._disc_server_port, + client_type, pub_id) + task = disc.publish(service_type, pub_data) + tasks.append(task) + + time.sleep(1) + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response['services']), 3) + self.assertEqual(response['services'][0]['service_type'], service_type) + failure = validate_assignment_count(response, 'In-use count just after publishing') + self.assertEqual(failure, False) + + # multiple subscribers for 2 instances each + subcount = 20 + service_count = 2 + tasks = [] + for i in range(subcount): + client_id = "test-load-balance-%d" % i + disc = client.DiscoveryClient( + self._disc_server_ip, self._disc_server_port, client_id) + obj = disc.subscribe( + service_type, service_count, info_callback, client_id) + tasks.append(obj.task) + time.sleep(1) + + # validate all clients have subscribed + time.sleep(1) + (code, msg) = self._http_get('/clients.json') + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response['services']), subcount*service_count) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after initial subscribe') + self.assertEqual(failure, False) + + # start one more publisher + pub_id = 'test_discovery-3' + pub_data = 'collector-3' + pub_url = '/service/%s' % pub_id + disc = client.DiscoveryClient( + self._disc_server_ip, self._disc_server_port, + client_type, pub_id) + task = disc.publish(service_type, pub_data) + tasks.append(task) + + # wait for all TTL to expire before looking at publisher's counters + print 'Waiting for all client TTL to expire (1 min)' + time.sleep(1*60) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after bringing up one more publisher') + self.assertEqual(failure, False) + + # set operational state down - new service + payload = { + 'service-type' : '%s' % service_type, + 'oper-state' : 'down', + } + (code, msg) = self._http_put(pub_url, json.dumps(payload)) + self.assertEqual(code, 200) + + # wait for all TTL to expire before looking at publisher's counters + print 'Waiting for all client TTL to expire (1 min)' + time.sleep(1*60) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after publisher-3 down') + self.assertEqual(failure, False) + + # set operational state up - again + payload = { + 'service-type' : '%s' % service_type, + 'oper-state' : 'up', + } + (code, msg) = self._http_put(pub_url, json.dumps(payload)) + self.assertEqual(code, 200) + + # wait for all TTL to expire before looking at publisher's counters + print 'Waiting for all client TTL to expire (1 min)' + time.sleep(1*60) + + # total subscriptions must be subscount * service_count + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + failure = validate_assignment_count(response, 'In-use count just after publisher-3 up again') + self.assertEqual(failure, False)