From 3c7f5e51badb2fb9e6fa76cb20d22c50a6e85820 Mon Sep 17 00:00:00 2001 From: Rudra Rugge Date: Wed, 15 Feb 2017 11:39:52 -0800 Subject: [PATCH] Network policy multi-label support Add support to handle multiple labels as selectors for pods. Change-Id: Ib68d543c8ea4e273e4ee02db51cf8ededd107594 Partial-Bug: 1654454 --- .../kube_manager/vnc/config_db.py | 34 ++++ .../kube_manager/vnc/label_cache.py | 2 - .../kube_manager/vnc/vnc_network_policy.py | 146 +++++++----------- .../kube-manager/kube_manager/vnc/vnc_pod.py | 68 +++++++- 4 files changed, 147 insertions(+), 103 deletions(-) diff --git a/src/container/kube-manager/kube_manager/vnc/config_db.py b/src/container/kube-manager/kube_manager/vnc/config_db.py index b910078f224..2e0f9d349d6 100644 --- a/src/container/kube-manager/kube_manager/vnc/config_db.py +++ b/src/container/kube-manager/kube_manager/vnc/config_db.py @@ -482,6 +482,10 @@ def __init__(self, uuid, obj_dict=None): self.virtual_machine_interfaces = set() self.annotations = None self.rule_entries = None + self.src_ns_selector = None + self.src_pod_selector = None + self.dst_pod_selector = None + self.dst_ports = None obj_dict = self.update(obj_dict) super(SecurityGroupKM, self).__init__(uuid, obj_dict) @@ -492,9 +496,39 @@ def update(self, obj=None): self.fq_name = obj['fq_name'] self.update_multiple_refs('virtual_machine_interface', obj) self.annotations = obj.get('annotations', None) + self._set_selectors(self.annotations) self.rule_entries = obj.get('security_group_entries', None) return obj + def _set_selectors(self, annotations): + if not annotations: + return + for kvp in annotations.get('key_value_pair', []): + if kvp.get('key') == 'spec': + break + specjson = json.loads(kvp.get('value')) + + pod_selector = specjson.get('podSelector') + if pod_selector: + self.dst_pod_selector = pod_selector.get('matchLabels') + + ingress = specjson.get('ingress') + if not ingress: + return + for rule in ingress: + self.dst_ports = rule.get('ports') + from_rule = rule.get('from') + if not from_rule: + continue + for item in from_rule or []: + ns_selector = item.get('namespaceSelector') + if ns_selector: + self.src_ns_selector = ns_selector.get('matchLabels') + continue + pod_selector = item.get('podSelector') + if pod_selector: + self.src_pod_selector = pod_selector.get('matchLabels') + @classmethod def delete(cls, uuid): if uuid not in cls._dict: diff --git a/src/container/kube-manager/kube_manager/vnc/label_cache.py b/src/container/kube-manager/kube_manager/vnc/label_cache.py index 681c2d83523..0cd7b9e74cc 100644 --- a/src/container/kube-manager/kube_manager/vnc/label_cache.py +++ b/src/container/kube-manager/kube_manager/vnc/label_cache.py @@ -25,5 +25,3 @@ def _remove_label(self, key, cache, label, uuid): cache[key].remove(uuid) except KeyError: pass - - cache[key] = set() diff --git a/src/container/kube-manager/kube_manager/vnc/vnc_network_policy.py b/src/container/kube-manager/kube_manager/vnc/vnc_network_policy.py index 2415ae2121f..951e2b196e3 100644 --- a/src/container/kube-manager/kube_manager/vnc/vnc_network_policy.py +++ b/src/container/kube-manager/kube_manager/vnc/vnc_network_policy.py @@ -22,14 +22,20 @@ def __init__(self, vnc_lib=None, label_cache=None, logger=None): self.policy_src_label_cache = {} self.policy_dest_label_cache = {} - def _select_pods(self, labels): + def _select_pods(self, labels, pod_set=None): result = set() for label in labels.items(): key = self._label_cache._get_key(label) pod_ids = self._label_cache.pod_label_cache.get(key) if not pod_ids: continue - result.update(pod_ids) + if not result: + result = pod_ids.copy() + else: + result.intersection_update(pod_ids) + + if pod_set: + result.intersection_update(pod_set) return result def _add_sg_rule(self, sg, sg_rule): @@ -65,73 +71,39 @@ def _delete_sg_rule(self, sg_uuid, rule_uuid): sg_obj.set_security_group_entries(rules) self._vnc_lib.security_group_update(sg_obj) - def _get_src_labels_from_annotations(self, annotations): - if not annotations: - return - for kvp in annotations.get('key_value_pair', []): - if kvp.get('key') != 'spec': + def _add_src_pod_2_policy(self, pod_id, policy_list): + for policy_id in policy_list: + sg = SecurityGroupKM.get(policy_id) + if not sg: continue - specjson = json.loads(kvp.get('value')) - return specjson['ingress'] + if self._select_pods(sg.src_pod_selector, {pod_id}): + self._set_sg_rule(sg, pod_id) - def _get_dst_labels_from_annotations(self, annotations): - if not annotations: - return - for kvp in annotations.get('key_value_pair', []): - if kvp.get('key') != 'spec': + def _add_dst_pod_2_policy(self, pod_id, policy_list): + for policy_id in policy_list: + sg = SecurityGroupKM.get(policy_id) + if not sg: continue - specjson = json.loads(kvp.get('value')) - return specjson['podSelector']['matchLabels'] - - def _add_src_pod_2_policy(self, pod_id, policy_id): - sg = SecurityGroupKM.get(policy_id) - if not sg: - return - - rules = self._get_src_labels_from_annotations(sg.annotations) - for rule in rules: - ports = rule.get('ports') - self._set_sg_rule(sg, pod_id, ports) + if self._select_pods(sg.dst_pod_selector, {pod_id}): + self._sg_2_pod_link(pod_id, policy_id, 'ADD') def vnc_pod_add(self, event): - labels = event['object']['metadata'].get('labels', {}) + labels = event['object']['metadata']['labels'] pod_id = event['object']['metadata']['uid'] + policy_src_list = set() + policy_dst_list = set() for label in labels.items(): key = self._label_cache._get_key(label) policy_ids = self.policy_src_label_cache.get(key, []) - for policy_id in policy_ids: - self._add_src_pod_2_policy(pod_id, policy_id) + policy_src_list.update(policy_ids) policy_ids = self.policy_dest_label_cache.get(key, []) - for policy_id in policy_ids: - self._sg_2_pod_link(pod_id, policy_id, 'ADD') + policy_dst_list.update(policy_ids) - def _get_label_diff(self, old_labels, new_labels): - if old_labels == new_labels: - return None - - diff = dict() - added = {} - removed = {} - changed = {} - keys = set(old_labels.keys()) | set(new_labels.keys()) - for k in keys: - if k not in old_labels.keys(): - added[k] = new_labels[k] - continue - if k not in new_labels.keys(): - removed[k] = old_labels[k] - continue - if old_labels[k] == new_labels[k]: - continue - changed[k] = old_labels[k] - - diff['added'] = added - diff['removed'] = removed - diff['changed'] = changed - return diff + self._add_src_pod_2_policy(pod_id, policy_src_list) + self._add_dst_pod_2_policy(pod_id, policy_dst_list) def _check_deleted_labels(self, deleted_labels, vm): if not deleted_labels: @@ -146,9 +118,9 @@ def _check_deleted_labels(self, deleted_labels, vm): # check if pod link to be deleted for sg_id in vmi.security_groups: sg = SecurityGroupKM.get(sg_id) - if not sg: + if not sg or not sg.dst_pod_selector: continue - dst_labels = self._get_dst_labels_from_annotations(sg.annotations) + dst_labels = sg.dst_pod_selector if set(deleted_labels.keys()).intersection(set(dst_labels.keys())): self._sg_2_pod_link(vm.uuid, sg_id, 'DELETE') @@ -173,9 +145,9 @@ def _check_changed_labels(self, changed_labels, vm): # check if pod link to be deleted for sg_id in vmi.security_groups: sg = SecurityGroupKM.get(sg_id) - if not sg: + if not sg or not sg.dst_pod_selector: continue - dst_labels = self._get_dst_labels_from_annotations(sg.annotations) + dst_labels = sg.dst_pod_selector if set(changed_labels.keys()).intersection(set(dst_labels.keys())): self._sg_2_pod_link(vm.uuid, sg_id, 'DELETE') @@ -187,22 +159,22 @@ def _check_changed_labels(self, changed_labels, vm): for policy_id in policy_ids: self._delete_sg_rule(policy_id, vm.uuid) - def vnc_pod_update(self, event): + def vnc_pod_update(self, event, label_diff): + if not label_diff: + return + labels = event['object']['metadata']['labels'] pod_id = event['object']['metadata']['uid'] vm = VirtualMachineKM.get(pod_id) if not vm: return - diff = self._get_label_diff(vm.pod_labels, labels) - if not diff: - return - self._check_deleted_labels(diff['removed'], vm) - self._check_changed_labels(diff['changed'], vm) + self._check_deleted_labels(label_diff['removed'], vm) + self._check_changed_labels(label_diff['changed'], vm) self.vnc_pod_add(event) def vnc_pod_delete(self, event): - labels = event['object']['metadata'].get('labels', {}) + labels = event['object']['metadata']['labels'] pod_id = event['object']['metadata']['uid'] for label in labels.items(): @@ -212,7 +184,7 @@ def vnc_pod_delete(self, event): for policy_id in policy_ids: self._delete_sg_rule(policy_id, pod_id) - def _set_sg_rule(self, sg, src_pod, ports): + def _set_sg_rule(self, sg, src_pod): vm = VirtualMachineKM.get(src_pod) if not vm: return @@ -233,7 +205,7 @@ def _set_sg_rule(self, sg, src_pod, ports): sgr_uuid = src_pod src_addr = AddressType(subnet=SubnetType(ip_addr, 32)) dst_addr = AddressType(security_group='local') - for port in ports: + for port in sg.dst_ports: proto = port['protocol'].lower() rule = PolicyRuleType(rule_uuid=sgr_uuid, direction='>', protocol=proto, @@ -245,24 +217,15 @@ def _set_sg_rule(self, sg, src_pod, ports): self._add_sg_rule(sg, rule) def _set_sg_rules(self, sg, event): - update = False - rules = event['object']['spec']['ingress'] - for rule in rules: - ports = rule['ports'] - src_selectors = rule['from'] - for src_selector in src_selectors: - podSelector = src_selector.get('podSelector', None) - if not podSelector: - continue - - src_labels = podSelector['matchLabels'] - for src_label in src_labels.items(): - key = self._label_cache._get_key(src_label) - self._label_cache._locate_label(key, - self.policy_src_label_cache, src_label, sg.uuid) - src_pods = self._select_pods(src_labels) - for src_pod in src_pods: - self._set_sg_rule(sg, src_pod, ports) + if not sg.src_pod_selector: + return + for src_label in sg.src_pod_selector.items(): + key = self._label_cache._get_key(src_label) + self._label_cache._locate_label(key, + self.policy_src_label_cache, src_label, sg.uuid) + src_pods = self._select_pods(sg.src_pod_selector) + for src_pod in src_pods: + self._set_sg_rule(sg, src_pod) def _sg_2_pod_link(self, pod_id, sg_id, oper): vm = VirtualMachineKM.get(pod_id) @@ -298,21 +261,16 @@ def _sg_2_pod_link(self, pod_id, sg_id, oper): (default_oper, pod_id, sg_uuid)) def _apply_sg_2_pods(self, sg, event): - podSelector = event['object']['spec'].get('podSelector', None) - if not podSelector: - return - - dest_labels = podSelector['matchLabels'] - if not dest_labels: + if not sg.dst_pod_selector: return uuid = event['object']['metadata'].get('uid') - for dest_label in dest_labels.items(): + for dest_label in sg.dst_pod_selector.items(): key = self._label_cache._get_key(dest_label) self._label_cache._locate_label(key, self.policy_dest_label_cache, dest_label, uuid) - pod_ids = self._select_pods(dest_labels) + pod_ids = self._select_pods(sg.dst_pod_selector) for pod_id in pod_ids: self._sg_2_pod_link(pod_id, sg.uuid, 'ADD') diff --git a/src/container/kube-manager/kube_manager/vnc/vnc_pod.py b/src/container/kube-manager/kube_manager/vnc/vnc_pod.py index 1f208a87734..cefe063855f 100644 --- a/src/container/kube-manager/kube_manager/vnc/vnc_pod.py +++ b/src/container/kube-manager/kube_manager/vnc/vnc_pod.py @@ -24,6 +24,52 @@ def __init__(self, vnc_lib=None, label_cache=None, service_mgr=None, self._queue = queue self._service_fip_pool = svc_fip_pool + def _get_label_diff(self, new_labels, vm): + old_labels = vm.pod_labels + if old_labels == new_labels: + return None + + diff = dict() + added = {} + removed = {} + changed = {} + keys = set(old_labels.keys()) | set(new_labels.keys()) + for k in keys: + if k not in old_labels.keys(): + added[k] = new_labels[k] + continue + if k not in new_labels.keys(): + removed[k] = old_labels[k] + continue + if old_labels[k] == new_labels[k]: + continue + changed[k] = old_labels[k] + + diff['added'] = added + diff['removed'] = removed + diff['changed'] = changed + return diff + + def _set_label_to_pod_cache(self, new_labels, vm): + for label in new_labels.items(): + key = self._label_cache._get_key(label) + self._label_cache._locate_label(key, + self._label_cache.pod_label_cache, label, vm.uuid) + vm.pod_labels = new_labels + + def _clear_label_to_pod_cache(self, vm): + if not vm.pod_labels: + return + for label in vm.pod_labels.items() or []: + key = self._label_cache._get_key(label) + self._label_cache._remove_label(key, + self._label_cache.pod_label_cache, label, vm.uuid) + vm.pod_labels = None + + def _update_label_to_pod_cache(self, new_labels, vm): + self._clear_label_to_pod_cache(vm) + self._set_label_to_pod_cache(new_labels, vm) + def _get_network(self, pod_id, pod_namespace): """ Get network corresponding to this namesapce. @@ -122,7 +168,6 @@ def _create_vm(self, pod_id, pod_name, labels): except RefsExistError: vm_obj = self._vnc_lib.virtual_machine_read(id=pod_id) vm = VirtualMachineKM.locate(vm_obj.uuid) - vm.pod_labels = labels return vm_obj def _link_vm_to_node(self, vm_obj, pod_node): @@ -147,7 +192,7 @@ def _check_pod_uuid_change(self, pod_uuid, pod_name, pod_namespace): def vnc_pod_add(self, pod_id, pod_name, pod_namespace, pod_node, labels): vm = VirtualMachineKM.get(pod_id) if vm: - vm.pod_labels = labels + self._set_label_to_pod_cache(labels, vm) return if not vm: self._check_pod_uuid_change(pod_id, pod_name, pod_namespace) @@ -162,6 +207,16 @@ def vnc_pod_add(self, pod_id, pod_name, pod_namespace, pod_node, labels): self._link_vm_to_node(vm_obj, pod_node) + def vnc_pod_update(self, pod_id, labels): + label_diff = None + vm = VirtualMachineKM.get(pod_id) + if vm: + label_diff = self._get_label_diff(labels, vm) + if not label_diff: + return label_diff + self._update_label_to_pod_cache(labels, vm) + return label_diff + def vnc_port_delete(self, vmi_id): vmi = VirtualMachineInterfaceKM.get(vmi_id) if not vmi: @@ -189,6 +244,8 @@ def vnc_pod_delete(self, pod_id): if not vm: return + self._clear_label_to_pod_cache(vm) + if vm.virtual_router: self._vnc_lib.ref_update('virtual-router', vm.virtual_router, 'virtual-machine', vm.uuid, None, 'DELETE') @@ -201,8 +258,6 @@ def vnc_pod_delete(self, pod_id): except NoIdError: pass - return - def _create_pod_event(self, event_type, pod_id, vm_obj): event = {} object = {} @@ -257,9 +312,8 @@ def process(self, event): host_network = event['object']['spec'].get('hostNetwork') if host_network: return - self._network_policy_mgr.vnc_pod_update(event) - self.vnc_pod_add(pod_id, pod_name, pod_namespace, - pod_node, labels) + label_diff = self.vnc_pod_update(pod_id, labels) + self._network_policy_mgr.vnc_pod_update(event, label_diff) elif event['type'] == 'DELETED': self.vnc_pod_delete(pod_id) self._network_policy_mgr.vnc_pod_delete(event)