Skip to content

Commit

Permalink
Let Analytics services connect to any collector based on discovery
Browse files Browse the repository at this point in the history
Presently, the analytics services such as analytics-api, query-engine,
snmp-collector and topology always connect to local collector. If the
local collector service is down, then these services do not connect to
other active collector service, if present.

Side-effect of this behavior:
If the collector service is down, then the PRouterEntry UVEs and the
PRouterLinkEntry UVEs originated by the snmp-collector and the topology
service respectively would be missing from the /analytics/uves/prouter
output.

Fix:
analytics-api, query-engine, snmp-collector and topology should get the
collector list from the discovery service instead of connecting to the
local collector service.

Change-Id: I62074e8516f4e7eb88ac1d1c727067e87578794f
Partial-Bug: #1528770
  • Loading branch information
Sundaresan Rajangam committed Dec 26, 2015
1 parent e593abd commit 90a75d5
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 24 deletions.
Expand Up @@ -79,7 +79,7 @@ def parse(self):
args, remaining_argv = conf_parser.parse_known_args(self._argv.split())

defaults = {
'collectors' : ['127.0.0.1:8086'],
'collectors' : None,
'log_local' : False,
'log_level' : SandeshLevel.SYS_DEBUG,
'log_category' : '',
Expand Down
22 changes: 20 additions & 2 deletions src/analytics/contrail-topology/contrail_topology/config.py
Expand Up @@ -4,14 +4,17 @@
import argparse, os, ConfigParser, sys, re
from pysandesh.sandesh_base import *
from pysandesh.gen_py.sandesh.ttypes import SandeshLevel
from sandesh_common.vns.constants import HttpPortTopology
from sandesh_common.vns.constants import ModuleNames, HttpPortTopology
from sandesh_common.vns.ttypes import Module
import discoveryclient.client as discovery_client

class CfgParser(object):
CONF_DEFAULT_PATH = '/etc/contrail/contrail-topology.conf'
def __init__(self, argv):
self._args = None
self.__pat = None
self._argv = argv or ' '.join(sys.argv[1:])
self._disc = None

def parse(self):
'''
Expand Down Expand Up @@ -61,7 +64,7 @@ def parse(self):
args, remaining_argv = conf_parser.parse_known_args(self._argv.split())

defaults = {
'collectors' : ['127.0.0.1:8086'],
'collectors' : None,
'analytics_api' : ['127.0.0.1:8081'],
'log_local' : False,
'log_level' : SandeshLevel.SYS_DEBUG,
Expand All @@ -74,6 +77,10 @@ def parse(self):
'zookeeper' : '127.0.0.1:2181',
'sandesh_send_rate_limit': SandeshSystem.get_sandesh_send_rate_limit(),
}
disc_opts = {
'disc_server_ip' : '127.0.0.1',
'disc_server_port' : 5998,
}

config = None
if args.conf_file:
Expand All @@ -82,6 +89,8 @@ def parse(self):
config.read(args.conf_file)
if 'DEFAULTS' in config.sections():
defaults.update(dict(config.items("DEFAULTS")))
if 'DISCOVERY' in config.sections():
disc_opts.update(dict(config.items('DISCOVERY')))
# Override with CLI options
# Don't surpress add_help here so it will handle -h
parser = argparse.ArgumentParser(
Expand All @@ -92,6 +101,7 @@ def parse(self):
# Don't mess with format of description
formatter_class=argparse.RawDescriptionHelpFormatter,
)
defaults.update(disc_opts)
parser.set_defaults(**defaults)
parser.add_argument("--analytics_api",
help="List of analytics-api IP addresses in ip:port format",
Expand Down Expand Up @@ -121,6 +131,10 @@ def parse(self):
help="introspect server port")
parser.add_argument("--zookeeper",
help="ip:port of zookeeper server")
parser.add_argument("--disc_server_ip",
help="Discovery Server IP address")
parser.add_argument("--disc_server_port", type=int,
help="Discovery Server port")
parser.add_argument("--sandesh_send_rate_limit", type=int,
help="Sandesh send rate limit in messages/sec.")
self._args = parser.parse_args(remaining_argv)
Expand All @@ -130,6 +144,10 @@ def parse(self):
self._args.analytics_api = self._args.analytics_api.split()

self._args.config_sections = config
self._disc = discovery_client.DiscoveryClient(
self._args.disc_server_ip,
self._args.disc_server_port,
ModuleNames[Module.CONTRAIL_TOPOLOGY])

def _pat(self):
if self.__pat is None:
Expand Down
Expand Up @@ -28,7 +28,8 @@ def __init__(self, conf):
self._conf.collectors(),
self._node_type_name,
self._conf.http_port(),
['contrail_topology.sandesh'])
['contrail_topology.sandesh'],
self._conf._disc)
sandesh_global.set_logging_params(
enable_local_log=self._conf.log_local(),
category=self._conf.log_category(),
Expand All @@ -40,8 +41,7 @@ def __init__(self, conf):
self._instance_id,
staticmethod(ConnectionState.get_process_state_cb),
NodeStatusUVE, NodeStatus)

# generator_init()
# end __init__

def send(self, data):
pprint.pprint(data)
Expand Down
27 changes: 10 additions & 17 deletions src/opserver/opserver.py
Expand Up @@ -58,6 +58,7 @@
from sandesh.analytics.ttypes import *
from sandesh.analytics.cpuinfo.ttypes import ProcessCpuInfo
from sandesh.discovery.ttypes import CollectorTrace
import discoveryclient.client as discovery_client
from opserver_util import OpServerUtils
from opserver_util import ServicePoller
from cpuinfo import CpuInfoData
Expand Down Expand Up @@ -393,29 +394,16 @@ class OpServer(object):
* ``/analytics/operation/database-purge``:
"""
def disc_publish(self):
try:
import discoveryclient.client as client
except:
try:
# TODO: Try importing from the server. This should go away..
import discovery.client as client
except:
raise Exception('Could not get Discovery Client')

data = {
'ip-address': self._args.host_ip,
'port': self._args.rest_api_port,
}
self.disc = client.DiscoveryClient(
self._args.disc_server_ip,
self._args.disc_server_port,
ModuleNames[Module.OPSERVER])
self.disc.set_sandesh(self._sandesh)
self._logger.info("Disc Publish to %s : %d - %s"
% (self._args.disc_server_ip,
self._args.disc_server_port, str(data)))
self.disc.publish(ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, data)
# end
# end disc_publish

def __init__(self, args_str=' '.join(sys.argv[1:])):
self.gevs = []
Expand Down Expand Up @@ -449,11 +437,17 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
if self._args.sandesh_send_rate_limit is not None:
SandeshSystem.set_sandesh_send_rate_limit( \
self._args.sandesh_send_rate_limit)
self.disc = None
if self._args.disc_server_ip:
self.disc = discovery_client.DiscoveryClient(
self._args.disc_server_ip,
self._args.disc_server_port,
ModuleNames[Module.OPSERVER])
self._sandesh.init_generator(
self._moduleid, self._hostname, self._node_type_name,
self._instance_id, self._args.collectors, 'opserver_context',
int(self._args.http_server_port), ['opserver.sandesh'],
logger_class=self._args.logger_class,
self.disc, logger_class=self._args.logger_class,
logger_config_file=self._args.logging_conf)
self._sandesh.set_logging_params(
enable_local_log=self._args.log_local,
Expand Down Expand Up @@ -523,7 +517,6 @@ def __init__(self, args_str=' '.join(sys.argv[1:])):
dict((ModuleNames[k], [CategoryNames[ce] for ce in v])
for k, v in ModuleCategoryMap.iteritems())

self.disc = None
self.agp = {}
if self._usecache:
ConnectionState.update(conn_type = ConnectionType.UVEPARTITIONS,
Expand Down Expand Up @@ -727,7 +720,7 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])):

defaults = {
'host_ip' : "127.0.0.1",
'collectors' : ['127.0.0.1:8086'],
'collectors' : None,
'cassandra_server_list' : ['127.0.0.1:9160'],
'http_server_port' : 8090,
'rest_api_port' : 8081,
Expand Down
4 changes: 3 additions & 1 deletion src/query_engine/qed.cc
Expand Up @@ -219,11 +219,13 @@ main(int argc, char *argv[]) {
// 1. Collector client
// 2. Redis
// 3. Cassandra
// 4. Discovery (if collector list not configured)
ConnectionStateManager<NodeStatusUVE, NodeStatus>::
GetInstance()->Init(*evm.io_service(),
options.hostname(), module_name,
instance_id,
boost::bind(&GetProcessStateCb, _1, _2, _3, 3));
boost::bind(&GetProcessStateCb, _1, _2, _3,
options.collector_server_list().size() ? 3 : 4));
Sandesh::set_send_rate_limit(options.sandesh_send_rate_limit());
bool success(Sandesh::InitGenerator(
module_name,
Expand Down

0 comments on commit 90a75d5

Please sign in to comment.