diff --git a/src/config/common/tests/test_utils.py b/src/config/common/tests/test_utils.py index 6c0858f708a..3b76bccd8d9 100644 --- a/src/config/common/tests/test_utils.py +++ b/src/config/common/tests/test_utils.py @@ -991,6 +991,7 @@ def __init__(self, app, conf, *args, **kwargs): auth_protocol = conf['auth_protocol'] auth_host = conf['auth_host'] auth_port = conf['auth_port'] + self.delay_auth_decision = conf['delay_auth_decision'] self.request_uri = '%s://%s:%s' % (auth_protocol, auth_host, auth_port) self.auth_uri = self.request_uri # print 'FakeAuthProtocol init: auth-uri %s, conf %s' % (self.auth_uri, self.conf) @@ -1077,6 +1078,9 @@ def __call__(self, env, start_response): if user_token: # print '****** user token %s ***** ' % user_token pass + elif self.delay_auth_decision: + self._add_headers(env, {'X-Identity-Status': 'Invalid'}) + return self.app(env, start_response) else: # print 'Missing token or Unable to authenticate token' return self._reject_request(env, start_response) diff --git a/src/config/utils/discovery_cli.py b/src/config/utils/discovery_cli.py index b605d352430..3318107b9e2 100644 --- a/src/config/utils/discovery_cli.py +++ b/src/config/utils/discovery_cli.py @@ -14,6 +14,7 @@ EP_DELIM=',' PUBSUB_DELIM=' ' +DEFAULT_HEADERS = {'Content-type': 'application/json; charset="UTF-8"'} def show_usage(): print 'A rule string must be specified for this operation' @@ -225,6 +226,43 @@ def get_ks_var(args, name): server_ip = server[0] server_port = server[1] +# Validate API server information +api_server = args.api_server.split(':') +if len(api_server) != 2: + print 'API server address must be of the form ip:port, '\ + 'for example 127.0.0.1:8082' + sys.exit(1) +api_server_ip = api_server[0] +api_server_port = api_server[1] + +# Validate keystone credentials +conf = {} +for name in ['username', 'password', 'tenant_name']: + val, rsp = get_ks_var(args, name) + if val is None: + print rsp + sys.exit(1) + conf[name] = val + +username = conf['username'] +password = conf['password'] +tenant_name = conf['tenant_name'] + +print 'API Server = ', args.api_server +print 'Discovery Server = ', args.server +print 'Username = ', username +print 'Tenant = ', tenant_name +print '' + +try: + vnc = VncApi(username, password, tenant_name, + api_server[0], api_server[1]) +except Exception as e: + print '*** %s' % str(e) + sys.exit(1) + +headers = DEFAULT_HEADERS.copy() +headers['X-AUTH-TOKEN'] = vnc.get_auth_token() if args.oper_state or args.admin_state or args.oper_state_reason: if not args.service_id or not args.service_type: @@ -240,9 +278,6 @@ def get_ks_var(args, name): data['oper-state-reason'] = args.oper_state_reason if args.admin_state: data['admin-state'] = args.admin_state - headers = { - 'Content-type': 'application/json', - } url = "http://%s:%s/service/%s" % (server_ip, server_port, args.service_id) r = requests.put(url, data=json.dumps(data), headers=headers) if r.status_code != 200: @@ -255,35 +290,11 @@ def get_ks_var(args, name): if args.service_id: print 'Specific service id %s ignored for this operation' % args.service_id url = "http://%s:%s/load-balance/%s" % (server_ip, server_port, args.service_type) - r = requests.post(url) + r = requests.post(url, headers=headers) if r.status_code != 200: print "Operation status %d" % r.status_code sys.exit(0) -# Validate API server information -api_server = args.api_server.split(':') -if len(api_server) != 2: - print 'Discovery server address must be of the form ip:port, '\ - 'for example 127.0.0.1:5998' - sys.exit(1) -api_server_ip = api_server[0] -api_server_port = api_server[1] - -# Validate keystone credentials -conf = {} -for name in ['username', 'password', 'tenant_name']: - val, rsp = get_ks_var(args, name) - if val is None: - print rsp - sys.exit(1) - conf[name] = val - -username = conf['username'] -password = conf['password'] -tenant_name = conf['tenant_name'] - -vnc = VncApi(username, password, tenant_name, - api_server[0], api_server[1]) uuid = args.uuid # transform uuid if needed @@ -295,9 +306,6 @@ def get_ks_var(args, name): print 'Oper = ', args.op print 'Name = %s' % fq_name print 'UUID = %s' % uuid -print 'API Server = ', args.server -print 'Discovery Server = ', args.server -print '' if args.op == 'add-rule': if not args.rule: diff --git a/src/discovery/SConscript b/src/discovery/SConscript index 4fcce9d97ce..7fce19d308a 100644 --- a/src/discovery/SConscript +++ b/src/discovery/SConscript @@ -46,15 +46,14 @@ for file in setup_sources: local_sources = [ '__init__.py', - 'disc_server_zk.py', 'disc_server.py', 'disc_utils.py', 'disc_consts.py', 'disc_exceptions.py', 'client.py', - 'disc_zk.py', 'disc_cassdb.py', 'output.py', + 'disc_auth_keystone.py', ] local_sources_rules = [] for file in local_sources: diff --git a/src/discovery/disc_auth_keystone.py b/src/discovery/disc_auth_keystone.py new file mode 100644 index 00000000000..8ab3cf88b52 --- /dev/null +++ b/src/discovery/disc_auth_keystone.py @@ -0,0 +1,40 @@ +# +# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. +# +# +# authentication/authorization functionality for discovery server +# + +try: + from keystoneclient.middleware import auth_token +except ImportError: + from keystonemiddleware import auth_token +except Exception: + pass + +class AuthServiceKeystone(object): + + def __init__(self, conf): + self._conf_info = conf + # end __init__ + + # gets called from keystone middleware after token check + def token_valid(self, env, start_response): + status = env.get('HTTP_X_IDENTITY_STATUS') + return True if status != 'Invalid' else False + + def validate_user_token(self, request): + # following config forces keystone middleware to always return the result + # back in HTTP_X_IDENTITY_STATUS env variable + conf_info = self._conf_info.copy() + conf_info['delay_auth_decision'] = True + + auth_middleware = auth_token.AuthProtocol(self.token_valid, conf_info) + return auth_middleware(request.headers.environ, None) + + def is_admin(self, request): + if not self.validate_user_token(request): + return False + roles = request.headers.environ.get('HTTP_X_ROLE', '').split(",") + return 'admin' in [x.lower() for x in roles] +# end class AuthServiceKeystone diff --git a/src/discovery/disc_server.py b/src/discovery/disc_server.py index f56c1f084a2..b1388d4d658 100644 --- a/src/discovery/disc_server.py +++ b/src/discovery/disc_server.py @@ -52,6 +52,7 @@ from gevent.coros import BoundedSemaphore from cfgm_common.rest import LinkObject +import disc_auth_keystone def obj_to_json(obj): # Non-null fields in object get converted to json fields @@ -82,10 +83,15 @@ def __init__(self, args): 'auto_lb': 0, 'db_exc_unknown': 0, 'db_exc_info': '', + 'wl_rejects_pub': 0, + 'wl_rejects_sub': 0, + 'auth_failures': 0, } self._ts_use = 1 self.short_ttl_map = {} self._sem = BoundedSemaphore(1) + self._pub_wl = None + self._sub_wl = None self._base_url = "http://%s:%s" % (self._args.listen_ip_addr, self._args.listen_port) @@ -225,6 +231,28 @@ def __init__(self, args): self._sub_data = {} for (client_id, service_type) in self._db_conn.subscriber_entries(): self.create_sub_data(client_id, service_type) + + # build white list + if self._args.white_list_publish: + self._pub_wl = IPSet() + for prefix in self._args.white_list_publish.split(" "): + self._pub_wl.add(prefix) + if self._args.white_list_subscribe: + self._sub_wl = IPSet() + for prefix in self._args.white_list_subscribe.split(" "): + self._sub_wl.add(prefix) + + self._auth_svc = None + if self._args.auth == 'keystone': + ks_conf = { + 'auth_host': self._args.auth_host, + 'auth_port': self._args.auth_port, + 'auth_protocol': self._args.auth_protocol, + 'admin_user': self._args.admin_user, + 'admin_password': self._args.admin_password, + 'admin_tenant_name': self._args.admin_tenant_name, + } + self._auth_svc = disc_auth_keystone.AuthServiceKeystone(ks_conf) # end __init__ def config_log(self, msg, level): @@ -374,6 +402,15 @@ def error_handler(self, *args, **kwargs): raise return error_handler + # decorator to authenticate request + def authenticate(func): + def wrapper(self, *args, **kwargs): + if self._auth_svc and not self._auth_svc.is_admin(bottle.request): + self._debug['auth_failures'] += 1 + bottle.abort(401, 'Unauthorized') + return func(self, *args, **kwargs) + return wrapper + # 404 forces republish def heartbeat(self, sig): # self.syslog('heartbeat from "%s"' % sig) @@ -425,6 +462,12 @@ def api_heartbeat(self): @db_error_handler def api_publish(self, end_point = None): self._debug['msg_pubs'] += 1 + + source = bottle.request.headers.get('X-Forwarded-For', None) + if source and self._pub_wl and source not in self._pub_wl: + self._debug['wl_rejects_pub'] += 1 + bottle.abort(401, 'Unauthorized request') + ctype = bottle.request.headers['content-type'] json_req = {} try: @@ -670,6 +713,12 @@ def adjust_in_use_list(self, pubs, in_use_list): @db_error_handler def api_subscribe(self): self._debug['msg_subs'] += 1 + + source = bottle.request.headers.get('X-Forwarded-For', None) + if source and self._sub_wl and source not in self._sub_wl: + self._debug['wl_rejects_sub'] += 1 + bottle.abort(401, 'Unauthorized request') + ctype = bottle.request.headers['content-type'] if 'application/json' in ctype: json_req = bottle.request.json @@ -846,6 +895,7 @@ def api_subscribe(self): # on-demand API to load-balance existing subscribers across all currently available # publishers. Needed if publisher gets added or taken down + @authenticate def api_lb_service(self, service_type): if service_type is None: bottle.abort(405, "Missing service") @@ -998,6 +1048,7 @@ def services_json(self, service_type=None): return {'services': rsp} # end services_json + @authenticate def service_http_put(self, id): self.syslog('Update service %s' % (id)) try: @@ -1270,6 +1321,8 @@ def parse_args(args_str): 'logger_class': None, 'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(), 'cluster_id': None, + 'white_list_publish': None, + 'white_list_subscribe': None, } # per service options @@ -1282,6 +1335,14 @@ def parse_args(args_str): 'cassandra_user' : None, 'cassandra_password' : None, } + keystone_opts = { + 'auth_host': '127.0.0.1', + 'auth_port': '35357', + 'auth_protocol': 'http', + 'admin_user': '', + 'admin_password': '', + 'admin_tenant_name': '', + } service_config = {} cassandra_config = {} @@ -1299,11 +1360,15 @@ def parse_args(args_str): if section == "DEFAULTS": defaults.update(dict(config.items("DEFAULTS"))) continue + if 'KEYSTONE' in config.sections(): + keystone_opts.update(dict(config.items("KEYSTONE"))) + continue service_config[ section.lower()] = default_service_opts.copy() service_config[section.lower()].update( dict(config.items(section))) + defaults.update(keystone_opts) parser.set_defaults(**defaults) parser.add_argument( @@ -1384,6 +1449,9 @@ def parse_args(args_str): help="Sandesh send rate limit in messages/sec") parser.add_argument("--cluster_id", help="Used for database keyspace separation") + parser.add_argument( + "--auth", choices=['keystone'], + help="Type of authentication for user-requests") args = parser.parse_args(remaining_argv) args.conf_file = args.conf_file diff --git a/src/discovery/test-requirements.txt b/src/discovery/test-requirements.txt index 38e72d363ca..39cea7bcbef 100644 --- a/src/discovery/test-requirements.txt +++ b/src/discovery/test-requirements.txt @@ -15,3 +15,4 @@ vnc_api discoveryclient sandesh sandesh-common +keystonemiddleware diff --git a/src/discovery/tests/test_auth.py b/src/discovery/tests/test_auth.py new file mode 100644 index 00000000000..d8953d6a1a8 --- /dev/null +++ b/src/discovery/tests/test_auth.py @@ -0,0 +1,101 @@ + +import sys +import time +sys.path.append("../config/common/tests") +from test_utils import * +import fixtures +import testtools +import test_utils +import test_common +import test_case + +import discoveryclient.client as client + +from vnc_api.vnc_api import * +import keystoneclient.v2_0.client as keystone +try: + from keystoneclient.middleware import auth_token +except ImportError: + from keystonemiddleware import auth_token +except Exception: + pass + +subscribe_info = '' +def info_callback(info): + global subscribe_info + subscribe_info = info + # print 'In subscribe callback handler' + # print '%s' % (info) + pass + +def token_from_user_info(user_name, tenant_name, domain_name, role_name, + tenant_id = None): + token_dict = { + 'X-User': user_name, + 'X-User-Name': user_name, + 'X-Project-Name': tenant_name, + 'X-Project-Id': tenant_id or '', + 'X-Domain-Name' : domain_name, + 'X-Role': role_name, + } + rval = json.dumps(token_dict) + # logger.info( '**** Generate token %s ****' % rval) + return rval + +# This is needed for VncApi._authenticate invocation from within Api server. +# We don't have access to user information so we hard code admin credentials. +def ks_admin_authenticate(self, response=None, headers=None): + rval = token_from_user_info('admin', 'admin', 'default-domain', 'admin') + new_headers = {} + new_headers['X-AUTH-TOKEN'] = rval + return new_headers + +class DiscoveryServerTestCase(test_case.DsTestCase): + def setUp(self): + extra_config_knobs = [ + ('DEFAULTS', 'auth', 'keystone'), + ] + extra_mocks = [(auth_token, 'AuthProtocol', + test_utils.FakeAuthProtocol)] + super(DiscoveryServerTestCase, self).setUp(extra_disc_server_mocks=extra_mocks, + extra_disc_server_config_knobs=extra_config_knobs) + + def test_service_admin_state(self): + service_type = 'foobar' + puburl = '/publish/test_discovery' + updurl = '/service/test_discovery' + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + + (code, msg) = self._http_post(puburl, json.dumps(payload)) + self.assertEqual(code, 200) + + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response['services']), 1) + + # service is up by default + entry = response['services'][0] + self.assertEqual(entry['admin_state'], 'up') + + payload = { + 'service-type' : service_type, + 'admin-state' : 'down', + } + (code, msg) = self._http_put(updurl, json.dumps(payload)) + self.assertEqual(code, 401) + + # try as non-admin - should fail + headers = {} + headers['X-AUTH-TOKEN'] = token_from_user_info('alice', 'admin', 'default-domain', 'alice-role') + (code, msg) = self._http_put(updurl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 401) + + # try as admin - should pass + headers = {} + headers['X-AUTH-TOKEN'] = token_from_user_info('admin', 'admin', 'default-domain', 'admin') + (code, msg) = self._http_put(updurl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 200) diff --git a/src/discovery/tests/test_case.py b/src/discovery/tests/test_case.py index 874d4fc17bf..6a740657075 100644 --- a/src/discovery/tests/test_case.py +++ b/src/discovery/tests/test_case.py @@ -1,9 +1,11 @@ +import gevent import sys import uuid +import socket import inspect import requests sys.path.append("../config/common/tests") -from test_utils import * +import test_utils import test_common from testtools import content, content_type @@ -29,34 +31,14 @@ def __init__(self, *args, **kwargs): ] super(DsTestCase, self).__init__(*args, **kwargs) - self.mocks = [ - (cfgm_common.vnc_cpu_info.CpuInfo, '__init__',stub), - (novaclient.client, 'Client',FakeNovaClient.initialize), + @classmethod + def setUpClass(cls): + # unstub discovery client + cls.mocks = [mock for mock in cls.mocks if mock[0].__name__ != 'DiscoveryClient'] + super(DsTestCase, cls).setUpClass() - (ifmap_client.client, '__init__', FakeIfmapClient.initialize), - (ifmap_client.client, 'call_async_result', FakeIfmapClient.call_async_result), - (ifmap_client.client, 'call', FakeIfmapClient.call), - - (pycassa.system_manager.Connection, '__init__',stub), - (pycassa.system_manager.SystemManager, '__new__',FakeSystemManager), - (pycassa.ConnectionPool, '__init__',stub), - (pycassa.ColumnFamily, '__new__',FakeCF), - (pycassa.util, 'convert_uuid_to_time',Fake_uuid_to_time), - - (kazoo.client.KazooClient, '__new__',FakeKazooClient), - (kazoo.handlers.gevent.SequentialGeventHandler, '__init__',stub), - - (kombu.Connection, '__new__',FakeKombu.Connection), - (kombu.Exchange, '__new__',FakeKombu.Exchange), - (kombu.Queue, '__new__',FakeKombu.Queue), - (kombu.Consumer, '__new__',FakeKombu.Consumer), - (kombu.Producer, '__new__',FakeKombu.Producer), - - (VncApiConfigLog, '__new__',FakeApiConfigLog), - #(VncApiStatsLog, '__new__',FakeVncApiStatsLog) - ] - - def setUp(self, extra_disc_server_config_knobs = None): + def setUp(self, extra_disc_server_config_knobs = None, + extra_disc_server_mocks = None): extra_api_server_config_knobs = [ ('DEFAULTS', 'disc_server_dsa_api', '/api/dsa'), ('DEFAULTS', 'log_level', 'SYS_DEBUG'), @@ -64,18 +46,20 @@ def setUp(self, extra_disc_server_config_knobs = None): super(DsTestCase, self).setUp(extra_config_knobs = extra_api_server_config_knobs) self._disc_server_ip = socket.gethostbyname(socket.gethostname()) - self._disc_server_port = get_free_port() - http_server_port = get_free_port() + self._disc_server_port = test_utils.get_free_port() + http_server_port = test_utils.get_free_port() if extra_disc_server_config_knobs: self._disc_server_config_knobs.extend(extra_disc_server_config_knobs) + if extra_disc_server_mocks: + self.mocks.extend(extra_disc_server_mocks) test_common.setup_mocks(self.mocks) self._disc_server_greenlet = gevent.spawn(test_common.launch_disc_server, self.id(), self._disc_server_ip, self._disc_server_port, http_server_port, self._disc_server_config_knobs) - block_till_port_listened(self._disc_server_ip, self._disc_server_port) + test_utils.block_till_port_listened(self._disc_server_ip, self._disc_server_port) self._disc_server_session = requests.Session() adapter = requests.adapters.HTTPAdapter() self._disc_server_session.mount("http://", adapter) @@ -83,6 +67,7 @@ def setUp(self, extra_disc_server_config_knobs = None): def tearDown(self): + test_utils.CassandraCFs.reset() test_common.kill_disc_server(self._disc_server_greenlet) super(DsTestCase, self).tearDown() @@ -115,11 +100,12 @@ def _http_get(self, uri, query_params=None): return (response.status_code, response.text) #end _http_get - def _http_post(self, uri, body): + def _http_post(self, uri, body, http_headers={}): + http_headers.update(self._http_headers) url = "http://%s:%s%s" % (self._disc_server_ip, self._disc_server_port, uri) - self._add_request_detail('POST', url, headers=self._http_headers, body=body) + self._add_request_detail('POST', url, headers=http_headers, body=body) response = self._disc_server_session.post(url, data=body, - headers=self._http_headers) + headers=http_headers) self._add_detail('Received Response: ' + pformat(response.status_code) + pformat(response.text)) @@ -137,11 +123,12 @@ def _http_delete(self, uri, body): return (response.status_code, response.text) #end _http_delete - def _http_put(self, uri, body): + def _http_put(self, uri, body, http_headers={}): + http_headers.update(self._http_headers) url = "http://%s:%s%s" % (self._disc_server_ip, self._disc_server_port, uri) - self._add_request_detail('PUT', url, headers=self._http_headers, body=body) + self._add_request_detail('PUT', url, headers=http_headers, body=body) response = self._disc_server_session.put(url, data=body, - headers=self._http_headers) + headers=http_headers) self._add_detail('Received Response: ' + pformat(response.status_code) + pformat(response.text)) diff --git a/src/discovery/tests/test_white_list.py b/src/discovery/tests/test_white_list.py new file mode 100644 index 00000000000..e7ac6057842 --- /dev/null +++ b/src/discovery/tests/test_white_list.py @@ -0,0 +1,85 @@ +import sys +import time +sys.path.append("../config/common/tests") +from test_utils import * +import fixtures +import testtools +import test_common +import test_case + +import discoveryclient.client as client + +def info_callback(info, client_id): + # print 'In subscribe callback handler' + print 'client-id %s info %s' % (client_id, info) + pass + +class DiscoveryServerTestCase(test_case.DsTestCase): + def setUp(self): + extra_config_knobs = [ + ('DEFAULTS', 'white_list_publish', '127.0.0.1'), + ('DEFAULTS', 'white_list_subscribe', '127.0.0.1'), + ] + super(DiscoveryServerTestCase, self).setUp(extra_disc_server_config_knobs=extra_config_knobs) + + # simple publish - for sanity + def test_publish_basic(self): + service_type = 'foobar' + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + puburl = '/publish/test_discovery' + (code, msg) = self._http_post(puburl, json.dumps(payload)) + self.assertEqual(code, 200) + + time.sleep(1) + (code, msg) = self._http_get('/services.json') + self.assertEqual(code, 200) + + response = json.loads(msg) + self.assertEqual(len(response['services']), 1) + self.assertEqual(response['services'][0]['service_type'], service_type) + + # Attempt to publish from non white-listed address should fail with 401 error + def test_publish_whitelist(self): + service_type = 'foobar' + headers = {'X-Forwarded-For': "1.1.1.1"} + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + puburl = '/publish/test_discovery' + (code, msg) = self._http_post(puburl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 401) + + def test_subscribe_basic(self): + service_type = 'foobar' + payload = { + '%s' % service_type: { "ip-addr" : "1.1.1.1", "port" : "1234" }, + 'service-type' : '%s' % service_type, + } + puburl = '/publish/test_discovery' + (code, msg) = self._http_post(puburl, json.dumps(payload)) + self.assertEqual(code, 200) + + # json subscribe request + suburl = "/subscribe" + payload = { + 'service' : '%s' % service_type, + 'instances' : 1, + 'client-type' : 'test-discovery', + 'client' : 'No-X-Forwarded-For-Header', + } + + # should subscribe successfully + (code, msg) = self._http_post(suburl, json.dumps(payload)) + self.assertEqual(code, 200) + response = json.loads(msg) + self.assertEqual(len(response[service_type]), 1) + + # Attempt to subscribe from non white-listed address should fail with 401 error + headers = {'X-Forwarded-For': "1.1.1.1"} + payload['client'] = 'X-Forwarded-For-Header' + (code, msg) = self._http_post(suburl, json.dumps(payload), http_headers=headers) + self.assertEqual(code, 401)