From 6bc451579d872307f5beeaeb00262382968eda04 Mon Sep 17 00:00:00 2001 From: sbalineni Date: Tue, 25 Aug 2015 11:22:45 -0700 Subject: [PATCH] DM: Allocate and persist irb address from each subnet of a VN for each PR - Allocate one ip from each subnet of a VN for each PR. - Persist the ip and the association with VN, Subnet, PR in cassandra DB - On DM restart, read the DB state and maintain allocated ip map in local memory - On update/restart, find differences between allocated ip map vs VNC DB state of VN, and take necessary actions - GW IP of the subnet is configured on the MX as the virtual-gateway-address under the IRB IP address Change-Id: Id0bcb4abeae9fb5eadf93ef9091be0b4da50655f Closes-Bug: #1465070 --- src/config/common/vnc_cassandra.py | 27 ++ .../device-manager/device_manager/db.py | 264 +++++++++++++++--- .../device_manager/device_manager.py | 19 +- .../device_manager/physical_router_config.py | 6 +- 4 files changed, 270 insertions(+), 46 deletions(-) diff --git a/src/config/common/vnc_cassandra.py b/src/config/common/vnc_cassandra.py index b3e7400940b..99e52659b5c 100644 --- a/src/config/common/vnc_cassandra.py +++ b/src/config/common/vnc_cassandra.py @@ -91,6 +91,33 @@ def __init__(self, server_list, db_prefix, keyspaces, logger, self._obj_fq_name_cf = self._cf_dict[self._OBJ_FQ_NAME_CF_NAME] # end __init__ + def get_cf(self, func): + return self._cf_dict.get(func) + #end + + def add(self, func, key, value): + try: + self.get_cf(func).insert(key, value) + return True + except: + return False + #end + + def get(self, func, key): + try: + return self.get_cf(func).get(key) + except: + return None + #end + + def delete(self, func, key): + try: + self.get_cf(func).remove(key) + return True + except: + return False + #end + def _update_sandesh_status(self, status, msg=''): ConnectionState.update(conn_type=ConnectionType.DATABASE, name='Cassandra', status=status, message=msg, diff --git a/src/config/device-manager/device_manager/db.py b/src/config/device-manager/device_manager/db.py index e50f276eb41..8136b3cf487 100644 --- a/src/config/device-manager/device_manager/db.py +++ b/src/config/device-manager/device_manager/db.py @@ -11,10 +11,12 @@ from sandesh.dm_introspect import ttypes as sandesh from cfgm_common.vnc_db import DBBase from cfgm_common.uve.physical_router.ttypes import * +from vnc_api.vnc_api import * import copy import socket import gevent from gevent import queue +from cfgm_common.vnc_cassandra import VncCassandraClient class DBBaseDM(DBBase): obj_type = __name__ @@ -100,6 +102,8 @@ def __init__(self, uuid, obj_dict=None): self.config_manager = None self.nc_q = queue.Queue(maxsize=1) self.nc_handler_gl = gevent.spawn(self.nc_handler) + self.vn_ip_map = {} + self.init_cs_state() self.update(obj_dict) self.config_manager = PhysicalRouterConfig( self.management_ip, self.user_credentials, self.vendor, @@ -135,6 +139,7 @@ def delete(cls, uuid): if uuid not in cls._dict: return obj = cls._dict[uuid] + obj._cassandra.delete_pr(uuid) obj.config_manager.delete_bgp_config() obj.uve_send(True) obj.update_single_ref('bgp_router', {}) @@ -171,24 +176,104 @@ def is_valid_ip(self, ip_str): return False #end - def push_config(self): - self.config_manager.reset_bgp_config() - bgp_router = BgpRouterDM.get(self.bgp_router) - if bgp_router: - for peer_uuid, attr in bgp_router.bgp_routers.items(): - peer = BgpRouterDM.get(peer_uuid) - if peer is None: - continue - external = (bgp_router.params['autonomous_system'] != - peer.params['autonomous_system']) - self.config_manager.add_bgp_peer(peer.params['address'], - peer.params, attr, external) - self.config_manager.set_bgp_config(bgp_router.params) - self.config_manager.set_global_routing_options(bgp_router.params) - bgp_router_ips = bgp_router.get_all_bgp_router_ips() - if self.dataplane_ip is not None and self.is_valid_ip(self.dataplane_ip): - self.config_manager.add_dynamic_tunnels(self.dataplane_ip, - GlobalSystemConfigDM.ip_fabric_subnets, bgp_router_ips) + def init_cs_state(self): + vn_subnet_set = self._cassandra.get_pr_vn_set(self.uuid) + for vn_subnet in vn_subnet_set: + ip = self._cassandra.get(self._cassandra._PR_VN_IP_CF, + self.uuid + ':' + vn_subnet) + if ip is not None: + self.vn_ip_map[vn_subnet] = ip['ip_address'] + #end init_cs_state + + def reserve_ip(self, vn_uuid, subnet_prefix): + try: + vn = VirtualNetwork() + vn.set_uuid(vn_uuid) + ip_addr = self._manager._vnc_lib.virtual_network_ip_alloc( + vn, + subnet=subnet_prefix) + if ip_addr: + return ip_addr[0] #ip_alloc default ip count is 1 + except Exception as e: + self._logger.error("Exception: %s" %(str(e))) + return None + #end + + def free_ip(self, vn_uuid, subnet_prefix, ip_addr): + try: + vn = VirtualNetwork() + vn.set_uuid(vn_uuid) + self._manager._vnc_lib.virtual_network_ip_free( + vn, + [ip_addr], + subnet=subnet_prefix) + return True + except Exception as e: + self._logger.error("Exception: %s" %(str(e))) + return False + #end + + def get_vn_irb_ip_map(self): + irb_ips = {} + for vn_subnet, ip_addr in self.vn_ip_map.items(): + (vn_uuid, subnet_prefix) = vn_subnet.split(':') + vn = VirtualNetworkDM.get(vn_uuid) + if vn_uuid not in irb_ips: + irb_ips[vn_uuid] = set() + irb_ips[vn_uuid].add((ip_addr, vn.gateways[subnet_prefix])) + return irb_ips + #end get_vn_irb_ip_map + + def evaluate_vn_irb_ip_map(self, vn_set): + new_vn_ip_set = set() + for vn_uuid in vn_set: + vn = VirtualNetworkDM.get(vn_uuid) + if vn.router_external == True: #dont need irb ip, gateway ip + continue + for subnet_prefix in vn.gateways.keys(): + new_vn_ip_set.add(vn_uuid + ':' + subnet_prefix) + + old_set = set(self.vn_ip_map.keys()) + delete_set = old_set.difference(new_vn_ip_set) + create_set = new_vn_ip_set.difference(old_set) + for vn_subnet in delete_set: + (vn_uuid, subnet_prefix) = vn_subnet.split(':') + ret = self.free_ip(vn_uuid, subnet_prefix, self.vn_ip_map[vn_subnet]) + if ret == False: + self._logger.error("Unable to free ip for vn/subnet/pr \ + (%s/%s/%s)" %(vn_uuid, subnet_prefix, self.uuid)) + ret = self._cassandra.delete(self._cassandra._PR_VN_IP_CF, + self.uuid + ':' + vn_uuid + ':' + subnet_prefix) + if ret == False: + self._logger.error("Unable to free ip from db for vn/subnet/pr \ + (%s/%s/%s)" %(vn_uuid, subnet_prefix, self.uuid)) + continue + self._cassandra.delete_from_pr_map(self.uuid, vn_subnet) + del self.vn_ip_map[vn_subnet] + + for vn_subnet in create_set: + (vn_uuid, subnet_prefix) = vn_subnet.split(':') + (sub, length) = subnet_prefix.split('/') + ip_addr = self.reserve_ip(vn_uuid, subnet_prefix) + if ip_addr is None: + self._logger.error("Unable to allocate ip for vn/subnet/pr \ + (%s/%s/%s)" %(vn_uuid, subnet_prefix, self.uuid)) + continue + ret = self._cassandra.add(self._cassandra._PR_VN_IP_CF, + self.uuid + ':' + vn_uuid + ':' + subnet_prefix, + {'ip_address': ip_addr + '/' + length}) + if ret == False: + self._logger.error("Unable to store ip for vn/subnet/pr \ + (%s/%s/%s)" %(self.uuid, subnet_prefix, self.uuid)) + if self.free_ip(vn_uuid, subnet_prefix, ip_addr) == False: + self._logger.error("Unable to free ip for vn/subnet/pr \ + (%s/%s/%s)" %(self.uuid, subnet_prefix, self.uuid)) + continue + self._cassandra.add_to_pr_map(self.uuid, vn_subnet) + self.vn_ip_map[vn_subnet] = ip_addr + '/' + length + #end evaluate_vn_irb_ip_map + + def get_vn_li_map(self): vn_dict = {} for vn_id in self.virtual_networks: vn_dict[vn_id] = [] @@ -212,6 +297,30 @@ def push_config(self): vn_dict[vn_id].append(li.name) else: vn_dict[vn_id] = [li.name] + #end + + def push_config(self): + self.config_manager.reset_bgp_config() + bgp_router = BgpRouterDM.get(self.bgp_router) + if bgp_router: + for peer_uuid, attr in bgp_router.bgp_routers.items(): + peer = BgpRouterDM.get(peer_uuid) + if peer is None: + continue + external = (bgp_router.params['autonomous_system'] != + peer.params['autonomous_system']) + self.config_manager.add_bgp_peer(peer.params['address'], + peer.params, attr, external) + self.config_manager.set_bgp_config(bgp_router.params) + self.config_manager.set_global_routing_options(bgp_router.params) + bgp_router_ips = bgp_router.get_all_bgp_router_ips() + if self.dataplane_ip is not None and self.is_valid_ip(self.dataplane_ip): + self.config_manager.add_dynamic_tunnels(self.dataplane_ip, + GlobalSystemConfigDM.ip_fabric_subnets, bgp_router_ips) + + vn_dict = self.get_vn_li_map() + self.evaluate_vn_irb_ip_map(set(vn_dict.keys())) + vn_irb_ip_map = self.get_vn_irb_ip_map() for vn_id, interfaces in vn_dict.items(): vn_obj = VirtualNetworkDM.get(vn_id) @@ -236,18 +345,19 @@ def push_config(self): import_set |= ri2.export_targets if vn_obj.router_external == False: + irb_ips = vn_irb_ip_map.get(vn_id, []) self.config_manager.add_routing_instance(vrf_name_l3, import_set, export_set, - vn_obj.prefixes, - vn_obj.gateways, + vn_obj.get_prefixes(), + irb_ips, vn_obj.router_external, ["irb" + "." + str(vn_obj.vn_network_id)]) self.config_manager.add_routing_instance(vrf_name_l2, import_set, export_set, - vn_obj.prefixes, - vn_obj.gateways, + vn_obj.get_prefixes(), + irb_ips, vn_obj.router_external, interfaces, vn_obj.vxlan_vni, @@ -256,8 +366,8 @@ def push_config(self): self.config_manager.add_routing_instance(vrf_name_l3, import_set, export_set, - vn_obj.prefixes, - vn_obj.gateways, + vn_obj.get_prefixes(), + None, vn_obj.router_external, interfaces, vn_obj.vxlan_vni, @@ -619,18 +729,19 @@ def update(self, obj=None): self.virtual_machine_interfaces = set( [vmi['uuid'] for vmi in obj.get('virtual_machine_interface_back_refs', [])]) - self.prefixes = set() - self.gateways = set() + self.gateways = {} for ipam_ref in obj.get('network_ipam_refs', []): for subnet in ipam_ref['attr'].get('ipam_subnets', []): - self.prefixes.add('%s/%d' % (subnet['subnet']['ip_prefix'], - subnet['subnet']['ip_prefix_len']) - ) - self.gateways.add('%s/%d' % (subnet['default_gateway'], - subnet['subnet']['ip_prefix_len']) - ) + prefix = subnet['subnet']['ip_prefix'] + prefix_len = subnet['subnet']['ip_prefix_len'] + self.gateways[prefix + '/' + str(prefix_len)] = \ + subnet.get('default_gateway', '') # end update + def get_prefixes(self): + return set(self.gateways.keys()) + #end get_prefixes + def get_vrf_name(self, vrf_type): #this function must be called only after vn gets its vn_id if self.vn_network_id is None: @@ -741,3 +852,92 @@ def delete(cls, uuid): # end delete # end RoutingInstanceDM +class DMCassandraDB(VncCassandraClient): + _KEYSPACE = 'dm_keyspace' + _PR_VN_IP_CF = 'dm_pr_vn_ip_table' + dm_cassandra_instance = None + + @classmethod + def getInstance(cls, manager): + if cls.dm_cassandra_instance == None: + cls.dm_cassandra_instance = DMCassandraDB(manager) + return cls.dm_cassandra_instance + #end + + def __init__(self, manager): + self._manager = manager + self._args = manager._args + + if self._args.cluster_id: + self._keyspace = '%s_%s' % (self._args.cluster_id, self._KEYSPACE) + else: + self._keyspace = self._KEYSPACE + + keyspaces = { + self._keyspace: [(self._PR_VN_IP_CF, None)]} + + cass_server_list = self._args.cassandra_server_list + cred = None + if self._args.cassandra_user is not None and \ + self._args.cassandra_password is not None: + cred={'username':self._args.cassandra_user, + 'password':self._args.cassandra_password} + + super(DMCassandraDB, self).__init__( + cass_server_list, self._args.cluster_id, keyspaces, + manager.config_log, credential=cred) + + self.pr_vn_ip_map = {} + self.init_pr_map() + #end + + def init_pr_map(self): + cf = self.get_cf(self._PR_VN_IP_CF) + keys = dict(cf.get_range(column_count=0,filter_empty=False)).keys() + for key in keys: + (pr_uuid, vn_subnet_uuid) = key.split(':', 1) + self.add_to_pr_map(pr_uuid, vn_subnet_uuid) + #end + + def add_to_pr_map(self, pr_uuid, vn_subnet): + if pr_uuid in self.pr_vn_ip_map: + self.pr_vn_ip_map[pr_uuid].add(vn_subnet) + else: + self.pr_vn_ip_map[pr_uuid] = set() + self.pr_vn_ip_map[pr_uuid].add(vn_subnet) + #end + + def delete_from_pr_map(self, pr_uuid, vn_subnet): + if pr_uuid in self.pr_vn_ip_map: + self.pr_vn_ip_map[pr_uuid].remove(vn_subnet) + if not self.pr_vn_ip_map[pr_uuid]: + del self.pr_vn_ip_map[pr_uuid] + #end + + def delete_pr(self, pr_uuid): + vn_subnet_set = self.pr_vn_ip_map.get(pr_uuid, set()) + for vn_subnet in vn_subnet_set: + ret = self.delete(self._PR_VN_IP_CF, pr_uuid + ':' + vn_subnet) + if ret == False: + self._logger.error("Unable to free ip from db for vn/pr/subnet \ + (%s/%s)" %(pr_uuid, vn_subnet)) + #end + + def handle_pr_deletes(self, current_pr_set): + cs_pr_set = set(self.pr_vn_ip_map.keys()) + delete_set = cs_pr_set.difference(current_pr_set) + for pr_uuid in delete_set: + self.delete_pr(vn_uuid) + #end + + def get_pr_vn_set(self, pr_uuid): + return self.pr_vn_ip_map.get(pr_uuid, set()) + #end + + @classmethod + def get_db_info(cls): + db_info = [(cls._KEYSPACE, [cls._PR_VN_IP_CF])] + return db_info + # end get_db_info + +#end diff --git a/src/config/device-manager/device_manager/device_manager.py b/src/config/device-manager/device_manager/device_manager.py index ed06656538c..6ec84352fde 100644 --- a/src/config/device-manager/device_manager/device_manager.py +++ b/src/config/device-manager/device_manager/device_manager.py @@ -38,7 +38,7 @@ NodeStatus from db import DBBaseDM, BgpRouterDM, PhysicalRouterDM, PhysicalInterfaceDM, \ LogicalInterfaceDM, VirtualMachineInterfaceDM, VirtualNetworkDM, RoutingInstanceDM, \ - GlobalSystemConfigDM, GlobalVRouterConfigDM, FloatingIpDM, InstanceIpDM + GlobalSystemConfigDM, GlobalVRouterConfigDM, FloatingIpDM, InstanceIpDM, DMCassandraDB from cfgm_common.dependency_tracker import DependencyTracker from sandesh.dm_introspect import ttypes as sandesh @@ -173,16 +173,7 @@ def __init__(self, args=None): q_name, self._vnc_subscribe_callback, self.config_log) - cass_server_list = self._args.cassandra_server_list - cred = None - if self._args.cassandra_user is not None and \ - self._args.cassandra_password is not None: - cred={'username':self._args.cassandra_user, - 'password':self._args.cassandra_password} - self._cassandra = VncCassandraClient(cass_server_list, - self._args.cluster_id, - None, - self.config_log,credential=cred) + self._cassandra = DMCassandraDB.getInstance(self) DBBaseDM.init(self, self._sandesh.logger(), self._cassandra) for obj in GlobalSystemConfigDM.list_obj(): @@ -200,7 +191,11 @@ def __init__(self, args=None): for obj in BgpRouterDM.list_obj(): BgpRouterDM.locate(obj['uuid'], obj) - for obj in PhysicalRouterDM.list_obj(): + pr_obj_list = PhysicalRouterDM.list_obj() + pr_uuid_set = set([pr_obj['uuid'] for pr_obj in pr_obj_list]) + self._cassandra.handle_pr_deletes(pr_uuid_set) + + for obj in pr_obj_list: pr = PhysicalRouterDM.locate(obj['uuid'], obj) li_set = pr.logical_interfaces for pi_id in pr.physical_interfaces: diff --git a/src/config/device-manager/device_manager/physical_router_config.py b/src/config/device-manager/device_manager/physical_router_config.py index 4285ee74aed..44617fd8277 100644 --- a/src/config/device-manager/device_manager/physical_router_config.py +++ b/src/config/device-manager/device_manager/physical_router_config.py @@ -341,9 +341,11 @@ def add_routing_instance(self, ri_name, import_targets, export_targets, etree.SubElement(intf_unit, "name").text = str(network_id) family = etree.SubElement(intf_unit, "family") inet = etree.SubElement(family, "inet") - for gateway in gateways: + for (irb_ip, gateway) in gateways: addr = etree.SubElement(inet, "address") - etree.SubElement(addr, "name").text = gateway + etree.SubElement(addr, "name").text = irb_ip + if len(gateway) and gateway != '0.0.0.0': + etree.SubElement(addr, "virtual-gateway-address").text = gateway lo_intf = etree.SubElement(interfaces_config, "interface") etree.SubElement(lo_intf, "name").text = "lo0"