diff --git a/src/container/kube-manager/kube_manager/kube/kube_monitor.py b/src/container/kube-manager/kube_manager/kube/kube_monitor.py index 954603178fc..48559956645 100644 --- a/src/container/kube-manager/kube_manager/kube/kube_monitor.py +++ b/src/container/kube-manager/kube_manager/kube/kube_monitor.py @@ -1,29 +1,28 @@ -import eventlet -import json import sys -import requests import os +import time +import socket +import select +import eventlet import json -from urllib3 import (request,PoolManager) +import requests class KubeMonitor(object): - # Common handle to a generic http connection to api server. - kube_api_conn_handle = None - def __init__(self, args=None, logger=None, q=None, db=None, resource_name='KubeMonitor', beta=False): + self.name = type(self).__name__ self.args = args self.logger = logger self.q = q self.cloud_orchestrator = self.args.orchestrator self.token = self.args.token # valid only for OpenShift self.headers = {} - - if not self.kube_api_conn_handle: - self.kube_api_conn_handle = PoolManager() + self.verify = False + self.timeout = 60 # Per-monitor stream handle to api server. + self.kube_api_resp = None self.kube_api_stream_handle = None # Resource name corresponding to this monitor. @@ -39,6 +38,7 @@ def __init__(self, args=None, logger=None, q=None, db=None, if self.cloud_orchestrator == "openshift": protocol = "https" self.headers = {'Authorization': "Bearer " + self.token} + self.verify = False else: # kubernetes protocol = "http" @@ -51,7 +51,21 @@ def __init__(self, args=None, logger=None, q=None, db=None, # URL to v1-beta1 components to api server. self.beta_url = "%s/apis/extensions/v1beta1" % (self.url) - self.logger.info("KubeMonitor init done."); + if not self._is_kube_api_server_alive(): + msg = "kube_api_service is not available" + self.logger.error("%s - %s" %(self.name, msg)) + raise Exception(msg) + + self.logger.info("%s - KubeMonitor init done." %self.name) + + def _is_kube_api_server_alive(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = sock.connect_ex((self.args.kubernetes_api_server, \ + self.args.kubernetes_api_port)) + if result == 0: + return True + else: + return False def _get_component_url(self): """URL to a component. @@ -80,45 +94,68 @@ def init_monitor(self): # Get the URL to this component. url = self._get_component_url() - # Read existing entries for this component, from api server. - resp = self.kube_api_conn_handle.request('GET', url) - if resp.status != 200: + try: + resp = requests.get(url, headers=self.headers, verify=self.verify) + if resp.status_code != 200: + resp.close() + return + except requests.exceptions.RequestException as e: + self.logger.error("%s - %s" % (self.name, e)) return - initial_entries = json.loads(resp.data)['items'] + initial_entries = resp.json()['items'] + resp.close() if initial_entries: for entry in initial_entries: entry_url = self.get_entry_url(self.url, entry) - resp = self.kube_api_conn_handle.request('GET', entry_url) - if resp.status != 200: + try: + resp = requests.get(entry_url, headers=self.headers, \ + verify=self.verify) + if resp.status_code != 200: + resp.close() + continue + except requests.exceptions.RequestException as e: + self.logger.error("%s - %s" % (self.name, e)) continue try: # Construct the event and initiate processing. - event = {'object':json.loads(resp.data), 'type':'ADDED'} + event = {'object':resp.json(), 'type':'ADDED'} self.process_event(event) except ValueError: self.logger.error("Invalid data read from kube api server :" " %s" % (entry)) + resp.close() def register_monitor(self): """Register this component for notifications from api server. """ - url = self._get_component_url() - - if self.cloud_orchestrator == "openshift": - resp = requests.get(url, params={'watch': 'true'}, stream=True, - headers=self.headers, verify=False) - else: # kubernetes - resp = requests.get(url, params={'watch': 'true'}, stream=True) - - if resp.status_code != 200: + if self.kube_api_resp: + self.kube_api_resp.close() + if not self._is_kube_api_server_alive(): + msg = "kube_api_service is not available" + self.logger.error("%s - %s" %(self.name, msg)) + time.sleep(self.timeout) return - # Get handle to events for this monitor. - self.kube_api_stream_handle = resp.iter_lines(chunk_size=10, - delimiter='\n') - - def get_resource(self, resource_type, resource_name, namespace=None, beta=False): + url = self._get_component_url() + try: + resp = requests.get(url, params={'watch': 'true'}, \ + stream=True, headers=self.headers, \ + verify=self.verify) + if resp.status_code != 200: + resp.close() + return + # Get handle to events for this monitor. + self.kube_api_resp = resp + self.kube_api_stream_handle = resp.iter_lines(chunk_size=256, + delimiter='\n') + self.logger.info("%s - Watches %s" %(self.name, url)) + except requests.exceptions.RequestException as e: + self.logger.error("%s - %s" % (self.name, e)) + + def get_resource(self, resource_type, resource_name, \ + namespace=None, beta=False): + json_data = {} if beta == False: base_url = self.url else: @@ -129,18 +166,19 @@ def get_resource(self, resource_type, resource_name, namespace=None, beta=False) else: url = "%s/namespaces/%s/%s/%s" % (base_url, namespace, resource_type, resource_name) - - if self.cloud_orchestrator == "openshift": - resp = requests.get(url, stream=True, - headers=self.headers, verify=False) - else: # kubernetes - resp = requests.get(url, stream=True) - - if resp.status_code != 200: - return - return json.loads(resp.raw.read()) - - def patch_resource(self, resource_type, resource_name, merge_patch, namespace=None, beta=False): + try: + resp = requests.get(url, stream=True, \ + headers=self.headers, verify=self.verify) + if resp.status_code == 200: + json_data = json.loads(resp.raw.read()) + resp.close() + except requests.exceptions.RequestException as e: + self.logger.error("%s - %s" % (self.name, e)) + + return json_data + + def patch_resource(self, resource_type, resource_name, \ + merge_patch, namespace=None, beta=False): if beta == False: base_url = self.url else: @@ -152,21 +190,33 @@ def patch_resource(self, resource_type, resource_name, merge_patch, namespace=No url = "%s/namespaces/%s/%s/%s" % (base_url, namespace, resource_type, resource_name) - self.headers.update({'Accept': 'application/json', 'Content-Type': 'application/strategic-merge-patch+json'}) - if self.cloud_orchestrator == "openshift": - resp = requests.patch(url, headers=self.headers, data=json.dumps(merge_patch), verify=False) - else: # kubernetes - resp = requests.patch(url, headers=self.headers, data=json.dumps(merge_patch)) + headers = {'Accept': 'application/json/', \ + 'Content-Type': 'application/strategic-merge-patch+json'} + self.headers.update(headers) + + try: + resp = requests.patch(url, headers=self.headers, \ + data=json.dumps(merge_patch), \ + verify=self.verify) + if resp.status_code != 200: + resp.close() + return + except requests.exceptions.RequestException as e: + self.logger.error("%s - %s" % (self.name, e)) - if resp.status_code != 200: - return return resp.iter_lines(chunk_size=10, delimiter='\n') def process(self): """Process available events.""" if not self.kube_api_stream_handle: - self.logger.error("Event handler not found for %s. " - "Cannot process its events." % (self.resource_name)) + self.logger.error("%s - Event handler not found. " + "Cannot process its events." % (self.name)) + return + + resp = self.kube_api_resp + fp = resp.raw._fp.fp + if fp is None: + self.register_monitor() return try: @@ -175,11 +225,14 @@ def process(self): return except StopIteration: return + except requests.exceptions.ChunkedEncodingError as e: + self.logger.error("%s - %s" % (self.name, e)) + return try: self.process_event(json.loads(line)) except ValueError: - print("Invalid JSON data from response stream:%s" % line) + self.logger.error("Invalid JSON data from response stream:%s" % line) def process_event(self, event): """Process an event.""" diff --git a/src/container/kube-manager/kube_manager/kube_manager.py b/src/container/kube-manager/kube_manager/kube_manager.py index 5526ef6b64b..5e16cd7d91e 100644 --- a/src/container/kube-manager/kube_manager/kube_manager.py +++ b/src/container/kube-manager/kube_manager/kube_manager.py @@ -61,7 +61,7 @@ def __init__(self, args=None): kube_api_connected = True except Exception as e: - time.sleep(5) + time.sleep(30) # Register all the known monitors. for monitor in self.monitors.values(): @@ -79,8 +79,8 @@ def launch_timer(self): "in contrail-kubernetes.conf. \ example: kube_timer_interval=60") sys.exit() - self.logger.notice("kube_timer_interval set to %s seconds" % - self.args.kube_timer_interval) + self.logger.info("KubeNetworkManager - kube_timer_interval(%ss)" + %self.args.kube_timer_interval) time.sleep(int(self.args.kube_timer_interval)) while True: gevent.sleep(int(self.args.kube_timer_interval))