From f1ce30a663c34677db67cd399d2d56f4bf91394c Mon Sep 17 00:00:00 2001 From: Sachin Bansal Date: Fri, 15 May 2015 17:04:30 -0700 Subject: [PATCH] New greenlet to send messages to ifmap With this change, we no longer send ifmap message in the same context as dequeueing them from rabbitmq. We now queue them up in a separate internal queue and a new greenlet is launched to dequeue from there. This allows faster dequeuing of rabbitmq and also the dequeue greenlet can send messages in bulk. Partial-Bug: 1432735 Change-Id: I57e7c0deee5f883bc959c2928c6f473b7dc042da (cherry picked from commit 19dc001f35b1ad2c3843efa566a45bf572cdf2da) --- src/config/api-server/vnc_cfg_api_server.py | 75 +++++++------- src/config/api-server/vnc_cfg_ifmap.py | 108 ++++++++++---------- 2 files changed, 90 insertions(+), 93 deletions(-) diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index a9a9725635f..473886f7739 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -1511,46 +1511,41 @@ def mt_http_put(self): return {} # end - def publish(self, publish_api=True, publish_ifmap=True): + def publish_self_to_discovery(self): # publish API server - if publish_api: - data = { - 'ip-address': self._args.ifmap_server_ip, - 'port': self._args.listen_port, - } - self.api_server_task = self._disc.publish( - API_SERVER_DISCOVERY_SERVICE_NAME, data) - + data = { + 'ip-address': self._args.ifmap_server_ip, + 'port': self._args.listen_port, + } + self.api_server_task = self._disc.publish( + API_SERVER_DISCOVERY_SERVICE_NAME, data) + + def publish_ifmap_to_discovery(self): # publish ifmap server - if publish_ifmap: - data = { - 'ip-address': self._args.ifmap_server_ip, - 'port': self._args.ifmap_server_port, - } - self.ifmap_task = self._disc.publish( - IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data) - # end - - def un_publish(self, un_publish_api=True, un_publish_ifmap=True): - if un_publish_api: - # un publish api server - data = { - 'ip-address': self._args.ifmap_server_ip, - 'port': self._args.listen_port, - } - self._disc.un_publish( - API_SERVER_DISCOVERY_SERVICE_NAME, data) - + data = { + 'ip-address': self._args.ifmap_server_ip, + 'port': self._args.ifmap_server_port, + } + self.ifmap_task = self._disc.publish( + IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data) + # end publish_ifmap_to_discovery + + def un_publish_self_to_discovery(self): + # un publish api server + data = { + 'ip-address': self._args.ifmap_server_ip, + 'port': self._args.listen_port, + } + self._disc.un_publish(API_SERVER_DISCOVERY_SERVICE_NAME, data) + + def un_publish_ifmap_to_discovery(self): # un publish ifmap server - if un_publish_ifmap: - data = { - 'ip-address': self._args.ifmap_server_ip, - 'port': self._args.ifmap_server_port, - } - self._disc.un_publish( - IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data) - - # end un_publish + data = { + 'ip-address': self._args.ifmap_server_ip, + 'port': self._args.ifmap_server_port, + } + self._disc.un_publish(IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data) + # end un_publish_ifmap_to_discovery # end class VncApiServer @@ -1567,9 +1562,9 @@ def main(args_str=None): server_port = vnc_api_server.get_server_port() # Advertise services - if vnc_api_server._args.disc_server_ip and\ - vnc_api_server._args.disc_server_port: - vnc_api_server.publish() + if (vnc_api_server._args.disc_server_ip and + vnc_api_server._args.disc_server_port): + vnc_api_server.publish_self_to_discovery() """ @sigchld Disable handling of SIG_CHLD for now as every keystone request to validate diff --git a/src/config/api-server/vnc_cfg_ifmap.py b/src/config/api-server/vnc_cfg_ifmap.py index cbe1527f2b6..3e63d3a563f 100644 --- a/src/config/api-server/vnc_cfg_ifmap.py +++ b/src/config/api-server/vnc_cfg_ifmap.py @@ -10,7 +10,7 @@ monkey.patch_all() import gevent import gevent.event -from gevent.queue import Queue +from gevent.queue import Queue, Empty import sys import time from pprint import pformat @@ -101,6 +101,8 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port, self._username = uname self._password = passwd self._ssl_options = ssl_options + self._queue = Queue(100000) + self._dequeue_greenlet = gevent.spawn(self._publish_to_ifmap_dequeue) self._CONTRAIL_XSD = "http://www.contrailsystems.com/vnc_cfg.xsd" self._IPERMS_NAME = "id-perms" self._IPERMS_FQ_NAME = "contrail:" + self._IPERMS_NAME @@ -125,7 +127,7 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port, server_addrs = ["%s:%s" % (ifmap_srv_ip, ifmap_srv_port)]) self._conn_state = ConnectionStatus.INIT - self._reset_cache_and_accumulator() + self._reset_cache() # Set the signal handler signal.signal(signal.SIGUSR2, self.handler) @@ -164,13 +166,11 @@ def _init_conn(self): mapclient.set_publisher_id(newSessionResult(result).get_publisher_id()) # end _init_conn - def _reset_cache_and_accumulator(self): + def _reset_cache(self): # Cache of metas populated in ifmap server. Useful in update to find # what things to remove in ifmap server self._id_to_metas = {} - self.accumulator = None - self.accumulated_request_len = 0 - # end _reset_cache_and_accumulator + # end _reset_cache def _publish_config_root(self): @@ -202,24 +202,47 @@ def _generate_ifmap_trace(self, oper, body): return ifmap_trace # end _generate_ifmap_trace - def publish_accumulated(self): - if self.accumulated_request_len: - upd_str = ''.join(''.join(request) - for request in self.accumulator) - self._publish_to_ifmap('update', upd_str, do_trace=False) - self.accumulator = None - self.accumulated_request_len = 0 - # end publish_accumulated - - def _publish_to_ifmap(self, oper, oper_body, do_trace=True): + def _publish_to_ifmap_enqueue(self, oper, oper_body, do_trace=True): # safety check, if we proceed ifmap-server reports error # asking for update|delete in publish if not oper_body: return + self._queue.put((oper, oper_body, do_trace)) + # end _publish_to_ifmap_enqueue - if do_trace: - trace = self._generate_ifmap_trace(oper, oper_body) + def _publish_to_ifmap_dequeue(self): + while self._queue.peek(): + publish_discovery = False + request = '' + traces = [] + while True: + try: + (oper, oper_body, do_trace) = self._queue.get_nowait() + if oper == 'publish_discovery': + publish_discovery = True + break + if do_trace: + trace = self._generate_ifmap_trace(oper, oper_body) + traces.append(trace) + request += oper_body + if len(request) > 1024*1024: + break + except Empty: + break + ok = True + if request: + ok, msg = self._publish_to_ifmap(request) + for trace in traces: + if ok: + trace_msg(trace, 'IfmapTraceBuf', self._sandesh) + else: + trace_msg(trace, 'IfmapTraceBuf', self._sandesh, + error_msg=msg) + if publish_discovery and ok: + self._db_client_mgr._api_svr_mgr.publish_ifmap_to_discovery() + # end _publish_to_ifmap_dequeue + def _publish_to_ifmap(self, oper_body): try: not_published = True retry_count = 0 @@ -255,15 +278,12 @@ def _publish_to_ifmap(self, oper, oper_body, do_trace=True): %(retry_count) self.config_log(log_str, level=SandeshLevel.SYS_ERR) - if do_trace: - trace_msg(trace, 'IfmapTraceBuf', self._sandesh) + return True, resp_xml except Exception as e: if (isinstance(e, socket.error) and self._conn_state != ConnectionStatus.DOWN): - log_str = 'Connection to IFMAP down. Failed to publish %s body %s' %( - oper, oper_body) - if do_trace: - trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=log_str) + log_str = 'Connection to IFMAP down. Failed to publish %s' %( + oper_body) self.config_log(log_str, level=SandeshLevel.SYS_ERR) self._conn_state = ConnectionStatus.DOWN ConnectionState.update( @@ -272,24 +292,18 @@ def _publish_to_ifmap(self, oper, oper_body, do_trace=True): server_addrs=["%s:%s" % (self._ifmap_srv_ip, self._ifmap_srv_port)]) + self._reset_cache() + self._db_client_mgr._api_svr_mgr.un_publish_ifmap_to_discovery() # this will block till connection is re-established - self._reset_cache_and_accumulator() - self._db_client_mgr._api_svr_mgr.un_publish( - un_publish_api=False, un_publish_ifmap=True) self._init_conn() - self._db_client_mgr._api_svr_mgr.publish( - publish_api=False, publish_ifmap=True) self._publish_config_root() self._db_client_mgr.db_resync() - return + return False, log_str else: - log_str = 'Failed to publish %s body %s to ifmap: %s' %(oper, - oper_body, str(e)) - if do_trace: - trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=log_str) + log_str = 'Failed to publish %s to ifmap: %s' %(oper_body, + str(e)) self.config_log(log_str, level=SandeshLevel.SYS_ERR) - - raise + return False, log_str # end _publish_to_ifmap def _build_request(self, id1_name, id2_name, meta_list, delete=False): @@ -317,7 +331,7 @@ def _delete_id_self_meta(self, self_imid, meta_name): mapclient = self._mapclient del_str = self._build_request(self_imid, 'self', [meta_name], True) - self._publish_to_ifmap('delete', del_str) + self._publish_to_ifmap_enqueue('delete', del_str) # del meta from cache and del id if this was last meta if meta_name: @@ -335,7 +349,7 @@ def _delete_id_pair_meta_list(self, id1, meta_list): for id2, metadata in meta_list: del_str += self._build_request(id1, id2, [metadata], True) - self._publish_to_ifmap('delete', del_str) + self._publish_to_ifmap_enqueue('delete', del_str) # del meta,id2 from cache and del id if this was last meta def _id_to_metas_delete(id1, id2, meta_name): @@ -425,18 +439,8 @@ def _publish_update(self, self_imid, update): else: self._id_to_metas[id2][meta_name] = [{'meta':m, 'id': self_imid}] - if self.accumulator is not None: - self.accumulator.append(requests) - self.accumulated_request_len += len(requests) - if self.accumulated_request_len >= 1024*1024: - upd_str = ''.join( - ''.join(request) for request in self.accumulator) - self._publish_to_ifmap('update', upd_str) - self.accumulator = [] - self.accumulated_request_len = 0 - else: - upd_str = ''.join(requests) - self._publish_to_ifmap('update', upd_str) + upd_str = ''.join(requests) + self._publish_to_ifmap_enqueue('update', upd_str) # end _publish_update def fq_name_to_ifmap_id(self, obj_type, fq_name): @@ -1062,13 +1066,11 @@ def _update_default_quota(self): def db_resync(self): # Read contents from cassandra and publish to ifmap mapclient = self._ifmap_db._mapclient - self._ifmap_db.accumulator = [] - self._ifmap_db.accumulated_request_len = 0 start_time = datetime.datetime.utcnow() self._cassandra_db.walk(self._dbe_resync) + self._ifmap_db._publish_to_ifmap_enqueue('publish_discovery', 1) self.config_log("Cassandra DB walk completed.", level=SandeshLevel.SYS_INFO) - self._ifmap_db.publish_accumulated() self._update_default_quota() end_time = datetime.datetime.utcnow() msg = "Time elapsed in syncing ifmap: %s" % (str(end_time - start_time))