From 13c018e77bacd9a5f933827400060fa065d6da38 Mon Sep 17 00:00:00 2001 From: Sachin Bansal Date: Fri, 29 May 2015 12:32:41 -0700 Subject: [PATCH] changes in api server ifmap interface 1. Make the ifmap queue size and message size configurable. 2. Catch and log exceptions in dequeue greenlet 3. Build messages in a list instead of one big string to avoid string copying multiple times Change-Id: I706478e20c4beff948e50e51286c1abbc9c0c668 Closes-Bug: #1460188 (cherry picked from commit e28383252a8f2c5f1570d291bb1619dc403dbc82) --- src/config/api-server/utils.py | 8 +++++ src/config/api-server/vnc_cfg_ifmap.py | 49 ++++++++++++++++---------- src/config/common/utils.py | 11 ++++++ 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/config/api-server/utils.py b/src/config/api-server/utils.py index 39ac3b9586c..7aa3e561304 100644 --- a/src/config/api-server/utils.py +++ b/src/config/api-server/utils.py @@ -34,6 +34,8 @@ def parse_args(args_str): 'admin_port': _ADMIN_PORT, 'ifmap_server_ip': '127.0.0.1', 'ifmap_server_port': "8443", + 'ifmap_queue_size': 10000, + 'ifmap_max_message_size': 1024*1024, 'collectors': None, 'http_server_port': '8084', 'log_local': True, @@ -121,6 +123,12 @@ def parse_args(args_str): "--ifmap_server_ip", help="IP address of ifmap server") parser.add_argument( "--ifmap_server_port", help="Port of ifmap server") + parser.add_argument( + "--ifmap_queue_size", help="Size of the queue that holds pending " + "messages to be sent to ifmap server") + parser.add_argument( + "--ifmap_max_message_size", help="Maximum size of message sent to " + "ifmap server") # TODO should be from certificate parser.add_argument( diff --git a/src/config/api-server/vnc_cfg_ifmap.py b/src/config/api-server/vnc_cfg_ifmap.py index 3e63d3a563f..a8201581412 100644 --- a/src/config/api-server/vnc_cfg_ifmap.py +++ b/src/config/api-server/vnc_cfg_ifmap.py @@ -101,16 +101,9 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port, self._username = uname self._password = passwd self._ssl_options = ssl_options - self._queue = Queue(100000) - self._dequeue_greenlet = gevent.spawn(self._publish_to_ifmap_dequeue) + self._dequeue_greenlet = None self._CONTRAIL_XSD = "http://www.contrailsystems.com/vnc_cfg.xsd" self._IPERMS_NAME = "id-perms" - self._IPERMS_FQ_NAME = "contrail:" + self._IPERMS_NAME - self._SUBNETS_NAME = "contrail:subnets" - self._IPAMS_NAME = "contrail:ipams" - self._SG_RULE_NAME = "contrail:sg_rules" - self._POLICY_ENTRY_NAME = "contrail:policy_entry" - self._NAMESPACES = { 'env': "http://www.w3.org/2003/05/soap-envelope", 'ifmap': "http://www.trustedcomputinggroup.org/2010/IFMAP/2", @@ -127,7 +120,7 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port, server_addrs = ["%s:%s" % (ifmap_srv_ip, ifmap_srv_port)]) self._conn_state = ConnectionStatus.INIT - self._reset_cache() + self._reset() # Set the signal handler signal.signal(signal.SIGUSR2, self.handler) @@ -166,11 +159,18 @@ def _init_conn(self): mapclient.set_publisher_id(newSessionResult(result).get_publisher_id()) # end _init_conn - def _reset_cache(self): + def _get_api_server(self): + return self._db_client_mgr._api_svr_mgr + # end _get_api_server + + def _reset(self): # Cache of metas populated in ifmap server. Useful in update to find # what things to remove in ifmap server self._id_to_metas = {} - # end _reset_cache + self._queue = Queue(self._get_api_server()._args.ifmap_queue_size) + if self._dequeue_greenlet is None: + self._dequeue_greenlet = gevent.spawn(self._ifmap_dequeue_task) + # end _reset def _publish_config_root(self): @@ -210,10 +210,19 @@ def _publish_to_ifmap_enqueue(self, oper, oper_body, do_trace=True): self._queue.put((oper, oper_body, do_trace)) # end _publish_to_ifmap_enqueue + def _ifmap_dequeue_task(self): + while True: + try: + self._publish_to_ifmap_dequeue() + except Exception as e: + tb = utils.detailed_traceback() + self.config_log(tb, level=SandeshLevel.SYS_ERROR) + def _publish_to_ifmap_dequeue(self): while self._queue.peek(): publish_discovery = False - request = '' + requests = [] + requests_len = 0 traces = [] while True: try: @@ -224,14 +233,16 @@ def _publish_to_ifmap_dequeue(self): if do_trace: trace = self._generate_ifmap_trace(oper, oper_body) traces.append(trace) - request += oper_body - if len(request) > 1024*1024: + requests.append(oper_body) + requests_len += len(oper_body) + if (requests_len > + self._get_api_server()._args.ifmap_max_message_size): break except Empty: break ok = True - if request: - ok, msg = self._publish_to_ifmap(request) + if requests: + ok, msg = self._publish_to_ifmap(''.join(requests)) for trace in traces: if ok: trace_msg(trace, 'IfmapTraceBuf', self._sandesh) @@ -239,7 +250,7 @@ def _publish_to_ifmap_dequeue(self): trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=msg) if publish_discovery and ok: - self._db_client_mgr._api_svr_mgr.publish_ifmap_to_discovery() + self._get_api_server().publish_ifmap_to_discovery() # end _publish_to_ifmap_dequeue def _publish_to_ifmap(self, oper_body): @@ -292,8 +303,8 @@ def _publish_to_ifmap(self, oper_body): server_addrs=["%s:%s" % (self._ifmap_srv_ip, self._ifmap_srv_port)]) - self._reset_cache() - self._db_client_mgr._api_svr_mgr.un_publish_ifmap_to_discovery() + self._reset() + self._get_api_server().un_publish_ifmap_to_discovery() # this will block till connection is re-established self._init_conn() self._publish_config_root() diff --git a/src/config/common/utils.py b/src/config/common/utils.py index 0c143ff45b4..a0239127abb 100644 --- a/src/config/common/utils.py +++ b/src/config/common/utils.py @@ -23,6 +23,17 @@ import urllib from collections import OrderedDict +import sys +import cgitb +import cStringIO + +def detailed_traceback(): + buf = cStringIO.StringIO() + cgitb.Hook(format="text", file=buf).handle(sys.exc_info()) + tb_txt = buf.getvalue() + buf.close() + return tb_txt +# end detailed_traceback def encode_string(enc_str, encoding='utf-8'): """Encode the string using urllib.quote_plus