diff --git a/src/analytics/contrail-topology/contrail_topology/analytic_client.py b/src/analytics/contrail-topology/contrail_topology/analytic_client.py index 56dc4e21fc3..1555634509f 100644 --- a/src/analytics/contrail-topology/contrail_topology/analytic_client.py +++ b/src/analytics/contrail-topology/contrail_topology/analytic_client.py @@ -3,6 +3,7 @@ # import requests, json from requests.exceptions import ConnectionError +from requests.auth import HTTPBasicAuth class AnalyticApiClient(object): def __init__(self, cfg): @@ -23,7 +24,8 @@ def init_client(self): def _get_url_json(self, url): if url is None: return {} - page = self.client.get(url) + page = self.client.get(url, auth=HTTPBasicAuth( + self.config.admin_user(), self.config.admin_password())) if page.status_code == 200: return json.loads(page.text) raise ConnectionError, "bad request " + url diff --git a/src/analytics/contrail-topology/contrail_topology/config.py b/src/analytics/contrail-topology/contrail_topology/config.py index e54d2e7b902..49f455aef22 100644 --- a/src/analytics/contrail-topology/contrail_topology/config.py +++ b/src/analytics/contrail-topology/contrail_topology/config.py @@ -4,7 +4,8 @@ import argparse, os, ConfigParser, sys, re from pysandesh.sandesh_base import * from pysandesh.gen_py.sandesh.ttypes import SandeshLevel -from sandesh_common.vns.constants import ModuleNames, HttpPortTopology, API_SERVER_DISCOVERY_SERVICE_NAME +from sandesh_common.vns.constants import ModuleNames, HttpPortTopology, \ + API_SERVER_DISCOVERY_SERVICE_NAME, OpServerAdminPort from sandesh_common.vns.ttypes import Module import discoveryclient.client as discovery_client import traceback @@ -69,7 +70,7 @@ def parse(self): defaults = { 'collectors' : None, - 'analytics_api' : ['127.0.0.1:8081'], + 'analytics_api' : ['127.0.0.1:' + str(OpServerAdminPort)], 'log_local' : False, 'log_level' : SandeshLevel.SYS_DEBUG, 'log_category' : '', @@ -224,6 +225,12 @@ def frequency(self): def http_port(self): return self._args.http_server_port + def admin_user(self): + return self._args.admin_user + + def admin_password(self): + return self._args.admin_password + def sandesh_send_rate_limit(self): return self._args.sandesh_send_rate_limit diff --git a/src/api-lib/vnc_api.py b/src/api-lib/vnc_api.py index b29bc61e825..7b1d4dd9b61 100644 --- a/src/api-lib/vnc_api.py +++ b/src/api-lib/vnc_api.py @@ -1044,8 +1044,6 @@ def virtual_network_subnet_ip_count(self, vnobj, subnet_list): #end virtual_network_subnet_ip_count def get_auth_token(self): - if self._auth_token: - return self._auth_token self._headers = self._authenticate(headers=self._headers) return self._auth_token @@ -1149,19 +1147,21 @@ def set_user_roles(self, roles): self._headers['X-API-ROLE'] = (',').join(roles) #end set_user_roles + """ + validate user token. Optionally, check token authorization for an object. + rv {'token_info': , 'permissions': 'RWX'} + """ def obj_perms(self, token, obj_uuid=None): - """ - validate user token. Optionally, check token authorization for an object. - rv {'token_info': , 'permissions': 'RWX'} - """ - query = 'token=%s' % token - if obj_uuid: - query += '&uuid=%s' % obj_uuid + self._headers['X-USER-TOKEN'] = token + query = 'uuid=%s' % obj_uuid if obj_uuid else '' try: - rv = self._request_server(rest.OP_GET, "/obj-perms", data=query) - return json.loads(rv) + rv_json = self._request_server(rest.OP_GET, "/obj-perms", data=query) + rv = json.loads(rv_json) except PermissionDenied: - return None + rv = None + finally: + del self._headers['X-USER-TOKEN'] + return rv # change object ownsership def chown(self, obj_uuid, owner): diff --git a/src/config/api-server/tests/test_crud_basic.py b/src/config/api-server/tests/test_crud_basic.py index 9360411c715..5a92da4e6be 100644 --- a/src/config/api-server/tests/test_crud_basic.py +++ b/src/config/api-server/tests/test_crud_basic.py @@ -1176,14 +1176,14 @@ def test_list_bulk_collection(self): vmi_uuids = [o.uuid for o in vmi_objs] logger.info("Querying VNs by obj_uuids.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('virtual-network', obj_uuids=vn_uuids) ret_uuids = [ret['uuid'] for ret in ret_list['virtual-networks']] self.assertThat(set(vn_uuids), Equals(set(ret_uuids))) logger.info("Querying RIs by parent_id.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('routing-instance', parent_id=vn_uuids) ret_uuids = [ret['uuid'] @@ -1192,7 +1192,7 @@ def test_list_bulk_collection(self): Equals(set(ret_uuids) & set(ri_uuids))) logger.info("Querying VMIs by back_ref_id.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('virtual-machine-interface', back_ref_id=vn_uuids) ret_uuids = [ret['uuid'] @@ -1200,7 +1200,7 @@ def test_list_bulk_collection(self): self.assertThat(set(vmi_uuids), Equals(set(ret_uuids))) logger.info("Querying VMIs by back_ref_id and extra fields.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('virtual-machine-interface', back_ref_id=vn_uuids, fields=['virtual_network_refs']) @@ -1212,14 +1212,14 @@ def test_list_bulk_collection(self): set(vn_uuids)) logger.info("Querying RIs by parent_id and filter.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('routing-instance', parent_id=vn_uuids, filters={'display_name':'%s-ri-5' %(self.id())}) self.assertThat(len(ret_list['routing-instances']), Equals(1)) logger.info("Querying VNs by obj_uuids for children+backref fields.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_objs = self._vnc_lib.resource_list('virtual-network', detail=True, obj_uuids=vn_uuids, fields=['routing_instances', 'virtual_machine_interface_back_refs']) diff --git a/src/config/api-server/tests/test_perms2.py b/src/config/api-server/tests/test_perms2.py index 47617860371..2a94f9e0f8b 100644 --- a/src/config/api-server/tests/test_perms2.py +++ b/src/config/api-server/tests/test_perms2.py @@ -112,9 +112,7 @@ def api_acl_name(self): return rg_name def check_perms(self, obj_uuid): - query = 'token=%s&uuid=%s' % (self.vnc_lib.get_auth_token(), obj_uuid) - rv = self.vnc_lib._request_server(rest.OP_GET, "/obj-perms", data=query) - rv = json.loads(rv) + rv = self.vnc_lib.obj_perms(self.vnc_lib.get_auth_token(), obj_uuid) return rv['permissions'] # display resource id-perms diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index 1b4f008f619..e49b9cfa9d7 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -1756,10 +1756,10 @@ def documentation_http_get(self, filename): # end documentation_http_get def obj_perms_http_get(self): - if 'token' not in get_request().query: + if 'HTTP_X_USER_TOKEN' not in get_request().environ: raise cfgm_common.exceptions.HttpError( 400, 'User token needed for validation') - user_token = get_request().query.token.encode("ascii") + user_token = get_request().environ['HTTP_X_USER_TOKEN'].encode("ascii") # get permissions in internal context try: diff --git a/src/config/common/analytics_client.py b/src/config/common/analytics_client.py index c5d14a00916..4209330c561 100644 --- a/src/config/common/analytics_client.py +++ b/src/config/common/analytics_client.py @@ -34,12 +34,13 @@ def __init__(self, endpoint, data={}): self.endpoint = endpoint self.data = data - def request(self, path, fqdn_uuid, data=None): + def request(self, path, fqdn_uuid, user_token=None, + data=None): req_data = dict(self.data) if data: req_data.update(data) - req_params = self._get_req_params(data=req_data) + req_params = self._get_req_params(user_token, data=req_data) url = urlparse.urljoin(self.endpoint, path + fqdn_uuid) resp = requests.get(url, **req_params) @@ -51,7 +52,7 @@ def request(self, path, fqdn_uuid, data=None): return resp.json() - def _get_req_params(self, data=None): + def _get_req_params(self, user_token, data=None): req_params = { 'headers': { 'Accept': 'application/json' @@ -59,5 +60,7 @@ def _get_req_params(self, data=None): 'data': data, 'allow_redirects': False, } + if user_token: + req_params['headers']['X-AUTH-TOKEN'] = user_token return req_params diff --git a/src/config/common/tests/tools/install_venv_common.py b/src/config/common/tests/tools/install_venv_common.py index 6c77c839664..b43c8176a49 100644 --- a/src/config/common/tests/tools/install_venv_common.py +++ b/src/config/common/tests/tools/install_venv_common.py @@ -120,7 +120,9 @@ def pip_install(self, find_links, *args): find_links_str = ' '.join('--find-links file://'+x for x in find_links) cmd_array = ['%stools/with_venv.sh' %(os.environ.get('tools_path', '')), 'python', '.venv/bin/pip', 'install', - '--upgrade', '--no-cache-dir'] + '--upgrade'] + if not args[0].startswith('pip'): + cmd_array.extend(['--no-cache-dir']) for link in find_links: cmd_array.extend(['--find-links', 'file://'+link]) self.run_command(cmd_array + list(args), diff --git a/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py b/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py index e8ff6de6979..fd3c450e126 100644 --- a/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py +++ b/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py @@ -106,7 +106,8 @@ def query_uve(self, filter_string): path = "/analytics/uves/vrouter/" response_dict = {} try: - response = self._analytics.request(path, filter_string) + response = self._analytics.request(path, filter_string, + user_token=self._vnc_lib.get_auth_token()) for values in response['value']: response_dict[values['name']] = values['value'] except analytics_client.OpenContrailAPIFailed: diff --git a/src/opserver/SConscript b/src/opserver/SConscript index 8def0775596..b0d0877cb2a 100644 --- a/src/opserver/SConscript +++ b/src/opserver/SConscript @@ -13,6 +13,7 @@ OpEnv = BuildEnv.Clone() setup_sources = [ 'setup.py', 'MANIFEST.in', + 'requirements.txt', ] setup_sources_rules = [] @@ -41,7 +42,9 @@ local_sources = [ 'partition_handler.py', 'consistent_schdlr.py', 'gendb_move_tables.py', - 'alarm_notify.py' + 'alarm_notify.py', + 'vnc_cfg_api_client.py', + 'opserver_local.py', ] plugins_sources = [ diff --git a/src/opserver/analytics_db.py b/src/opserver/analytics_db.py index 312fcd5c921..59c1b6a8ade 100644 --- a/src/opserver/analytics_db.py +++ b/src/opserver/analytics_db.py @@ -34,6 +34,7 @@ from cassandra.query import named_tuple_factory from cassandra.query import PreparedStatement, tuple_factory import platform +from opserver_util import OpServerUtils class AnalyticsDb(object): def __init__(self, logger, cassandra_server_list, @@ -603,7 +604,7 @@ def db_purge_thrift(self, purge_cutoff, purge_id): return (total_rows_deleted, purge_error_details) # end db_purge - def get_dbusage_info(self, rest_api_ip, rest_api_port): + def get_dbusage_info(self, ip, port, user, password): """Collects database usage information from all db nodes Returns: A dictionary with db node name as key and db usage in % as value @@ -611,12 +612,17 @@ def get_dbusage_info(self, rest_api_ip, rest_api_port): to_return = {} try: - uve_url = "http://" + rest_api_ip + ":" + str(rest_api_port) + "/analytics/uves/database-nodes?cfilt=DatabaseUsageInfo" - node_dburls = json.loads(urllib2.urlopen(uve_url).read()) + uve_url = "http://" + ip + ":" + str(port) + \ + "/analytics/uves/database-nodes?cfilt=DatabaseUsageInfo" + data = OpServerUtils.get_url_http(uve_url, user, password) + node_dburls = json.loads(data) for node_dburl in node_dburls: - # calculate disk usage percentage for analytics in each cassandra node - db_uve_state = json.loads(urllib2.urlopen(node_dburl['href']).read()) + # calculate disk usage percentage for analytics in each + # cassandra node + db_uve_data = OpServerUtils.get_url_http(node_dburl['href'], + user, password) + db_uve_state = json.loads(db_uve_data) db_usage_in_perc = (100* float(db_uve_state['DatabaseUsageInfo']['database_usage'][0]['analytics_db_size_1k'])/ float(db_uve_state['DatabaseUsageInfo']['database_usage'][0]['disk_space_available_1k'] + diff --git a/src/opserver/flow.py b/src/opserver/flow.py index 152c39b3909..dc948d526cf 100755 --- a/src/opserver/flow.py +++ b/src/opserver/flow.py @@ -76,7 +76,7 @@ def run(self): def parse_args(self): """ Eg. python flow.py --analytics-api-ip 127.0.0.1 - --analytics-api-port 8081 + --analytics-api-port 8181 --vrouter a6s23 --source-vn default-domain:default-project:vn1 --destination-vn default-domain:default-project:vn2 @@ -94,7 +94,7 @@ def parse_args(self): """ defaults = { 'analytics_api_ip': '127.0.0.1', - 'analytics_api_port': '8081', + 'analytics_api_port': '8181', 'start_time': 'now-10m', 'end_time': 'now', 'direction' : 'ingress', @@ -139,6 +139,11 @@ def parse_args(self): help="Show vmi uuid information") parser.add_argument( "--verbose", action="store_true", help="Show internal information") + parser.add_argument( + "--admin-user", help="Name of admin user", default="admin") + parser.add_argument( + "--admin-password", help="Password of admin user", + default="contrail123") self._args = parser.parse_args() try: @@ -332,13 +337,15 @@ def query(self): json.dumps(flow_query.__dict__)) print '' resp = OpServerUtils.post_url_http( - flow_url, json.dumps(flow_query.__dict__)) + flow_url, json.dumps(flow_query.__dict__), self._args.admin_user, + self._args.admin_password) result = {} if resp is not None: resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] result = OpServerUtils.get_query_result( - self._args.analytics_api_ip, self._args.analytics_api_port, qid) + self._args.analytics_api_ip, self._args.analytics_api_port, qid, + self._args.admin_user, self._args.admin_password) return result # end query diff --git a/src/opserver/introspect_util.py b/src/opserver/introspect_util.py index b449a354ace..2cc44366159 100644 --- a/src/opserver/introspect_util.py +++ b/src/opserver/introspect_util.py @@ -2,28 +2,37 @@ # Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. # -import urllib, urllib2 +import urllib import xmltodict import json import requests from lxml import etree import socket - +from requests.auth import HTTPBasicAuth class JsonDrv (object): - def _http_con(self, url): - return urllib2.urlopen(url) - - def load(self, url): - return json.load(self._http_con(url)) - + def load(self, url, user, password): + try: + if user and password: + auth=HTTPBasicAuth(user, password) + else: + auth=None + resp = requests.get(url, auth=auth) + return json.loads(resp.text) + except requests.ConnectionError, e: + print "Socket Connection error : " + str(e) + return None class XmlDrv (object): - def load(self, url): + def load(self, url, user, password): try: - resp = requests.get(url) + if user and password: + auth=HTTPBasicAuth(user, password) + else: + auth=None + resp = requests.get(url, auth=auth) return etree.fromstring(resp.text) except requests.ConnectionError, e: print "Socket Connection error : " + str(e) @@ -54,14 +63,14 @@ def _mk_url_str(self, path, query): return path+query_str return "http://%s:%d/%s%s" % (self._ip, self._port, path, query_str) - def dict_get(self, path='', query=None, drv=None): - try: - if path: - if drv is not None: - return drv().load(self._mk_url_str(path, query)) - return self._drv.load(self._mk_url_str(path, query)) - except urllib2.HTTPError: - return None + def dict_get(self, path='', query=None, drv=None, user=None, + password=None): + if path: + if drv is not None: + return drv().load(self._mk_url_str(path, query), user, + password) + return self._drv.load(self._mk_url_str(path, query), user, + password) # end dict_get diff --git a/src/opserver/log.py b/src/opserver/log.py index f768424bc35..22ff90236bd 100755 --- a/src/opserver/log.py +++ b/src/opserver/log.py @@ -82,7 +82,7 @@ def run(self): def parse_args(self): """ Eg. python log.py --analytics-api-ip 127.0.0.1 - --analytics-api-port 8081 + --analytics-api-port 8181 --source 127.0.0.1 --node-type Control --module bgp | cfgm | vnswad @@ -105,7 +105,7 @@ def parse_args(self): """ defaults = { 'analytics_api_ip': '127.0.0.1', - 'analytics_api_port': '8081', + 'analytics_api_port': '8181', } parser = argparse.ArgumentParser( @@ -158,6 +158,9 @@ def parse_args(self): parser.add_argument("--output-file", "-o", help="redirect output to file") parser.add_argument("--json", help="Dump output as json", action="store_true") parser.add_argument("--all", action="store_true", help=argparse.SUPPRESS) + parser.add_argument("--admin-user", help="Name of admin user", default="admin") + parser.add_argument("--admin-password", help="Password of admin user", + default="contrail123") self._args = parser.parse_args() return 0 # end parse_args @@ -457,13 +460,15 @@ def query(self): print 'Performing query: {0}'.format( json.dumps(messages_query.__dict__)) resp = OpServerUtils.post_url_http( - messages_url, json.dumps(messages_query.__dict__)) + messages_url, json.dumps(messages_query.__dict__), + self._args.admin_user, self._args.admin_password) result = {} if resp is not None: resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] result = OpServerUtils.get_query_result( - self._args.analytics_api_ip, self._args.analytics_api_port, qid) + self._args.analytics_api_ip, self._args.analytics_api_port, qid, + self._args.admin_user, self._args.admin_password) return result # end query diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index ecb67d5e8e6..34de586ec76 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -48,7 +48,9 @@ from sandesh_common.vns.constants import ModuleNames, CategoryNames,\ ModuleCategoryMap, Module2NodeType, NodeTypeNames, ModuleIds,\ INSTANCE_ID_DEFAULT, COLLECTOR_DISCOVERY_SERVICE_NAME,\ - ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, ALARM_GENERATOR_SERVICE_NAME + ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, ALARM_GENERATOR_SERVICE_NAME, \ + OpServerAdminPort, CLOUD_ADMIN_ROLE, APIAAAModes, \ + AAA_MODE_CLOUD_ADMIN, AAA_MODE_NO_AUTH from sandesh.viz.constants import _TABLES, _OBJECT_TABLES,\ _OBJECT_TABLE_SCHEMA, _OBJECT_TABLE_COLUMN_VALUES, \ _STAT_TABLES, STAT_OBJECTID_FIELD, STAT_VT_PREFIX, \ @@ -70,6 +72,9 @@ from generator_introspect_util import GeneratorIntrospectUtil from stevedore import hook, extension from partition_handler import PartInfo, UveStreamer, UveCacheProcessor +from functools import wraps +from vnc_cfg_api_client import VncCfgApiClient +from opserver_local import LocalApp _ERRORS = { errno.EBADMSG: 400, @@ -389,6 +394,26 @@ def disc_publish(self): self.disc.publish(ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, data) # end disc_publish + def validate_user_token(func): + @wraps(func) + def _impl(self, *f_args, **f_kwargs): + if self._args.auth_conf_info.get('cloud_admin_access_only') and \ + bottle.request.app == bottle.app(): + user_token = bottle.request.headers.get('X-Auth-Token') + if not user_token or not \ + self._vnc_api_client.is_role_cloud_admin(user_token): + raise bottle.HTTPResponse(status = 401, + body = 'Authentication required', + headers = self._reject_auth_headers()) + return func(self, *f_args, **f_kwargs) + return _impl + # end validate_user_token + + def _reject_auth_headers(self): + header_val = 'Keystone uri=\'%s\'' % \ + self._args.auth_conf_info.get('auth_uri') + return { "WWW-Authenticate" : header_val } + def __init__(self, args_str=' '.join(sys.argv[1:])): self.gevs = [] self._args = None @@ -464,6 +489,10 @@ def __init__(self, args_str=' '.join(sys.argv[1:])): body = gevent.queue.Queue() + self._vnc_api_client = None + if self._args.auth_conf_info.get('cloud_admin_access_only'): + self._vnc_api_client = VncCfgApiClient(self._args.auth_conf_info, + self._sandesh, self._logger) self._uvedbstream = UveStreamer(self._logger, None, None, self.get_agp, self._args.redis_password) @@ -738,6 +767,10 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): 'partitions' : 15, 'sandesh_send_rate_limit': SandeshSystem. \ get_sandesh_send_rate_limit(), + 'aaa_mode' : AAA_MODE_CLOUD_ADMIN, + 'api_server' : '127.0.0.1:8082', + 'admin_port' : OpServerAdminPort, + 'cloud_admin_role' : CLOUD_ADMIN_ROLE, } redis_opts = { 'redis_server_port' : 6379, @@ -752,6 +785,14 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): 'cassandra_user' : None, 'cassandra_password' : None, } + keystone_opts = { + 'auth_host': '127.0.0.1', + 'auth_protocol': 'http', + 'auth_port': 35357, + 'admin_user': 'admin', + 'admin_password': 'contrail123', + 'admin_tenant_name': 'default-domain' + } # read contrail-analytics-api own conf file config = None @@ -766,6 +807,8 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): disc_opts.update(dict(config.items('DISCOVERY'))) if 'CASSANDRA' in config.sections(): cassandra_opts.update(dict(config.items('CASSANDRA'))) + if 'KEYSTONE' in config.sections(): + keystone_opts.update(dict(config.items('KEYSTONE'))) # Override with CLI options # Don't surpress add_help here so it will handle -h @@ -779,6 +822,7 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): defaults.update(redis_opts) defaults.update(disc_opts) defaults.update(cassandra_opts) + defaults.update(keystone_opts) defaults.update() parser.set_defaults(**defaults) @@ -850,6 +894,26 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): help="Number of partitions for hashing UVE keys") parser.add_argument("--sandesh_send_rate_limit", type=int, help="Sandesh send rate limit in messages/sec") + parser.add_argument("--cloud_admin_role", + help="Name of cloud-admin role") + parser.add_argument("--aaa_mode", choices=APIAAAModes, + help="AAA mode") + parser.add_argument("--auth_host", + help="IP address of keystone server") + parser.add_argument("--auth_protocol", + help="Keystone authentication protocol") + parser.add_argument("--auth_port", type=int, + help="Keystone server port") + parser.add_argument("--admin_user", + help="Name of keystone admin user") + parser.add_argument("--admin_password", + help="Password of keystone admin user") + parser.add_argument("--admin_tenant_name", + help="Tenant name for keystone admin user") + parser.add_argument("--api_server", + help="Address of VNC API server in ip:port format") + parser.add_argument("--admin_port", + help="Port with local auth for admin access") self._args = parser.parse_args(remaining_argv) if type(self._args.collectors) is str: self._args.collectors = self._args.collectors.split() @@ -857,6 +921,25 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): self._args.redis_uve_list = self._args.redis_uve_list.split() if type(self._args.cassandra_server_list) is str: self._args.cassandra_server_list = self._args.cassandra_server_list.split() + + auth_conf_info = {} + auth_conf_info['admin_user'] = self._args.admin_user + auth_conf_info['admin_password'] = self._args.admin_password + auth_conf_info['admin_tenant_name'] = self._args.admin_tenant_name + auth_conf_info['auth_protocol'] = self._args.auth_protocol + auth_conf_info['auth_host'] = self._args.auth_host + auth_conf_info['auth_port'] = self._args.auth_port + auth_conf_info['auth_uri'] = '%s://%s:%d' % (self._args.auth_protocol, + self._args.auth_host, self._args.auth_port) + auth_conf_info['api_server_use_ssl'] = False + auth_conf_info['cloud_admin_access_only'] = \ + False if self._args.aaa_mode == AAA_MODE_NO_AUTH else True + auth_conf_info['cloud_admin_role'] = self._args.cloud_admin_role + auth_conf_info['admin_port'] = self._args.admin_port + api_server_info = self._args.api_server.split(':') + auth_conf_info['api_server_ip'] = api_server_info[0] + auth_conf_info['api_server_port'] = int(api_server_info[1]) + self._args.auth_conf_info = auth_conf_info # end _parse_args def get_args(self): @@ -925,9 +1008,11 @@ def _serve_streams(self, alarmsonly): ph.start() return body + @validate_user_token def uve_stream(self): return self._serve_streams(False) + @validate_user_token def alarm_stream(self): return self._serve_streams(True) @@ -1077,8 +1162,10 @@ def _query(self, request): if tabl == OVERLAY_TO_UNDERLAY_FLOW_MAP: overlay_to_underlay_map = OverlayToUnderlayMapper( - request.json, self._args.host_ip, - self._args.rest_api_port, self._logger) + request.json, 'localhost', + self._args.auth_conf_info['admin_port'], + self._args.auth_conf_info['admin_user'], + self._args.auth_conf_info['admin_password'], self._logger) try: yield overlay_to_underlay_map.process_query() except OverlayToUnderlayMapperError as e: @@ -1197,17 +1284,19 @@ def _sync_query(self, request, qid): return # end _sync_query + @validate_user_token def query_process(self): self._post_common(bottle.request, None) result = self._query(bottle.request) return result # end query_process + @validate_user_token def query_status_get(self, queryId): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) return self._query_status(bottle.request, queryId) # end query_status_get @@ -1215,15 +1304,16 @@ def query_chunk_get(self, queryId, chunkId): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) return self._query_chunk(bottle.request, queryId, int(chunkId)) # end query_chunk_get + @validate_user_token def show_queries(self): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) queries = {} try: redish = redis.StrictRedis(db=0, host='127.0.0.1', @@ -1349,13 +1439,14 @@ def _uve_http_post_filter_set(req): return filters # end _uve_http_post_filter_set + @validate_user_token def dyn_http_post(self, tables): (ok, result) = self._post_common(bottle.request, None) base_url = bottle.request.urlparts.scheme + \ '://' + bottle.request.urlparts.netloc if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) uve_type = tables uve_tbl = uve_type if uve_type in UVE_MAP: @@ -1405,6 +1496,7 @@ def dyn_http_post(self, tables): yield u']}' # end _uve_alarm_http_post + @validate_user_token def dyn_http_get(self, table, name): # common handling for all resource get (ok, result) = self._get_common(bottle.request) @@ -1412,7 +1504,7 @@ def dyn_http_get(self, table, name): '://' + bottle.request.urlparts.netloc if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) uve_tbl = table if table in UVE_MAP: uve_tbl = UVE_MAP[table] @@ -1457,12 +1549,13 @@ def dyn_http_get(self, table, name): yield json.dumps(rsp) # end dyn_http_get + @validate_user_token def uve_alarm_http_types(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') ret = {} @@ -1478,12 +1571,13 @@ def uve_alarm_http_types(self): ret[aname] = avalue return json.dumps(ret) + @validate_user_token def alarms_http_get(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') @@ -1510,12 +1604,13 @@ def alarms_http_get(self): return bottle.HTTPError(_ERRORS[errno.EIO],json.dumps(alms)) # end alarms_http_get + @validate_user_token def dyn_list_http_get(self, tables): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) arg_line = bottle.request.url.rsplit('/', 1)[1] uve_args = arg_line.split('?') uve_type = tables[:-1] @@ -1554,12 +1649,13 @@ def dyn_list_http_get(self, tables): return json.dumps(uve_links) # end dyn_list_http_get + @validate_user_token def analytics_http_get(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/' @@ -1569,12 +1665,13 @@ def analytics_http_get(self): return json.dumps(analytics_links) # end analytics_http_get + @validate_user_token def uves_http_get(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/uves/' @@ -1603,6 +1700,7 @@ def uves_http_get(self): return bottle.HTTPError(_ERRORS[errno.EIO],json.dumps(uvetype_links)) # end _uves_http_get + @validate_user_token def alarms_ack_http_post(self): self._post_common(bottle.request, None) if ('application/json' not in bottle.request.headers['Content-Type']): @@ -1655,6 +1753,7 @@ def alarms_ack_http_post(self): return bottle.HTTPResponse(status=200) # end alarms_ack_http_post + @validate_user_token def send_trace_buffer(self, source, module, instance_id, name): response = {} trace_req = SandeshTraceRequest(name) @@ -1677,11 +1776,12 @@ def send_trace_buffer(self, source, module, instance_id, name): return json.dumps(response) # end send_trace_buffer + @validate_user_token def tables_process(self): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/table/' @@ -1736,6 +1836,7 @@ def get_purge_cutoff(self, purge_input, start_times): return purge_cutoff #end get_purge_cutoff + @validate_user_token def process_purge_request(self): self._post_common(bottle.request, None) @@ -1880,8 +1981,10 @@ def _auto_purge(self): while True: trigger_purge = False db_node_usage = self._analytics_db.get_dbusage_info( - self._args.rest_api_ip, - self._args.rest_api_port) + 'localhost', + self._args.auth_conf_info['admin_port'], + self._args.auth_conf_info['admin_user'], + self._args.auth_conf_info['admin_password']) self._logger.info("node usage:" + str(db_node_usage) ) self._logger.info("threshold:" + str(self._args.db_purge_threshold)) @@ -1917,8 +2020,7 @@ def _auto_purge(self): gevent.sleep(60*30) # sleep for 30 minutes # end _auto_purge - - + @validate_user_token def _get_analytics_data_start_time(self): analytics_start_time = (self._analytics_db.get_analytics_start_time())[SYSTEM_OBJECT_START_TIME] response = {'analytics_data_start_time': analytics_start_time} @@ -1930,7 +2032,7 @@ def table_process(self, table): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/table/' + table + '/' @@ -1950,11 +2052,12 @@ def table_process(self, table): return json.dumps(json_links) # end table_process + @validate_user_token def table_schema_process(self, table): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') for i in range(0, len(self._VIRTUAL_TABLES)): @@ -1965,11 +2068,12 @@ def table_schema_process(self, table): return (json.dumps({})) # end table_schema_process + @validate_user_token def column_values_process(self, table): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + \ @@ -2034,11 +2138,12 @@ def generator_info(self, table, column): return [] # end generator_info + @validate_user_token def column_process(self, table, column): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') for i in range(0, len(self._VIRTUAL_TABLES)): @@ -2184,6 +2289,11 @@ def run(self): sp2.start() self.gevs.append(sp2) + if self._vnc_api_client: + self.gevs.append(gevent.spawn(self._vnc_api_client.connect)) + self._local_app = LocalApp(bottle.app(), self._args.auth_conf_info) + self.gevs.append(gevent.spawn(self._local_app.start_http_server)) + try: gevent.joinall(self.gevs) except KeyboardInterrupt: diff --git a/src/opserver/opserver_local.py b/src/opserver/opserver_local.py new file mode 100644 index 00000000000..d5ba564e8dc --- /dev/null +++ b/src/opserver/opserver_local.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +import gevent +from gevent import monkey +monkey.patch_all() +import bottle +import base64 +from bottle import GeventServer +from gevent.pool import Pool + +def get_bottle_server(pool_size): + """ + Custom gevent pool server for bottle + """ + + class GeventPoolServer(GeventServer): + """ Gevent server with limited pool size + """ + def __init__(self, host='127.0.0.1', port=8181, **options): + super(GeventPoolServer, self ).__init__(host, port, + spawn=Pool(size=pool_size), **options) + + return GeventPoolServer +# end get_bottle_server + +# Open port for access to API server for trouble shooting +class LocalApp(object): + + def __init__(self, app, conf_info): + self._http_host = 'localhost' + self._http_port = conf_info['admin_port'] + self._http_app = bottle.Bottle() + self._http_app.merge(app.routes) + self._http_app.config.local_auth = True + self._http_app.error_handler = app.error_handler + self._conf_info = conf_info + + # 2 decorators below due to change in api between bottle 0.11.6 + # (which insists on global app) vs later (which need on specific + # app) + @self._http_app.hook('before_request') + @bottle.hook('before_request') + def local_auth_check(*args, **kwargs): + if bottle.request.app != self._http_app: + return + # expect header to have something like 'Basic YWJjOmRlZg==' + auth_hdr_val = bottle.request.environ.get('HTTP_AUTHORIZATION') + if not auth_hdr_val: + bottle.abort(401, 'HTTP_AUTHORIZATION header missing') + try: + auth_type, user_passwd = auth_hdr_val.split() + except Exception as e: + bottle.abort(401, 'Auth Exception: %s' %(str(e))) + enc_user_passwd = auth_hdr_val.split()[1] + user_passwd = base64.b64decode(enc_user_passwd) + user, passwd = user_passwd.split(':') + if (not self._conf_info.get('admin_user') == user or + not self._conf_info.get('admin_password') == passwd): + bottle.abort(401, 'Authentication check failed') + + # Add admin role to the request + bottle.request.environ['HTTP_X_ROLE'] = 'admin' + # end __init__ + + def start_http_server(self): + self._http_app.run( + host=self._http_host, port=self._http_port, + server=get_bottle_server(1024)) + # end start_http_server +# end class LocalApp diff --git a/src/opserver/opserver_util.py b/src/opserver/opserver_util.py index 8b6f1240fc3..27eb376551c 100644 --- a/src/opserver/opserver_util.py +++ b/src/opserver/opserver_util.py @@ -27,7 +27,7 @@ class SandeshType(object): SYSTEM = 1 TRACE = 4 - +from requests.auth import HTTPBasicAuth def enum(**enums): return type('Enum', (), enums) @@ -197,7 +197,7 @@ def parse_start_end_time(start_time, end_time, last): # end parse_start_end_time @staticmethod - def post_url_http(url, params, sync=False): + def post_url_http(url, params, user, password, sync=False): if sync: hdrs = OpServerUtils.POST_HEADERS_SYNC stm = False @@ -211,10 +211,12 @@ def post_url_http(url, params, sync=False): if int(pkg_resources.get_distribution("requests").version[0]) != 0: response = requests.post(url, stream=stm, data=params, + auth=HTTPBasicAuth(user, password), headers=hdrs) else: response = requests.post(url, prefetch=pre, data=params, + auth=HTTPBasicAuth(user, password), headers=hdrs) except requests.exceptions.ConnectionError, e: print "Connection to %s failed %s" % (url, str(e)) @@ -227,13 +229,15 @@ def post_url_http(url, params, sync=False): # end post_url_http @staticmethod - def get_url_http(url): + def get_url_http(url, user, password): data = {} try: if int(pkg_resources.get_distribution("requests").version[0]) != 0: - data = requests.get(url, stream=True) + data = requests.get(url, stream=True, + auth=HTTPBasicAuth(user, password)) else: - data = requests.get(url, prefetch=False) + data = requests.get(url, prefetch=False, + auth=HTTPBasicAuth(user, password)) except requests.exceptions.ConnectionError, e: print "Connection to %s failed %s" % (url, str(e)) @@ -268,13 +272,14 @@ def parse_query_result(result): # end parse_query_result @staticmethod - def get_query_result(opserver_ip, opserver_port, qid, time_out=None): + def get_query_result(opserver_ip, opserver_port, qid, user, password, + time_out=None): sleep_interval = 0.5 time_left = time_out while True: url = OpServerUtils.opserver_query_url( opserver_ip, opserver_port) + '/' + qid - resp = OpServerUtils.get_url_http(url) + resp = OpServerUtils.get_url_http(url, user, password) if resp.status_code != 200: yield {} return @@ -297,7 +302,7 @@ def get_query_result(opserver_ip, opserver_port, qid, time_out=None): for chunk in status['chunks']: url = OpServerUtils.opserver_url( opserver_ip, opserver_port) + chunk['href'] - resp = OpServerUtils.get_url_http(url) + resp = OpServerUtils.get_url_http(url, user, password) if resp.status_code != 200: yield {} else: diff --git a/src/opserver/overlay_to_underlay_mapper.py b/src/opserver/overlay_to_underlay_mapper.py index 1635935aee5..6ea982375b0 100644 --- a/src/opserver/overlay_to_underlay_mapper.py +++ b/src/opserver/overlay_to_underlay_mapper.py @@ -27,10 +27,12 @@ class OverlayToUnderlayMapperError(Exception): class OverlayToUnderlayMapper(object): def __init__(self, query_json, analytics_api_ip, - analytics_api_port, logger): + analytics_api_port, user, password, logger): self.query_json = query_json self._analytics_api_ip = analytics_api_ip self._analytics_api_port = analytics_api_port + self._user = user + self._password = password self._logger = logger if self.query_json is not None: self._start_time = self.query_json['start_time'] @@ -233,7 +235,8 @@ def _send_query(self, query): self._logger.debug('Sending query: %s' % (query)) opserver_url = OpServerUtils.opserver_query_url(self._analytics_api_ip, str(self._analytics_api_port)) - resp = OpServerUtils.post_url_http(opserver_url, query, True) + resp = OpServerUtils.post_url_http(opserver_url, query, self._user, + self._password, True) try: resp = json.loads(resp) value = resp['value'] diff --git a/src/opserver/requirements.txt b/src/opserver/requirements.txt index a613123ba3b..e7eaa97824d 100644 --- a/src/opserver/requirements.txt +++ b/src/opserver/requirements.txt @@ -1,9 +1,11 @@ +requests +pycassa lxml -gevent==1.1a2 -geventhttpclient==1.0a +gevent +geventhttpclient redis xmltodict prettytable psutil>=0.6.0 -bottle==0.12.8 +bottle sseclient diff --git a/src/opserver/setup.py b/src/opserver/setup.py index 27ec91b212d..54bc6ef1b18 100644 --- a/src/opserver/setup.py +++ b/src/opserver/setup.py @@ -40,6 +40,12 @@ def run(self): if not re.search('\nOK', ''.join(f.readlines())): os._exit(1) +def requirements(filename): + with open(filename) as f: + lines = f.read().splitlines() + c = re.compile(r'\s*#.*') + return filter(bool, map(lambda y: c.sub('', y).strip(), lines)) + setup( name='opserver', version='0.1dev', @@ -49,15 +55,7 @@ def run(self): zip_safe=False, include_package_data=True, long_description="VNC Analytics API Implementation", - install_requires=[ - 'lxml', - 'gevent', - 'geventhttpclient', - 'redis', - 'xmltodict', - 'prettytable', - 'psutil>=0.4.1' - ], + install_requires=requirements('requirements.txt'), entry_points = { # Please update sandesh/common/vns.sandesh on process name change 'console_scripts' : [ diff --git a/src/opserver/stats.py b/src/opserver/stats.py index c1c85f6d6d7..203078bdc50 100755 --- a/src/opserver/stats.py +++ b/src/opserver/stats.py @@ -57,7 +57,7 @@ def run(self): def parse_args(self): """ Eg. python stats.py --analytics-api-ip 127.0.0.1 - --analytics-api-port 8081 + --analytics-api-port 8181 --table AnalyticsCpuState.cpu_info --where name=a6s40 cpu_info.module_id=Collector --select "T=60 SUM(cpu_info.cpu_share)" @@ -68,7 +68,7 @@ def parse_args(self): """ defaults = { 'analytics_api_ip': '127.0.0.1', - 'analytics_api_port': '8081', + 'analytics_api_port': '8181', 'start_time': 'now-10m', 'end_time': 'now', 'select' : [], @@ -96,6 +96,11 @@ def parse_args(self): "--where", help="List of Where Terms to be ANDed", nargs='+') parser.add_argument( "--sort", help="List of Sort Terms", nargs='+') + parser.add_argument( + "--admin-user", help="Name of admin user", default="admin") + parser.add_argument( + "--admin-password", help="Password of admin user", + default="contrail123") self._args = parser.parse_args() if self._args.table is None and self._args.dtable is None: @@ -132,7 +137,8 @@ def query(self): print json.dumps(query_dict) resp = OpServerUtils.post_url_http( - query_url, json.dumps(query_dict), sync = True) + query_url, json.dumps(query_dict), self._args.admin_user, + self._args.admin_password, sync = True) res = None if resp is not None: diff --git a/src/opserver/test-requirements.txt b/src/opserver/test-requirements.txt index 8e62f6cf868..9376030e977 100644 --- a/src/opserver/test-requirements.txt +++ b/src/opserver/test-requirements.txt @@ -3,7 +3,6 @@ testrepository python-subunit coverage fixtures==1.3.1 -requests>=1.1.0 thrift mock==1.0.1 flexmock==0.9.7 diff --git a/src/opserver/test/test_flow.py b/src/opserver/test/test_flow.py index 30b1d3763d3..04a7da998a3 100755 --- a/src/opserver/test/test_flow.py +++ b/src/opserver/test/test_flow.py @@ -46,8 +46,8 @@ def custom_output(self, outputdict): def setUp(self): self._querier = FlowQuerier() - flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y: self.custom_post_url_http(x, y)) - flexmock(OpServerUtils).should_receive('get_query_result').once().replace_with(lambda x, y, z: self.custom_get_query_result(x, y, z)) + flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y)) + flexmock(OpServerUtils).should_receive('get_query_result').once().replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) flexmock(self._querier).should_receive('output').replace_with(lambda x: self.custom_output(x)) #@unittest.skip("skip test_1_noarg_query") diff --git a/src/opserver/test/test_log.py b/src/opserver/test/test_log.py index a45a1bbc76b..7ee248f2d90 100755 --- a/src/opserver/test/test_log.py +++ b/src/opserver/test/test_log.py @@ -55,8 +55,8 @@ def setUp(self): self.maxDiff = None self._querier = LogQuerier() - flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y: self.custom_post_url_http(x, y)) - flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z: self.custom_get_query_result(x, y, z)) + flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y)) + flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x)) diff --git a/src/opserver/test/test_overlay_to_underlay_mapper.py b/src/opserver/test/test_overlay_to_underlay_mapper.py index 1594c2f3864..1edc11e96de 100755 --- a/src/opserver/test/test_overlay_to_underlay_mapper.py +++ b/src/opserver/test/test_overlay_to_underlay_mapper.py @@ -235,7 +235,7 @@ def test_get_overlay_flow_data_noerror(self, mock_send_query, overlay_to_underlay_mapper = \ OverlayToUnderlayMapper( item['input']['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertEqual(item['output']['flowrecord_data'], overlay_to_underlay_mapper._get_overlay_flow_data()) args, _ = overlay_to_underlay_mapper._send_query.call_args @@ -296,7 +296,7 @@ def test_get_overlay_flow_data_raise_exception(self): for query in queries: overlay_to_underlay_mapper = \ - OverlayToUnderlayMapper(query, None, None, logging) + OverlayToUnderlayMapper(query, None, None, None, None, logging) self.assertRaises(_OverlayToFlowRecordFieldsNameError, overlay_to_underlay_mapper._get_overlay_flow_data) # end test_get_overlay_flow_data_raise_exception @@ -618,7 +618,7 @@ def test_get_underlay_flow_data_noerror(self, mock_send_query, overlay_to_underlay_mapper = \ OverlayToUnderlayMapper( item['input']['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertEqual(item['output']['uflow_data'], overlay_to_underlay_mapper._get_underlay_flow_data( item['input']['flow_record_data'])) @@ -683,7 +683,7 @@ def test_get_underlay_flow_data_raise_exception(self): for query in queries: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(query['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertRaises(_UnderlayToUFlowDataFieldsNameError, overlay_to_underlay_mapper._get_underlay_flow_data, query['flow_record_data']) @@ -696,6 +696,8 @@ def test_send_query_no_error(self, mock_post_url_http): 'input': { 'analytics_api_ip': '10.10.10.1', 'analytics_api_port': 8081, + 'username': 'admin', + 'password': 'admin123', 'query': { 'table': FLOW_TABLE, 'start_time': 'now-10m', 'end_time': 'now-5m', @@ -714,6 +716,8 @@ def test_send_query_no_error(self, mock_post_url_http): 'input': { 'analytics_api_ip': '192.168.10.1', 'analytics_api_port': 8090, + 'username': 'admin', + 'password': 'admin123', 'query': { 'table': 'StatTable.UFlowData.flow', 'start_time': 1416275005000000, @@ -751,11 +755,14 @@ def test_send_query_no_error(self, mock_post_url_http): for item in input_output_list: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(None, item['input']['analytics_api_ip'], - item['input']['analytics_api_port'], logging) + item['input']['analytics_api_port'], + item['input']['username'], item['input']['password'], + logging) self.assertEqual(overlay_to_underlay_mapper._send_query( item['input']['query']), item['output']['response']['value']) OpServerUtils.post_url_http.assert_called_with( - item['output']['query_url'], item['input']['query'], True) + item['output']['query_url'], item['input']['query'], + item['input']['username'], item['input']['password'], True) # end test_send_query_no_error @mock.patch('opserver.overlay_to_underlay_mapper.OpServerUtils.post_url_http') @@ -810,7 +817,7 @@ def test_send_query_raise_exception(self, mock_post_url_http): for item in queries: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(None, item['analytics_api_ip'], - item['analytics_api_port'], logging) + item['analytics_api_port'], None, None, logging) self.assertRaises(_QueryError, overlay_to_underlay_mapper._send_query, item['query']) # end test_send_query_raise_exception @@ -888,7 +895,7 @@ def test_send_response_no_error(self): overlay_to_underlay_mapper = \ OverlayToUnderlayMapper( item['input']['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertEqual(item['output']['underlay_response'], json.loads(overlay_to_underlay_mapper._send_response( item['input']['uflow_data']))) @@ -911,7 +918,7 @@ def test_send_response_raise_exception(self): for item in input_list: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(item['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertRaises(_UnderlayToUFlowDataFieldsNameError, overlay_to_underlay_mapper._send_response, item['uflow_data']) # end test_send_response_raise_exception @@ -956,7 +963,7 @@ def test_process_query(self, mock_get_overlay_flow_data, [json.dumps(item['response']) for item in test_data] for item in test_data: overlay_to_underlay_mapper = \ - OverlayToUnderlayMapper(None, None, None, logging) + OverlayToUnderlayMapper(None, None, None, None, None, logging) self.assertEqual(item['response'], json.loads(overlay_to_underlay_mapper.process_query())) overlay_to_underlay_mapper._get_overlay_flow_data.called_with() diff --git a/src/opserver/test/test_stats.py b/src/opserver/test/test_stats.py index c175f4b1661..d6915de2bf0 100755 --- a/src/opserver/test/test_stats.py +++ b/src/opserver/test/test_stats.py @@ -36,7 +36,7 @@ def custom_display(self, result): def setUp(self): self._querier = StatQuerier() - flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y, **kwargs: self.custom_post_url_http(x, y, kwargs)) + flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y, w, z, **kwargs: self.custom_post_url_http(x, y, kwargs)) flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x)) diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index 61a185909e6..a15db04cd7f 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -12,7 +12,6 @@ from mockkafka import mockkafka from mockzoo import mockzoo import redis -import urllib2 import copy import os import json @@ -296,7 +295,8 @@ def stop(self): class OpServer(object): def __init__(self, primary_collector, secondary_collector, redis_port, - analytics_fixture, logger, kafka=False, is_dup=False): + analytics_fixture, logger, admin_user, admin_password, + kafka=False, is_dup=False): self.primary_collector = primary_collector self.secondary_collector = secondary_collector self.analytics_fixture = analytics_fixture @@ -314,7 +314,10 @@ def __init__(self, primary_collector, secondary_collector, redis_port, self.hostname = self.hostname+'dup' self._generator_id = self.hostname+':'+NodeTypeNames[NodeType.ANALYTICS]+\ ':'+ModuleNames[Module.OPSERVER]+':0' - self.listen_port = AnalyticsFixture.get_free_port() + self.rest_api_port = AnalyticsFixture.get_free_port() + self.admin_port = AnalyticsFixture.get_free_port() + self.admin_user = admin_user + self.admin_password = admin_password # end __init__ def set_primary_collector(self, collector): @@ -331,7 +334,7 @@ def get_generator_id(self): def start(self): assert(self._instance == None) - self._log_file = '/tmp/opserver.messages.' + str(self.listen_port) + self._log_file = '/tmp/opserver.messages.' + str(self.admin_port) part = "0" if self._kafka: part = "4" @@ -346,7 +349,10 @@ def start(self): '--log_file', self._log_file, '--log_level', "SYS_INFO", '--partitions', part, - '--rest_api_port', str(self.listen_port)] + '--rest_api_port', str(self.rest_api_port), + '--admin_port', str(self.admin_port), + '--admin_user', self.admin_user, + '--admin_password', self.admin_password] if self.analytics_fixture.redis_uves[0].password: args.append('--redis_password') args.append(self.analytics_fixture.redis_uves[0].password) @@ -383,15 +389,16 @@ def verify_setup(self): def stop(self): if self._instance is not None: rcode = self.analytics_fixture.process_stop( - "contrail-analytics-api:%s" % str(self.listen_port), + "contrail-analytics-api:%s" % str(self.admin_port), self._instance, self._log_file, is_py=True) #assert(rcode == 0) self._instance = None # end stop def send_tracebuffer_request(self, src, mod, instance, tracebuf): - vops = VerificationOpsSrv('127.0.0.1', self.listen_port) - res = vops.send_tracebuffer_req(src, mod, instance, tracebuf) + vns = VerificationOpsSrv('127.0.0.1', self.admin_port, + self.admin_user, self.admin_password) + res = vns.send_tracebuffer_req(src, mod, instance, tracebuf) self._logger.info('send_tracebuffer_request: %s' % (str(res))) assert(res['status'] == 'pass') # end send_tracebuffer_request @@ -558,6 +565,8 @@ def stop(self): # end class Zookeeper class AnalyticsFixture(fixtures.Fixture): + ADMIN_USER = 'test' + ADMIN_PASSWORD = 'password' def __init__(self, logger, builddir, redis_port, cassandra_port, ipfix_port = False, sflow_port = False, syslog_port = False, @@ -584,6 +593,8 @@ def __init__(self, logger, builddir, redis_port, cassandra_port, self.cassandra_user = cassandra_user self.cassandra_password = cassandra_password self.zookeeper = None + self.admin_user = AnalyticsFixture.ADMIN_USER + self.admin_password = AnalyticsFixture.ADMIN_PASSWORD def setUp(self): super(AnalyticsFixture, self).setUp() @@ -634,10 +645,11 @@ def setUp(self): opkafka = True self.opserver = OpServer(primary_collector, secondary_collector, self.redis_uves[0].port, - self, self.logger, opkafka) + self, self.logger, self.admin_user, + self.admin_password, opkafka) if not self.opserver.start(): self.logger.error("OpServer did NOT start") - self.opserver_port = self.opserver.listen_port + self.opserver_port = self.get_opserver_port() if self.kafka is not None: self.alarmgen = AlarmGen(primary_collector, secondary_collector, @@ -663,7 +675,7 @@ def get_collectors(self): # end get_collectors def get_opserver_port(self): - return self.opserver.listen_port + return self.opserver.admin_port # end get_opserver_port def verify_on_setup(self): @@ -718,13 +730,8 @@ def verify_opserver_api(self): ''' data = {} url = 'http://127.0.0.1:' + str(self.opserver_port) + '/' - try: - data = urllib2.urlopen(url).read() - except urllib2.HTTPError, e: - self.logger.info("HTTP error: %d" % e.code) - except urllib2.URLError, e: - self.logger.info("Network error: %s" % e.reason.args[1]) - + data = OpServerUtils.get_url_http(url, self.admin_user, + self.admin_password) self.logger.info("Checking OpServer %s" % str(data)) if data == {}: return False @@ -809,7 +816,8 @@ def verify_uvetable_alarm(self, table, name, type, is_set = True, any_of = None) @retry(delay=2, tries=10) def verify_collector_obj_count(self): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('ObjectCollectorInfo', start_time='-10m', end_time='now', select_fields=["ObjectLog"], @@ -850,7 +858,8 @@ def verify_generator_list(self, collector, exp_genlist): @retry(delay=1, tries=10) def verify_generator_uve_list(self, exp_gen_list): self.logger.info('verify_generator_uve_list') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # get generator list gen_list = vns.uve_query('generators', {'cfilt':'ModuleClientState:client_info'}) @@ -869,7 +878,8 @@ def verify_generator_uve_list(self, exp_gen_list): @retry(delay=1, tries=6) def verify_message_table_messagetype(self): self.logger.info("verify_message_table_messagetype") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # query for CollectorInfo logs res = vns.post_query('MessageTable', start_time='-10m', end_time='now', @@ -892,7 +902,8 @@ def verify_message_table_messagetype(self): @retry(delay=1, tries=6) def verify_message_table_select_uint_type(self): self.logger.info("verify_message_table_select_uint_type") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # query for CollectorInfo logs res = vns.post_query('MessageTable', start_time='-10m', end_time='now', @@ -915,7 +926,8 @@ def verify_message_table_select_uint_type(self): @retry(delay=1, tries=6) def verify_message_table_moduleid(self): self.logger.info("verify_message_table_moduleid") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # query for contrail-query-engine logs res_qe = vns.post_query('MessageTable', start_time='-10m', end_time='now', @@ -935,7 +947,8 @@ def verify_message_table_moduleid(self): @retry(delay=1, tries=6) def verify_message_table_where_or(self): self.logger.info("verify_message_table_where_or") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) res = vns.post_query( @@ -957,7 +970,8 @@ def verify_message_table_where_or(self): @retry(delay=1, tries=6) def verify_message_table_where_and(self): self.logger.info("verify_message_table_where_and") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) res = vns.post_query( @@ -979,7 +993,8 @@ def verify_message_table_where_and(self): @retry(delay=1, tries=6) def verify_message_table_where_prefix(self): self.logger.info('verify_message_table_where_prefix') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) prefix_key_value_map = {'Source': socket.gethostname()[:-1], 'ModuleId': 'contrail-', 'Messagetype': 'Collector', 'Category': 'Discovery'} @@ -999,7 +1014,8 @@ def verify_message_table_where_prefix(self): @retry(delay=1, tries=6) def verify_message_table_filter(self): self.logger.info("verify_message_table_where_filter") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) res = vns.post_query('MessageTable', @@ -1031,7 +1047,8 @@ def verify_message_table_filter(self): @retry(delay=1, tries=6) def verify_message_table_filter2(self): self.logger.info("verify_message_table_filter2") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) a_query = Query(table="MessageTable", start_time='now-10m', end_time='now', @@ -1067,7 +1084,8 @@ def verify_message_table_filter2(self): @retry(delay=1, tries=1) def verify_message_table_sort(self): self.logger.info("verify_message_table_sort:Ascending Sort") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) @@ -1147,7 +1165,8 @@ def verify_message_table_sort(self): def verify_message_table_limit(self): self.logger.info("verify_message_table_limit") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('MessageTable', start_time='-10m', end_time='now', select_fields=['ModuleId', 'Messagetype'], @@ -1160,7 +1179,8 @@ def verify_message_table_limit(self): @retry(delay=1, tries=8) def verify_intervn_all(self, gen_obj): self.logger.info("verify_intervn_all") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('StatTable.UveVirtualNetworkAgent.vn_stats', start_time='-10m', end_time='now', @@ -1174,7 +1194,8 @@ def verify_intervn_all(self, gen_obj): @retry(delay=1, tries=8) def verify_intervn_sum(self, gen_obj): self.logger.info("verify_intervn_sum") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('StatTable.UveVirtualNetworkAgent.vn_stats', start_time='-10m', end_time='now', @@ -1188,7 +1209,8 @@ def verify_intervn_sum(self, gen_obj): @retry(delay=1, tries=10) def verify_flow_samples(self, generator_obj): self.logger.info("verify_flow_samples") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) vrouter = generator_obj._hostname res = vns.post_query('FlowSeriesTable', start_time=str(generator_obj.flow_start_time), @@ -1198,7 +1220,8 @@ def verify_flow_samples(self, generator_obj): if len(res) != generator_obj.num_flow_samples: return False - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) result = vns.post_query('FlowSeriesTable', start_time=str(generator_obj.egress_flow_start_time), end_time=str(generator_obj.egress_flow_end_time), @@ -1213,7 +1236,8 @@ def verify_flow_samples(self, generator_obj): def verify_where_query_prefix(self,generator_obj): self.logger.info('verify where query in FlowSeriesTable') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) vrouter = generator_obj._hostname a_query = Query(table="FlowSeriesTable", start_time=(generator_obj.flow_start_time), @@ -1239,7 +1263,8 @@ def verify_flow_table(self, generator_obj): vrouter = generator_obj._hostname # query flow records self.logger.info('verify_flow_table') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('FlowRecordTable', start_time=str(generator_obj.flow_start_time), end_time=str(generator_obj.flow_end_time), @@ -1511,7 +1536,8 @@ def verify_flow_series_aggregation_binning(self, generator_object): generator_obj = generator_object[0] vrouter = generator_obj._hostname self.logger.info('verify_flow_series_aggregation_binning') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # Helper function for stats aggregation def _aggregate_stats(flow, start_time, end_time): @@ -2021,7 +2047,8 @@ def _aggregate_flows_stats(flows, start_time, end_time): def verify_fieldname_messagetype(self): self.logger.info('Verify stats table for stats name field'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table="StatTable.FieldNames.fields", start_time="now-10m", end_time="now", @@ -2062,7 +2089,8 @@ def verify_collector_redis_uve_connection(self, collector, connected=True): @retry(delay=2, tries=5) def verify_opserver_redis_uve_connection(self, opserver, connected=True): self.logger.info('verify_opserver_redis_uve_connection') - vops = VerificationOpsSrv('127.0.0.1', opserver.http_port) + vops = VerificationOpsSrv('127.0.0.1', opserver.http_port, + self.admin_user, self.admin_password) try: redis_uve = vops.get_redis_uve_info()['RedisUveInfo'] if redis_uve['status'] == 'Connected': @@ -2075,7 +2103,8 @@ def verify_opserver_redis_uve_connection(self, opserver, connected=True): @retry(delay=2, tries=5) def verify_tracebuffer_in_analytics_db(self, src, mod, tracebuf): self.logger.info('verify trace buffer data in analytics db') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause = [] where_clause.append('Source = ' + src) where_clause.append('ModuleId = ' + mod) @@ -2093,7 +2122,8 @@ def verify_tracebuffer_in_analytics_db(self, src, mod, tracebuf): @retry(delay=1, tries=5) def verify_table_source_module_list(self, exp_src_list, exp_mod_list): self.logger.info('verify source/module list') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) try: src_list = vns.get_table_column_values(COLLECTOR_GLOBAL_TABLE, SOURCE) @@ -2116,7 +2146,8 @@ def verify_table_source_module_list(self, exp_src_list, exp_mod_list): @retry(delay=1, tries=5) def verify_where_query(self): self.logger.info('Verify where query with int type works'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table="StatTable.QueryPerfInfo.query_stats", start_time="now-1h", end_time="now", @@ -2130,7 +2161,8 @@ def verify_where_query(self): def verify_collector_object_log(self, start_time, end_time): self.logger.info('verify_collector_object_log') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table='ObjectCollectorInfo', start_time=start_time, end_time=end_time, select_fields=['ObjectLog']) @@ -2152,7 +2184,8 @@ def verify_collector_object_log_before_purge(self, start_time, end_time): def verify_database_purge_query(self, json_qstr): self.logger.info('verify database purge query'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_purge_query_json(json_qstr) try: assert(res['status'] == 'started') @@ -2177,7 +2210,8 @@ def verify_collector_object_log_after_purge(self, start_time, end_time): def verify_database_purge_status(self, purge_id): self.logger.info('verify database purge status: purge_id [%s]' % (purge_id)) - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table='StatTable.DatabasePurgeInfo.stats', start_time='now-1m', end_time='now', select_fields=['stats.purge_id', 'stats.purge_status', @@ -2208,7 +2242,8 @@ def verify_database_purge_with_percentage_input(self): def verify_database_purge_support_utc_time_format(self): self.logger.info('verify database purge support utc time format') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) json_qstr = json.dumps({'purge_input': 'now'}) end_time = OpServerUtils.convert_to_utc_timestamp_usec('now') start_time = end_time - 20*60*pow(10,6) @@ -2221,7 +2256,8 @@ def verify_database_purge_support_utc_time_format(self): def verify_database_purge_support_datetime_format(self): self.logger.info('verify database purge support datetime format') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) dt = datetime.datetime.now().strftime("%Y %b %d %H:%M:%S.%f") json_qstr = json.dumps({'purge_input': dt}) end_time = OpServerUtils.convert_to_utc_timestamp_usec(dt) @@ -2235,7 +2271,8 @@ def verify_database_purge_support_datetime_format(self): def verify_database_purge_support_deltatime_format(self): self.logger.info('verify database purge support deltatime format') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) json_qstr = json.dumps({'purge_input': '-1s'}) end_time = OpServerUtils.convert_to_utc_timestamp_usec('-1s') start_time = end_time - 10*60*pow(10,6) @@ -2248,7 +2285,8 @@ def verify_database_purge_support_deltatime_format(self): def verify_database_purge_request_limit(self): self.logger.info('verify database purge request limit') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) json_qstr = json.dumps({'purge_input': 50}) res = vns.post_purge_query_json(json_qstr) self.logger.info(str(res)) @@ -2267,7 +2305,8 @@ def verify_database_purge_request_limit(self): def verify_object_table_sandesh_types(self, table, object_id, exp_msg_types): self.logger.info('verify_object_table_sandesh_types') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query(table, start_time='-1m', end_time='now', select_fields=['Messagetype', 'ObjectLog', 'SystemLog'], where_clause='ObjectId=%s' % object_id) @@ -2285,7 +2324,8 @@ def verify_object_table_sandesh_types(self, table, object_id, @retry(delay=1, tries=10) def verify_object_value_table_query(self, table, exp_object_values): self.logger.info('verify_object_value_table_query') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query(table, start_time='-10m', end_time='now', select_fields=['ObjectId'], where_clause='') @@ -2303,7 +2343,8 @@ def verify_object_value_table_query(self, table, exp_object_values): @retry(delay=1, tries=5) def verify_keyword_query(self, line, keywords=[]): self.logger.info('Verify where query with keywords'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table="MessageTable", start_time="now-1h", @@ -2340,7 +2381,8 @@ def verify_fieldname_table(self): to ensure that the 2 entries are present in the table ''' self.logger.info("verify_fieldname_table") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) self.logger.info("VerificationOpsSrv") res = vns.post_query('StatTable.FieldNames.fields', start_time='-1m', @@ -2375,7 +2417,8 @@ def _get_filters_json(self, filters): @retry(delay=1, tries=4) def verify_uve_list(self, table, filts=None, exp_uve_list=[]): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filters = self._get_filters_url_param(filts) table_query = table+'s' self.logger.info('verify_uve_list: %s:%s' % @@ -2432,7 +2475,8 @@ def _verify_uves(self, exp_uves, actual_uves): @retry(delay=1, tries=4) def verify_get_alarms(self, table, filts=None, exp_uves=None): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filters = self._get_filters_url_param(filts) self.logger.info('verify_get_alarms: %s' % str(filters)) try: @@ -2446,7 +2490,8 @@ def verify_get_alarms(self, table, filts=None, exp_uves=None): @retry(delay=1, tries=4) def verify_multi_uve_get(self, table, filts=None, exp_uves=None): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filters = self._get_filters_url_param(filts) table_query = table+'/*' if not filters: @@ -2464,7 +2509,8 @@ def verify_multi_uve_get(self, table, filts=None, exp_uves=None): @retry(delay=1, tries=4) def verify_uve_post(self, table, filts=None, exp_uves=None): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filter_json = self._get_filters_json(filts) self.logger.info('verify_uve_post: %s: %s' % (table, filter_json)) try: @@ -2478,7 +2524,8 @@ def verify_uve_post(self, table, filts=None, exp_uves=None): @retry(delay=1, tries=5) def verify_alarm_list_include(self, table, filts=None, expected_alarms=[]): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) yfilts = filts or {} yfilts['cfilt'] = ["UVEAlarms"] filters = self._get_filters_url_param(yfilts) @@ -2502,7 +2549,8 @@ def verify_alarm_list_include(self, table, filts=None, expected_alarms=[]): @retry(delay=1, tries=5) def verify_alarm_list_exclude(self, table, filts=None, unexpected_alms=[]): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) yfilts = filts or {} yfilts['cfilt'] = ["UVEAlarms"] filters = self._get_filters_url_param(yfilts) @@ -2544,7 +2592,8 @@ def _verify_alarms(self, exp_alarms, actual_alarms): @retry(delay=1, tries=3) def verify_alarm(self, table, key, expected_alarm): self.logger.info('verify_alarm: %s:%s' % (table, key)) - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) table_query = table+'/'+key filters = {'cfilt':'UVEAlarms'} try: @@ -2589,7 +2638,7 @@ def cleanUp(self): redis_uve.stop() if self.kafka is not None: self.kafka.stop() - + self.zookeeper.stop() super(AnalyticsFixture, self).cleanUp() self.logger.info('cleanUp complete') diff --git a/src/opserver/test/utils/opserver_introspect_utils.py b/src/opserver/test/utils/opserver_introspect_utils.py index 867e4ea2638..60c949f5768 100644 --- a/src/opserver/test/utils/opserver_introspect_utils.py +++ b/src/opserver/test/utils/opserver_introspect_utils.py @@ -15,20 +15,23 @@ from opserver_results import * from opserver.opserver_util import OpServerUtils - class VerificationOpsSrv (IntrospectUtilBase): - - def __init__(self, ip, port=8081): + def __init__(self, ip, port=8181, user='test', + password='password'): super(VerificationOpsSrv, self).__init__(ip, port) + self._user = user + self._password = password def get_ops_vm(self, vm='default-virtual-machine'): - vm_dict = self.dict_get('analytics/uves/virtual-machine/' + vm) + vm_dict = self.dict_get('analytics/uves/virtual-machine/' + vm, + user=self._user, password=self._password) return OpVMResult(vm_dict) def get_ops_vn(self, vn='default-virtual-network'): res = None try: - vn_dict = self.dict_get('analytics/uves/virtual-network/' + vn) + vn_dict = self.dict_get('analytics/uves/virtual-network/' + vn, + user=self._user, password=self._password) res = OpVNResult(vn_dict) except Exception as e: print e @@ -40,7 +43,8 @@ def get_ops_collector(self, col=None): col = socket.gethostname() res = None try: - col_dict = self.dict_get('analytics/uves/analytics-node/' + col) + col_dict = self.dict_get('analytics/uves/analytics-node/' + col, + user=self._user, password=self._password) res = OpCollectorResult(col_dict) except Exception as e: print e @@ -49,22 +53,27 @@ def get_ops_collector(self, col=None): def send_tracebuffer_req(self, src, mod, instance, buf_name): return self.dict_get('analytics/send-tracebuffer/%s/%s/%s/%s' \ - % (src, mod, instance, buf_name)) + % (src, mod, instance, buf_name), user=self._user, + password=self._password) def get_table_column_values(self, table, col_name): return self.dict_get('analytics/table/%s/column-values/%s' \ - % (table, col_name)) + % (table, col_name), user=self._user, + password=self._password) def uve_query(self, table, query): - return self.dict_get('analytics/uves/%s' % (table), query) + return self.dict_get('analytics/uves/%s' % (table), query, + user=self._user, password=self._password) def get_alarms(self, filters): - return self.dict_get('analytics/alarms', filters) + return self.dict_get('analytics/alarms', filters, user=self._user, + password=self._password) def post_uve_request(self, table, json_body): url = 'http://%s:%s/analytics/uves/%s' % (self._ip, str(self._port), table) try: - res = OpServerUtils.post_url_http(url, json_body, sync=True) + res = OpServerUtils.post_url_http(url, json_body, user=self._user, + password=self._password, sync=True) res = json.loads(res) except Exception as e: print 'Error: POST uve request: %s' % str(e) @@ -85,11 +94,13 @@ def post_query_json(self, json_str, sync=True): ''' res = None try: - flows_url = OpServerUtils.opserver_query_url(self._ip, str(self._port)) + flows_url = OpServerUtils.opserver_query_url(self._ip, + str(self._port)) print flows_url print "query is: ", json_str res = [] - resp = OpServerUtils.post_url_http(flows_url, json_str, sync) + resp = OpServerUtils.post_url_http(flows_url, json_str, + self._user, self._password, sync) if sync: if resp is not None: res = json.loads(resp) @@ -98,7 +109,8 @@ def post_query_json(self, json_str, sync=True): if resp is not None: resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] - result = OpServerUtils.get_query_result(self._ip, str(self._port), qid, 30) + result = OpServerUtils.get_query_result(self._ip, + str(self._port), qid, self._user, self._password, 30) for item in result: res.append(item) except Exception as e: @@ -118,7 +130,8 @@ def post_purge_query_json(self, json_str, sync=True): print purge_request_url print "query is: ", json_str resp = OpServerUtils.post_url_http( - purge_request_url, json_str, sync) + purge_request_url, json_str, self._user, self._password, + sync) if resp is not None: res = json.loads(resp) except Exception as e: @@ -144,7 +157,8 @@ def post_query(self, table, start_time=None, end_time=None, print json.dumps(query_dict) res = [] resp = OpServerUtils.post_url_http( - flows_url, json.dumps(query_dict), sync) + flows_url, json.dumps(query_dict), self._user, self._password, + sync) if sync: if resp is not None: res = json.loads(resp) @@ -154,7 +168,8 @@ def post_query(self, table, start_time=None, end_time=None, resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] result = OpServerUtils.get_query_result( - self._ip, str(self._port), qid, 30) + self._ip, str(self._port), qid, self._user, + self._password, 30) for item in result: res.append(item) except Exception as e: diff --git a/src/opserver/vnc_cfg_api_client.py b/src/opserver/vnc_cfg_api_client.py new file mode 100644 index 00000000000..f08ab3f8ea8 --- /dev/null +++ b/src/opserver/vnc_cfg_api_client.py @@ -0,0 +1,81 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +import time +import requests +from pysandesh.connection_info import ConnectionState +from pysandesh.gen_py.process_info.ttypes import ConnectionType,\ + ConnectionStatus +from vnc_api import vnc_api + +class VncCfgApiClient(object): + + def __init__(self, conf_info, sandesh_instance, logger): + self._conf_info = conf_info + self._sandesh_instance = sandesh_instance + self._logger = logger + self._vnc_api_client = None + # end __init__ + + def _update_connection_state(self, status, message = ''): + server_addrs = ['%s:%d' % (self._conf_info['api_server_ip'], \ + self._conf_info['api_server_port'])] + ConnectionState.update(conn_type=ConnectionType.APISERVER, name='', + status=status, message=message, server_addrs=server_addrs) + # end _update_connection_state + + def _get_user_token_info(self, user_token): + if self._vnc_api_client: + return self._vnc_api_client.obj_perms(user_token) + else: + self._logger.error('VNC Config API Client NOT FOUND') + return None + # end _get_user_token_info + + def connect(self): + # Retry till API server is up + connected = False + self._update_connection_state(ConnectionStatus.INIT) + while not connected: + try: + self._vnc_api_client = vnc_api.VncApi( + self._conf_info['admin_user'], + self._conf_info['admin_password'], + self._conf_info['admin_tenant_name'], + self._conf_info['api_server_ip'], + self._conf_info['api_server_port'], + api_server_use_ssl=self._conf_info['api_server_use_ssl'], + auth_host=self._conf_info['auth_host'], + auth_port=self._conf_info['auth_port'], + auth_protocol=self._conf_info['auth_protocol']) + connected = True + self._update_connection_state(ConnectionStatus.UP) + except Exception as e: + # Update connection info + self._update_connection_state(ConnectionStatus.DOWN, str(e)) + time.sleep(3) + # end connect + + def is_role_cloud_admin(self, user_token): + result = self._get_user_token_info(user_token) + if not result or not result['token_info']: + self._logger.error( + 'Token info for %s NOT FOUND' % str(user_token)) + return False + # Handle v2 and v3 responses + token_info = result['token_info'] + if 'access' in token_info: + roles_list = [roles['name'] for roles in \ + token_info['access']['user']['roles']] + elif 'token' in token_info: + roles_list = [roles['name'] for roles in \ + token_info['token']['roles']] + else: + self._logger.error('Role info for %s NOT FOUND: %s' % \ + (str(user_token), str(token_info))) + return False + return self._conf_info['cloud_admin_role'] in roles_list + # end is_role_cloud_admin + +# end class VncCfgApiServer diff --git a/src/sandesh/common/vns.sandesh b/src/sandesh/common/vns.sandesh index 4bad4150d5b..d62cc4a7241 100644 --- a/src/sandesh/common/vns.sandesh +++ b/src/sandesh/common/vns.sandesh @@ -102,6 +102,7 @@ const u16 MetadataProxyVrouterAgentPort = 8097 // TCP const u16 VrouterAgentMirrorClientUdpPort = 8097 // UDP const u16 VrouterAgentDnsClientUdpPort = 8098 // UDP const u16 ContrailDnsClientUdpPort = 8092 // UDP +const u16 OpServerAdminPort = 8181 const map ServiceHttpPortMap = { SERVICE_VROUTER_AGENT : HttpPortAgent, @@ -504,3 +505,13 @@ const list ThreadPoolNames = [ NATIVETRANSPORTREQUESTS, COMPACTIONEXECUTOR, ] + +const string CLOUD_ADMIN_ROLE = "admin" + +const string AAA_MODE_NO_AUTH = "no-auth" +const string AAA_MODE_CLOUD_ADMIN = "cloud-admin" + +const list APIAAAModes = [ + AAA_MODE_NO_AUTH, + AAA_MODE_CLOUD_ADMIN, +]