Skip to content

Commit

Permalink
Merge "Create vrouter state cache from polling uve" into R2.21.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Apr 14, 2016
2 parents 39355b8 + 616fdd5 commit 04c0e9c
Show file tree
Hide file tree
Showing 9 changed files with 478 additions and 472 deletions.
1 change: 1 addition & 0 deletions src/config/svc-monitor/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ sources = [
'svc_monitor/tests/test_vrouter_instance_manager.py',
'svc_monitor/tests/test_virtual_machine_manager.py',
'svc_monitor/tests/test_snat.py',
'svc_monitor/tests/test_common_utils.py',
'svc_monitor/tests/scheduler/__init__.py',
'svc_monitor/tests/scheduler/test_vrouter_schedulers.py',
]
Expand Down
15 changes: 15 additions & 0 deletions src/config/svc-monitor/svc_monitor/config_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ class VirtualRouterSM(DBBase):

def __init__(self, uuid, obj_dict=None):
self.uuid = uuid
self.agent_state = False
self.agent_down_count = 0
self.virtual_machines = set()
self.update(obj_dict)
# end __init__
Expand All @@ -246,6 +248,19 @@ def delete(cls, uuid):
obj.update_multiple_refs('virtual_machine', {})
del cls._dict[uuid]
# end delete

def set_agent_state(self, up):
if up:
self.agent_down_count = 0
self.agent_state = True
else:
self.agent_down_count += 1
if not (self.agent_down_count % 3):
self.agent_state = False

def set_netns_version(self, netns_version):
self.netns_version = netns_version

# end VirtualRouterSM


Expand Down
15 changes: 7 additions & 8 deletions src/config/svc-monitor/svc_monitor/instance_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,12 @@ def _create_svc_vm_port(self, nic, instance_name, si, st,
def _associate_vrouter(self, si, vm):
vrouter_name = None
if not vm.virtual_router:
chosen_vr_fq_name = None
chosen_vr_fq_name = self.vrouter_scheduler.schedule(
si.uuid, vm.uuid)
if chosen_vr_fq_name:
vrouter_name = chosen_vr_fq_name[-1]
self.logger.log_info("VRouter %s updated with VM %s" %
(':'.join(chosen_vr_fq_name), vm.name))
chosen_vr = self.vrouter_scheduler.schedule(si, vm)
if chosen_vr:
vr = VirtualRouterSM.get(chosen_vr)
vrouter_name = vr.name
self.logger.log_notice("vrouter %s updated with vm %s" %
(':'.join(vr.fq_name), vm.name))
vm.update()
else:
vr = VirtualRouterSM.get(vm.virtual_router)
Expand Down Expand Up @@ -607,7 +606,7 @@ def check_service(self, si):
service_up = False
else:
vr = VirtualRouterSM.get(vm.virtual_router)
if self.vrouter_scheduler.vrouter_running(vr.name):
if vr.agent_state:
continue
self._vnc_lib.ref_update('virtual-router', vr.uuid,
'virtual-machine', vm.uuid, None, 'DELETE')
Expand Down
222 changes: 109 additions & 113 deletions src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@
from sandesh_common.vns import constants
from vnc_api.vnc_api import NoIdError

from svc_monitor.config_db import *

from sandesh_common.vns.constants import \
ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME as analytics_svc_name

@six.add_metaclass(abc.ABCMeta)
class VRouterScheduler(object):

def __init__(self, vnc_lib, nova_client, args):
def __init__(self, vnc_lib, nova_client, disc, logger, args):
self._vnc_lib = vnc_lib
self._args = args
self._nc = nova_client

# initialize analytics client
endpoint = "http://%s:%s" % (self._args.analytics_server_ip,
self._args.analytics_server_port)
self._analytics = analytics_client.Client(endpoint)
self._disc = disc
self._logger = logger

@abc.abstractmethod
def schedule(self, plugin, context, router_id, candidates=None):
Expand All @@ -51,130 +53,124 @@ def schedule(self, plugin, context, router_id, candidates=None):
"""
pass

def _get_candidates(self, si_uuid, vm_uuid):
"""Return vrouter agents where a service instance virtual machine
could be scheduled.
If a VM of a same service instance is already scheduled on the vrouter,
that vrouter is exclude from the candidates list.
If the VM is already scheduled on a running vrouter, only that vrouter
is return in the candidates list.
"""

# if availability zone configured then use it
vr_list = []
if self._args.netns_availability_zone:
az_list = self._nc.oper('availability_zones', 'list',
'admin', detailed=True)
for az in az_list:
if self._args.netns_availability_zone in str(az):
for host in az.hosts:
vr_list.append({'fq_name':
['default-global-system-config', host]})
else:
vr_list = self._vnc_lib.virtual_routers_list()['virtual-routers']

# check if vrouters are functional and support service instance
vrs_fq_name = [vr['fq_name'] for vr in vr_list
if self.vrouter_running(vr['fq_name'][-1])]
vrs_fq_name = [vr_fq_name for vr_fq_name in vrs_fq_name
if self.vrouter_check_version(
vr_fq_name[-1],
svc_info._VROUTER_NETNS_SUPPORTED_VERSION)]

for vr_fq_name in vrs_fq_name:
try:
vr_obj = self._vnc_lib.virtual_router_read(fq_name=vr_fq_name)
except NoIdError:
vrs_fq_name.remove(vr_fq_name)
def _get_az_vrouter_list(self):
if not self._args.netns_availability_zone:
return None
az_list = self._nc.oper('availability_zones', 'list',
self._args.admin_tenant_name,
detailed=True)
az_vr_list = []
for az in az_list:
if self._args.netns_availability_zone not in str(az):
continue
for vm_ref in vr_obj.get_virtual_machine_refs() or []:
if vm_uuid == vm_ref['uuid']:
return [vr_fq_name]
try:
vm_obj = self._vnc_lib.virtual_machine_read(
id=vm_ref['uuid'])
except NoIdError:
continue
if si_uuid in [si['uuid'] for si in
vm_obj.get_service_instance_refs()]:
vrs_fq_name.remove(vr_fq_name)
continue
return vrs_fq_name

def vrouter_running(self, vrouter_name):
"""Check if a vrouter agent is up and running."""
az_vr_list.extend(az.hosts.keys())

return az_vr_list

def get_analytics_client(self):
try:
sub_obj = self._disc.subscribe(analytics_svc_name, 0)
slist = sub_obj.info
except Exception as ex:
self._logger.log_error('Failed to get analytics api from discovery')
return None
else:
if not slist:
self._logger.log_error('No analytics api client in discovery')
return None
analytics_api = random.choice(slist)
endpoint = "http://%s:%s" % (analytics_api['ip-address'],
str(analytics_api['port']))
return analytics_client.Client(endpoint)

def query_uve(self, filter_string):
path = "/analytics/uves/vrouter/"

fqdn_uuid = "%s?cfilt=VrouterAgent" % vrouter_name
try:
vrouter_agent = self._analytics.request(path, fqdn_uuid)
except analytics_client.OpenContrailAPIFailed:
return False

if 'VrouterAgent' not in vrouter_agent or \
('mode' in vrouter_agent['VrouterAgent'] and \
vrouter_agent['VrouterAgent']['mode'] != \
constants.VrouterAgentTypeMap[
constants.VrouterAgentType.VROUTER_AGENT_EMBEDDED]):
return False

fqdn_uuid = "%s?cfilt=NodeStatus" % vrouter_name
response_dict = {}
try:
node_status = self._analytics.request(path, fqdn_uuid)
response = self._analytics.request(path, filter_string)
for values in response['value']:
response_dict[values['name']] = values['value']
except analytics_client.OpenContrailAPIFailed:
return False
pass
return response_dict

if not node_status or 'NodeStatus' not in node_status or \
'process_status' not in node_status['NodeStatus']:
return False
def vrouters_running(self):
# get az host list
az_vrs = self._get_az_vrouter_list()

for process in node_status['NodeStatus']['process_status']:
if (process['module_id'] == constants.MODULE_VROUTER_AGENT_NAME and
int(process['instance_id']) == 0 and
process['state'] == 'Functional'):
return True
return False
# read all vrouter information
self._analytics = self.get_analytics_client()
if not self._analytics:
return
agents_status = self.query_uve("*?cfilt=NodeStatus:process_status")
vrouters_mode = self.query_uve("*?cfilt=VrouterAgent:mode")

def vrouter_check_version(self, vrouter_name, version):
"""Check the vrouter version is upper or equal to a desired version."""
path = "/analytics/uves/vrouter/"
fqdn_uuid = "%s?cfilt=VrouterAgent" % vrouter_name
for vr in VirtualRouterSM.values():
if az_vrs and vr.name not in az_vrs:
vr.set_agent_state(False)
continue

try:
vrouter_agent = self._analytics.request(path, fqdn_uuid)
except analytics_client.OpenContrailAPIFailed:
return False
if vr.name not in vrouters_mode or vr.name not in agents_status:
vr.set_agent_state(False)
continue

if not vrouter_agent:
return False
try:
vr_mode = vrouters_mode[vr.name]['VrouterAgent']
if (vr_mode['mode'] != constants.VrouterAgentTypeMap[
constants.VrouterAgentType.VROUTER_AGENT_EMBEDDED]):
vr.set_agent_state(False)
continue
except Exception as e:
vr.set_agent_state(False)
continue

try:
build_info = ast.literal_eval(
vrouter_agent['VrouterAgent']['build_info'])
vrouter_version = V(build_info['build-info'][0]['build-version'])
requested_version = V(version)
except KeyError, ValueError:
return False
try:
state_up = False
for vr_status in agents_status[vr.name]['NodeStatus']['process_status'] or []:
if (vr_status['module_id'] != constants.MODULE_VROUTER_AGENT_NAME):
continue
if (int(vr_status['instance_id']) == 0 and
vr_status['state'] == 'Functional'):
vr.set_agent_state(True)
state_up = True
break
if not state_up:
vr.set_agent_state(False)
except Exception as e:
vr.set_agent_state(False)
continue

return vrouter_version >= requested_version
def _get_candidates(self, si, vm):
if vm.virtual_router:
return [vm.virtual_router]

def _bind_vrouter(self, vm_uuid, vr_fq_name):
"""Bind the virtual machine to the vrouter which has been chosen."""
vm_obj = self._vnc_lib.virtual_machine_read(id=vm_uuid)
vr_obj = self._vnc_lib.virtual_router_read(fq_name=vr_fq_name)
vr_obj.add_virtual_machine(vm_obj)
self._vnc_lib.virtual_router_update(vr_obj)
vr_list = VirtualRouterSM._dict.keys()
for vm_id in si.virtual_machines:
if vm_id == vm.uuid:
continue
anti_affinity_vm = VirtualMachineSM.get(vm_id)
if anti_affinity_vm:
try:
vr_list.remove(anti_affinity_vm.virtual_router)
except ValueError:
pass

for vr in VirtualRouterSM.values():
if not vr.agent_state:
try:
vr_list.remove(vr.uuid)
except ValueError:
pass
return vr_list

class RandomScheduler(VRouterScheduler):
"""Randomly allocate a vrouter agent for virtual machine of a service
instance."""

def schedule(self, si_uuid, vm_uuid):
candidates = self._get_candidates(si_uuid, vm_uuid)
def schedule(self, si, vm):
candidates = self._get_candidates(si, vm)
if not candidates:
return
return None
chosen_vrouter = random.choice(candidates)
self._bind_vrouter(vm_uuid, chosen_vrouter)
self._vnc_lib.ref_update('virtual-router', chosen_vrouter,
'virtual-machine', vm.uuid, None, 'ADD')
return chosen_vrouter
5 changes: 4 additions & 1 deletion src/config/svc-monitor/svc_monitor/svc_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def post_init(self, vnc_lib, args=None):
self.vrouter_scheduler = importutils.import_object(
self._args.si_netns_scheduler_driver,
self._vnc_lib, self._nova_client,
self._args)
self._disc, self.logger, self._args)

# load virtual machine instance manager
self.vm_manager = importutils.import_object(
Expand Down Expand Up @@ -383,6 +383,7 @@ def post_init(self, vnc_lib, args=None):
self.upgrade()

# check services
self.vrouter_scheduler.vrouters_running()
self.launch_services()

self._db_resync_done.set()
Expand Down Expand Up @@ -915,6 +916,8 @@ def timer_callback(monitor):
for vm in vm_delete_list:
monitor._delete_service_instance(vm)

monitor.vrouter_scheduler.vrouters_running()

# check status of service
si_list = list(ServiceInstanceSM.values())
for si in si_list:
Expand Down

0 comments on commit 04c0e9c

Please sign in to comment.