Skip to content

Commit

Permalink
Merge "Initial code for mesos-manager - defined a mesos-manager serve…
Browse files Browse the repository at this point in the history
…r - fixed mesos server code to run - handle CNI input data"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Dec 7, 2016
2 parents a7a9bc7 + 28935e8 commit 580e31c
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 21 deletions.
22 changes: 14 additions & 8 deletions src/container/mesos-manager/contrail-mesos.conf
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
[MESOS]
listen_ip_addr=10.84.13.35
listen_port=6991
pod_subnets=
service_subnets=

[VNC]
vnc_endpoint_ip = 10.138.0.2
vnc_endpoint_port = 8082
admin_user = admin
admin_password = admin
admin_tenant = admin
rabbit_server=10.138.0.2:5672
cassandra_server_list=10.138.0.2:9160
vnc_endpoint_ip=127.0.0.1
vnc_endpoint_port=8082
admin_user=admin
admin_password=admin
admin_tenant=admin
rabbit_server=127.0.0.1:5672
cassandra_server_list=127.0.0.1:9160

[DEFAULTS]
disc_server_ip=localhost
disc_server_ip=127.0.0.1
disc_server_port=5998
log_local=1
log_level=SYS_DEBUG
Expand Down
8 changes: 8 additions & 0 deletions src/container/mesos-manager/mesos_manager/common/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
#

import argparse
import ConfigParser
import sys

from pysandesh.sandesh_base import Sandesh, SandeshSystem
import mesos_consts
from sandesh_common.vns.constants import HttpPortMesosManager


Expand All @@ -16,6 +18,8 @@ def parse_args():
args, remaining_argv = conf_parser.parse_known_args(sys.argv)

defaults = {
'listen_ip_addr': mesos_consts._WEB_HOST,
'listen_port': mesos_consts._WEB_PORT,
'http_server_port': HttpPortMesosManager,
'worker_id': '0',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
Expand All @@ -40,6 +44,10 @@ def parse_args():
'kombu_ssl_keyfile': '',
'kombu_ssl_certfile': '',
'kombu_ssl_ca_certs': '',
'cassandra_server_ip': mesos_consts._CASSANDRA_HOST,
'cassandra_server_port': mesos_consts._CASSANDRA_PORT,
'cassandra_max_retries': mesos_consts._CASSANDRA_MAX_RETRIES,
'cassandra_timeout': mesos_consts._CASSANDRA_TIMEOUT,
'cassandra_user': None,
'cassandra_password': None,
'cluster_id': '',
Expand Down
16 changes: 8 additions & 8 deletions src/container/mesos-manager/mesos_manager/common/logger.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
#
Expand All @@ -8,10 +7,11 @@
"""

import logging
import socket

from cfgm_common.uve.nodeinfo.ttypes import NodeStatusUVE, NodeStatus
import discoveryclient.client as client
from mesos_manager.sandesh.mesos_manager import ttypes as sandesh
from sandesh.mesos_manager import ttypes as sandesh
from pysandesh.connection_info import ConnectionState
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from pysandesh.sandesh_base import Sandesh, SandeshSystem
Expand Down Expand Up @@ -155,12 +155,12 @@ def sandesh_init(self):

# Initialize Sandesh generator.
self._sandesh.init_generator(
self.module.name, self.module.hostname,
self.module.node_type_name, self.module.instance_id,
self.module['name'], self.module['hostname'],
self.module['node_type_name'], self.module['instance_id'],
self._args.collectors, 'mesos_manager_context',
int(self._args.http_server_port),
['cfgm_common', 'mesos_manager.sandesh'],
self.module.discovery, logger_class=self._args.logger_class,
self.module['discovery'], logger_class=self._args.logger_class,
logger_config_file=self._args.logging_conf)

# Set Sandesh logging params.
Expand All @@ -172,8 +172,8 @@ def sandesh_init(self):
syslog_facility=self._args.syslog_facility)

# Connect to collector.
ConnectionState.init(self._sandesh, self.module.hostname,
self.module.name, self.module.instance_id,
ConnectionState.init(self._sandesh, self.module['hostname'],
self.module['name'], self.module['instance_id'],
staticmethod(ConnectionState.get_process_state_cb),
NodeStatusUVE, NodeStatus, self.module.table)
NodeStatusUVE, NodeStatus, self.module['table'])

14 changes: 14 additions & 0 deletions src/container/mesos-manager/mesos_manager/mesos_consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#
# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
#

_WEB_HOST = '127.0.0.1'
_WEB_PORT = 6991
_CASSANDRA_HOST = '127.0.0.1'
_CASSANDRA_PORT = 9160
_CASSANDRA_MAX_RETRIES = 5
_CASSANDRA_TIMEOUT = 0.5
_ZK_HOST = '127.0.0.1'
_ZK_PORT = 2181
DEFAULT_VERSION='1.0'

20 changes: 15 additions & 5 deletions src/container/mesos-manager/mesos_manager/mesos_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,46 @@
Mesos network manager
"""

import socket
import gevent
from gevent.queue import Queue

import common.args as mesos_args
import common.logger as logger
from vnc_api.vnc_api import *
import common.logger
from cfgm_common import vnc_cgitb
import vnc.vnc_mesos as vnc_mesos
import mesos_server as mserver


class MesosNetworkManager(object):
def __init__(self, args=None):
self.args = args
self.logger = logger.MesosManagerLogger(args)
self.q = Queue()

self.logger = common.logger.MesosManagerLogger(args)

self.vnc = vnc_mesos.VncMesos(args=self.args,
logger=self.logger, q=self.q)

self.mserver = mserver.MesosServer(args=self.args,
logger=self.logger)
# end __init__

def start_tasks(self):
self.logger.info("Starting all tasks.")

gevent.joinall([
gevent.spawn(self.vnc.vnc_process),
gevent.spawn(self.mserver.start_server),
])
# end start_tasks

# end class MesosNetworkManager

def main():
vnc_cgitb.enable(format='text')
args = mesos_args.parse_args()
mesos_nw_mgr = MesosNetworkManager(args)
mesos_nw_mgr.start_tasks()


if __name__ == '__main__':
main()
194 changes: 194 additions & 0 deletions src/container/mesos-manager/mesos_manager/mesos_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
#
# Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
#

"""
MESOS CNI Server
"""

import bottle
import json
from cfgm_common.rest import LinkObject
from vnc_api.vnc_api import *

class MesosServer(object):

def __init__(self, args=None, logger=None, q=None):
self._args = args
self.logger = logger

self._homepage_links = []
self._cni_data = {}

self._base_url = "http://%s:%s" % (self._args.listen_ip_addr,
self._args.listen_port)
self._pipe_start_app = None
bottle.route('/', 'GET', self.homepage_http_get)

# Add CNI information
bottle.route('/add_cni_info', 'POST', self.add_cni_info)
self._homepage_links.append(
LinkObject(
'action',
self._base_url , '/add_cni_info', 'Add CNI information'))

# Get CNI information
bottle.route('/get_cni_info', 'GET', self.get_cni_info_all)
self._homepage_links.append(
LinkObject(
'action',
self._base_url , '/get_cni_info', 'get all CNI information'))

# get a specific CNI information
bottle.route('/get_cni_info/<container_id>', 'GET', self.get_cni_info_all)

# show config
bottle.route('/config', 'GET', self.show_config)
self._homepage_links.append(
LinkObject(
'action',
self._base_url , '/config', 'show cni config'))

# show debug
bottle.route('/stats', 'GET', self.show_stats)
self._homepage_links.append(
LinkObject(
'action',
self._base_url , '/stats', 'show cni debug stats'))

# cleanup
bottle.route('/cleanup', 'GET', self.cleanup_http_get)
self._homepage_links.append(LinkObject('action',
self._base_url , '/cleanup', 'Purge deleted cni'))

if not self._pipe_start_app:
self._pipe_start_app = bottle.app()

def process_cni_data(self, container_id, data):
event = json.loads(data)
print event
self.q.put(event)
pass

def create_cni_data(self, container_id, data):
if not container_id in self._cni_data:
self._cni_data[container_id] = {}
self._cni_data[container_id] = data
self.process_cni_data(container_id, data)
return self._cni_data[container_id]
# end

def delete_cni_data(self, container_id):
if container_id in self._cni_data:
del self._cni_data[container_id]
# end

def get_cni_data(self, container_id, service_type):
if container_id in self._cni_data:
return self._cni_data[container_id]
return None
# end

# Public Methods
def get_args(self):
return self._args
# end get_args

def get_ip_addr(self):
return self._args.listen_ip_addr
# end get_ip_addr

def get_port(self):
return self._args.listen_port
# end get_port

def get_pipe_start_app(self):
return self._pipe_start_app
# end get_pipe_start_app

def add_cni_info(self):
json_req = {}
ctype = bottle.request.headers['content-type']
try:
if 'application/json' in ctype:
data = bottle.request.json
elif 'application/xml' in ctype:
data = xmltodict.parse(bottle.request.body.read())
except Exception as e:
self.syslog('Unable to parse publish request')
self.syslog(bottle.request.body.buf)
bottle.abort(415, 'Unable to parse publish request')

for key, value in data.items():
json_req[key] = value
print json_req

cid = json_req['cid']
self.create_cni_data(cid, json_req)

return json_req
# end add_cni_info

def get_cni_info_all(self):
return self._cni_data
# end get_cni_info_all

# purge expired cni
def cleanup_http_get(self):
return "Cleanup"
# end cleanup_http_get

def show_config(self):
rsp = ""

rsp += '<table border="1" cellpadding="1" cellspacing="0">\n'
rsp += '<tr><th colspan="2">Defaults CONFIGG</th></tr>'
for k in sorted(self._args.__dict__.iterkeys()):
v = self._args.__dict__[k]
rsp += '<tr><td>%s</td><td>%s</td></tr>' % (k, v)
rsp += '</table>'
rsp += '<br>'

return rsp
# end show_config

def show_stats(self):

rsp = ""
rsp += ' <table border="1" cellpadding="1" cellspacing="0">\n'
for k in sorted(stats.iterkeys()):
rsp += ' <tr>\n'
rsp += ' <td>%s</td>\n' % (k)
rsp += ' <td>STATSSS</td>\n'
rsp += ' </tr>\n'
return rsp
# end show_stats

def homepage_http_get(self):
json_links = []
url = bottle.request.url[:-1]
for link in self._homepage_links:
json_links.append({'link': link.to_dict(with_url=url)})

json_body = \
{"href": self._base_url,
"links": json_links
}

return json_body
# end homepage_http_get

def start_server(self):
self.logger.info("Starting server.")

pipe_start_app = self.get_pipe_start_app()

try:
bottle.run(app=pipe_start_app, host=self.get_ip_addr(),
port=self.get_port(), server='gevent')
except Exception as e:
import pdb; pdb.set_trace()
self.cleanup()
# start_server

# end class MesosServer

0 comments on commit 580e31c

Please sign in to comment.