Skip to content

Commit

Permalink
Merge "Added check_visibility logic for CRUD operations."
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Nov 15, 2016
2 parents 671e0d6 + 51318dc commit 7f69ecd
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 16 deletions.
24 changes: 12 additions & 12 deletions src/config/api-server/tests/test_crud_basic.py
Expand Up @@ -1054,7 +1054,11 @@ def test_list_for_coverage(self):
vn2_obj = VirtualNetwork(
name, display_name=name, id_perms=id_perms,
is_shared=True, router_external=False)
self._vnc_lib.virtual_network_create(vn2_obj)
def fake_admin_request(orig_method, *args, **kwargs):
return True
with test_common.patch(self._api_server,
'is_admin_request', fake_admin_request):
self._vnc_lib.virtual_network_create(vn2_obj)

listen_ip = self._api_server_ip
listen_port = self._api_server._args.listen_port
Expand All @@ -1063,17 +1067,13 @@ def test_list_for_coverage(self):
url = 'http://%s:%s/virtual-networks?%s' %(
listen_ip, listen_port, q_params)

def fake_non_admin_request(orig_method, *args, **kwargs):
return False
with test_common.patch(self._api_server,
'is_admin_request', fake_non_admin_request):
resp = requests.get(url)
self.assertEqual(resp.status_code, 200)
read_vn_dicts = json.loads(resp.text)['virtual-networks']
self.assertEqual(len(read_vn_dicts), 1)
self.assertEqual(read_vn_dicts[0]['uuid'], vn1_obj.uuid)
self.assertEqual(read_vn_dicts[0]['is_shared'], True)
self.assertEqual(read_vn_dicts[0]['router_external'], False)
resp = requests.get(url)
self.assertEqual(resp.status_code, 200)
read_vn_dicts = json.loads(resp.text)['virtual-networks']
self.assertEqual(len(read_vn_dicts), 1)
self.assertEqual(read_vn_dicts[0]['uuid'], vn1_obj.uuid)
self.assertEqual(read_vn_dicts[0]['is_shared'], True)
self.assertEqual(read_vn_dicts[0]['router_external'], False)
# end test_list_for_coverage

def test_list_with_malformed_filters(self):
Expand Down
94 changes: 90 additions & 4 deletions src/config/api-server/tests/test_perms.py
Expand Up @@ -24,6 +24,9 @@
import requests
import stevedore

import keystoneclient.v2_0.client as keystone
from keystonemiddleware import auth_token

from vnc_api.vnc_api import *
import cfgm_common
from cfgm_common import vnc_cgitb
Expand All @@ -34,12 +37,95 @@
import test_common
import test_case

from test_perms2 import User, set_perms, vnc_read_obj, vnc_aal_create, vnc_aal_add_rule

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# This is needed for VncApi._authenticate invocation from within Api server.
# We don't have access to user information so we hard code admin credentials.
def ks_admin_authenticate(self, response=None, headers=None):
rval = token_from_user_info('admin', 'admin', 'default-domain', 'cloud-admin')
new_headers = {}
new_headers['X-AUTH-TOKEN'] = rval
return new_headers

class TestUserVisible(test_case.ApiServerTestCase):
domain_name = 'default-domain'
fqdn = [domain_name]

@classmethod
def setUpClass(cls):
extra_mocks = [(keystone.Client,
'__new__', test_utils.FakeKeystoneClient),
(vnc_api.vnc_api.VncApi,
'_authenticate', ks_admin_authenticate),
(auth_token, 'AuthProtocol',
test_utils.FakeAuthProtocol)]
extra_config_knobs = [
('DEFAULTS', 'aaa_mode', 'rbac'),
('DEFAULTS', 'cloud_admin_role', 'cloud-admin'),
('DEFAULTS', 'global_read_only_role', 'read-only-role'),
('DEFAULTS', 'auth', 'keystone'),
]
super(TestUserVisible, cls).setUpClass(extra_mocks=extra_mocks,
extra_config_knobs=extra_config_knobs)

def setUp(self):
super(TestUserVisible, self).setUp()
ip = self._api_server_ip
port = self._api_server_port
kc = keystone.Client(username='admin', password='contrail123',
tenant_name='admin',
auth_url='http://127.0.0.1:5000/v2.0')

# prepare token before vnc api invokes keystone
self.test = User(ip, port, kc, 'test', 'test123', 'test-role', 'admin-%s' % self.id())
self.admin = User(ip, port, kc, 'admin', 'contrail123', 'cloud-admin', 'admin-%s' % self.id())

def test_user_visible_perms(self):
user = self.test
project_obj = Project(user.project)
project_obj.uuid = user.project_uuid
self.admin.vnc_lib.project_create(project_obj)

# read projects back
user.project_obj = vnc_read_obj(self.admin.vnc_lib,
'project', obj_uuid = user.project_uuid)
user.domain_id = user.project_obj.parent_uuid
user.vnc_lib.set_domain_id(user.project_obj.parent_uuid)

logger.info( 'Change owner of project %s to %s' % (user.project, user.project_uuid))
set_perms(user.project_obj, owner=user.project_uuid, share = [])
self.admin.vnc_lib.project_update(user.project_obj)

# allow permission to create all objects
user.proj_rg = vnc_aal_create(self.admin.vnc_lib, user.project_obj)
vnc_aal_add_rule(self.admin.vnc_lib, user.proj_rg,
rule_str = '* %s:CRUD' % user.role)

ipam_obj = NetworkIpam('ipam-%s' %(self.id()), user.project_obj)
user.vnc_lib.network_ipam_create(ipam_obj)
ipam_sn_v4 = IpamSubnetType(subnet=SubnetType('11.1.1.0', 24))
kwargs = {'id_perms':{'user_visible': False}}
vn = VirtualNetwork('vn-%s' %(self.id()), user.project_obj, **kwargs)
vn.add_network_ipam(ipam_obj, VnSubnetsType([ipam_sn_v4]))

#create virtual-network by non-admin user should fail when user_visible -> 'false'
with ExpectedException(BadRequest) as e:
user.vnc_lib.virtual_network_create(vn)

#create virtual-network by admin user
self.admin.vnc_lib.virtual_network_create(vn)
vn_fq_name = vn.get_fq_name()

#delete virtual-network by non-admin user should fail when user_visible -> 'false'
with ExpectedException(NoIdError) as e:
user.vnc_lib.virtual_network_delete(fq_name = vn_fq_name)

class TestPermissions(test_case.ApiServerTestCase):
def test_example(self):
pass
# end test_example
#update virtual-network by non-admin user should fail when user_visible -> 'false'
vn.display_name = "test_perms"
with ExpectedException(NoIdError) as e:
user.vnc_lib.virtual_network_update(vn)
#end test_user_visible_perms
# class TestPermissions
21 changes: 21 additions & 0 deletions src/config/api-server/vnc_cfg_api_server.py
Expand Up @@ -420,6 +420,13 @@ def http_resource_create(self, obj_type):
resource_type, r_class = self._validate_resource_type(obj_type)
obj_dict = get_request().json[resource_type]

# check visibility
user_visible = (obj_dict.get('id_perms') or {}).get('user_visible', True)
if not user_visible and not self.is_admin_request():
result = 'This object is not visible by users'
self.config_object_error(None, None, obj_type, 'http_post', result)
raise cfgm_common.exceptions.HttpError(400, result)

self._post_validate(obj_type, obj_dict=obj_dict)
fq_name = obj_dict['fq_name']
try:
Expand Down Expand Up @@ -768,6 +775,13 @@ def http_resource_update(self, obj_type, id):
except NoIdError as e:
raise cfgm_common.exceptions.HttpError(404, str(e))

# check visibility
if (not read_result['id_perms'].get('user_visible', True) and
not self.is_admin_request()):
result = 'This object is not visible by users: %s' % id
self.config_object_error(id, None, obj_type, 'http_put', result)
raise cfgm_common.exceptions.HttpError(404, result)

# properties validator
ok, result = self._validate_props_in_request(r_class, obj_dict)
if not ok:
Expand Down Expand Up @@ -892,6 +906,13 @@ def http_resource_delete(self, obj_type, id):
id, None, obj_type, 'http_delete', read_result)
# proceed down to delete the resource

# check visibility
if (not read_result['id_perms'].get('user_visible', True) and
not self.is_admin_request()):
result = 'This object is not visible by users: %s' % id
self.config_object_error(id, None, obj_type, 'http_delete', result)
raise cfgm_common.exceptions.HttpError(404, result)

# common handling for all resource delete
parent_obj_type = read_result.get('parent_type')
(ok, del_result) = self._delete_common(
Expand Down

0 comments on commit 7f69ecd

Please sign in to comment.