Skip to content

Commit

Permalink
Merge "Network policy and pod out of order handling"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 1, 2017
2 parents 3cbd162 + 387c68b commit 4aa5e6d
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 123 deletions.
20 changes: 19 additions & 1 deletion src/container/kube-manager/kube_manager/vnc/config_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@

class DBBaseKM(DBBase):
obj_type = __name__
_fq_name_to_uuid = {}

def __init__(self, uuid, obj_dict=None):
self._fq_name_to_uuid[tuple(obj_dict['fq_name'])] = uuid

@classmethod
def get_fq_name_to_uuid(cls, fq_name):
return cls._fq_name_to_uuid.get(tuple(fq_name))

@classmethod
def delete(cls, uuid):
if uuid not in cls._dict:
return
obj = cls._dict[uuid]
del cls._fq_name_to_uuid[tuple(self.fq_name)]

def evaluate(self):
# Implement in the derived class
Expand Down Expand Up @@ -449,7 +464,8 @@ def __init__(self, uuid, obj_dict=None):
self.virtual_machine_interfaces = set()
self.annotations = None
self.rule_entries = None
self.update(obj_dict)
obj_dict = self.update(obj_dict)
super(SecurityGroupKM, self).__init__(uuid, obj_dict)

def update(self, obj=None):
if obj is None:
Expand All @@ -459,13 +475,15 @@ def update(self, obj=None):
self.update_multiple_refs('virtual_machine_interface', obj)
self.annotations = obj.get('annotations', None)
self.rule_entries = obj.get('security_group_entries', None)
return obj

@classmethod
def delete(cls, uuid):
if uuid not in cls._dict:
return
obj = cls._dict[uuid]
obj.update_multiple_refs('virtual_machine_interface', {})
super(SecurityGroupKM, cls).delete(uuid)
del cls._dict[uuid]

class FloatingIpPoolKM(DBBaseKM):
Expand Down
194 changes: 72 additions & 122 deletions src/container/kube-manager/kube_manager/vnc/vnc_network_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,17 @@ def __init__(self, vnc_lib=None, label_cache=None, logger=None):
self.policy_src_label_cache = {}
self.policy_dest_label_cache = {}

def _append_sg_rule(self, sg_obj, sg, sg_rule):
def _select_pods(self, labels):
result = set()
for label in labels.items():
key = self._label_cache._get_key(label)
pod_ids = self._label_cache.pod_label_cache.get(key)
if not pod_ids:
continue
result.update(pod_ids)
return result

def _append_sg_rule(self, sg, sg_rule):
sg_obj = self._vnc_lib.security_group_read(id=sg.uuid)
rules = sg_obj.get_security_group_entries()
if rules is None:
Expand All @@ -39,73 +49,21 @@ def _append_sg_rule(self, sg_obj, sg, sg_rule):
sg_obj.set_security_group_entries(rules)
self._vnc_lib.security_group_update(sg_obj)

def _remove_sg_rule_by_id(self, sg_obj, rule_uuid):
rules = sg_obj.get_security_group_entries()
if rules is None:
self.logger.info("No SecurityGroupRule matching rule_uuid %s, no rules found" % rule_uuid)
return

for sgr in rules.get_policy_rule() or []:
if sgr.rule_uuid == rule_uuid:
self.logger.info("Deleting SecurityGroupRule %s matching rule_uuid %s: " %
sgr.rule_uuid, rule_uuid)
rules.delete_policy_rule(sgr)
sg_obj.set_security_group_entries(rules)
update_needed = True

if update_needed:
sg_obj.set_security_group_entries(rules)
else:
self.logger.info("No SecurityGroupRule matching rule_uuid %s found" % rule_uuid)

def _remove_sg_rule_by_src_address(self, sg_obj, src_address):
def _delete_sg_rules(self, sg_obj, sg, ip_addr):
sg_obj = self._vnc_lib.security_group_read(id=sg.uuid)
rules = sg_obj.get_security_group_entries()
if rules is None:
self.logger.error("No SecurityGroupRules matching src_addr %s, no rules found" % src_address)
return

for sgr in rules.get_policy_rule() or []:
if sgr.src_addresses == src_address:
self.logger.info("Deleting SecurityGroupRule %s matching src_addr %s: " %
sgr.rule_uuid, src_address)
rules.delete_policy_rule(sgr)
update_needed = True
if update_needed:
sg_obj.set_security_group_entries(rules)
else:
self.logger.error("No SecurityGroupRule matching src_addr %s found" % src_address)

def _delete_sg_from_pod(self, sg, pod):
vm = VirtualMachineKM.get(pod)
if not vm:
return

for vmi_id in vm.virtual_machine_interfaces:
# vmi = VirtualMachineInterfaceKM.get(vmi_id)
vmi = self._vnc_lib.virtual_machine_interface_read(id=vmi_id)
if not vmi:
if sgr.ip_addr != ip_addr:
continue
rules.delete_policy_rule(sgr)
update_sg = True

vmi_sg = vmi.get_security_group()
if vmi_sg['uuid'] == sg['uuid']:
vmi.delete_security_group(sg)
self._vnc_lib.virtual_machine_interface_update(vmi)

def _vnc_sg_clear_vmis(self, sg):
vmi_ids = sg.get_virtual_machine_interface_back_refs()
if not vmi_ids:
return
for vmi_id in vmi_ids:
# vmi = VirtualMachineInterfaceKM.get(vmi_id)
vmi = self._vnc_lib.virtual_machine_interface_read(id=vmi_id['uuid'])
vmi.del_security_group(sg)
self._vnc_lib.virtual_machine_interface_update(vmi)

def vnc_network_policy_delete(self, event):
uuid = event['object']['metadata'].get('uid')
sg = self._vnc_lib.security_group_read(id=uuid)
self._vnc_sg_clear_vmis(sg)
self._vnc_lib.security_group_delete(id=uuid)
if update_sg:
sg_obj.set_security_group_entries(rules)
self._vnc_lib.security_group_update(sg_obj)

def _get_rules_from_annotations(self, annotations):
if not annotations:
Expand All @@ -124,7 +82,7 @@ def _add_src_pod_2_policy(self, pod_id, policy_id):
rules = self._get_rules_from_annotations(sg.annotations)
for rule in rules:
ports = rule.get('ports')
self._set_sg_rule(None, sg, pod_id, ports)
self._set_sg_rule(sg, pod_id, ports)

def vnc_pod_add(self, event):
labels = event['object']['metadata']['labels']
Expand All @@ -139,14 +97,12 @@ def vnc_pod_add(self, event):

policy_ids = self.policy_dest_label_cache.get(key, [])
for policy_id in policy_ids:
self._apply_sg_2_pod(policy_id, pod_id)
self._sg_2_pod_link(pod_id, policy_id, 'ADD')

def _delete_src_pod_from_policy(self, pod_id, policy_id):
try:
sg = self._vnc_lib.security_group_read(id=policy_id)
except Exception as e:
self.logger.error("Cannot read security group with UUID " + id)

sg = SecurityGroupKM.get(policy_id)
if not sg:
return
vm = VirtualMachineKM.get(pod_id)
if not vm:
return
Expand All @@ -164,25 +120,7 @@ def _delete_src_pod_from_policy(self, pod_id, policy_id):
if not ip_addr:
return

sgr_uuid = str(uuid.uuid4())
src_addr = AddressType(subnet=SubnetType(ip_addr, 32))
dst_addr = AddressType(security_group='local')
for port in ports:
proto = port['protocol'].lower()
rule = PolicyRuleType(rule_uuid=sgr_uuid, direction='>',
protocol=proto,
src_addresses=[src_addr], src_ports=[PortType(0, 65535)],
dst_addresses=[dst_addr],
dst_ports=[PortType(int(port['port']), int(port['port']))],
ethertype='IPv4')
self._append_sg_rule(sg, rule)

def _delete_dest_pod_from_policy(self, pod_id, policy_id):
try:
sg = self._vnc_lib.security_group_read(id=policy_id)
except Exception as e:
self.logger.error("Cannot read security group with UUID " + id)
self._delete_sg_from_pod(sg, pod_id)
self._delete_sg_rule(sg, ip_addr)

def vnc_pod_delete(self, event):
labels = event['object']['metadata']['labels']
Expand All @@ -197,23 +135,13 @@ def vnc_pod_delete(self, event):

policy_ids = self.policy_dest_label_cache.get(key, [])
for policy_id in policy_ids:
self._delete_dest_pod_from_policy(pod_id, policy_id)
self._sg_2_pod_link(pod_id, policy_id, 'DELETE')

def _select_pods(self, labels):
result = set()
for label in labels.items():
key = self._label_cache._get_key(label)
pod_ids = self._label_cache.pod_label_cache.get(key)
if not pod_ids:
continue
result.update(pod_ids)
return result

def _set_sg_rule(self, sg_obj, sg, src_pod, ports):
def _set_sg_rule(self, sg, src_pod, ports):
vm = VirtualMachineKM.get(src_pod)
if not vm:
return

ip_addr = None
for vmi_id in vm.virtual_machine_interfaces:
vmi = VirtualMachineInterfaceKM.get(vmi_id)
Expand All @@ -239,9 +167,9 @@ def _set_sg_rule(self, sg_obj, sg, src_pod, ports):
dst_addresses=[dst_addr],
dst_ports=[PortType(int(port['port']), int(port['port']))],
ethertype='IPv4')
self._append_sg_rule(sg_obj, sg, rule)
self._append_sg_rule(sg, rule)

def _set_sg_rules(self, sg_obj, sg, event):
def _set_sg_rules(self, sg, event):
update = False
rules = event['object']['spec']['ingress']
for rule in rules:
Expand All @@ -259,13 +187,9 @@ def _set_sg_rules(self, sg_obj, sg, event):
self.policy_src_label_cache, src_label, sg.uuid)
src_pods = self._select_pods(src_labels)
for src_pod in src_pods:
self._set_sg_rule(sg_obj, sg, src_pod, ports)
update = True

if update:
self._vnc_lib.security_group_update(sg)
self._set_sg_rule(sg, src_pod, ports)

def _apply_sg_2_pod(self, sg_id, pod_id):
def _sg_2_pod_link(self, pod_id, sg_id, oper):
vm = VirtualMachineKM.get(pod_id)
if not vm:
return
Expand All @@ -277,10 +201,10 @@ def _apply_sg_2_pod(self, sg_id, pod_id):

try:
self._vnc_lib.ref_update('virtual-machine-interface', vmi_id,
'security-group', sg_id, None, 'ADD')
'security-group', sg_id, None, oper)
except Exception as e:
self.logger.error("Failed to attach SG %s to pod %s" %
(pod_id, sg_id))
self.logger.error("Failed to %s SG %s to pod %s" %
(oper, pod_id, sg_id))

def _apply_sg_2_pods(self, sg, event):
podSelector = event['object']['spec'].get('podSelector', None)
Expand All @@ -299,7 +223,7 @@ def _apply_sg_2_pods(self, sg, event):

pod_ids = self._select_pods(dest_labels)
for pod_id in pod_ids:
self._apply_sg_2_pod(sg.uuid, pod_id)
self._sg_2_pod_link(pod_id, sg.uuid, 'ADD')

def _set_sg_annotations(self, sg_obj, sg, event):
update_sg = False
Expand All @@ -321,10 +245,7 @@ def _set_sg_annotations(self, sg_obj, sg, event):
def _update_sg(self, event, sg):
sg_obj = SecurityGroup()
sg_obj.uuid = sg.uuid

update_sg = False
self._set_sg_rules(sg_obj, sg, event)
update_sg |= self._set_sg_annotations(sg_obj, sg, event)
update_sg = self._set_sg_annotations(sg_obj, sg, event)
if not update_sg:
return sg

Expand All @@ -344,35 +265,64 @@ def _create_sg(self, event, uuid):
parent='domain')
sg_obj = SecurityGroup(name=name, parent_obj=proj_obj)
sg_obj.uuid = uuid

self._set_sg_rules(sg_obj, None, event)
self._set_sg_annotations(sg_obj, None, event)

try:
self._vnc_lib.security_group_create(sg_obj)
except Exception as e:
self.logger.error("Failed to create SG %s" % uuid)
return None
return SecurityGroupKM.locate(uuid)
sg = SecurityGroupKM.locate(uuid)
return sg

def _check_sg_uuid_change(self, event, uuid):
name = event['object']['metadata'].get('name')
namespace = event['object']['metadata'].get('namespace')
sg_fq_name = ['default-domain', namespace, name]
sg_uuid = SecurityGroupKM.get_fq_name_to_uuid(sg_fq_name)
if sg_uuid != uuid:
self.vnc_network_policy_delete(event, sg_uuid)

def vnc_network_policy_add(self, event):
uuid = event['object']['metadata'].get('uid')
sg = SecurityGroupKM.get(uuid)
if not sg:
self._check_sg_uuid_change(event, uuid)
sg = self._create_sg(event, uuid)
else:
sg = self._update_sg(event, sg)

self._set_sg_rules(sg, event)
self._apply_sg_2_pods(sg, event)

def vnc_network_policy_delete(self, event, sg_uuid=None):
if not sg_uuid:
sg_uuid = event['object']['metadata'].get('uid')
sg = SecurityGroupKM.get(sg_uuid)
if not sg:
return

for vmi_id in list(sg.virtual_machine_interfaces):
try:
self._vnc_lib.ref_update('virtual-machine-interface', vmi_id,
'security-group', sg.uuid, None, 'DELETE')
except Exception as e:
self.logger.error("Failed to detach SG %s" % str(e))

self._vnc_lib.security_group_delete(id=sg.uuid)

def process(self, event):
if event['object'].get('kind') == 'NetworkPolicy':
if event['type'] == 'ADDED' or event['type'] == 'MODIFIED':
if event['type'] == 'ADDED':
self.vnc_network_policy_add(event)
elif event['type'] == 'MODIFIED':
self.vnc_network_policy_add(event)
elif event['type'] == 'DELETED':
self.vnc_network_policy_delete(event)

if event['object'].get('kind') == 'Pod':
if event['type'] == 'ADDED' or event['type'] == 'MODIFIED':
if event['type'] == 'ADDED':
self.vnc_pod_add(event)
elif event['type'] == 'MODIFIED':
self.vnc_pod_add(event)
elif event['type'] == 'DELETED':
self.vnc_pod_delete(event)

0 comments on commit 4aa5e6d

Please sign in to comment.