diff --git a/src/config/api-server/tests/test_crud_basic.py b/src/config/api-server/tests/test_crud_basic.py index 20d5eae6a49..8a0d4938a5d 100644 --- a/src/config/api-server/tests/test_crud_basic.py +++ b/src/config/api-server/tests/test_crud_basic.py @@ -1054,7 +1054,11 @@ def test_list_for_coverage(self): vn2_obj = VirtualNetwork( name, display_name=name, id_perms=id_perms, is_shared=True, router_external=False) - self._vnc_lib.virtual_network_create(vn2_obj) + def fake_admin_request(orig_method, *args, **kwargs): + return True + with test_common.patch(self._api_server, + 'is_admin_request', fake_admin_request): + self._vnc_lib.virtual_network_create(vn2_obj) listen_ip = self._api_server_ip listen_port = self._api_server._args.listen_port @@ -1063,17 +1067,13 @@ def test_list_for_coverage(self): url = 'http://%s:%s/virtual-networks?%s' %( listen_ip, listen_port, q_params) - def fake_non_admin_request(orig_method, *args, **kwargs): - return False - with test_common.patch(self._api_server, - 'is_admin_request', fake_non_admin_request): - resp = requests.get(url) - self.assertEqual(resp.status_code, 200) - read_vn_dicts = json.loads(resp.text)['virtual-networks'] - self.assertEqual(len(read_vn_dicts), 1) - self.assertEqual(read_vn_dicts[0]['uuid'], vn1_obj.uuid) - self.assertEqual(read_vn_dicts[0]['is_shared'], True) - self.assertEqual(read_vn_dicts[0]['router_external'], False) + resp = requests.get(url) + self.assertEqual(resp.status_code, 200) + read_vn_dicts = json.loads(resp.text)['virtual-networks'] + self.assertEqual(len(read_vn_dicts), 1) + self.assertEqual(read_vn_dicts[0]['uuid'], vn1_obj.uuid) + self.assertEqual(read_vn_dicts[0]['is_shared'], True) + self.assertEqual(read_vn_dicts[0]['router_external'], False) # end test_list_for_coverage def test_list_with_malformed_filters(self): diff --git a/src/config/api-server/tests/test_perms.py b/src/config/api-server/tests/test_perms.py index c96b9862f38..1663bf2377d 100644 --- a/src/config/api-server/tests/test_perms.py +++ b/src/config/api-server/tests/test_perms.py @@ -24,6 +24,9 @@ import requests import stevedore +import keystoneclient.v2_0.client as keystone +from keystonemiddleware import auth_token + from vnc_api.vnc_api import * import cfgm_common from cfgm_common import vnc_cgitb @@ -34,12 +37,95 @@ import test_common import test_case +from test_perms2 import User, set_perms, vnc_read_obj, vnc_aal_create, vnc_aal_add_rule + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) +# This is needed for VncApi._authenticate invocation from within Api server. +# We don't have access to user information so we hard code admin credentials. +def ks_admin_authenticate(self, response=None, headers=None): + rval = token_from_user_info('admin', 'admin', 'default-domain', 'cloud-admin') + new_headers = {} + new_headers['X-AUTH-TOKEN'] = rval + return new_headers + +class TestUserVisible(test_case.ApiServerTestCase): + domain_name = 'default-domain' + fqdn = [domain_name] + + @classmethod + def setUpClass(cls): + extra_mocks = [(keystone.Client, + '__new__', test_utils.FakeKeystoneClient), + (vnc_api.vnc_api.VncApi, + '_authenticate', ks_admin_authenticate), + (auth_token, 'AuthProtocol', + test_utils.FakeAuthProtocol)] + extra_config_knobs = [ + ('DEFAULTS', 'aaa_mode', 'rbac'), + ('DEFAULTS', 'cloud_admin_role', 'cloud-admin'), + ('DEFAULTS', 'global_read_only_role', 'read-only-role'), + ('DEFAULTS', 'auth', 'keystone'), + ] + super(TestUserVisible, cls).setUpClass(extra_mocks=extra_mocks, + extra_config_knobs=extra_config_knobs) + + def setUp(self): + super(TestUserVisible, self).setUp() + ip = self._api_server_ip + port = self._api_server_port + kc = keystone.Client(username='admin', password='contrail123', + tenant_name='admin', + auth_url='http://127.0.0.1:5000/v2.0') + + # prepare token before vnc api invokes keystone + self.test = User(ip, port, kc, 'test', 'test123', 'test-role', 'admin-%s' % self.id()) + self.admin = User(ip, port, kc, 'admin', 'contrail123', 'cloud-admin', 'admin-%s' % self.id()) + + def test_user_visible_perms(self): + user = self.test + project_obj = Project(user.project) + project_obj.uuid = user.project_uuid + self.admin.vnc_lib.project_create(project_obj) + + # read projects back + user.project_obj = vnc_read_obj(self.admin.vnc_lib, + 'project', obj_uuid = user.project_uuid) + user.domain_id = user.project_obj.parent_uuid + user.vnc_lib.set_domain_id(user.project_obj.parent_uuid) + + logger.info( 'Change owner of project %s to %s' % (user.project, user.project_uuid)) + set_perms(user.project_obj, owner=user.project_uuid, share = []) + self.admin.vnc_lib.project_update(user.project_obj) + + # allow permission to create all objects + user.proj_rg = vnc_aal_create(self.admin.vnc_lib, user.project_obj) + vnc_aal_add_rule(self.admin.vnc_lib, user.proj_rg, + rule_str = '* %s:CRUD' % user.role) + + ipam_obj = NetworkIpam('ipam-%s' %(self.id()), user.project_obj) + user.vnc_lib.network_ipam_create(ipam_obj) + ipam_sn_v4 = IpamSubnetType(subnet=SubnetType('11.1.1.0', 24)) + kwargs = {'id_perms':{'user_visible': False}} + vn = VirtualNetwork('vn-%s' %(self.id()), user.project_obj, **kwargs) + vn.add_network_ipam(ipam_obj, VnSubnetsType([ipam_sn_v4])) + + #create virtual-network by non-admin user should fail when user_visible -> 'false' + with ExpectedException(BadRequest) as e: + user.vnc_lib.virtual_network_create(vn) + + #create virtual-network by admin user + self.admin.vnc_lib.virtual_network_create(vn) + vn_fq_name = vn.get_fq_name() + + #delete virtual-network by non-admin user should fail when user_visible -> 'false' + with ExpectedException(NoIdError) as e: + user.vnc_lib.virtual_network_delete(fq_name = vn_fq_name) -class TestPermissions(test_case.ApiServerTestCase): - def test_example(self): - pass - # end test_example + #update virtual-network by non-admin user should fail when user_visible -> 'false' + vn.display_name = "test_perms" + with ExpectedException(NoIdError) as e: + user.vnc_lib.virtual_network_update(vn) + #end test_user_visible_perms # class TestPermissions diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index e4f9536e66b..2d833b4f711 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -420,6 +420,13 @@ def http_resource_create(self, obj_type): resource_type, r_class = self._validate_resource_type(obj_type) obj_dict = get_request().json[resource_type] + # check visibility + user_visible = (obj_dict.get('id_perms') or {}).get('user_visible', True) + if not user_visible and not self.is_admin_request(): + result = 'This object is not visible by users' + self.config_object_error(None, None, obj_type, 'http_post', result) + raise cfgm_common.exceptions.HttpError(400, result) + self._post_validate(obj_type, obj_dict=obj_dict) fq_name = obj_dict['fq_name'] try: @@ -768,6 +775,13 @@ def http_resource_update(self, obj_type, id): except NoIdError as e: raise cfgm_common.exceptions.HttpError(404, str(e)) + # check visibility + if (not read_result['id_perms'].get('user_visible', True) and + not self.is_admin_request()): + result = 'This object is not visible by users: %s' % id + self.config_object_error(id, None, obj_type, 'http_put', result) + raise cfgm_common.exceptions.HttpError(404, result) + # properties validator ok, result = self._validate_props_in_request(r_class, obj_dict) if not ok: @@ -892,6 +906,13 @@ def http_resource_delete(self, obj_type, id): id, None, obj_type, 'http_delete', read_result) # proceed down to delete the resource + # check visibility + if (not read_result['id_perms'].get('user_visible', True) and + not self.is_admin_request()): + result = 'This object is not visible by users: %s' % id + self.config_object_error(id, None, obj_type, 'http_delete', result) + raise cfgm_common.exceptions.HttpError(404, result) + # common handling for all resource delete parent_obj_type = read_result.get('parent_type') (ok, del_result) = self._delete_common(