Skip to content

Commit

Permalink
Merge "changes in api server ifmap interface"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 1, 2015
2 parents 99681d2 + 58fb616 commit 3083ec2
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 19 deletions.
8 changes: 8 additions & 0 deletions src/config/api-server/utils.py
Expand Up @@ -34,6 +34,8 @@ def parse_args(args_str):
'admin_port': _ADMIN_PORT,
'ifmap_server_ip': '127.0.0.1',
'ifmap_server_port': "8443",
'ifmap_queue_size': 10000,
'ifmap_max_message_size': 1024*1024,
'collectors': None,
'http_server_port': '8084',
'log_local': True,
Expand Down Expand Up @@ -122,6 +124,12 @@ def parse_args(args_str):
"--ifmap_server_ip", help="IP address of ifmap server")
parser.add_argument(
"--ifmap_server_port", help="Port of ifmap server")
parser.add_argument(
"--ifmap_queue_size", help="Size of the queue that holds pending "
"messages to be sent to ifmap server")
parser.add_argument(
"--ifmap_max_message_size", help="Maximum size of message sent to "
"ifmap server")

# TODO should be from certificate
parser.add_argument(
Expand Down
49 changes: 30 additions & 19 deletions src/config/api-server/vnc_cfg_ifmap.py
Expand Up @@ -102,16 +102,9 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
self._username = uname
self._password = passwd
self._ssl_options = ssl_options
self._queue = Queue(100000)
self._dequeue_greenlet = gevent.spawn(self._publish_to_ifmap_dequeue)
self._dequeue_greenlet = None
self._CONTRAIL_XSD = "http://www.contrailsystems.com/vnc_cfg.xsd"
self._IPERMS_NAME = "id-perms"
self._IPERMS_FQ_NAME = "contrail:" + self._IPERMS_NAME
self._SUBNETS_NAME = "contrail:subnets"
self._IPAMS_NAME = "contrail:ipams"
self._SG_RULE_NAME = "contrail:sg_rules"
self._POLICY_ENTRY_NAME = "contrail:policy_entry"

self._NAMESPACES = {
'env': "http://www.w3.org/2003/05/soap-envelope",
'ifmap': "http://www.trustedcomputinggroup.org/2010/IFMAP/2",
Expand All @@ -128,7 +121,7 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
server_addrs = ["%s:%s" % (ifmap_srv_ip, ifmap_srv_port)])
self._conn_state = ConnectionStatus.INIT

self._reset_cache()
self._reset()

# Set the signal handler
signal.signal(signal.SIGUSR2, self.handler)
Expand Down Expand Up @@ -167,11 +160,18 @@ def _init_conn(self):
mapclient.set_publisher_id(newSessionResult(result).get_publisher_id())
# end _init_conn

def _reset_cache(self):
def _get_api_server(self):
return self._db_client_mgr._api_svr_mgr
# end _get_api_server

def _reset(self):
# Cache of metas populated in ifmap server. Useful in update to find
# what things to remove in ifmap server
self._id_to_metas = {}
# end _reset_cache
self._queue = Queue(self._get_api_server()._args.ifmap_queue_size)
if self._dequeue_greenlet is None:
self._dequeue_greenlet = gevent.spawn(self._ifmap_dequeue_task)
# end _reset


def _publish_config_root(self):
Expand Down Expand Up @@ -211,10 +211,19 @@ def _publish_to_ifmap_enqueue(self, oper, oper_body, do_trace=True):
self._queue.put((oper, oper_body, do_trace))
# end _publish_to_ifmap_enqueue

def _ifmap_dequeue_task(self):
while True:
try:
self._publish_to_ifmap_dequeue()
except Exception as e:
tb = utils.detailed_traceback()
self.config_log(tb, level=SandeshLevel.SYS_ERROR)

def _publish_to_ifmap_dequeue(self):
while self._queue.peek():
publish_discovery = False
request = ''
requests = []
requests_len = 0
traces = []
while True:
try:
Expand All @@ -225,22 +234,24 @@ def _publish_to_ifmap_dequeue(self):
if do_trace:
trace = self._generate_ifmap_trace(oper, oper_body)
traces.append(trace)
request += oper_body
if len(request) > 1024*1024:
requests.append(oper_body)
requests_len += len(oper_body)
if (requests_len >
self._get_api_server()._args.ifmap_max_message_size):
break
except Empty:
break
ok = True
if request:
ok, msg = self._publish_to_ifmap(request)
if requests:
ok, msg = self._publish_to_ifmap(''.join(requests))
for trace in traces:
if ok:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh)
else:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh,
error_msg=msg)
if publish_discovery and ok:
self._db_client_mgr._api_svr_mgr.publish_ifmap_to_discovery()
self._get_api_server().publish_ifmap_to_discovery()
# end _publish_to_ifmap_dequeue

def _publish_to_ifmap(self, oper_body):
Expand Down Expand Up @@ -294,8 +305,8 @@ def _publish_to_ifmap(self, oper_body):
server_addrs=["%s:%s" % (self._ifmap_srv_ip,
self._ifmap_srv_port)])

self._reset_cache()
self._db_client_mgr._api_svr_mgr.un_publish_ifmap_to_discovery()
self._reset()
self._get_api_server().un_publish_ifmap_to_discovery()
# this will block till connection is re-established
self._init_conn()
self._publish_config_root()
Expand Down
11 changes: 11 additions & 0 deletions src/config/common/utils.py
Expand Up @@ -23,6 +23,17 @@

import urllib
from collections import OrderedDict
import sys
import cgitb
import cStringIO

def detailed_traceback():
buf = cStringIO.StringIO()
cgitb.Hook(format="text", file=buf).handle(sys.exc_info())
tb_txt = buf.getvalue()
buf.close()
return tb_txt
# end detailed_traceback

def encode_string(enc_str, encoding='utf-8'):
"""Encode the string using urllib.quote_plus
Expand Down

0 comments on commit 3083ec2

Please sign in to comment.