diff --git a/src/discovery/disc_server.py b/src/discovery/disc_server.py index de9bfeb9cc6..d1fb17b34de 100644 --- a/src/discovery/disc_server.py +++ b/src/discovery/disc_server.py @@ -75,6 +75,7 @@ def __init__(self, args): 'throttle_subs':0, '503': 0, 'count_lb': 0, + 'auto_lb': 0, } self._ts_use = 1 self.short_ttl_map = {} @@ -616,17 +617,28 @@ def api_subscribe(self): plist = dict((entry['service_id'],entry) for entry in pubs_active) plist_all = dict((entry['service_id'],entry) for entry in pubs) policy = self.get_service_config(service_type, 'policy') + + # Auto load-balance is triggered if enabled and some servers are + # more than 5% off expected average allocation. + load_balance = self.get_service_config(service_type, 'load-balance') + if load_balance: + total_subs = sum([entry['in_use'] for entry in pubs_active]) + avg = total_subs/len(pubs_active) + for service_id, expired in subs: # expired True if service was marked for deletion by LB command # previously published service is gone # force renew for fixed policy since some service may have flapped - entry2 = plist_all.get(service_id, None) entry = plist.get(service_id, None) - if entry is None or expired or policy == 'fixed': + if entry is None or expired or policy == 'fixed' or (load_balance and entry['in_use'] > int(1.05*avg)): self._db_conn.delete_subscription(service_type, client_id, service_id) # delete fixed policy server if expired - if policy == 'fixed' and entry is None and entry2: - self._db_conn.delete_service(entry2) + if policy == 'fixed' and entry is None: + self._db_conn.delete_service(plist_all.get(service_id, None)) + # load-balance one at at time to avoid churn + if load_balance and entry and entry['in_use'] > int(1.05*avg): + self._debug['auto_lb'] += 1 + load_balance = False continue result = entry['info'].copy() result['@publisher-id'] = entry['service_id'] @@ -1103,6 +1115,7 @@ def parse_args(args_str): # per service options default_service_opts = { 'policy': None, + 'load-balance': False, } cassandra_opts = {