diff --git a/src/config/vnc_openstack/vnc_openstack/neutron_plugin_db.py b/src/config/vnc_openstack/vnc_openstack/neutron_plugin_db.py index 235c2229b68..da6708da5ff 100644 --- a/src/config/vnc_openstack/vnc_openstack/neutron_plugin_db.py +++ b/src/config/vnc_openstack/vnc_openstack/neutron_plugin_db.py @@ -1088,16 +1088,18 @@ def _security_group_rule_vnc_to_neutron(self, sg_id, sg_rule, sg_obj=None): addr.get_subnet().get_ip_prefix_len()) elif addr.get_security_group(): if (addr.get_security_group() != 'any' and - addr.get_security_group() != 'local'): + addr.get_security_group() != 'local'): remote_sg = addr.get_security_group() - try: - if remote_sg != ':'.join(sg_obj.get_fq_name()): - remote_sg_obj = self._vnc_lib.security_group_read(fq_name_str=remote_sg) - else: - remote_sg_obj = sg_obj - remote_sg_uuid = remote_sg_obj.uuid - except NoIdError: - pass + if remote_sg != ':'.join(sg_obj.get_fq_name()): + try: + remote_sg_uuid = self._vnc_lib.fq_name_to_id( + 'security-group', remote_sg.split(':')) + except NoIdError: + # Filter rule out as the remote security group does not + # exist anymore + return sgr_q_dict + else: + remote_sg_uuid = sg_obj.uuid sgr_q_dict['id'] = sg_rule.get_rule_uuid() sgr_q_dict['tenant_id'] = sg_obj.parent_uuid.replace('-', '') @@ -4018,9 +4020,11 @@ def security_group_rule_read(self, context, sgr_id): sg_obj, sg_rule = self._security_group_rule_find(sgr_id, project_uuid) if sg_obj and sg_rule: - return self._security_group_rule_vnc_to_neutron(sg_obj.uuid, - sg_rule, sg_obj) - + sgr_info = self._security_group_rule_vnc_to_neutron(sg_obj.uuid, + sg_rule, + sg_obj) + if sgr_info: + return sgr_info self._raise_contrail_exception('SecurityGroupRuleNotFound', id=sgr_id) #end security_group_rule_read @@ -4049,10 +4053,10 @@ def security_group_rules_read(self, sg_id, sg_obj=None): return for sg_rule in sgr_entries.get_policy_rule(): - sg_info = self._security_group_rule_vnc_to_neutron(sg_obj.uuid, - sg_rule, - sg_obj) - sg_rules.append(sg_info) + sgr_info = self._security_group_rule_vnc_to_neutron( + sg_obj.uuid, sg_rule, sg_obj) + if sgr_info: + sg_rules.append(sgr_info) except NoIdError: self._raise_contrail_exception('SecurityGroupNotFound', id=sg_id) diff --git a/src/config/vnc_openstack/vnc_openstack/tests/test_basic.py b/src/config/vnc_openstack/vnc_openstack/tests/test_basic.py index 90967ac7ea9..8e68170cca7 100644 --- a/src/config/vnc_openstack/vnc_openstack/tests/test_basic.py +++ b/src/config/vnc_openstack/vnc_openstack/tests/test_basic.py @@ -15,7 +15,8 @@ class TestBasic(test_case.NeutronBackendTestCase): def read_resource(self, url_pfx, id): context = {'operation': 'READ', 'user_id': '', - 'roles': ''} + 'roles': '', + 'is_admin': True} data = {'fields': None, 'id': id} body = {'context': context, 'data': data} @@ -137,7 +138,7 @@ def list_resource(url_pfx): # for collections that are objects in contrail model for (objects, res_url_pfx, res_xlate_name) in collection_types: res_dicts = list_resource(res_url_pfx) - present_ids = [r['id'] for r in res_dicts] + present_ids = [r['id'] for r in res_dicts] for obj in objects: self.assertIn(obj.uuid, present_ids) @@ -153,11 +154,11 @@ def err_on_object_2(orig_method, res_obj, *args, **kwargs): with test_common.patch( neutron_db_obj, res_xlate_name, err_on_object_2): res_dicts = list_resource(res_url_pfx) - present_ids = [r['id'] for r in res_dicts] + present_ids = [r['id'] for r in res_dicts] self.assertNotIn(objects[2].uuid, present_ids) res_dicts = list_resource(res_url_pfx) - present_ids = [r['id'] for r in res_dicts] + present_ids = [r['id'] for r in res_dicts] for obj in objects: self.assertIn(obj.uuid, present_ids) # end for collections that are objects in contrail model @@ -176,7 +177,7 @@ def err_on_sn2(orig_method, subnet_vnc, *args, **kwargs): with test_common.patch( neutron_db_obj, '_subnet_vnc_to_neutron', err_on_sn2): res_dicts = list_resource('subnet') - present_ids = [r['id'] for r in res_dicts] + present_ids = [r['id'] for r in res_dicts] self.assertNotIn(sn2_id, present_ids) # end test_list_with_inconsistent_members @@ -287,6 +288,102 @@ def test_port_bindings(self): self.assertTrue(isinstance(port_dict['binding:profile'], dict)) self.assertTrue(isinstance(port_dict['binding:host_id'], basestring)) # end test_port_bindings + + def test_sg_rules_delete_when_peer_group_deleted_on_read_sg(self): + sg1_obj = vnc_api.SecurityGroup('sg1-%s' %(self.id())) + self._vnc_lib.security_group_create(sg1_obj) + sg1_obj = self._vnc_lib.security_group_read(sg1_obj.fq_name) + sg2_obj = vnc_api.SecurityGroup('sg2-%s' %(self.id())) + self._vnc_lib.security_group_create(sg2_obj) + sg2_obj = self._vnc_lib.security_group_read(sg2_obj.fq_name) + sgr_uuid = str(uuid.uuid4()) + local = [vnc_api.AddressType(security_group='local')] + remote = [vnc_api.AddressType(security_group=sg2_obj.get_fq_name_str())] + sgr_obj = vnc_api.PolicyRuleType(rule_uuid=sgr_uuid, + direction='>', + protocol='any', + src_addresses=remote, + src_ports=[vnc_api.PortType(0, 255)], + dst_addresses=local, + dst_ports=[vnc_api.PortType(0, 255)], + ethertype='IPv4') + rules = vnc_api.PolicyEntriesType([sgr_obj]) + sg1_obj.set_security_group_entries(rules) + self._vnc_lib.security_group_update(sg1_obj) + + self._vnc_lib.security_group_delete(fq_name=sg2_obj.fq_name) + + sg_dict = self.read_resource('security_group', sg1_obj.uuid) + sgr = [rule['id'] for rule in sg_dict.get('security_group_rules', [])] + self.assertNotIn(sgr_uuid, sgr) + sg1_obj = self._vnc_lib.security_group_read(sg1_obj.fq_name) + sgr = [rule.rule_uuid for rule in + sg1_obj.get_security_group_entries().get_policy_rule() or []] + self.assertIn(sgr_uuid, sgr) + + def test_sg_rules_delete_when_peer_group_deleted_on_read_rule(self): + sg1_obj = vnc_api.SecurityGroup('sg1-%s' %(self.id())) + self._vnc_lib.security_group_create(sg1_obj) + sg1_obj = self._vnc_lib.security_group_read(sg1_obj.fq_name) + sg2_obj = vnc_api.SecurityGroup('sg2-%s' %(self.id())) + self._vnc_lib.security_group_create(sg2_obj) + sg2_obj = self._vnc_lib.security_group_read(sg2_obj.fq_name) + sgr_uuid = str(uuid.uuid4()) + local = [vnc_api.AddressType(security_group='local')] + remote = [vnc_api.AddressType( + security_group=sg2_obj.get_fq_name_str())] + sgr_obj = vnc_api.PolicyRuleType(rule_uuid=sgr_uuid, + direction='>', + protocol='any', + src_addresses=remote, + src_ports=[vnc_api.PortType(0, 255)], + dst_addresses=local, + dst_ports=[vnc_api.PortType(0, 255)], + ethertype='IPv4') + rules = vnc_api.PolicyEntriesType([sgr_obj]) + sg1_obj.set_security_group_entries(rules) + self._vnc_lib.security_group_update(sg1_obj) + + self._vnc_lib.security_group_delete(fq_name=sg2_obj.fq_name) + + with ExpectedException(webtest.app.AppError): + self.read_resource('security_group_rule', sgr_uuid) + sg1_obj = self._vnc_lib.security_group_read(sg1_obj.fq_name) + sgr = [rule.rule_uuid for rule in + sg1_obj.get_security_group_entries().get_policy_rule() or []] + self.assertIn(sgr_uuid, sgr) + + def test_sg_rules_delete_when_peer_group_deleted_on_list_rules(self): + sg1_obj = vnc_api.SecurityGroup('sg1-%s' %(self.id())) + self._vnc_lib.security_group_create(sg1_obj) + sg1_obj = self._vnc_lib.security_group_read(sg1_obj.fq_name) + sg2_obj = vnc_api.SecurityGroup('sg2-%s' %(self.id())) + self._vnc_lib.security_group_create(sg2_obj) + sg2_obj = self._vnc_lib.security_group_read(sg2_obj.fq_name) + sgr_uuid = str(uuid.uuid4()) + local = [vnc_api.AddressType(security_group='local')] + remote = [vnc_api.AddressType( + security_group=sg2_obj.get_fq_name_str())] + sgr_obj = vnc_api.PolicyRuleType(rule_uuid=sgr_uuid, + direction='>', + protocol='any', + src_addresses=remote, + src_ports=[vnc_api.PortType(0, 255)], + dst_addresses=local, + dst_ports=[vnc_api.PortType(0, 255)], + ethertype='IPv4') + rules = vnc_api.PolicyEntriesType([sgr_obj]) + sg1_obj.set_security_group_entries(rules) + self._vnc_lib.security_group_update(sg1_obj) + + self._vnc_lib.security_group_delete(fq_name=sg2_obj.fq_name) + + sgr_dict = self.list_resource('security_group_rule') + self.assertNotIn(sgr_uuid, [rule['id'] for rule in sgr_dict]) + sg1_obj = self._vnc_lib.security_group_read(sg1_obj.fq_name) + sgr = [rule.rule_uuid for rule in + sg1_obj.get_security_group_entries().get_policy_rule() or []] + self.assertIn(sgr_uuid, sgr) # end class TestBasic class TestExtraFieldsPresenceByKnob(test_case.NeutronBackendTestCase):