Skip to content

Commit

Permalink
Merge "Instead of publishing each AlarmPartition as a service, we pub…
Browse files Browse the repository at this point in the history
…lish partition information as part of the Alarm service"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Nov 27, 2015
2 parents 4bc0e16 + 079188d commit 31e945f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 42 deletions.
39 changes: 29 additions & 10 deletions src/opserver/alarmgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,23 @@ def __init__(self, conf, test_logger=None):
self._instance_id = self._conf.worker_id()

self.disc = None
self._libpart_name = self._hostname + ":" + self._instance_id
self._libpart_name = self._conf.host_ip() + ":" + self._instance_id
self._libpart = None
self._partset = set()
if self._conf.discovery()['server']:
self._max_out_rows = 20
data = {
'ip-address': self._hostname ,
'port': self._instance_id
}
self.disc = client.DiscoveryClient(
self._conf.discovery()['server'],
self._conf.discovery()['port'],
ModuleNames[Module.ALARM_GENERATOR])
print("Disc Publish to %s : %s"
% (str(self._conf.discovery()), str(data)))
data = {
'ip-address': self._conf.host_ip(),
'instance-id': self._instance_id,
'redis-port': str(self._conf.redis_server_port()),
'partitions': json.dumps({})
}
print("Disc Publish to %s : %s" \
% (str(self._conf.discovery()), str(data)))
self.disc.publish(ALARM_GENERATOR_SERVICE_NAME, data)

is_collector = True
Expand Down Expand Up @@ -574,7 +576,25 @@ def run_uve_processing(self):
"""

lredis = None
oldworkerset = {}
while True:
workerset = {}
for part in self._workers.keys():
if self._workers[part]._up:
workerset[part] = self._workers[part].acq_time()
if workerset != oldworkerset:
if self.disc:
data = {
'ip-address': self._conf.host_ip(),
'instance-id': self._instance_id,
'redis-port': str(self._conf.redis_server_port()),
'partitions': json.dumps(workerset)
}
self._logger.error("Disc Publish to %s : %s"
% (str(self._conf.discovery()), str(data)))
self.disc.publish(ALARM_GENERATOR_SERVICE_NAME, data)
oldworkerset = workerset

for part in self._uveqf.keys():
self._logger.error("Stop UVE processing for %d:%d" % \
(part, self._uveqf[part]))
Expand Down Expand Up @@ -1010,8 +1030,7 @@ def partition_change(self, partno, enl):
self.handle_uve_notifq, self._conf.host_ip(),
self.handle_resource_check,
self._instance_id,
self._conf.redis_server_port(),
cdisc)
self._conf.redis_server_port())
ph.start()
self._workers[partno] = ph
self._uvestats[partno] = {}
Expand Down Expand Up @@ -1270,7 +1289,7 @@ def disc_cb_ag(self, alist):
newlist = []
for elem in alist:
ipaddr = elem["ip-address"]
inst = elem["port"]
inst = elem["instance-id"]
newlist.append(ipaddr + ":" + inst)

# We should always include ourselves in the list of memebers
Expand Down
29 changes: 16 additions & 13 deletions src/opserver/opserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from sandesh_common.vns.constants import ModuleNames, CategoryNames,\
ModuleCategoryMap, Module2NodeType, NodeTypeNames, ModuleIds,\
INSTANCE_ID_DEFAULT, COLLECTOR_DISCOVERY_SERVICE_NAME,\
ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, ALARM_PARTITION_SERVICE_NAME
ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, ALARM_GENERATOR_SERVICE_NAME
from sandesh.viz.constants import _TABLES, _OBJECT_TABLES,\
_OBJECT_TABLE_SCHEMA, _OBJECT_TABLE_COLUMN_VALUES, \
_STAT_TABLES, STAT_OBJECTID_FIELD, STAT_VT_PREFIX, \
Expand Down Expand Up @@ -2210,18 +2210,21 @@ def disc_cb(self, clist):
def disc_agp(self, clist):
new_agp = {}
for elem in clist:
if int(elem['acq-time']) == 0:
continue
pi = PartInfo(instance_id = elem['instance-id'],
ip_address = elem['ip-address'],
acq_time = int(elem['acq-time']),
port = int(elem['port']))
partno = int(elem['partition'])
if partno not in new_agp:
new_agp[partno] = pi
else:
if pi.acq_time > new_agp[partno].acq_time:
instance_id = elem['instance-id']
port = int(elem['redis-port'])
ip_address = elem['ip-address']
parts = json.loads(elem['partitions'])
for partstr,acq_time in parts.iteritems():
partno = int(partstr)
pi = PartInfo(instance_id = instance_id,
ip_address = ip_address,
acq_time = acq_time,
port = port)
if partno not in new_agp:
new_agp[partno] = pi
else:
if pi.acq_time > new_agp[partno].acq_time:
new_agp[partno] = pi
if len(new_agp) == self._args.partitions and \
len(self.agp) != self._args.partitions:
ConnectionState.update(conn_type = ConnectionType.UVEPARTITIONS,
Expand Down Expand Up @@ -2254,7 +2257,7 @@ def run(self):
self.gevs.append(sp)

sp2 = ServicePoller(self._logger, CollectorTrace, \
self.disc, ALARM_PARTITION_SERVICE_NAME, \
self.disc, ALARM_GENERATOR_SERVICE_NAME, \
self.disc_agp, self._sandesh)
sp2.start()
self.gevs.append(sp2)
Expand Down
22 changes: 4 additions & 18 deletions src/opserver/partition_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import struct
import socket
import discoveryclient.client as client
from sandesh_common.vns.constants import ALARM_PARTITION_SERVICE_NAME
from pysandesh.util import UTCTimestampUsec
import select
import redis
Expand Down Expand Up @@ -640,7 +639,7 @@ class UveStreamProc(PartitionHandler):
# rport : redis server port
# disc : discovery client to publish to
def __init__(self, brokers, partition, uve_topic, logger, callback,
host_ip, rsc, aginst, rport, disc = None):
host_ip, rsc, aginst, rport):
super(UveStreamProc, self).__init__(brokers, "workers",
uve_topic, logger, False)
self._uvedb = {}
Expand All @@ -653,8 +652,8 @@ def __init__(self, brokers, partition, uve_topic, logger, callback,
self.disc_rset = set()
self._resource_cb = rsc
self._aginst = aginst
self._disc = disc
self._acq_time = UTCTimestampUsec()
self._up = True
self._rport = rport

def reset_acq_time(self):
Expand Down Expand Up @@ -706,28 +705,15 @@ def stop_partition(self, kcoll=None):
else:
# If all collectors are being cleared, clear resoures too
self.disc_rset = set()
if self._disc:
# TODO: Unpublish instead of setting acq-time to 0
data = { 'instance-id' : self._aginst,
'partition' : str(self._partno),
'ip-address': self._host_ip,
'acq-time': "0",
'port':str(self._rport)}
self._disc.publish(ALARM_PARTITION_SERVICE_NAME, data)
self._up = False

return partdb

def start_partition(self, cbdb):
''' This function loads the initial UVE database.
for the partition
'''
if self._disc:
data = { 'instance-id' : self._aginst,
'partition' : str(self._partno),
'ip-address': self._host_ip,
'acq-time': str(self._acq_time),
'port':str(self._rport)}
self._disc.publish(ALARM_PARTITION_SERVICE_NAME, data)
self._up = True
self._logger.error("Starting part %d collectors %s" % \
(self._partno, str(cbdb.keys())))
uves = {}
Expand Down
1 change: 0 additions & 1 deletion src/sandesh/common/vns.sandesh
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,6 @@ const string COLLECTOR_DISCOVERY_SERVICE_NAME = "Collector"
const string API_SERVER_DISCOVERY_SERVICE_NAME = "ApiServer"
const string ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME = "OpServer"
const string ALARM_GENERATOR_SERVICE_NAME = "AlarmGenerator"
const string ALARM_PARTITION_SERVICE_NAME = "AlarmPartition"
const string IFMAP_SERVER_DISCOVERY_SERVICE_NAME = "IfmapServer"
const string XMPP_SERVER_DISCOVERY_SERVICE_NAME = "xmpp-server"
const string DNS_SERVER_DISCOVERY_SERVICE_NAME = "dns-server"
Expand Down

0 comments on commit 31e945f

Please sign in to comment.