Skip to content

Commit

Permalink
Merge "Fixed connection issue with kube_api_server"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 9, 2017
2 parents b0a5cbf + d7812d3 commit c1d88f5
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 57 deletions.
161 changes: 107 additions & 54 deletions src/container/kube-manager/kube_manager/kube/kube_monitor.py
@@ -1,29 +1,28 @@
import eventlet
import json
import sys
import requests
import os
import time
import socket
import select
import eventlet
import json
from urllib3 import (request,PoolManager)
import requests

class KubeMonitor(object):

# Common handle to a generic http connection to api server.
kube_api_conn_handle = None

def __init__(self, args=None, logger=None, q=None, db=None,
resource_name='KubeMonitor', beta=False):
self.name = type(self).__name__
self.args = args
self.logger = logger
self.q = q
self.cloud_orchestrator = self.args.orchestrator
self.token = self.args.token # valid only for OpenShift
self.headers = {}

if not self.kube_api_conn_handle:
self.kube_api_conn_handle = PoolManager()
self.verify = False
self.timeout = 60

# Per-monitor stream handle to api server.
self.kube_api_resp = None
self.kube_api_stream_handle = None

# Resource name corresponding to this monitor.
Expand All @@ -39,6 +38,7 @@ def __init__(self, args=None, logger=None, q=None, db=None,
if self.cloud_orchestrator == "openshift":
protocol = "https"
self.headers = {'Authorization': "Bearer " + self.token}
self.verify = False
else: # kubernetes
protocol = "http"

Expand All @@ -51,7 +51,21 @@ def __init__(self, args=None, logger=None, q=None, db=None,
# URL to v1-beta1 components to api server.
self.beta_url = "%s/apis/extensions/v1beta1" % (self.url)

self.logger.info("KubeMonitor init done.");
if not self._is_kube_api_server_alive():
msg = "kube_api_service is not available"
self.logger.error("%s - %s" %(self.name, msg))
raise Exception(msg)

self.logger.info("%s - KubeMonitor init done." %self.name)

def _is_kube_api_server_alive(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
result = sock.connect_ex((self.args.kubernetes_api_server, \
self.args.kubernetes_api_port))
if result == 0:
return True
else:
return False

def _get_component_url(self):
"""URL to a component.
Expand Down Expand Up @@ -80,45 +94,68 @@ def init_monitor(self):
# Get the URL to this component.
url = self._get_component_url()

# Read existing entries for this component, from api server.
resp = self.kube_api_conn_handle.request('GET', url)
if resp.status != 200:
try:
resp = requests.get(url, headers=self.headers, verify=self.verify)
if resp.status_code != 200:
resp.close()
return
except requests.exceptions.RequestException as e:
self.logger.error("%s - %s" % (self.name, e))
return

initial_entries = json.loads(resp.data)['items']
initial_entries = resp.json()['items']
resp.close()
if initial_entries:
for entry in initial_entries:
entry_url = self.get_entry_url(self.url, entry)
resp = self.kube_api_conn_handle.request('GET', entry_url)
if resp.status != 200:
try:
resp = requests.get(entry_url, headers=self.headers, \
verify=self.verify)
if resp.status_code != 200:
resp.close()
continue
except requests.exceptions.RequestException as e:
self.logger.error("%s - %s" % (self.name, e))
continue
try:
# Construct the event and initiate processing.
event = {'object':json.loads(resp.data), 'type':'ADDED'}
event = {'object':resp.json(), 'type':'ADDED'}
self.process_event(event)
except ValueError:
self.logger.error("Invalid data read from kube api server :"
" %s" % (entry))
resp.close()

def register_monitor(self):
"""Register this component for notifications from api server.
"""
url = self._get_component_url()

if self.cloud_orchestrator == "openshift":
resp = requests.get(url, params={'watch': 'true'}, stream=True,
headers=self.headers, verify=False)
else: # kubernetes
resp = requests.get(url, params={'watch': 'true'}, stream=True)

if resp.status_code != 200:
if self.kube_api_resp:
self.kube_api_resp.close()
if not self._is_kube_api_server_alive():
msg = "kube_api_service is not available"
self.logger.error("%s - %s" %(self.name, msg))
time.sleep(self.timeout)
return

# Get handle to events for this monitor.
self.kube_api_stream_handle = resp.iter_lines(chunk_size=10,
delimiter='\n')

def get_resource(self, resource_type, resource_name, namespace=None, beta=False):
url = self._get_component_url()
try:
resp = requests.get(url, params={'watch': 'true'}, \
stream=True, headers=self.headers, \
verify=self.verify)
if resp.status_code != 200:
resp.close()
return
# Get handle to events for this monitor.
self.kube_api_resp = resp
self.kube_api_stream_handle = resp.iter_lines(chunk_size=256,
delimiter='\n')
self.logger.info("%s - Watches %s" %(self.name, url))
except requests.exceptions.RequestException as e:
self.logger.error("%s - %s" % (self.name, e))

def get_resource(self, resource_type, resource_name, \
namespace=None, beta=False):
json_data = {}
if beta == False:
base_url = self.url
else:
Expand All @@ -129,18 +166,19 @@ def get_resource(self, resource_type, resource_name, namespace=None, beta=False)
else:
url = "%s/namespaces/%s/%s/%s" % (base_url, namespace,
resource_type, resource_name)

if self.cloud_orchestrator == "openshift":
resp = requests.get(url, stream=True,
headers=self.headers, verify=False)
else: # kubernetes
resp = requests.get(url, stream=True)

if resp.status_code != 200:
return
return json.loads(resp.raw.read())

def patch_resource(self, resource_type, resource_name, merge_patch, namespace=None, beta=False):
try:
resp = requests.get(url, stream=True, \
headers=self.headers, verify=self.verify)
if resp.status_code == 200:
json_data = json.loads(resp.raw.read())
resp.close()
except requests.exceptions.RequestException as e:
self.logger.error("%s - %s" % (self.name, e))

return json_data

def patch_resource(self, resource_type, resource_name, \
merge_patch, namespace=None, beta=False):
if beta == False:
base_url = self.url
else:
Expand All @@ -152,21 +190,33 @@ def patch_resource(self, resource_type, resource_name, merge_patch, namespace=No
url = "%s/namespaces/%s/%s/%s" % (base_url, namespace,
resource_type, resource_name)

self.headers.update({'Accept': 'application/json', 'Content-Type': 'application/strategic-merge-patch+json'})
if self.cloud_orchestrator == "openshift":
resp = requests.patch(url, headers=self.headers, data=json.dumps(merge_patch), verify=False)
else: # kubernetes
resp = requests.patch(url, headers=self.headers, data=json.dumps(merge_patch))
headers = {'Accept': 'application/json/', \
'Content-Type': 'application/strategic-merge-patch+json'}
self.headers.update(headers)

try:
resp = requests.patch(url, headers=self.headers, \
data=json.dumps(merge_patch), \
verify=self.verify)
if resp.status_code != 200:
resp.close()
return
except requests.exceptions.RequestException as e:
self.logger.error("%s - %s" % (self.name, e))

if resp.status_code != 200:
return
return resp.iter_lines(chunk_size=10, delimiter='\n')

def process(self):
"""Process available events."""
if not self.kube_api_stream_handle:
self.logger.error("Event handler not found for %s. "
"Cannot process its events." % (self.resource_name))
self.logger.error("%s - Event handler not found. "
"Cannot process its events." % (self.name))
return

resp = self.kube_api_resp
fp = resp.raw._fp.fp
if fp is None:
self.register_monitor()
return

try:
Expand All @@ -175,11 +225,14 @@ def process(self):
return
except StopIteration:
return
except requests.exceptions.ChunkedEncodingError as e:
self.logger.error("%s - %s" % (self.name, e))
return

try:
self.process_event(json.loads(line))
except ValueError:
print("Invalid JSON data from response stream:%s" % line)
self.logger.error("Invalid JSON data from response stream:%s" % line)

def process_event(self, event):
"""Process an event."""
Expand Down
6 changes: 3 additions & 3 deletions src/container/kube-manager/kube_manager/kube_manager.py
Expand Up @@ -61,7 +61,7 @@ def __init__(self, args=None):
kube_api_connected = True

except Exception as e:
time.sleep(5)
time.sleep(30)

# Register all the known monitors.
for monitor in self.monitors.values():
Expand All @@ -79,8 +79,8 @@ def launch_timer(self):
"in contrail-kubernetes.conf. \
example: kube_timer_interval=60")
sys.exit()
self.logger.notice("kube_timer_interval set to %s seconds" %
self.args.kube_timer_interval)
self.logger.info("KubeNetworkManager - kube_timer_interval(%ss)"
%self.args.kube_timer_interval)
time.sleep(int(self.args.kube_timer_interval))
while True:
gevent.sleep(int(self.args.kube_timer_interval))
Expand Down

0 comments on commit c1d88f5

Please sign in to comment.