Skip to content

Commit

Permalink
Send partition info from snmp-collector and topology services
Browse files Browse the repository at this point in the history
Send member list, partition list and the prouter list from
contrail-snmp-collector and contrail-topology

Change-Id: I4db80eb18a77ae1b1f0bf554083185498c335e90
Closes-Bug: #1640959
  • Loading branch information
Sundaresan Rajangam committed Mar 11, 2017
1 parent 26d8424 commit d7e7d71
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 11 deletions.
5 changes: 4 additions & 1 deletion src/analytics/contrail-snmp-collector/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ sources = [
mibs = Glob('mibs/*')

prouter_pkg = env.SandeshGenPy('prouter.sandesh', 'contrail_snmp_collector/sandesh/', False)
snmp_collector_info_pkg = env.SandeshGenPy('snmp_collector_info.sandesh', 'contrail_snmp_collector/sandesh/', False)
nodeinfo_pkg = env.SandeshGenPy('#controller/src/base/sandesh/nodeinfo.sandesh', 'contrail_snmp_collector/sandesh/', False)
cpuinfo_pkg = env.SandeshGenPy('#controller/src/base/sandesh/cpuinfo.sandesh', 'contrail_snmp_collector/sandesh/nodeinfo/', False)
process_info_pkg = env.SandeshGenPy('#controller/src/base/sandesh/process_info.sandesh', 'contrail_snmp_collector/sandesh/nodeinfo/', False)

sdist_depends = [ prouter_pkg, nodeinfo_pkg, cpuinfo_pkg, process_info_pkg ]
sdist_depends = [ prouter_pkg, snmp_collector_info_pkg, nodeinfo_pkg,
cpuinfo_pkg, process_info_pkg ]

version = '0.0.1'
#with open("CHANGES.txt") as f:
Expand Down Expand Up @@ -116,6 +118,7 @@ doc_files += env['BASE_DOC_FILES']
doc_files += env['SANDESH_DOC_FILES']
doc_files += env['ANALYTICS_DOC_FILES']
doc_files += env.SandeshGenDoc('prouter.sandesh')
doc_files += env.SandeshGenDoc('snmp_collector_info.sandesh')

if 'install' in BUILD_TARGETS:
install_cmd = env.Command(None, sources + sdist_depends,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import signal
import random
import hashlib
from sandesh.snmp_collector_info.ttypes import SnmpCollectorInfo, \
SnmpCollectorUVE

class MaxNinTtime(object):
def __init__(self, n, t, default=0):
Expand Down Expand Up @@ -54,6 +56,7 @@ def __init__(self, config):
self._config.random_collectors = random.sample(self._config.collectors(), \
len(self._config.collectors()))
self.uve = SnmpUve(self._config)
self._hostname = socket.gethostname()
self._logger = self.uve.logger()
self.sleep_time()
self._keep_running = True
Expand All @@ -64,6 +67,9 @@ def __init__(self, config):
self._state = 'full_scan' # replace it w/ fsm
self._if_data = None # replace it w/ fsm
self._cleanup = None
self._members = None
self._partitions = None
self._prouters = None

def _make_if_cdata(self, data):
if_cdata = {}
Expand Down Expand Up @@ -228,6 +234,22 @@ def _send_uve(self, d):
self.find_fix_name(data['name'], dev)
self._logger.debug('@send_uve:Processed %d!' % (len(d)))

def _send_snmp_collector_uve(self, members, partitions, prouters):
snmp_collector_info = SnmpCollectorInfo()
if self._members != members:
self._members = members
snmp_collector_info.members = members
if self._partitions != partitions:
self._partitions = partitions
snmp_collector_info.partitions = partitions
if self._prouters != prouters:
self._prouters = prouters
snmp_collector_info.prouters = prouters
if snmp_collector_info != SnmpCollectorInfo():
snmp_collector_info.name = self._hostname
SnmpCollectorUVE(data=snmp_collector_info).send()
# end _send_snmp_collector_uve

def _del_uves(self, l):
with self._sem:
for dev in l:
Expand Down Expand Up @@ -300,10 +322,12 @@ def run(self):
while self._keep_running:
self._logger.debug('@run: ittr(%d)' % i)
if constnt_schdlr.schedule(self._config.devices()):
members = constnt_schdlr.members()
partitions = constnt_schdlr.partitions()
prouters = map(lambda x: x.name, constnt_schdlr.work_items())
self._send_snmp_collector_uve(members, partitions, prouters)
sleep_time = self.do_work(i, constnt_schdlr.work_items())
self._logger.debug('done work %s' % str(
map(lambda x: x.name,
constnt_schdlr.work_items())))
self._logger.debug('done work %s' % str(prouters))
i += 1
gevent.sleep(sleep_time)
else:
Expand Down
27 changes: 27 additions & 0 deletions src/analytics/contrail-snmp-collector/snmp_collector_info.sandesh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
*/

/**
* Definitions of structures used by contrail-snmp-collector to send
* information to the contrail-collector
*/

/**
* stores snmp collector information
*/
struct SnmpCollectorInfo {
1: string name (key="ObjectCollectorInfo")
2: optional bool deleted
3: list<string> members
4: list<string> partitions
5: list<string> prouters
}

/**
* @description: uve to send snmp collector information
* @object: analytics-node
*/
uve sandesh SnmpCollectorUVE {
1: SnmpCollectorInfo data
}
4 changes: 3 additions & 1 deletion src/analytics/contrail-topology/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ sources = [
]

src_sandesh = env.SandeshGenPy('link.sandesh', 'contrail_topology/sandesh/', False)
topology_info_pkg = env.SandeshGenPy('topology_info.sandesh', 'contrail_topology/sandesh/', False)
cpuinfo_pkg = env.SandeshGenPy('#controller/src/base/sandesh/cpuinfo.sandesh', 'contrail_topology/sandesh/nodeinfo/', False)
process_info_pkg = env.SandeshGenPy('#controller/src/base/sandesh/process_info.sandesh', 'contrail_topology/sandesh/nodeinfo/', False)
nodeinfo_pkg = env.SandeshGenPy('#controller/src/base/sandesh/nodeinfo.sandesh', 'contrail_topology/sandesh/', False)
Expand All @@ -36,7 +37,7 @@ version = '0.0.1'
# version = f.read().split(",")[0][1:]

sdist_depends = [ nodeinfo_pkg, cpuinfo_pkg, process_info_pkg,
derived_stats_pkg]
derived_stats_pkg, topology_info_pkg ]
cd_cmd = 'cd ' + Dir('.').path + ' && '
sdist_gen = env.Command('dist/contrail-topology-' \
+ version + '.tar.gz',
Expand Down Expand Up @@ -114,6 +115,7 @@ doc_files += env['BASE_DOC_FILES']
doc_files += env['SANDESH_DOC_FILES']
doc_files += env['ANALYTICS_DOC_FILES']
doc_files += env.SandeshGenDoc('link.sandesh')
doc_files += env.SandeshGenDoc('topology_info.sandesh')

if 'install' in BUILD_TARGETS:
install_cmd = env.Command(None, sources + src_sandesh,
Expand Down
29 changes: 28 additions & 1 deletion src/analytics/contrail-topology/contrail_topology/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import signal
import random
import hashlib
from sandesh.topology_info.ttypes import TopologyInfo, TopologyUVE

class PRouter(object):
def __init__(self, name, data):
Expand All @@ -21,7 +22,7 @@ def __init__(self, name, data):
class Controller(object):
def __init__(self, config):
self._config = config
self._me = socket.gethostname() + ':' + str(os.getpid())
self._hostname = socket.gethostname()
self.analytic_api = AnalyticApiClient(self._config)
self._config.random_collectors = self._config.collectors()
self._chksum = ""
Expand All @@ -30,9 +31,13 @@ def __init__(self, config):
self._config.random_collectors = random.sample(self._config.collectors(), \
len(self._config.collectors()))
self.uve = LinkUve(self._config)
self._logger = self.uve.logger()
self.sleep_time()
self._keep_running = True
self._vnc = None
self._members = None
self._partitions = None
self._prouters = None

def stop(self):
self._keep_running = False
Expand Down Expand Up @@ -128,6 +133,22 @@ def _chk_lnk(self, pre, index):
return d['ifOperStatus'] == 1
return False

def _send_topology_uve(self, members, partitions, prouters):
topology_info = TopologyInfo()
if self._members != members:
self._members = members
topology_info.members = members
if self._partitions != partitions:
self._partitions = partitions
topology_info.partitions = partitions
if self._prouters != prouters:
self._prouters = prouters
topology_info.prouters = prouters
if topology_info != TopologyInfo():
topology_info.name = self._hostname
TopologyUVE(data=topology_info).send()
# end _send_topology_uve

def bms_links(self, prouter, ifm):
if not self._vnc:
try:
Expand Down Expand Up @@ -317,11 +338,17 @@ def run(self):
self.uve._moduleid,
zookeeper=self._config.zookeeper_server(),
delete_hndlr=self._del_uves,
logger=self._logger,
cluster_id=self._config.cluster_id())

while self._keep_running:
self.scan_data()
if self.constnt_schdlr.schedule(self.prouters):
members = self.constnt_schdlr.members()
partitions = self.constnt_schdlr.partitions()
prouters = map(lambda x: x.name,
self.constnt_schdlr.work_items())
self._send_topology_uve(members, partitions, prouters)
try:
with self._sem:
self.compute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,13 @@ def __init__(self, conf):
self._instance_id,
staticmethod(ConnectionState.get_process_state_cb),
NodeStatusUVE, NodeStatus, self.table)
self._logger = sandesh_global.logger()
# end __init__

def logger(self):
return self._logger
# end logger

def send(self, data):
pprint.pprint(data)
for prouter in data:
Expand Down
27 changes: 27 additions & 0 deletions src/analytics/contrail-topology/topology_info.sandesh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
*/

/**
* Definitions of structures used by contrail-snmp-collector to send
* information to the contrail-collector
*/

/**
* stores topology information
*/
struct TopologyInfo {
1: string name (key="ObjectCollectorInfo")
2: optional bool deleted
3: list<string> members
4: list<string> partitions
5: list<string> prouters
}

/**
* @description: uve to send topology information
* @object: analytics-node
*/
uve sandesh TopologyUVE {
1: TopologyInfo data
}
18 changes: 13 additions & 5 deletions src/opserver/consistent_schdlr.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def schedule(self, items, lock_timeout=30):
self._items2name(items))
return ret

def members(self):
return list(self._con_hash.nodes)

def partitions(self):
return list(self._pc)

def work_items(self):
return sum(self._partitions.values(), [])

Expand Down Expand Up @@ -171,25 +177,27 @@ def _supress_log(self, *s):
def _consistent_hash(self, members):
if self._con_hash is None:
self._con_hash = ConsistentHash(members)
self._supress_log('members:', self._con_hash.nodes)
self._logger.error('members: %s' % (str(self._con_hash.nodes)))
cur, updtd = set(self._con_hash.nodes), set(members)
if cur != updtd:
newm = updtd - cur
rmvd = cur - updtd
if newm:
self._supress_log('new workers:', newm)
self._logger.error('new members: %s' % (str(newm)))
self._con_hash.add_nodes(list(newm))
if rmvd:
self._supress_log('workers left:', rmvd)
self._logger.error('members left: %s' % (str(rmvd)))
self._con_hash.del_nodes(list(rmvd))
return self._con_hash

def _consistent_hash_get_node(self, members, partition):
return self._consistent_hash(members).get_node(partition)

def _partitioner_func(self, identifier, members, _partitions):
return [p for p in _partitions \
if self._consistent_hash_get_node(members, p) == identifier]
partitions = [p for p in _partitions \
if self._consistent_hash_get_node(members, p) == identifier]
self._logger.error('partitions: %s' % (str(partitions)))
return partitions

def _release(self):
old = set(self._pc)
Expand Down

0 comments on commit d7e7d71

Please sign in to comment.