Skip to content

Commit

Permalink
Create vrouter state cache from polling uve
Browse files Browse the repository at this point in the history
Currently every launch of a netns VM causes fetch of vrouter UVEs.
This is inefficient and the plan is to maintain a local cache for
the same. Also need to add debouncing logic for marking the vrouter
as down as the collectors might return inconsistent data when
there is flux in the system.

Also added fix to pick analytics api client from discovery and not
from configuration as before.

Change-Id: I5c158d152355eaaee3a7b582ba41841f6fe49573
Closes-Bug: #1536821
  • Loading branch information
rrugge committed Mar 30, 2016
1 parent e3fad01 commit afebb2b
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 471 deletions.
1 change: 1 addition & 0 deletions src/config/svc-monitor/SConscript
Expand Up @@ -37,6 +37,7 @@ sources = [
'svc_monitor/tests/test_vrouter_instance_manager.py',
'svc_monitor/tests/test_virtual_machine_manager.py',
'svc_monitor/tests/test_snat.py',
'svc_monitor/tests/test_common_utils.py',
'svc_monitor/tests/scheduler/__init__.py',
'svc_monitor/tests/scheduler/test_vrouter_schedulers.py',
]
Expand Down
15 changes: 15 additions & 0 deletions src/config/svc-monitor/svc_monitor/config_db.py
Expand Up @@ -226,6 +226,8 @@ class VirtualRouterSM(DBBase):

def __init__(self, uuid, obj_dict=None):
self.uuid = uuid
self.agent_state = False
self.agent_down_count = 0
self.virtual_machines = set()
self.update(obj_dict)
# end __init__
Expand All @@ -246,6 +248,19 @@ def delete(cls, uuid):
obj.update_multiple_refs('virtual_machine', {})
del cls._dict[uuid]
# end delete

def set_agent_state(self, up):
if up:
self.agent_down_count = 0
self.agent_state = True
else:
self.agent_down_count += 1
if not (self.agent_down_count % 3):
self.agent_state = False

def set_netns_version(self, netns_version):
self.netns_version = netns_version

# end VirtualRouterSM


Expand Down
13 changes: 6 additions & 7 deletions src/config/svc-monitor/svc_monitor/instance_manager.py
Expand Up @@ -534,13 +534,12 @@ def _create_svc_vm_port(self, nic, instance_name, si, st,
def _associate_vrouter(self, si, vm):
vrouter_name = None
if not vm.virtual_router:
chosen_vr_fq_name = None
chosen_vr_fq_name = self.vrouter_scheduler.schedule(
si.uuid, vm.uuid)
if chosen_vr_fq_name:
vrouter_name = chosen_vr_fq_name[-1]
chosen_vr = self.vrouter_scheduler.schedule(si, vm)
if chosen_vr:
vr = VirtualRouterSM.get(chosen_vr)
vrouter_name = vr.name
self.logger.log_notice("vrouter %s updated with vm %s" %
(':'.join(chosen_vr_fq_name), vm.name))
(':'.join(vr.fq_name), vm.name))
vm.update()
else:
vr = VirtualRouterSM.get(vm.virtual_router)
Expand Down Expand Up @@ -599,7 +598,7 @@ def check_service(self, si):
service_up = False
else:
vr = VirtualRouterSM.get(vm.virtual_router)
if self.vrouter_scheduler.vrouter_running(vr.name):
if vr.agent_state:
continue
self._vnc_lib.ref_update('virtual-router', vr.uuid,
'virtual-machine', vm.uuid, None, 'DELETE')
Expand Down
222 changes: 109 additions & 113 deletions src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py
Expand Up @@ -28,18 +28,20 @@
from sandesh_common.vns import constants
from vnc_api.vnc_api import NoIdError

from svc_monitor.config_db import *

from sandesh_common.vns.constants import \
ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME as analytics_svc_name

@six.add_metaclass(abc.ABCMeta)
class VRouterScheduler(object):

def __init__(self, vnc_lib, nova_client, args):
def __init__(self, vnc_lib, nova_client, disc, logger, args):
self._vnc_lib = vnc_lib
self._args = args
self._nc = nova_client

# initialize analytics client
endpoint = "http://%s:%s" % (self._args.analytics_server_ip,
self._args.analytics_server_port)
self._analytics = analytics_client.Client(endpoint)
self._disc = disc
self._logger = logger

@abc.abstractmethod
def schedule(self, plugin, context, router_id, candidates=None):
Expand All @@ -51,130 +53,124 @@ def schedule(self, plugin, context, router_id, candidates=None):
"""
pass

def _get_candidates(self, si_uuid, vm_uuid):
"""Return vrouter agents where a service instance virtual machine
could be scheduled.
If a VM of a same service instance is already scheduled on the vrouter,
that vrouter is exclude from the candidates list.
If the VM is already scheduled on a running vrouter, only that vrouter
is return in the candidates list.
"""

# if availability zone configured then use it
vr_list = []
if self._args.netns_availability_zone:
az_list = self._nc.oper('availability_zones', 'list',
'admin', detailed=True)
for az in az_list:
if self._args.netns_availability_zone in str(az):
for host in az.hosts:
vr_list.append({'fq_name':
['default-global-system-config', host]})
else:
vr_list = self._vnc_lib.virtual_routers_list()['virtual-routers']

# check if vrouters are functional and support service instance
vrs_fq_name = [vr['fq_name'] for vr in vr_list
if self.vrouter_running(vr['fq_name'][-1])]
vrs_fq_name = [vr_fq_name for vr_fq_name in vrs_fq_name
if self.vrouter_check_version(
vr_fq_name[-1],
svc_info._VROUTER_NETNS_SUPPORTED_VERSION)]

for vr_fq_name in vrs_fq_name:
try:
vr_obj = self._vnc_lib.virtual_router_read(fq_name=vr_fq_name)
except NoIdError:
vrs_fq_name.remove(vr_fq_name)
def _get_az_vrouter_list(self):
if not self._args.netns_availability_zone:
return None
az_list = self._nc.oper('availability_zones', 'list',
self._args.admin_tenant_name,
detailed=True)
az_vr_list = []
for az in az_list:
if self._args.netns_availability_zone not in str(az):
continue
for vm_ref in vr_obj.get_virtual_machine_refs() or []:
if vm_uuid == vm_ref['uuid']:
return [vr_fq_name]
try:
vm_obj = self._vnc_lib.virtual_machine_read(
id=vm_ref['uuid'])
except NoIdError:
continue
if si_uuid in [si['uuid'] for si in
vm_obj.get_service_instance_refs()]:
vrs_fq_name.remove(vr_fq_name)
continue
return vrs_fq_name

def vrouter_running(self, vrouter_name):
"""Check if a vrouter agent is up and running."""
az_vr_list.extend(az.hosts.keys())

return az_vr_list

def get_analytics_client(self):
try:
sub_obj = self._disc.subscribe(analytics_svc_name, 0)
slist = sub_obj.info
except Exception as ex:
self._logger.log_error('Failed to get analytics api from discovery')
return None
else:
if not slist:
self._logger.log_error('No analytics api client in discovery')
return None
analytics_api = random.choice(slist)
endpoint = "http://%s:%s" % (analytics_api['ip-address'],
str(analytics_api['port']))
return analytics_client.Client(endpoint)

def query_uve(self, filter_string):
path = "/analytics/uves/vrouter/"

fqdn_uuid = "%s?cfilt=VrouterAgent" % vrouter_name
try:
vrouter_agent = self._analytics.request(path, fqdn_uuid)
except analytics_client.OpenContrailAPIFailed:
return False

if 'VrouterAgent' not in vrouter_agent or \
('mode' in vrouter_agent['VrouterAgent'] and \
vrouter_agent['VrouterAgent']['mode'] != \
constants.VrouterAgentTypeMap[
constants.VrouterAgentType.VROUTER_AGENT_EMBEDDED]):
return False

fqdn_uuid = "%s?cfilt=NodeStatus" % vrouter_name
response_dict = {}
try:
node_status = self._analytics.request(path, fqdn_uuid)
response = self._analytics.request(path, filter_string)
for values in response['value']:
response_dict[values['name']] = values['value']
except analytics_client.OpenContrailAPIFailed:
return False
pass
return response_dict

if not node_status or 'NodeStatus' not in node_status or \
'process_status' not in node_status['NodeStatus']:
return False
def vrouters_running(self):
# get az host list
az_vrs = self._get_az_vrouter_list()

for process in node_status['NodeStatus']['process_status']:
if (process['module_id'] == constants.MODULE_VROUTER_AGENT_NAME and
int(process['instance_id']) == 0 and
process['state'] == 'Functional'):
return True
return False
# read all vrouter information
self._analytics = self.get_analytics_client()
if not self._analytics:
return
agents_status = self.query_uve("*?cfilt=NodeStatus:process_status")
vrouters_mode = self.query_uve("*?cfilt=VrouterAgent:mode")

def vrouter_check_version(self, vrouter_name, version):
"""Check the vrouter version is upper or equal to a desired version."""
path = "/analytics/uves/vrouter/"
fqdn_uuid = "%s?cfilt=VrouterAgent" % vrouter_name
for vr in VirtualRouterSM.values():
if az_vrs and vr.name not in az_vrs:
vr.set_agent_state(False)
continue

try:
vrouter_agent = self._analytics.request(path, fqdn_uuid)
except analytics_client.OpenContrailAPIFailed:
return False
if vr.name not in vrouters_mode or vr.name not in agents_status:
vr.set_agent_state(False)
continue

if not vrouter_agent:
return False
try:
vr_mode = vrouters_mode[vr.name]['VrouterAgent']
if (vr_mode['mode'] != constants.VrouterAgentTypeMap[
constants.VrouterAgentType.VROUTER_AGENT_EMBEDDED]):
vr.set_agent_state(False)
continue
except Exception as e:
vr.set_agent_state(False)
continue

try:
build_info = ast.literal_eval(
vrouter_agent['VrouterAgent']['build_info'])
vrouter_version = V(build_info['build-info'][0]['build-version'])
requested_version = V(version)
except KeyError, ValueError:
return False
try:
state_up = False
for vr_status in agents_status[vr.name]['NodeStatus']['process_status'] or []:
if (vr_status['module_id'] != constants.MODULE_VROUTER_AGENT_NAME):
continue
if (int(vr_status['instance_id']) == 0 and
vr_status['state'] == 'Functional'):
vr.set_agent_state(True)
state_up = True
break
if not state_up:
vr.set_agent_state(False)
except Exception as e:
vr.set_agent_state(False)
continue

return vrouter_version >= requested_version
def _get_candidates(self, si, vm):
if vm.virtual_router:
return [vm.virtual_router]

def _bind_vrouter(self, vm_uuid, vr_fq_name):
"""Bind the virtual machine to the vrouter which has been chosen."""
vm_obj = self._vnc_lib.virtual_machine_read(id=vm_uuid)
vr_obj = self._vnc_lib.virtual_router_read(fq_name=vr_fq_name)
vr_obj.add_virtual_machine(vm_obj)
self._vnc_lib.virtual_router_update(vr_obj)
vr_list = VirtualRouterSM._dict.keys()
for vm_id in si.virtual_machines:
if vm_id == vm.uuid:
continue
anti_affinity_vm = VirtualMachineSM.get(vm_id)
if anti_affinity_vm:
try:
vr_list.remove(anti_affinity_vm.virtual_router)
except ValueError:
pass

for vr in VirtualRouterSM.values():
if not vr.agent_state:
try:
vr_list.remove(vr.uuid)
except ValueError:
pass
return vr_list

class RandomScheduler(VRouterScheduler):
"""Randomly allocate a vrouter agent for virtual machine of a service
instance."""

def schedule(self, si_uuid, vm_uuid):
candidates = self._get_candidates(si_uuid, vm_uuid)
def schedule(self, si, vm):
candidates = self._get_candidates(si, vm)
if not candidates:
return
return None
chosen_vrouter = random.choice(candidates)
self._bind_vrouter(vm_uuid, chosen_vrouter)
self._vnc_lib.ref_update('virtual-router', chosen_vrouter,
'virtual-machine', vm.uuid, None, 'ADD')
return chosen_vrouter
5 changes: 4 additions & 1 deletion src/config/svc-monitor/svc_monitor/svc_monitor.py
Expand Up @@ -305,7 +305,7 @@ def post_init(self, vnc_lib, args=None):
self.vrouter_scheduler = importutils.import_object(
self._args.si_netns_scheduler_driver,
self._vnc_lib, self._nova_client,
self._args)
self._disc, self.logger, self._args)

# load virtual machine instance manager
self.vm_manager = importutils.import_object(
Expand Down Expand Up @@ -363,6 +363,7 @@ def post_init(self, vnc_lib, args=None):
self.upgrade()

# check services
self.vrouter_scheduler.vrouters_running()
self.launch_services()

self._db_resync_done.set()
Expand Down Expand Up @@ -907,6 +908,8 @@ def timer_callback(monitor):
if len(vmi_delete_list):
monitor.vm_manager.cleanup_svc_vm_ports(vmi_delete_list)

monitor.vrouter_scheduler.vrouters_running()

# check status of service
si_list = list(ServiceInstanceSM.values())
for si in si_list:
Expand Down

0 comments on commit afebb2b

Please sign in to comment.