Skip to content

Commit

Permalink
New greenlet to send messages to ifmap
Browse files Browse the repository at this point in the history
With this change, we no longer send ifmap message in the same context as
dequeueing them from rabbitmq. We now queue them up in a separate internal queue
and a new greenlet is launched to dequeue from there. This allows faster
dequeuing of rabbitmq and also the dequeue greenlet can send messages in bulk.

Partial-Bug: 1432735

Change-Id: I57e7c0deee5f883bc959c2928c6f473b7dc042da
(cherry picked from commit 19dc001)
  • Loading branch information
Sachin Bansal committed May 28, 2015
1 parent 81b1966 commit f1ce30a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 93 deletions.
75 changes: 35 additions & 40 deletions src/config/api-server/vnc_cfg_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,46 +1511,41 @@ def mt_http_put(self):
return {}
# end

def publish(self, publish_api=True, publish_ifmap=True):
def publish_self_to_discovery(self):
# publish API server
if publish_api:
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self.api_server_task = self._disc.publish(
API_SERVER_DISCOVERY_SERVICE_NAME, data)

data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self.api_server_task = self._disc.publish(
API_SERVER_DISCOVERY_SERVICE_NAME, data)

def publish_ifmap_to_discovery(self):
# publish ifmap server
if publish_ifmap:
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self.ifmap_task = self._disc.publish(
IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)
# end

def un_publish(self, un_publish_api=True, un_publish_ifmap=True):
if un_publish_api:
# un publish api server
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self._disc.un_publish(
API_SERVER_DISCOVERY_SERVICE_NAME, data)

data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self.ifmap_task = self._disc.publish(
IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)
# end publish_ifmap_to_discovery

def un_publish_self_to_discovery(self):
# un publish api server
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self._disc.un_publish(API_SERVER_DISCOVERY_SERVICE_NAME, data)

def un_publish_ifmap_to_discovery(self):
# un publish ifmap server
if un_publish_ifmap:
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self._disc.un_publish(
IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)

# end un_publish
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self._disc.un_publish(IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)
# end un_publish_ifmap_to_discovery

# end class VncApiServer

Expand All @@ -1567,9 +1562,9 @@ def main(args_str=None):
server_port = vnc_api_server.get_server_port()

# Advertise services
if vnc_api_server._args.disc_server_ip and\
vnc_api_server._args.disc_server_port:
vnc_api_server.publish()
if (vnc_api_server._args.disc_server_ip and
vnc_api_server._args.disc_server_port):
vnc_api_server.publish_self_to_discovery()

""" @sigchld
Disable handling of SIG_CHLD for now as every keystone request to validate
Expand Down
108 changes: 55 additions & 53 deletions src/config/api-server/vnc_cfg_ifmap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
monkey.patch_all()
import gevent
import gevent.event
from gevent.queue import Queue
from gevent.queue import Queue, Empty
import sys
import time
from pprint import pformat
Expand Down Expand Up @@ -101,6 +101,8 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
self._username = uname
self._password = passwd
self._ssl_options = ssl_options
self._queue = Queue(100000)
self._dequeue_greenlet = gevent.spawn(self._publish_to_ifmap_dequeue)
self._CONTRAIL_XSD = "http://www.contrailsystems.com/vnc_cfg.xsd"
self._IPERMS_NAME = "id-perms"
self._IPERMS_FQ_NAME = "contrail:" + self._IPERMS_NAME
Expand All @@ -125,7 +127,7 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
server_addrs = ["%s:%s" % (ifmap_srv_ip, ifmap_srv_port)])
self._conn_state = ConnectionStatus.INIT

self._reset_cache_and_accumulator()
self._reset_cache()

# Set the signal handler
signal.signal(signal.SIGUSR2, self.handler)
Expand Down Expand Up @@ -164,13 +166,11 @@ def _init_conn(self):
mapclient.set_publisher_id(newSessionResult(result).get_publisher_id())
# end _init_conn

def _reset_cache_and_accumulator(self):
def _reset_cache(self):
# Cache of metas populated in ifmap server. Useful in update to find
# what things to remove in ifmap server
self._id_to_metas = {}
self.accumulator = None
self.accumulated_request_len = 0
# end _reset_cache_and_accumulator
# end _reset_cache


def _publish_config_root(self):
Expand Down Expand Up @@ -202,24 +202,47 @@ def _generate_ifmap_trace(self, oper, body):
return ifmap_trace
# end _generate_ifmap_trace

def publish_accumulated(self):
if self.accumulated_request_len:
upd_str = ''.join(''.join(request)
for request in self.accumulator)
self._publish_to_ifmap('update', upd_str, do_trace=False)
self.accumulator = None
self.accumulated_request_len = 0
# end publish_accumulated

def _publish_to_ifmap(self, oper, oper_body, do_trace=True):
def _publish_to_ifmap_enqueue(self, oper, oper_body, do_trace=True):
# safety check, if we proceed ifmap-server reports error
# asking for update|delete in publish
if not oper_body:
return
self._queue.put((oper, oper_body, do_trace))
# end _publish_to_ifmap_enqueue

if do_trace:
trace = self._generate_ifmap_trace(oper, oper_body)
def _publish_to_ifmap_dequeue(self):
while self._queue.peek():
publish_discovery = False
request = ''
traces = []
while True:
try:
(oper, oper_body, do_trace) = self._queue.get_nowait()
if oper == 'publish_discovery':
publish_discovery = True
break
if do_trace:
trace = self._generate_ifmap_trace(oper, oper_body)
traces.append(trace)
request += oper_body
if len(request) > 1024*1024:
break
except Empty:
break
ok = True
if request:
ok, msg = self._publish_to_ifmap(request)
for trace in traces:
if ok:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh)
else:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh,
error_msg=msg)
if publish_discovery and ok:
self._db_client_mgr._api_svr_mgr.publish_ifmap_to_discovery()
# end _publish_to_ifmap_dequeue

def _publish_to_ifmap(self, oper_body):
try:
not_published = True
retry_count = 0
Expand Down Expand Up @@ -255,15 +278,12 @@ def _publish_to_ifmap(self, oper, oper_body, do_trace=True):
%(retry_count)
self.config_log(log_str, level=SandeshLevel.SYS_ERR)

if do_trace:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh)
return True, resp_xml
except Exception as e:
if (isinstance(e, socket.error) and
self._conn_state != ConnectionStatus.DOWN):
log_str = 'Connection to IFMAP down. Failed to publish %s body %s' %(
oper, oper_body)
if do_trace:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=log_str)
log_str = 'Connection to IFMAP down. Failed to publish %s' %(
oper_body)
self.config_log(log_str, level=SandeshLevel.SYS_ERR)
self._conn_state = ConnectionStatus.DOWN
ConnectionState.update(
Expand All @@ -272,24 +292,18 @@ def _publish_to_ifmap(self, oper, oper_body, do_trace=True):
server_addrs=["%s:%s" % (self._ifmap_srv_ip,
self._ifmap_srv_port)])

self._reset_cache()
self._db_client_mgr._api_svr_mgr.un_publish_ifmap_to_discovery()
# this will block till connection is re-established
self._reset_cache_and_accumulator()
self._db_client_mgr._api_svr_mgr.un_publish(
un_publish_api=False, un_publish_ifmap=True)
self._init_conn()
self._db_client_mgr._api_svr_mgr.publish(
publish_api=False, publish_ifmap=True)
self._publish_config_root()
self._db_client_mgr.db_resync()
return
return False, log_str
else:
log_str = 'Failed to publish %s body %s to ifmap: %s' %(oper,
oper_body, str(e))
if do_trace:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=log_str)
log_str = 'Failed to publish %s to ifmap: %s' %(oper_body,
str(e))
self.config_log(log_str, level=SandeshLevel.SYS_ERR)

raise
return False, log_str
# end _publish_to_ifmap

def _build_request(self, id1_name, id2_name, meta_list, delete=False):
Expand Down Expand Up @@ -317,7 +331,7 @@ def _delete_id_self_meta(self, self_imid, meta_name):
mapclient = self._mapclient

del_str = self._build_request(self_imid, 'self', [meta_name], True)
self._publish_to_ifmap('delete', del_str)
self._publish_to_ifmap_enqueue('delete', del_str)

# del meta from cache and del id if this was last meta
if meta_name:
Expand All @@ -335,7 +349,7 @@ def _delete_id_pair_meta_list(self, id1, meta_list):
for id2, metadata in meta_list:
del_str += self._build_request(id1, id2, [metadata], True)

self._publish_to_ifmap('delete', del_str)
self._publish_to_ifmap_enqueue('delete', del_str)

# del meta,id2 from cache and del id if this was last meta
def _id_to_metas_delete(id1, id2, meta_name):
Expand Down Expand Up @@ -425,18 +439,8 @@ def _publish_update(self, self_imid, update):
else:
self._id_to_metas[id2][meta_name] = [{'meta':m,
'id': self_imid}]
if self.accumulator is not None:
self.accumulator.append(requests)
self.accumulated_request_len += len(requests)
if self.accumulated_request_len >= 1024*1024:
upd_str = ''.join(
''.join(request) for request in self.accumulator)
self._publish_to_ifmap('update', upd_str)
self.accumulator = []
self.accumulated_request_len = 0
else:
upd_str = ''.join(requests)
self._publish_to_ifmap('update', upd_str)
upd_str = ''.join(requests)
self._publish_to_ifmap_enqueue('update', upd_str)
# end _publish_update

def fq_name_to_ifmap_id(self, obj_type, fq_name):
Expand Down Expand Up @@ -1062,13 +1066,11 @@ def _update_default_quota(self):
def db_resync(self):
# Read contents from cassandra and publish to ifmap
mapclient = self._ifmap_db._mapclient
self._ifmap_db.accumulator = []
self._ifmap_db.accumulated_request_len = 0
start_time = datetime.datetime.utcnow()
self._cassandra_db.walk(self._dbe_resync)
self._ifmap_db._publish_to_ifmap_enqueue('publish_discovery', 1)
self.config_log("Cassandra DB walk completed.",
level=SandeshLevel.SYS_INFO)
self._ifmap_db.publish_accumulated()
self._update_default_quota()
end_time = datetime.datetime.utcnow()
msg = "Time elapsed in syncing ifmap: %s" % (str(end_time - start_time))
Expand Down

0 comments on commit f1ce30a

Please sign in to comment.