Skip to content

Commit

Permalink
changes in api server ifmap interface
Browse files Browse the repository at this point in the history
1. Make the ifmap queue size and message size configurable.
2. Catch and log exceptions in dequeue greenlet
3. Build messages in a list instead of one big string to avoid string copying multiple times

Change-Id: I706478e20c4beff948e50e51286c1abbc9c0c668
Closes-Bug: #1460188
(cherry picked from commit e28383252a8f2c5f1570d291bb1619dc403dbc82)
  • Loading branch information
Sachin Bansal committed May 30, 2015
1 parent eb7ec67 commit 13c018e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 19 deletions.
8 changes: 8 additions & 0 deletions src/config/api-server/utils.py
Expand Up @@ -34,6 +34,8 @@ def parse_args(args_str):
'admin_port': _ADMIN_PORT,
'ifmap_server_ip': '127.0.0.1',
'ifmap_server_port': "8443",
'ifmap_queue_size': 10000,
'ifmap_max_message_size': 1024*1024,
'collectors': None,
'http_server_port': '8084',
'log_local': True,
Expand Down Expand Up @@ -121,6 +123,12 @@ def parse_args(args_str):
"--ifmap_server_ip", help="IP address of ifmap server")
parser.add_argument(
"--ifmap_server_port", help="Port of ifmap server")
parser.add_argument(
"--ifmap_queue_size", help="Size of the queue that holds pending "
"messages to be sent to ifmap server")
parser.add_argument(
"--ifmap_max_message_size", help="Maximum size of message sent to "
"ifmap server")

# TODO should be from certificate
parser.add_argument(
Expand Down
49 changes: 30 additions & 19 deletions src/config/api-server/vnc_cfg_ifmap.py
Expand Up @@ -101,16 +101,9 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
self._username = uname
self._password = passwd
self._ssl_options = ssl_options
self._queue = Queue(100000)
self._dequeue_greenlet = gevent.spawn(self._publish_to_ifmap_dequeue)
self._dequeue_greenlet = None
self._CONTRAIL_XSD = "http://www.contrailsystems.com/vnc_cfg.xsd"
self._IPERMS_NAME = "id-perms"
self._IPERMS_FQ_NAME = "contrail:" + self._IPERMS_NAME
self._SUBNETS_NAME = "contrail:subnets"
self._IPAMS_NAME = "contrail:ipams"
self._SG_RULE_NAME = "contrail:sg_rules"
self._POLICY_ENTRY_NAME = "contrail:policy_entry"

self._NAMESPACES = {
'env': "http://www.w3.org/2003/05/soap-envelope",
'ifmap': "http://www.trustedcomputinggroup.org/2010/IFMAP/2",
Expand All @@ -127,7 +120,7 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
server_addrs = ["%s:%s" % (ifmap_srv_ip, ifmap_srv_port)])
self._conn_state = ConnectionStatus.INIT

self._reset_cache()
self._reset()

# Set the signal handler
signal.signal(signal.SIGUSR2, self.handler)
Expand Down Expand Up @@ -166,11 +159,18 @@ def _init_conn(self):
mapclient.set_publisher_id(newSessionResult(result).get_publisher_id())
# end _init_conn

def _reset_cache(self):
def _get_api_server(self):
return self._db_client_mgr._api_svr_mgr
# end _get_api_server

def _reset(self):
# Cache of metas populated in ifmap server. Useful in update to find
# what things to remove in ifmap server
self._id_to_metas = {}
# end _reset_cache
self._queue = Queue(self._get_api_server()._args.ifmap_queue_size)
if self._dequeue_greenlet is None:
self._dequeue_greenlet = gevent.spawn(self._ifmap_dequeue_task)
# end _reset


def _publish_config_root(self):
Expand Down Expand Up @@ -210,10 +210,19 @@ def _publish_to_ifmap_enqueue(self, oper, oper_body, do_trace=True):
self._queue.put((oper, oper_body, do_trace))
# end _publish_to_ifmap_enqueue

def _ifmap_dequeue_task(self):
while True:
try:
self._publish_to_ifmap_dequeue()
except Exception as e:
tb = utils.detailed_traceback()
self.config_log(tb, level=SandeshLevel.SYS_ERROR)

def _publish_to_ifmap_dequeue(self):
while self._queue.peek():
publish_discovery = False
request = ''
requests = []
requests_len = 0
traces = []
while True:
try:
Expand All @@ -224,22 +233,24 @@ def _publish_to_ifmap_dequeue(self):
if do_trace:
trace = self._generate_ifmap_trace(oper, oper_body)
traces.append(trace)
request += oper_body
if len(request) > 1024*1024:
requests.append(oper_body)
requests_len += len(oper_body)
if (requests_len >
self._get_api_server()._args.ifmap_max_message_size):
break
except Empty:
break
ok = True
if request:
ok, msg = self._publish_to_ifmap(request)
if requests:
ok, msg = self._publish_to_ifmap(''.join(requests))
for trace in traces:
if ok:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh)
else:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh,
error_msg=msg)
if publish_discovery and ok:
self._db_client_mgr._api_svr_mgr.publish_ifmap_to_discovery()
self._get_api_server().publish_ifmap_to_discovery()
# end _publish_to_ifmap_dequeue

def _publish_to_ifmap(self, oper_body):
Expand Down Expand Up @@ -292,8 +303,8 @@ def _publish_to_ifmap(self, oper_body):
server_addrs=["%s:%s" % (self._ifmap_srv_ip,
self._ifmap_srv_port)])

self._reset_cache()
self._db_client_mgr._api_svr_mgr.un_publish_ifmap_to_discovery()
self._reset()
self._get_api_server().un_publish_ifmap_to_discovery()
# this will block till connection is re-established
self._init_conn()
self._publish_config_root()
Expand Down
11 changes: 11 additions & 0 deletions src/config/common/utils.py
Expand Up @@ -23,6 +23,17 @@

import urllib
from collections import OrderedDict
import sys
import cgitb
import cStringIO

def detailed_traceback():
buf = cStringIO.StringIO()
cgitb.Hook(format="text", file=buf).handle(sys.exc_info())
tb_txt = buf.getvalue()
buf.close()
return tb_txt
# end detailed_traceback

def encode_string(enc_str, encoding='utf-8'):
"""Encode the string using urllib.quote_plus
Expand Down

0 comments on commit 13c018e

Please sign in to comment.