From 1f508c3fbaea014a7423d4c8c79207242b4be65d Mon Sep 17 00:00:00 2001 From: Deepinder Setia Date: Wed, 17 Feb 2016 21:30:17 -0800 Subject: [PATCH] Enhance security of discovery server 1) Discovery utility to require admin keystone credentials to perform any actions such as load-balance, set admin state etc. 2) Discovery server to authenticate load-balance and API to update publisher state (admin, operational) by requiring token and validating it against keystone 2) To prevent unauthorized publish or subscribe requests to effect discovery server state (and assuming such requests are coming through load-balancer such ha-proxy), discovery server to apply configured publish and subscribe white-lists to incoming IP addresses as obtained from X-Forwarded-For header. Load-Balancer must be enabled to forward client's real IP address in X-Forwarded-For header to discovery servers. The whitelist configuration in contrail-discovery.conf looks like this: white_list_publish=127.0.0.1 10.84.20.0/24 white_list_subscribe=127.0.0.1 10.84.20.0/24 RHS is list of IP prefixes seperated by white space. If X-Forwarded-For header is missing in incoming publish or subscribe request, white list configuration is ignored. Change-Id: If2bbe1d90ec93f0cf9f29ba8c7e768a6888de41b Partial-Bug: #1546801 --- src/config/common/tests/test_utils.py | 4 + src/config/utils/discovery_cli.py | 70 +++++++++-------- src/discovery/SConscript | 3 +- src/discovery/disc_auth_keystone.py | 40 ++++++++++ src/discovery/disc_server.py | 68 +++++++++++++++++ src/discovery/test-requirements.txt | 1 + src/discovery/tests/test_auth.py | 101 +++++++++++++++++++++++++ src/discovery/tests/test_case.py | 61 ++++++--------- src/discovery/tests/test_white_list.py | 85 +++++++++++++++++++++ 9 files changed, 363 insertions(+), 70 deletions(-) create mode 100644 src/discovery/disc_auth_keystone.py create mode 100644 src/discovery/tests/test_auth.py create mode 100644 src/discovery/tests/test_white_list.py diff --git a/src/config/common/tests/test_utils.py b/src/config/common/tests/test_utils.py index 6c0858f708a..3b76bccd8d9 100644 --- a/src/config/common/tests/test_utils.py +++ b/src/config/common/tests/test_utils.py @@ -991,6 +991,7 @@ def __init__(self, app, conf, *args, **kwargs): auth_protocol = conf['auth_protocol'] auth_host = conf['auth_host'] auth_port = conf['auth_port'] + self.delay_auth_decision = conf['delay_auth_decision'] self.request_uri = '%s://%s:%s' % (auth_protocol, auth_host, auth_port) self.auth_uri = self.request_uri # print 'FakeAuthProtocol init: auth-uri %s, conf %s' % (self.auth_uri, self.conf) @@ -1077,6 +1078,9 @@ def __call__(self, env, start_response): if user_token: # print '****** user token %s ***** ' % user_token pass + elif self.delay_auth_decision: + self._add_headers(env, {'X-Identity-Status': 'Invalid'}) + return self.app(env, start_response) else: # print 'Missing token or Unable to authenticate token' return self._reject_request(env, start_response) diff --git a/src/config/utils/discovery_cli.py b/src/config/utils/discovery_cli.py index b605d352430..3318107b9e2 100644 --- a/src/config/utils/discovery_cli.py +++ b/src/config/utils/discovery_cli.py @@ -14,6 +14,7 @@ EP_DELIM=',' PUBSUB_DELIM=' ' +DEFAULT_HEADERS = {'Content-type': 'application/json; charset="UTF-8"'} def show_usage(): print 'A rule string must be specified for this operation' @@ -225,6 +226,43 @@ def get_ks_var(args, name): server_ip = server[0] server_port = server[1] +# Validate API server information +api_server = args.api_server.split(':') +if len(api_server) != 2: + print 'API server address must be of the form ip:port, '\ + 'for example 127.0.0.1:8082' + sys.exit(1) +api_server_ip = api_server[0] +api_server_port = api_server[1] + +# Validate keystone credentials +conf = {} +for name in ['username', 'password', 'tenant_name']: + val, rsp = get_ks_var(args, name) + if val is None: + print rsp + sys.exit(1) + conf[name] = val + +username = conf['username'] +password = conf['password'] +tenant_name = conf['tenant_name'] + +print 'API Server = ', args.api_server +print 'Discovery Server = ', args.server +print 'Username = ', username +print 'Tenant = ', tenant_name +print '' + +try: + vnc = VncApi(username, password, tenant_name, + api_server[0], api_server[1]) +except Exception as e: + print '*** %s' % str(e) + sys.exit(1) + +headers = DEFAULT_HEADERS.copy() +headers['X-AUTH-TOKEN'] = vnc.get_auth_token() if args.oper_state or args.admin_state or args.oper_state_reason: if not args.service_id or not args.service_type: @@ -240,9 +278,6 @@ def get_ks_var(args, name): data['oper-state-reason'] = args.oper_state_reason if args.admin_state: data['admin-state'] = args.admin_state - headers = { - 'Content-type': 'application/json', - } url = "http://%s:%s/service/%s" % (server_ip, server_port, args.service_id) r = requests.put(url, data=json.dumps(data), headers=headers) if r.status_code != 200: @@ -255,35 +290,11 @@ def get_ks_var(args, name): if args.service_id: print 'Specific service id %s ignored for this operation' % args.service_id url = "http://%s:%s/load-balance/%s" % (server_ip, server_port, args.service_type) - r = requests.post(url) + r = requests.post(url, headers=headers) if r.status_code != 200: print "Operation status %d" % r.status_code sys.exit(0) -# Validate API server information -api_server = args.api_server.split(':') -if len(api_server) != 2: - print 'Discovery server address must be of the form ip:port, '\ - 'for example 127.0.0.1:5998' - sys.exit(1) -api_server_ip = api_server[0] -api_server_port = api_server[1] - -# Validate keystone credentials -conf = {} -for name in ['username', 'password', 'tenant_name']: - val, rsp = get_ks_var(args, name) - if val is None: - print rsp - sys.exit(1) - conf[name] = val - -username = conf['username'] -password = conf['password'] -tenant_name = conf['tenant_name'] - -vnc = VncApi(username, password, tenant_name, - api_server[0], api_server[1]) uuid = args.uuid # transform uuid if needed @@ -295,9 +306,6 @@ def get_ks_var(args, name): print 'Oper = ', args.op print 'Name = %s' % fq_name print 'UUID = %s' % uuid -print 'API Server = ', args.server -print 'Discovery Server = ', args.server -print '' if args.op == 'add-rule': if not args.rule: diff --git a/src/discovery/SConscript b/src/discovery/SConscript index 4fcce9d97ce..7fce19d308a 100644 --- a/src/discovery/SConscript +++ b/src/discovery/SConscript @@ -46,15 +46,14 @@ for file in setup_sources: local_sources = [ '__init__.py', - 'disc_server_zk.py', 'disc_server.py', 'disc_utils.py', 'disc_consts.py', 'disc_exceptions.py', 'client.py', - 'disc_zk.py', 'disc_cassdb.py', 'output.py', + 'disc_auth_keystone.py', ] local_sources_rules = [] for file in local_sources: diff --git a/src/discovery/disc_auth_keystone.py b/src/discovery/disc_auth_keystone.py new file mode 100644 index 00000000000..8ab3cf88b52 --- /dev/null +++ b/src/discovery/disc_auth_keystone.py @@ -0,0 +1,40 @@ +# +# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. +# +# +# authentication/authorization functionality for discovery server +# + +try: + from keystoneclient.middleware import auth_token +except ImportError: + from keystonemiddleware import auth_token +except Exception: + pass + +class AuthServiceKeystone(object): + + def __init__(self, conf): + self._conf_info = conf + # end __init__ + + # gets called from keystone middleware after token check + def token_valid(self, env, start_response): + status = env.get('HTTP_X_IDENTITY_STATUS') + return True if status != 'Invalid' else False + + def validate_user_token(self, request): + # following config forces keystone middleware to always return the result + # back in HTTP_X_IDENTITY_STATUS env variable + conf_info = self._conf_info.copy() + conf_info['delay_auth_decision'] = True + + auth_middleware = auth_token.AuthProtocol(self.token_valid, conf_info) + return auth_middleware(request.headers.environ, None) + + def is_admin(self, request): + if not self.validate_user_token(request): + return False + roles = request.headers.environ.get('HTTP_X_ROLE', '').split(",") + return 'admin' in [x.lower() for x in roles] +# end class AuthServiceKeystone diff --git a/src/discovery/disc_server.py b/src/discovery/disc_server.py index f56c1f084a2..b1388d4d658 100644 --- a/src/discovery/disc_server.py +++ b/src/discovery/disc_server.py @@ -52,6 +52,7 @@ from gevent.coros import BoundedSemaphore from cfgm_common.rest import LinkObject +import disc_auth_keystone def obj_to_json(obj): # Non-null fields in object get converted to json fields @@ -82,10 +83,15 @@ def __init__(self, args): 'auto_lb': 0, 'db_exc_unknown': 0, 'db_exc_info': '', + 'wl_rejects_pub': 0, + 'wl_rejects_sub': 0, + 'auth_failures': 0, } self._ts_use = 1 self.short_ttl_map = {} self._sem = BoundedSemaphore(1) + self._pub_wl = None + self._sub_wl = None self._base_url = "http://%s:%s" % (self._args.listen_ip_addr, self._args.listen_port) @@ -225,6 +231,28 @@ def __init__(self, args): self._sub_data = {} for (client_id, service_type) in self._db_conn.subscriber_entries(): self.create_sub_data(client_id, service_type) + + # build white list + if self._args.white_list_publish: + self._pub_wl = IPSet() + for prefix in self._args.white_list_publish.split(" "): + self._pub_wl.add(prefix) + if self._args.white_list_subscribe: + self._sub_wl = IPSet() + for prefix in self._args.white_list_subscribe.split(" "): + self._sub_wl.add(prefix) + + self._auth_svc = None + if self._args.auth == 'keystone': + ks_conf = { + 'auth_host': self._args.auth_host, + 'auth_port': self._args.auth_port, + 'auth_protocol': self._args.auth_protocol, + 'admin_user': self._args.admin_user, + 'admin_password': self._args.admin_password, + 'admin_tenant_name': self._args.admin_tenant_name, + } + self._auth_svc = disc_auth_keystone.AuthServiceKeystone(ks_conf) # end __init__ def config_log(self, msg, level): @@ -374,6 +402,15 @@ def error_handler(self, *args, **kwargs): raise return error_handler + # decorator to authenticate request + def authenticate(func): + def wrapper(self, *args, **kwargs): + if self._auth_svc and not self._auth_svc.is_admin(bottle.request): + self._debug['auth_failures'] += 1 + bottle.abort(401, 'Unauthorized') + return func(self, *args, **kwargs) + return wrapper + # 404 forces republish def heartbeat(self, sig): # self.syslog('heartbeat from "%s"' % sig) @@ -425,6 +462,12 @@ def api_heartbeat(self): @db_error_handler def api_publish(self, end_point = None): self._debug['msg_pubs'] += 1 + + source = bottle.request.headers.get('X-Forwarded-For', None) + if source and self._pub_wl and source not in self._pub_wl: + self._debug['wl_rejects_pub'] += 1 + bottle.abort(401, 'Unauthorized request') + ctype = bottle.request.headers['content-type'] json_req = {} try: @@ -670,6 +713,12 @@ def adjust_in_use_list(self, pubs, in_use_list): @db_error_handler def api_subscribe(self): self._debug['msg_subs'] += 1 + + source = bottle.request.headers.get('X-Forwarded-For', None) + if source and self._sub_wl and source not in self._sub_wl: + self._debug['wl_rejects_sub'] += 1 + bottle.abort(401, 'Unauthorized request') + ctype = bottle.request.headers['content-type'] if 'application/json' in ctype: json_req = bottle.request.json @@ -846,6 +895,7 @@ def api_subscribe(self): # on-demand API to load-balance existing subscribers across all currently available # publishers. Needed if publisher gets added or taken down + @authenticate def api_lb_service(self, service_type): if service_type is None: bottle.abort(405, "Missing service") @@ -998,6 +1048,7 @@ def services_json(self, service_type=None): return {'services': rsp} # end services_json + @authenticate def service_http_put(self, id): self.syslog('Update service %s' % (id)) try: @@ -1270,6 +1321,8 @@ def parse_args(args_str): 'logger_class': None, 'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(), 'cluster_id': None, + 'white_list_publish': None, + 'white_list_subscribe': None, } # per service options @@ -1282,6 +1335,14 @@ def parse_args(args_str): 'cassandra_user' : None, 'cassandra_password' : None, } + keystone_opts = { + 'auth_host': '127.0.0.1', + 'auth_port': '35357', + 'auth_protocol': 'http', + 'admin_user': '', + 'admin_password': '', + 'admin_tenant_name': '', + } service_config = {} cassandra_config = {} @@ -1299,11 +1360,15 @@ def parse_args(args_str): if section == "DEFAULTS": defaults.update(dict(config.items("DEFAULTS"))) continue + if 'KEYSTONE' in config.sections(): + keystone_opts.update(dict(config.items("KEYSTONE"))) + continue service_config[ section.lower()] = default_service_opts.copy() service_config[section.lower()].update( dict(config.items(section))) + defaults.update(keystone_opts) parser.set_defaults(**defaults) parser.add_argument( @@ -1384,6 +1449,9 @@ def parse_args(args_str): help="Sandesh send rate limit in messages/sec") parser.add_argument("--cluster_id", help="Used for database keyspace separation") + parser.add_argument( + "--auth", choices=['keystone'], + help="Type of authentication for user-requests") args = parser.parse_args(remaining_argv) args.conf_file = args.conf_file diff --git a/src/discovery/test-requirements.txt b/src/discovery/test-requirements.txt index 38e72d363ca..39cea7bcbef 100644 --- a/src/discovery/test-requirements.txt +++ b/src/discovery/test-requirements.txt @@ -15,3 +15,4 @@ vnc_api discoveryclient sandesh sandesh-common +keystonemiddleware diff --git a/src/discovery/tests/test_auth.py b/src/discovery/tests/test_auth.py new file mode 100644 index 00000000000..d8953d6a1a8 --- /dev/null +++ b/src/discovery/tests/test_auth.py @@ -0,0 +1,101 @@ + +import sys +import time +sys.path.append("../config/common/tests") +from test_utils import * +import fixtures +import testtools +import test_utils +import test_common +import test_case + +import discoveryclient.client as client + +from vnc_api.vnc_api import * +import keystoneclient.v2_0.client as keystone +try: + from keystoneclient.middleware import auth_token +except ImportError: + from keystonemiddleware import auth_token +except Exception: + pass + +subscribe_info = '' +def info_callback(info): + global subscribe_info + subscribe_info = info + # print 'In subscribe callback handler' + # print '%s' % (info) + pass + +def token_from_user_info(user_name, tenant_name, domain_name, role_name, + tenant_id = None): + token_dict = { + 'X-User': user_name, + 'X-User-Name': user_name, + 'X-Project-Name': tenant_name, + 'X-Project-Id': tenant_id or '', + 'X-Domain-Name' : domain_name, + 'X-Role': role_name, + } + rval = json.dumps(token_dict) + # logger.info( '**** Generate token %s ****' % rval) + return rval + +# This is needed for VncApi._authenticate invocation from within Api server. +# We don't have access to user information so we hard code admin credentials. +def ks_admin_authenticate(self, response=None, headers=None): + rval = token_from_user_info('admin', 'admin', 'default-domain', 'admin') + new_headers = {} + new_headers['X-AUTH-TOKEN'] = rval + return new_headers + +class DiscoveryServerTestCase(test_case.DsTestCase): + def setUp(self): + extra_config_knobs = [ + ('DEFAULTS', 'auth', 'keystone'), + ] + extra_mocks = [(auth_token, 'AuthProtocol', + test_utils.FakeAuthProtocol)] + super(DiscoveryServerTestCase, self).setUp(extra_disc_server_mocks=extra_mocks, + extra_disc_server_config_knobs=extra_config_knobs) + + def test_service_admin_state(self): + service_type = 'foobar' + puburl = '/publish/test_discovery' + updurl = '/service/test_discovery' + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + + (code, msg) = self._http_post(puburl, json.dumps(payload)) + self.assertEqual(code, 200) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response['services']), 1) + + # service is up by default + entry = response['services'][0] + self.assertEqual(entry['admin_state'], 'up') + + payload = { + 'service-type' : service_type, + 'admin-state' : 'down', + } + (code, msg) = self._http_put(updurl, json.dumps(payload)) + self.assertEqual(code, 401) + + # try as non-admin - should fail + headers = {} + headers['X-AUTH-TOKEN'] = token_from_user_info('alice', 'admin', 'default-domain', 'alice-role') + (code, msg) = self._http_put(updurl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 401) + + # try as admin - should pass + headers = {} + headers['X-AUTH-TOKEN'] = token_from_user_info('admin', 'admin', 'default-domain', 'admin') + (code, msg) = self._http_put(updurl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 200) diff --git a/src/discovery/tests/test_case.py b/src/discovery/tests/test_case.py index 874d4fc17bf..6a740657075 100644 --- a/src/discovery/tests/test_case.py +++ b/src/discovery/tests/test_case.py @@ -1,9 +1,11 @@ +import gevent import sys import uuid +import socket import inspect import requests sys.path.append("../config/common/tests") -from test_utils import * +import test_utils import test_common from testtools import content, content_type @@ -29,34 +31,14 @@ def __init__(self, *args, **kwargs): ] super(DsTestCase, self).__init__(*args, **kwargs) - self.mocks = [ - (cfgm_common.vnc_cpu_info.CpuInfo, '__init__',stub), - (novaclient.client, 'Client',FakeNovaClient.initialize), + @classmethod + def setUpClass(cls): + # unstub discovery client + cls.mocks = [mock for mock in cls.mocks if mock[0].__name__ != 'DiscoveryClient'] + super(DsTestCase, cls).setUpClass() - (ifmap_client.client, '__init__', FakeIfmapClient.initialize), - (ifmap_client.client, 'call_async_result', FakeIfmapClient.call_async_result), - (ifmap_client.client, 'call', FakeIfmapClient.call), - - (pycassa.system_manager.Connection, '__init__',stub), - (pycassa.system_manager.SystemManager, '__new__',FakeSystemManager), - (pycassa.ConnectionPool, '__init__',stub), - (pycassa.ColumnFamily, '__new__',FakeCF), - (pycassa.util, 'convert_uuid_to_time',Fake_uuid_to_time), - - (kazoo.client.KazooClient, '__new__',FakeKazooClient), - (kazoo.handlers.gevent.SequentialGeventHandler, '__init__',stub), - - (kombu.Connection, '__new__',FakeKombu.Connection), - (kombu.Exchange, '__new__',FakeKombu.Exchange), - (kombu.Queue, '__new__',FakeKombu.Queue), - (kombu.Consumer, '__new__',FakeKombu.Consumer), - (kombu.Producer, '__new__',FakeKombu.Producer), - - (VncApiConfigLog, '__new__',FakeApiConfigLog), - #(VncApiStatsLog, '__new__',FakeVncApiStatsLog) - ] - - def setUp(self, extra_disc_server_config_knobs = None): + def setUp(self, extra_disc_server_config_knobs = None, + extra_disc_server_mocks = None): extra_api_server_config_knobs = [ ('DEFAULTS', 'disc_server_dsa_api', '/api/dsa'), ('DEFAULTS', 'log_level', 'SYS_DEBUG'), @@ -64,18 +46,20 @@ def setUp(self, extra_disc_server_config_knobs = None): super(DsTestCase, self).setUp(extra_config_knobs = extra_api_server_config_knobs) self._disc_server_ip = socket.gethostbyname(socket.gethostname()) - self._disc_server_port = get_free_port() - http_server_port = get_free_port() + self._disc_server_port = test_utils.get_free_port() + http_server_port = test_utils.get_free_port() if extra_disc_server_config_knobs: self._disc_server_config_knobs.extend(extra_disc_server_config_knobs) + if extra_disc_server_mocks: + self.mocks.extend(extra_disc_server_mocks) test_common.setup_mocks(self.mocks) self._disc_server_greenlet = gevent.spawn(test_common.launch_disc_server, self.id(), self._disc_server_ip, self._disc_server_port, http_server_port, self._disc_server_config_knobs) - block_till_port_listened(self._disc_server_ip, self._disc_server_port) + test_utils.block_till_port_listened(self._disc_server_ip, self._disc_server_port) self._disc_server_session = requests.Session() adapter = requests.adapters.HTTPAdapter() self._disc_server_session.mount("http://", adapter) @@ -83,6 +67,7 @@ def setUp(self, extra_disc_server_config_knobs = None): def tearDown(self): + test_utils.CassandraCFs.reset() test_common.kill_disc_server(self._disc_server_greenlet) super(DsTestCase, self).tearDown() @@ -115,11 +100,12 @@ def _http_get(self, uri, query_params=None): return (response.status_code, response.text) #end _http_get - def _http_post(self, uri, body): + def _http_post(self, uri, body, http_headers={}): + http_headers.update(self._http_headers) url = "http://%s:%s%s" % (self._disc_server_ip, self._disc_server_port, uri) - self._add_request_detail('POST', url, headers=self._http_headers, body=body) + self._add_request_detail('POST', url, headers=http_headers, body=body) response = self._disc_server_session.post(url, data=body, - headers=self._http_headers) + headers=http_headers) self._add_detail('Received Response: ' + pformat(response.status_code) + pformat(response.text)) @@ -137,11 +123,12 @@ def _http_delete(self, uri, body): return (response.status_code, response.text) #end _http_delete - def _http_put(self, uri, body): + def _http_put(self, uri, body, http_headers={}): + http_headers.update(self._http_headers) url = "http://%s:%s%s" % (self._disc_server_ip, self._disc_server_port, uri) - self._add_request_detail('PUT', url, headers=self._http_headers, body=body) + self._add_request_detail('PUT', url, headers=http_headers, body=body) response = self._disc_server_session.put(url, data=body, - headers=self._http_headers) + headers=http_headers) self._add_detail('Received Response: ' + pformat(response.status_code) + pformat(response.text)) diff --git a/src/discovery/tests/test_white_list.py b/src/discovery/tests/test_white_list.py new file mode 100644 index 00000000000..e7ac6057842 --- /dev/null +++ b/src/discovery/tests/test_white_list.py @@ -0,0 +1,85 @@ +import sys +import time +sys.path.append("../config/common/tests") +from test_utils import * +import fixtures +import testtools +import test_common +import test_case + +import discoveryclient.client as client + +def info_callback(info, client_id): + # print 'In subscribe callback handler' + print 'client-id %s info %s' % (client_id, info) + pass + +class DiscoveryServerTestCase(test_case.DsTestCase): + def setUp(self): + extra_config_knobs = [ + ('DEFAULTS', 'white_list_publish', '127.0.0.1'), + ('DEFAULTS', 'white_list_subscribe', '127.0.0.1'), + ] + super(DiscoveryServerTestCase, self).setUp(extra_disc_server_config_knobs=extra_config_knobs) + + # simple publish - for sanity + def test_publish_basic(self): + service_type = 'foobar' + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + puburl = '/publish/test_discovery' + (code, msg) = self._http_post(puburl, json.dumps(payload)) + self.assertEqual(code, 200) + + time.sleep(1) + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + + response = json.loads(msg) + self.assertEqual(len(response['services']), 1) + self.assertEqual(response['services'][0]['service_type'], service_type) + + # Attempt to publish from non white-listed address should fail with 401 error + def test_publish_whitelist(self): + service_type = 'foobar' + headers = {'X-Forwarded-For': "1.1.1.1"} + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + puburl = '/publish/test_discovery' + (code, msg) = self._http_post(puburl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 401) + + def test_subscribe_basic(self): + service_type = 'foobar' + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + puburl = '/publish/test_discovery' + (code, msg) = self._http_post(puburl, json.dumps(payload)) + self.assertEqual(code, 200) + + # json subscribe request + suburl = "/subscribe" + payload = { + 'service' : '%s' % service_type, + 'instances' : 1, + 'client-type' : 'test-discovery', + 'client' : 'No-X-Forwarded-For-Header', + } + + # should subscribe successfully + (code, msg) = self._http_post(suburl, json.dumps(payload)) + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response[service_type]), 1) + + # Attempt to subscribe from non white-listed address should fail with 401 error + headers = {'X-Forwarded-For': "1.1.1.1"} + payload['client'] = 'X-Forwarded-For-Header' + (code, msg) = self._http_post(suburl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 401)