Skip to content

Commit

Permalink
Merge "DM: Allocate and persist irb address from each subnet of a VN …
Browse files Browse the repository at this point in the history
…for each PR"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 26, 2015
2 parents 9a866fe + 6bc4515 commit f2d7c32
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 46 deletions.
27 changes: 27 additions & 0 deletions src/config/common/vnc_cassandra.py
Expand Up @@ -91,6 +91,33 @@ def __init__(self, server_list, db_prefix, keyspaces, logger,
self._obj_fq_name_cf = self._cf_dict[self._OBJ_FQ_NAME_CF_NAME]
# end __init__

def get_cf(self, func):
return self._cf_dict.get(func)
#end

def add(self, func, key, value):
try:
self.get_cf(func).insert(key, value)
return True
except:
return False
#end

def get(self, func, key):
try:
return self.get_cf(func).get(key)
except:
return None
#end

def delete(self, func, key):
try:
self.get_cf(func).remove(key)
return True
except:
return False
#end

def _update_sandesh_status(self, status, msg=''):
ConnectionState.update(conn_type=ConnectionType.DATABASE,
name='Cassandra', status=status, message=msg,
Expand Down
264 changes: 232 additions & 32 deletions src/config/device-manager/device_manager/db.py
Expand Up @@ -11,10 +11,12 @@
from sandesh.dm_introspect import ttypes as sandesh
from cfgm_common.vnc_db import DBBase
from cfgm_common.uve.physical_router.ttypes import *
from vnc_api.vnc_api import *
import copy
import socket
import gevent
from gevent import queue
from cfgm_common.vnc_cassandra import VncCassandraClient

class DBBaseDM(DBBase):
obj_type = __name__
Expand Down Expand Up @@ -100,6 +102,8 @@ def __init__(self, uuid, obj_dict=None):
self.config_manager = None
self.nc_q = queue.Queue(maxsize=1)
self.nc_handler_gl = gevent.spawn(self.nc_handler)
self.vn_ip_map = {}
self.init_cs_state()
self.update(obj_dict)
self.config_manager = PhysicalRouterConfig(
self.management_ip, self.user_credentials, self.vendor,
Expand Down Expand Up @@ -135,6 +139,7 @@ def delete(cls, uuid):
if uuid not in cls._dict:
return
obj = cls._dict[uuid]
obj._cassandra.delete_pr(uuid)
obj.config_manager.delete_bgp_config()
obj.uve_send(True)
obj.update_single_ref('bgp_router', {})
Expand Down Expand Up @@ -171,24 +176,104 @@ def is_valid_ip(self, ip_str):
return False
#end

def push_config(self):
self.config_manager.reset_bgp_config()
bgp_router = BgpRouterDM.get(self.bgp_router)
if bgp_router:
for peer_uuid, attr in bgp_router.bgp_routers.items():
peer = BgpRouterDM.get(peer_uuid)
if peer is None:
continue
external = (bgp_router.params['autonomous_system'] !=
peer.params['autonomous_system'])
self.config_manager.add_bgp_peer(peer.params['address'],
peer.params, attr, external)
self.config_manager.set_bgp_config(bgp_router.params)
self.config_manager.set_global_routing_options(bgp_router.params)
bgp_router_ips = bgp_router.get_all_bgp_router_ips()
if self.dataplane_ip is not None and self.is_valid_ip(self.dataplane_ip):
self.config_manager.add_dynamic_tunnels(self.dataplane_ip,
GlobalSystemConfigDM.ip_fabric_subnets, bgp_router_ips)
def init_cs_state(self):
vn_subnet_set = self._cassandra.get_pr_vn_set(self.uuid)
for vn_subnet in vn_subnet_set:
ip = self._cassandra.get(self._cassandra._PR_VN_IP_CF,
self.uuid + ':' + vn_subnet)
if ip is not None:
self.vn_ip_map[vn_subnet] = ip['ip_address']
#end init_cs_state

def reserve_ip(self, vn_uuid, subnet_prefix):
try:
vn = VirtualNetwork()
vn.set_uuid(vn_uuid)
ip_addr = self._manager._vnc_lib.virtual_network_ip_alloc(
vn,
subnet=subnet_prefix)
if ip_addr:
return ip_addr[0] #ip_alloc default ip count is 1
except Exception as e:
self._logger.error("Exception: %s" %(str(e)))
return None
#end

def free_ip(self, vn_uuid, subnet_prefix, ip_addr):
try:
vn = VirtualNetwork()
vn.set_uuid(vn_uuid)
self._manager._vnc_lib.virtual_network_ip_free(
vn,
[ip_addr],
subnet=subnet_prefix)
return True
except Exception as e:
self._logger.error("Exception: %s" %(str(e)))
return False
#end

def get_vn_irb_ip_map(self):
irb_ips = {}
for vn_subnet, ip_addr in self.vn_ip_map.items():
(vn_uuid, subnet_prefix) = vn_subnet.split(':')
vn = VirtualNetworkDM.get(vn_uuid)
if vn_uuid not in irb_ips:
irb_ips[vn_uuid] = set()
irb_ips[vn_uuid].add((ip_addr, vn.gateways[subnet_prefix]))
return irb_ips
#end get_vn_irb_ip_map

def evaluate_vn_irb_ip_map(self, vn_set):
new_vn_ip_set = set()
for vn_uuid in vn_set:
vn = VirtualNetworkDM.get(vn_uuid)
if vn.router_external == True: #dont need irb ip, gateway ip
continue
for subnet_prefix in vn.gateways.keys():
new_vn_ip_set.add(vn_uuid + ':' + subnet_prefix)

old_set = set(self.vn_ip_map.keys())
delete_set = old_set.difference(new_vn_ip_set)
create_set = new_vn_ip_set.difference(old_set)
for vn_subnet in delete_set:
(vn_uuid, subnet_prefix) = vn_subnet.split(':')
ret = self.free_ip(vn_uuid, subnet_prefix, self.vn_ip_map[vn_subnet])
if ret == False:
self._logger.error("Unable to free ip for vn/subnet/pr \
(%s/%s/%s)" %(vn_uuid, subnet_prefix, self.uuid))
ret = self._cassandra.delete(self._cassandra._PR_VN_IP_CF,
self.uuid + ':' + vn_uuid + ':' + subnet_prefix)
if ret == False:
self._logger.error("Unable to free ip from db for vn/subnet/pr \
(%s/%s/%s)" %(vn_uuid, subnet_prefix, self.uuid))
continue
self._cassandra.delete_from_pr_map(self.uuid, vn_subnet)
del self.vn_ip_map[vn_subnet]

for vn_subnet in create_set:
(vn_uuid, subnet_prefix) = vn_subnet.split(':')
(sub, length) = subnet_prefix.split('/')
ip_addr = self.reserve_ip(vn_uuid, subnet_prefix)
if ip_addr is None:
self._logger.error("Unable to allocate ip for vn/subnet/pr \
(%s/%s/%s)" %(vn_uuid, subnet_prefix, self.uuid))
continue
ret = self._cassandra.add(self._cassandra._PR_VN_IP_CF,
self.uuid + ':' + vn_uuid + ':' + subnet_prefix,
{'ip_address': ip_addr + '/' + length})
if ret == False:
self._logger.error("Unable to store ip for vn/subnet/pr \
(%s/%s/%s)" %(self.uuid, subnet_prefix, self.uuid))
if self.free_ip(vn_uuid, subnet_prefix, ip_addr) == False:
self._logger.error("Unable to free ip for vn/subnet/pr \
(%s/%s/%s)" %(self.uuid, subnet_prefix, self.uuid))
continue
self._cassandra.add_to_pr_map(self.uuid, vn_subnet)
self.vn_ip_map[vn_subnet] = ip_addr + '/' + length
#end evaluate_vn_irb_ip_map

def get_vn_li_map(self):
vn_dict = {}
for vn_id in self.virtual_networks:
vn_dict[vn_id] = []
Expand All @@ -212,6 +297,30 @@ def push_config(self):
vn_dict[vn_id].append(li.name)
else:
vn_dict[vn_id] = [li.name]
#end

def push_config(self):
self.config_manager.reset_bgp_config()
bgp_router = BgpRouterDM.get(self.bgp_router)
if bgp_router:
for peer_uuid, attr in bgp_router.bgp_routers.items():
peer = BgpRouterDM.get(peer_uuid)
if peer is None:
continue
external = (bgp_router.params['autonomous_system'] !=
peer.params['autonomous_system'])
self.config_manager.add_bgp_peer(peer.params['address'],
peer.params, attr, external)
self.config_manager.set_bgp_config(bgp_router.params)
self.config_manager.set_global_routing_options(bgp_router.params)
bgp_router_ips = bgp_router.get_all_bgp_router_ips()
if self.dataplane_ip is not None and self.is_valid_ip(self.dataplane_ip):
self.config_manager.add_dynamic_tunnels(self.dataplane_ip,
GlobalSystemConfigDM.ip_fabric_subnets, bgp_router_ips)

vn_dict = self.get_vn_li_map()
self.evaluate_vn_irb_ip_map(set(vn_dict.keys()))
vn_irb_ip_map = self.get_vn_irb_ip_map()

for vn_id, interfaces in vn_dict.items():
vn_obj = VirtualNetworkDM.get(vn_id)
Expand All @@ -236,18 +345,19 @@ def push_config(self):
import_set |= ri2.export_targets

if vn_obj.router_external == False:
irb_ips = vn_irb_ip_map.get(vn_id, [])
self.config_manager.add_routing_instance(vrf_name_l3,
import_set,
export_set,
vn_obj.prefixes,
vn_obj.gateways,
vn_obj.get_prefixes(),
irb_ips,
vn_obj.router_external,
["irb" + "." + str(vn_obj.vn_network_id)])
self.config_manager.add_routing_instance(vrf_name_l2,
import_set,
export_set,
vn_obj.prefixes,
vn_obj.gateways,
vn_obj.get_prefixes(),
irb_ips,
vn_obj.router_external,
interfaces,
vn_obj.vxlan_vni,
Expand All @@ -256,8 +366,8 @@ def push_config(self):
self.config_manager.add_routing_instance(vrf_name_l3,
import_set,
export_set,
vn_obj.prefixes,
vn_obj.gateways,
vn_obj.get_prefixes(),
None,
vn_obj.router_external,
interfaces,
vn_obj.vxlan_vni,
Expand Down Expand Up @@ -619,18 +729,19 @@ def update(self, obj=None):
self.virtual_machine_interfaces = set(
[vmi['uuid'] for vmi in
obj.get('virtual_machine_interface_back_refs', [])])
self.prefixes = set()
self.gateways = set()
self.gateways = {}
for ipam_ref in obj.get('network_ipam_refs', []):
for subnet in ipam_ref['attr'].get('ipam_subnets', []):
self.prefixes.add('%s/%d' % (subnet['subnet']['ip_prefix'],
subnet['subnet']['ip_prefix_len'])
)
self.gateways.add('%s/%d' % (subnet['default_gateway'],
subnet['subnet']['ip_prefix_len'])
)
prefix = subnet['subnet']['ip_prefix']
prefix_len = subnet['subnet']['ip_prefix_len']
self.gateways[prefix + '/' + str(prefix_len)] = \
subnet.get('default_gateway', '')
# end update

def get_prefixes(self):
return set(self.gateways.keys())
#end get_prefixes

def get_vrf_name(self, vrf_type):
#this function must be called only after vn gets its vn_id
if self.vn_network_id is None:
Expand Down Expand Up @@ -741,3 +852,92 @@ def delete(cls, uuid):
# end delete
# end RoutingInstanceDM

class DMCassandraDB(VncCassandraClient):
_KEYSPACE = 'dm_keyspace'
_PR_VN_IP_CF = 'dm_pr_vn_ip_table'
dm_cassandra_instance = None

@classmethod
def getInstance(cls, manager):
if cls.dm_cassandra_instance == None:
cls.dm_cassandra_instance = DMCassandraDB(manager)
return cls.dm_cassandra_instance
#end

def __init__(self, manager):
self._manager = manager
self._args = manager._args

if self._args.cluster_id:
self._keyspace = '%s_%s' % (self._args.cluster_id, self._KEYSPACE)
else:
self._keyspace = self._KEYSPACE

keyspaces = {
self._keyspace: [(self._PR_VN_IP_CF, None)]}

cass_server_list = self._args.cassandra_server_list
cred = None
if self._args.cassandra_user is not None and \
self._args.cassandra_password is not None:
cred={'username':self._args.cassandra_user,
'password':self._args.cassandra_password}

super(DMCassandraDB, self).__init__(
cass_server_list, self._args.cluster_id, keyspaces,
manager.config_log, credential=cred)

self.pr_vn_ip_map = {}
self.init_pr_map()
#end

def init_pr_map(self):
cf = self.get_cf(self._PR_VN_IP_CF)
keys = dict(cf.get_range(column_count=0,filter_empty=False)).keys()
for key in keys:
(pr_uuid, vn_subnet_uuid) = key.split(':', 1)
self.add_to_pr_map(pr_uuid, vn_subnet_uuid)
#end

def add_to_pr_map(self, pr_uuid, vn_subnet):
if pr_uuid in self.pr_vn_ip_map:
self.pr_vn_ip_map[pr_uuid].add(vn_subnet)
else:
self.pr_vn_ip_map[pr_uuid] = set()
self.pr_vn_ip_map[pr_uuid].add(vn_subnet)
#end

def delete_from_pr_map(self, pr_uuid, vn_subnet):
if pr_uuid in self.pr_vn_ip_map:
self.pr_vn_ip_map[pr_uuid].remove(vn_subnet)
if not self.pr_vn_ip_map[pr_uuid]:
del self.pr_vn_ip_map[pr_uuid]
#end

def delete_pr(self, pr_uuid):
vn_subnet_set = self.pr_vn_ip_map.get(pr_uuid, set())
for vn_subnet in vn_subnet_set:
ret = self.delete(self._PR_VN_IP_CF, pr_uuid + ':' + vn_subnet)
if ret == False:
self._logger.error("Unable to free ip from db for vn/pr/subnet \
(%s/%s)" %(pr_uuid, vn_subnet))
#end

def handle_pr_deletes(self, current_pr_set):
cs_pr_set = set(self.pr_vn_ip_map.keys())
delete_set = cs_pr_set.difference(current_pr_set)
for pr_uuid in delete_set:
self.delete_pr(vn_uuid)
#end

def get_pr_vn_set(self, pr_uuid):
return self.pr_vn_ip_map.get(pr_uuid, set())
#end

@classmethod
def get_db_info(cls):
db_info = [(cls._KEYSPACE, [cls._PR_VN_IP_CF])]
return db_info
# end get_db_info

#end

0 comments on commit f2d7c32

Please sign in to comment.