Skip to content

Commit

Permalink
Merge "New greenlet to send messages to ifmap" into R2.20
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed May 29, 2015
2 parents a365f11 + f1ce30a commit 15e99d2
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 93 deletions.
75 changes: 35 additions & 40 deletions src/config/api-server/vnc_cfg_api_server.py
Expand Up @@ -1511,46 +1511,41 @@ def mt_http_put(self):
return {}
# end

def publish(self, publish_api=True, publish_ifmap=True):
def publish_self_to_discovery(self):
# publish API server
if publish_api:
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self.api_server_task = self._disc.publish(
API_SERVER_DISCOVERY_SERVICE_NAME, data)

data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self.api_server_task = self._disc.publish(
API_SERVER_DISCOVERY_SERVICE_NAME, data)

def publish_ifmap_to_discovery(self):
# publish ifmap server
if publish_ifmap:
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self.ifmap_task = self._disc.publish(
IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)
# end

def un_publish(self, un_publish_api=True, un_publish_ifmap=True):
if un_publish_api:
# un publish api server
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self._disc.un_publish(
API_SERVER_DISCOVERY_SERVICE_NAME, data)

data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self.ifmap_task = self._disc.publish(
IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)
# end publish_ifmap_to_discovery

def un_publish_self_to_discovery(self):
# un publish api server
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.listen_port,
}
self._disc.un_publish(API_SERVER_DISCOVERY_SERVICE_NAME, data)

def un_publish_ifmap_to_discovery(self):
# un publish ifmap server
if un_publish_ifmap:
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self._disc.un_publish(
IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)

# end un_publish
data = {
'ip-address': self._args.ifmap_server_ip,
'port': self._args.ifmap_server_port,
}
self._disc.un_publish(IFMAP_SERVER_DISCOVERY_SERVICE_NAME, data)
# end un_publish_ifmap_to_discovery

# end class VncApiServer

Expand All @@ -1567,9 +1562,9 @@ def main(args_str=None):
server_port = vnc_api_server.get_server_port()

# Advertise services
if vnc_api_server._args.disc_server_ip and\
vnc_api_server._args.disc_server_port:
vnc_api_server.publish()
if (vnc_api_server._args.disc_server_ip and
vnc_api_server._args.disc_server_port):
vnc_api_server.publish_self_to_discovery()

""" @sigchld
Disable handling of SIG_CHLD for now as every keystone request to validate
Expand Down
108 changes: 55 additions & 53 deletions src/config/api-server/vnc_cfg_ifmap.py
Expand Up @@ -10,7 +10,7 @@
monkey.patch_all()
import gevent
import gevent.event
from gevent.queue import Queue
from gevent.queue import Queue, Empty
import sys
import time
from pprint import pformat
Expand Down Expand Up @@ -101,6 +101,8 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
self._username = uname
self._password = passwd
self._ssl_options = ssl_options
self._queue = Queue(100000)
self._dequeue_greenlet = gevent.spawn(self._publish_to_ifmap_dequeue)
self._CONTRAIL_XSD = "http://www.contrailsystems.com/vnc_cfg.xsd"
self._IPERMS_NAME = "id-perms"
self._IPERMS_FQ_NAME = "contrail:" + self._IPERMS_NAME
Expand All @@ -125,7 +127,7 @@ def __init__(self, db_client_mgr, ifmap_srv_ip, ifmap_srv_port,
server_addrs = ["%s:%s" % (ifmap_srv_ip, ifmap_srv_port)])
self._conn_state = ConnectionStatus.INIT

self._reset_cache_and_accumulator()
self._reset_cache()

# Set the signal handler
signal.signal(signal.SIGUSR2, self.handler)
Expand Down Expand Up @@ -164,13 +166,11 @@ def _init_conn(self):
mapclient.set_publisher_id(newSessionResult(result).get_publisher_id())
# end _init_conn

def _reset_cache_and_accumulator(self):
def _reset_cache(self):
# Cache of metas populated in ifmap server. Useful in update to find
# what things to remove in ifmap server
self._id_to_metas = {}
self.accumulator = None
self.accumulated_request_len = 0
# end _reset_cache_and_accumulator
# end _reset_cache


def _publish_config_root(self):
Expand Down Expand Up @@ -202,24 +202,47 @@ def _generate_ifmap_trace(self, oper, body):
return ifmap_trace
# end _generate_ifmap_trace

def publish_accumulated(self):
if self.accumulated_request_len:
upd_str = ''.join(''.join(request)
for request in self.accumulator)
self._publish_to_ifmap('update', upd_str, do_trace=False)
self.accumulator = None
self.accumulated_request_len = 0
# end publish_accumulated

def _publish_to_ifmap(self, oper, oper_body, do_trace=True):
def _publish_to_ifmap_enqueue(self, oper, oper_body, do_trace=True):
# safety check, if we proceed ifmap-server reports error
# asking for update|delete in publish
if not oper_body:
return
self._queue.put((oper, oper_body, do_trace))
# end _publish_to_ifmap_enqueue

if do_trace:
trace = self._generate_ifmap_trace(oper, oper_body)
def _publish_to_ifmap_dequeue(self):
while self._queue.peek():
publish_discovery = False
request = ''
traces = []
while True:
try:
(oper, oper_body, do_trace) = self._queue.get_nowait()
if oper == 'publish_discovery':
publish_discovery = True
break
if do_trace:
trace = self._generate_ifmap_trace(oper, oper_body)
traces.append(trace)
request += oper_body
if len(request) > 1024*1024:
break
except Empty:
break
ok = True
if request:
ok, msg = self._publish_to_ifmap(request)
for trace in traces:
if ok:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh)
else:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh,
error_msg=msg)
if publish_discovery and ok:
self._db_client_mgr._api_svr_mgr.publish_ifmap_to_discovery()
# end _publish_to_ifmap_dequeue

def _publish_to_ifmap(self, oper_body):
try:
not_published = True
retry_count = 0
Expand Down Expand Up @@ -255,15 +278,12 @@ def _publish_to_ifmap(self, oper, oper_body, do_trace=True):
%(retry_count)
self.config_log(log_str, level=SandeshLevel.SYS_ERR)

if do_trace:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh)
return True, resp_xml
except Exception as e:
if (isinstance(e, socket.error) and
self._conn_state != ConnectionStatus.DOWN):
log_str = 'Connection to IFMAP down. Failed to publish %s body %s' %(
oper, oper_body)
if do_trace:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=log_str)
log_str = 'Connection to IFMAP down. Failed to publish %s' %(
oper_body)
self.config_log(log_str, level=SandeshLevel.SYS_ERR)
self._conn_state = ConnectionStatus.DOWN
ConnectionState.update(
Expand All @@ -272,24 +292,18 @@ def _publish_to_ifmap(self, oper, oper_body, do_trace=True):
server_addrs=["%s:%s" % (self._ifmap_srv_ip,
self._ifmap_srv_port)])

self._reset_cache()
self._db_client_mgr._api_svr_mgr.un_publish_ifmap_to_discovery()
# this will block till connection is re-established
self._reset_cache_and_accumulator()
self._db_client_mgr._api_svr_mgr.un_publish(
un_publish_api=False, un_publish_ifmap=True)
self._init_conn()
self._db_client_mgr._api_svr_mgr.publish(
publish_api=False, publish_ifmap=True)
self._publish_config_root()
self._db_client_mgr.db_resync()
return
return False, log_str
else:
log_str = 'Failed to publish %s body %s to ifmap: %s' %(oper,
oper_body, str(e))
if do_trace:
trace_msg(trace, 'IfmapTraceBuf', self._sandesh, error_msg=log_str)
log_str = 'Failed to publish %s to ifmap: %s' %(oper_body,
str(e))
self.config_log(log_str, level=SandeshLevel.SYS_ERR)

raise
return False, log_str
# end _publish_to_ifmap

def _build_request(self, id1_name, id2_name, meta_list, delete=False):
Expand Down Expand Up @@ -317,7 +331,7 @@ def _delete_id_self_meta(self, self_imid, meta_name):
mapclient = self._mapclient

del_str = self._build_request(self_imid, 'self', [meta_name], True)
self._publish_to_ifmap('delete', del_str)
self._publish_to_ifmap_enqueue('delete', del_str)

# del meta from cache and del id if this was last meta
if meta_name:
Expand All @@ -335,7 +349,7 @@ def _delete_id_pair_meta_list(self, id1, meta_list):
for id2, metadata in meta_list:
del_str += self._build_request(id1, id2, [metadata], True)

self._publish_to_ifmap('delete', del_str)
self._publish_to_ifmap_enqueue('delete', del_str)

# del meta,id2 from cache and del id if this was last meta
def _id_to_metas_delete(id1, id2, meta_name):
Expand Down Expand Up @@ -425,18 +439,8 @@ def _publish_update(self, self_imid, update):
else:
self._id_to_metas[id2][meta_name] = [{'meta':m,
'id': self_imid}]
if self.accumulator is not None:
self.accumulator.append(requests)
self.accumulated_request_len += len(requests)
if self.accumulated_request_len >= 1024*1024:
upd_str = ''.join(
''.join(request) for request in self.accumulator)
self._publish_to_ifmap('update', upd_str)
self.accumulator = []
self.accumulated_request_len = 0
else:
upd_str = ''.join(requests)
self._publish_to_ifmap('update', upd_str)
upd_str = ''.join(requests)
self._publish_to_ifmap_enqueue('update', upd_str)
# end _publish_update

def fq_name_to_ifmap_id(self, obj_type, fq_name):
Expand Down Expand Up @@ -1062,13 +1066,11 @@ def _update_default_quota(self):
def db_resync(self):
# Read contents from cassandra and publish to ifmap
mapclient = self._ifmap_db._mapclient
self._ifmap_db.accumulator = []
self._ifmap_db.accumulated_request_len = 0
start_time = datetime.datetime.utcnow()
self._cassandra_db.walk(self._dbe_resync)
self._ifmap_db._publish_to_ifmap_enqueue('publish_discovery', 1)
self.config_log("Cassandra DB walk completed.",
level=SandeshLevel.SYS_INFO)
self._ifmap_db.publish_accumulated()
self._update_default_quota()
end_time = datetime.datetime.utcnow()
msg = "Time elapsed in syncing ifmap: %s" % (str(end_time - start_time))
Expand Down

0 comments on commit 15e99d2

Please sign in to comment.