@@ -15,7 +15,8 @@ class TestBasic(test_case.NeutronBackendTestCase):
15
15
def read_resource (self , url_pfx , id ):
16
16
context = {'operation' : 'READ' ,
17
17
'user_id' : '' ,
18
- 'roles' : '' }
18
+ 'roles' : '' ,
19
+ 'is_admin' : True }
19
20
data = {'fields' : None ,
20
21
'id' : id }
21
22
body = {'context' : context , 'data' : data }
@@ -137,7 +138,7 @@ def list_resource(url_pfx):
137
138
# for collections that are objects in contrail model
138
139
for (objects , res_url_pfx , res_xlate_name ) in collection_types :
139
140
res_dicts = list_resource (res_url_pfx )
140
- present_ids = [r ['id' ] for r in res_dicts ]
141
+ present_ids = [r ['id' ] for r in res_dicts ]
141
142
for obj in objects :
142
143
self .assertIn (obj .uuid , present_ids )
143
144
@@ -153,11 +154,11 @@ def err_on_object_2(orig_method, res_obj, *args, **kwargs):
153
154
with test_common .patch (
154
155
neutron_db_obj , res_xlate_name , err_on_object_2 ):
155
156
res_dicts = list_resource (res_url_pfx )
156
- present_ids = [r ['id' ] for r in res_dicts ]
157
+ present_ids = [r ['id' ] for r in res_dicts ]
157
158
self .assertNotIn (objects [2 ].uuid , present_ids )
158
159
159
160
res_dicts = list_resource (res_url_pfx )
160
- present_ids = [r ['id' ] for r in res_dicts ]
161
+ present_ids = [r ['id' ] for r in res_dicts ]
161
162
for obj in objects :
162
163
self .assertIn (obj .uuid , present_ids )
163
164
# end for collections that are objects in contrail model
@@ -176,7 +177,7 @@ def err_on_sn2(orig_method, subnet_vnc, *args, **kwargs):
176
177
with test_common .patch (
177
178
neutron_db_obj , '_subnet_vnc_to_neutron' , err_on_sn2 ):
178
179
res_dicts = list_resource ('subnet' )
179
- present_ids = [r ['id' ] for r in res_dicts ]
180
+ present_ids = [r ['id' ] for r in res_dicts ]
180
181
self .assertNotIn (sn2_id , present_ids )
181
182
# end test_list_with_inconsistent_members
182
183
@@ -287,6 +288,102 @@ def test_port_bindings(self):
287
288
self .assertTrue (isinstance (port_dict ['binding:profile' ], dict ))
288
289
self .assertTrue (isinstance (port_dict ['binding:host_id' ], basestring ))
289
290
# end test_port_bindings
291
+
292
+ def test_sg_rules_delete_when_peer_group_deleted_on_read_sg (self ):
293
+ sg1_obj = vnc_api .SecurityGroup ('sg1-%s' % (self .id ()))
294
+ self ._vnc_lib .security_group_create (sg1_obj )
295
+ sg1_obj = self ._vnc_lib .security_group_read (sg1_obj .fq_name )
296
+ sg2_obj = vnc_api .SecurityGroup ('sg2-%s' % (self .id ()))
297
+ self ._vnc_lib .security_group_create (sg2_obj )
298
+ sg2_obj = self ._vnc_lib .security_group_read (sg2_obj .fq_name )
299
+ sgr_uuid = str (uuid .uuid4 ())
300
+ local = [vnc_api .AddressType (security_group = 'local' )]
301
+ remote = [vnc_api .AddressType (security_group = sg2_obj .get_fq_name_str ())]
302
+ sgr_obj = vnc_api .PolicyRuleType (rule_uuid = sgr_uuid ,
303
+ direction = '>' ,
304
+ protocol = 'any' ,
305
+ src_addresses = remote ,
306
+ src_ports = [vnc_api .PortType (0 , 255 )],
307
+ dst_addresses = local ,
308
+ dst_ports = [vnc_api .PortType (0 , 255 )],
309
+ ethertype = 'IPv4' )
310
+ rules = vnc_api .PolicyEntriesType ([sgr_obj ])
311
+ sg1_obj .set_security_group_entries (rules )
312
+ self ._vnc_lib .security_group_update (sg1_obj )
313
+
314
+ self ._vnc_lib .security_group_delete (fq_name = sg2_obj .fq_name )
315
+
316
+ sg_dict = self .read_resource ('security_group' , sg1_obj .uuid )
317
+ sgr = [rule ['id' ] for rule in sg_dict .get ('security_group_rules' , [])]
318
+ self .assertNotIn (sgr_uuid , sgr )
319
+ sg1_obj = self ._vnc_lib .security_group_read (sg1_obj .fq_name )
320
+ sgr = [rule .rule_uuid for rule in
321
+ sg1_obj .get_security_group_entries ().get_policy_rule () or []]
322
+ self .assertIn (sgr_uuid , sgr )
323
+
324
+ def test_sg_rules_delete_when_peer_group_deleted_on_read_rule (self ):
325
+ sg1_obj = vnc_api .SecurityGroup ('sg1-%s' % (self .id ()))
326
+ self ._vnc_lib .security_group_create (sg1_obj )
327
+ sg1_obj = self ._vnc_lib .security_group_read (sg1_obj .fq_name )
328
+ sg2_obj = vnc_api .SecurityGroup ('sg2-%s' % (self .id ()))
329
+ self ._vnc_lib .security_group_create (sg2_obj )
330
+ sg2_obj = self ._vnc_lib .security_group_read (sg2_obj .fq_name )
331
+ sgr_uuid = str (uuid .uuid4 ())
332
+ local = [vnc_api .AddressType (security_group = 'local' )]
333
+ remote = [vnc_api .AddressType (
334
+ security_group = sg2_obj .get_fq_name_str ())]
335
+ sgr_obj = vnc_api .PolicyRuleType (rule_uuid = sgr_uuid ,
336
+ direction = '>' ,
337
+ protocol = 'any' ,
338
+ src_addresses = remote ,
339
+ src_ports = [vnc_api .PortType (0 , 255 )],
340
+ dst_addresses = local ,
341
+ dst_ports = [vnc_api .PortType (0 , 255 )],
342
+ ethertype = 'IPv4' )
343
+ rules = vnc_api .PolicyEntriesType ([sgr_obj ])
344
+ sg1_obj .set_security_group_entries (rules )
345
+ self ._vnc_lib .security_group_update (sg1_obj )
346
+
347
+ self ._vnc_lib .security_group_delete (fq_name = sg2_obj .fq_name )
348
+
349
+ with ExpectedException (webtest .app .AppError ):
350
+ self .read_resource ('security_group_rule' , sgr_uuid )
351
+ sg1_obj = self ._vnc_lib .security_group_read (sg1_obj .fq_name )
352
+ sgr = [rule .rule_uuid for rule in
353
+ sg1_obj .get_security_group_entries ().get_policy_rule () or []]
354
+ self .assertIn (sgr_uuid , sgr )
355
+
356
+ def test_sg_rules_delete_when_peer_group_deleted_on_list_rules (self ):
357
+ sg1_obj = vnc_api .SecurityGroup ('sg1-%s' % (self .id ()))
358
+ self ._vnc_lib .security_group_create (sg1_obj )
359
+ sg1_obj = self ._vnc_lib .security_group_read (sg1_obj .fq_name )
360
+ sg2_obj = vnc_api .SecurityGroup ('sg2-%s' % (self .id ()))
361
+ self ._vnc_lib .security_group_create (sg2_obj )
362
+ sg2_obj = self ._vnc_lib .security_group_read (sg2_obj .fq_name )
363
+ sgr_uuid = str (uuid .uuid4 ())
364
+ local = [vnc_api .AddressType (security_group = 'local' )]
365
+ remote = [vnc_api .AddressType (
366
+ security_group = sg2_obj .get_fq_name_str ())]
367
+ sgr_obj = vnc_api .PolicyRuleType (rule_uuid = sgr_uuid ,
368
+ direction = '>' ,
369
+ protocol = 'any' ,
370
+ src_addresses = remote ,
371
+ src_ports = [vnc_api .PortType (0 , 255 )],
372
+ dst_addresses = local ,
373
+ dst_ports = [vnc_api .PortType (0 , 255 )],
374
+ ethertype = 'IPv4' )
375
+ rules = vnc_api .PolicyEntriesType ([sgr_obj ])
376
+ sg1_obj .set_security_group_entries (rules )
377
+ self ._vnc_lib .security_group_update (sg1_obj )
378
+
379
+ self ._vnc_lib .security_group_delete (fq_name = sg2_obj .fq_name )
380
+
381
+ sgr_dict = self .list_resource ('security_group_rule' )
382
+ self .assertNotIn (sgr_uuid , [rule ['id' ] for rule in sgr_dict ])
383
+ sg1_obj = self ._vnc_lib .security_group_read (sg1_obj .fq_name )
384
+ sgr = [rule .rule_uuid for rule in
385
+ sg1_obj .get_security_group_entries ().get_policy_rule () or []]
386
+ self .assertIn (sgr_uuid , sgr )
290
387
# end class TestBasic
291
388
292
389
class TestExtraFieldsPresenceByKnob (test_case .NeutronBackendTestCase ):
0 commit comments