From 761ffd96941cd9ec2f670675fbe553080c4790ec Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Wed, 8 Jun 2016 18:21:34 -0700 Subject: [PATCH] 1. Add option for cloud admin access only for analytics REST API Allow cloud admin role access only for analytics REST API controlled via --cloud_admin_access_only currently defaulted to False but will default to True once provisioning changes are done. contrail-analytics-api will validate role from the X-Auth-Token header via vnc_api/contrail-api. For debug/administration a localhost bound port 8181 - --admin_port is provided that requires basic HTTP access authentication. Clients of analytics REST API - contrail-flows, contrail-logs, contrail-stats, contrail-topology are changed to use admin port. contrail-svc-monitor is changed to use auth token. Conflicts: src/opserver/SConscript Partial-Bug: #1461175 (cherry picked from commit 5492f71383123fea8240ca265e125aee28d9349f) 2. Rename cloud_admin_access_only to multi_tenancy in contrail-analytics-api Closes-Bug: #1461175 (cherry picked from commit 36df0991a47068bcb6af8cd219e416e2ca60d4cd) 3. for bool option, a conversion from string to bool is required. Closes-Bug: #1595044 (cherry picked from commit 1d6b81bccf5a7aee39fbb60bd25152e1b8726206) 4. Change cloud admin role name to "cloud-admin" from "admin" for analytics API access Closes-Bug: #1600699 (cherry picked from commit 8c131016252a22c52cdfab8042571598818f82c3) 5. Rename multi_tenancy to aaa_mode for analytics API Handle keystone v2 and v3 token infos returned by VNC API. Enable cloud-admin-only aaa_mode by default Change analytics DB and underlay to overlay mapper to use local admin port when quering opserver Do not cache auth_token in vnc lib Closes-Bug: #1599654 (cherry picked from commit a2a7c9248b3d9830d491ab6baf7d21bd9aa64ff6) 6. Changes to bring analytics authenticated access in sync with config 1. Rename aaa_mode value cloud-admin-only to cloud-admin 2. CLOUD_ADMIN_ROLE defaults to admin instead of cloud-admin Partial-Bug: #1607563 (cherry picked from commit 42db6e38e55bc2410297a99c2af3bea03faa938c) 7. Fix missing import of OpServerUtils in analytics_db.py Closes-Bug: #1609054 (cherry picked from commit cf5f0567c9bb03e83cd83515b775d2018e668d0c) 8. Remove aaa_mode value cloud-admin-only Closes-Bug: #1609987 9. Keep on trying to create VNC API client from analytics API The gevent that creates the VNC API client was exiting due to authentication failure exception. Changed code to handle all exceptions and keep on trying to create the API client. The node status will show the API connection down in case we are not able to create the VNC API client. Closes-Bug: #1611158 (cherry picked from commit 8072aa5ffd37e4082d7ae9697020a6160e8d2682) 10. Keystone middleware doesn't like if token is unicode. It must be converted to string before validation. Fixes-Bug: #1604773 (cherry picked from commit 18df64367eb5468bbca403aef4f2d22d02be4636) 11. Change the obj-perms API to pass in the user token in HTTP headers With PKI tokens, when user token was passed in query parameters for obj-perms API the token was getting truncated. Changed the API to accept user token in X-USER-TOKEN HTTP header. Closes-Bug: #1614376 12. 1. Called once check moved from _list_collection to list_bulk_collection_http_post, due to refractoring bug. 2. Removed the local API server teardown for class TestPermissions 3. Project's within class TestPermissions appended with self.id(), to create unique Project for each testcase. Closes-Bug: 1555323 (cherry picked from commit a8ac59a0c8f08d8fc8e6f33abc52c292753dd1a3) Change-Id: Ia6bb36b37a86b33d87f304e9c784fa6fd780222b --- .../contrail_topology/analytic_client.py | 4 +- .../contrail_topology/config.py | 11 +- src/api-lib/vnc_api.py | 24 +-- .../api-server/tests/test_crud_basic.py | 12 +- src/config/api-server/tests/test_perms2.py | 4 +- src/config/api-server/vnc_cfg_api_server.py | 4 +- src/config/common/analytics_client.py | 9 +- .../common/tests/tools/install_venv_common.py | 4 +- .../scheduler/vrouter_scheduler.py | 3 +- src/opserver/SConscript | 5 +- src/opserver/analytics_db.py | 16 +- src/opserver/flow.py | 15 +- src/opserver/introspect_util.py | 45 +++-- src/opserver/log.py | 13 +- src/opserver/opserver.py | 154 +++++++++++++--- src/opserver/opserver_local.py | 72 ++++++++ src/opserver/opserver_util.py | 21 ++- src/opserver/overlay_to_underlay_mapper.py | 7 +- src/opserver/requirements.txt | 8 +- src/opserver/setup.py | 16 +- src/opserver/stats.py | 12 +- src/opserver/test-requirements.txt | 1 - src/opserver/test/test_flow.py | 4 +- src/opserver/test/test_log.py | 4 +- .../test/test_overlay_to_underlay_mapper.py | 27 ++- src/opserver/test/test_stats.py | 2 +- src/opserver/test/utils/analytics_fixture.py | 171 +++++++++++------- .../test/utils/opserver_introspect_utils.py | 49 +++-- src/opserver/vnc_cfg_api_client.py | 81 +++++++++ src/sandesh/common/vns.sandesh | 11 ++ 30 files changed, 605 insertions(+), 204 deletions(-) create mode 100644 src/opserver/opserver_local.py create mode 100644 src/opserver/vnc_cfg_api_client.py diff --git a/src/analytics/contrail-topology/contrail_topology/analytic_client.py b/src/analytics/contrail-topology/contrail_topology/analytic_client.py index 56dc4e21fc3..1555634509f 100644 --- a/src/analytics/contrail-topology/contrail_topology/analytic_client.py +++ b/src/analytics/contrail-topology/contrail_topology/analytic_client.py @@ -3,6 +3,7 @@ # import requests, json from requests.exceptions import ConnectionError +from requests.auth import HTTPBasicAuth class AnalyticApiClient(object): def __init__(self, cfg): @@ -23,7 +24,8 @@ def init_client(self): def _get_url_json(self, url): if url is None: return {} - page = self.client.get(url) + page = self.client.get(url, auth=HTTPBasicAuth( + self.config.admin_user(), self.config.admin_password())) if page.status_code == 200: return json.loads(page.text) raise ConnectionError, "bad request " + url diff --git a/src/analytics/contrail-topology/contrail_topology/config.py b/src/analytics/contrail-topology/contrail_topology/config.py index e54d2e7b902..49f455aef22 100644 --- a/src/analytics/contrail-topology/contrail_topology/config.py +++ b/src/analytics/contrail-topology/contrail_topology/config.py @@ -4,7 +4,8 @@ import argparse, os, ConfigParser, sys, re from pysandesh.sandesh_base import * from pysandesh.gen_py.sandesh.ttypes import SandeshLevel -from sandesh_common.vns.constants import ModuleNames, HttpPortTopology, API_SERVER_DISCOVERY_SERVICE_NAME +from sandesh_common.vns.constants import ModuleNames, HttpPortTopology, \ + API_SERVER_DISCOVERY_SERVICE_NAME, OpServerAdminPort from sandesh_common.vns.ttypes import Module import discoveryclient.client as discovery_client import traceback @@ -69,7 +70,7 @@ def parse(self): defaults = { 'collectors' : None, - 'analytics_api' : ['127.0.0.1:8081'], + 'analytics_api' : ['127.0.0.1:' + str(OpServerAdminPort)], 'log_local' : False, 'log_level' : SandeshLevel.SYS_DEBUG, 'log_category' : '', @@ -224,6 +225,12 @@ def frequency(self): def http_port(self): return self._args.http_server_port + def admin_user(self): + return self._args.admin_user + + def admin_password(self): + return self._args.admin_password + def sandesh_send_rate_limit(self): return self._args.sandesh_send_rate_limit diff --git a/src/api-lib/vnc_api.py b/src/api-lib/vnc_api.py index b29bc61e825..7b1d4dd9b61 100644 --- a/src/api-lib/vnc_api.py +++ b/src/api-lib/vnc_api.py @@ -1044,8 +1044,6 @@ def virtual_network_subnet_ip_count(self, vnobj, subnet_list): #end virtual_network_subnet_ip_count def get_auth_token(self): - if self._auth_token: - return self._auth_token self._headers = self._authenticate(headers=self._headers) return self._auth_token @@ -1149,19 +1147,21 @@ def set_user_roles(self, roles): self._headers['X-API-ROLE'] = (',').join(roles) #end set_user_roles + """ + validate user token. Optionally, check token authorization for an object. + rv {'token_info': , 'permissions': 'RWX'} + """ def obj_perms(self, token, obj_uuid=None): - """ - validate user token. Optionally, check token authorization for an object. - rv {'token_info': , 'permissions': 'RWX'} - """ - query = 'token=%s' % token - if obj_uuid: - query += '&uuid=%s' % obj_uuid + self._headers['X-USER-TOKEN'] = token + query = 'uuid=%s' % obj_uuid if obj_uuid else '' try: - rv = self._request_server(rest.OP_GET, "/obj-perms", data=query) - return json.loads(rv) + rv_json = self._request_server(rest.OP_GET, "/obj-perms", data=query) + rv = json.loads(rv_json) except PermissionDenied: - return None + rv = None + finally: + del self._headers['X-USER-TOKEN'] + return rv # change object ownsership def chown(self, obj_uuid, owner): diff --git a/src/config/api-server/tests/test_crud_basic.py b/src/config/api-server/tests/test_crud_basic.py index 9360411c715..5a92da4e6be 100644 --- a/src/config/api-server/tests/test_crud_basic.py +++ b/src/config/api-server/tests/test_crud_basic.py @@ -1176,14 +1176,14 @@ def test_list_bulk_collection(self): vmi_uuids = [o.uuid for o in vmi_objs] logger.info("Querying VNs by obj_uuids.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('virtual-network', obj_uuids=vn_uuids) ret_uuids = [ret['uuid'] for ret in ret_list['virtual-networks']] self.assertThat(set(vn_uuids), Equals(set(ret_uuids))) logger.info("Querying RIs by parent_id.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('routing-instance', parent_id=vn_uuids) ret_uuids = [ret['uuid'] @@ -1192,7 +1192,7 @@ def test_list_bulk_collection(self): Equals(set(ret_uuids) & set(ri_uuids))) logger.info("Querying VMIs by back_ref_id.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('virtual-machine-interface', back_ref_id=vn_uuids) ret_uuids = [ret['uuid'] @@ -1200,7 +1200,7 @@ def test_list_bulk_collection(self): self.assertThat(set(vmi_uuids), Equals(set(ret_uuids))) logger.info("Querying VMIs by back_ref_id and extra fields.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('virtual-machine-interface', back_ref_id=vn_uuids, fields=['virtual_network_refs']) @@ -1212,14 +1212,14 @@ def test_list_bulk_collection(self): set(vn_uuids)) logger.info("Querying RIs by parent_id and filter.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_list = self._vnc_lib.resource_list('routing-instance', parent_id=vn_uuids, filters={'display_name':'%s-ri-5' %(self.id())}) self.assertThat(len(ret_list['routing-instances']), Equals(1)) logger.info("Querying VNs by obj_uuids for children+backref fields.") - flexmock(self._api_server).should_call('_list_collection').once() + flexmock(self._api_server).should_call('list_bulk_collection_http_post').once() ret_objs = self._vnc_lib.resource_list('virtual-network', detail=True, obj_uuids=vn_uuids, fields=['routing_instances', 'virtual_machine_interface_back_refs']) diff --git a/src/config/api-server/tests/test_perms2.py b/src/config/api-server/tests/test_perms2.py index 47617860371..2a94f9e0f8b 100644 --- a/src/config/api-server/tests/test_perms2.py +++ b/src/config/api-server/tests/test_perms2.py @@ -112,9 +112,7 @@ def api_acl_name(self): return rg_name def check_perms(self, obj_uuid): - query = 'token=%s&uuid=%s' % (self.vnc_lib.get_auth_token(), obj_uuid) - rv = self.vnc_lib._request_server(rest.OP_GET, "/obj-perms", data=query) - rv = json.loads(rv) + rv = self.vnc_lib.obj_perms(self.vnc_lib.get_auth_token(), obj_uuid) return rv['permissions'] # display resource id-perms diff --git a/src/config/api-server/vnc_cfg_api_server.py b/src/config/api-server/vnc_cfg_api_server.py index 1b4f008f619..e49b9cfa9d7 100644 --- a/src/config/api-server/vnc_cfg_api_server.py +++ b/src/config/api-server/vnc_cfg_api_server.py @@ -1756,10 +1756,10 @@ def documentation_http_get(self, filename): # end documentation_http_get def obj_perms_http_get(self): - if 'token' not in get_request().query: + if 'HTTP_X_USER_TOKEN' not in get_request().environ: raise cfgm_common.exceptions.HttpError( 400, 'User token needed for validation') - user_token = get_request().query.token.encode("ascii") + user_token = get_request().environ['HTTP_X_USER_TOKEN'].encode("ascii") # get permissions in internal context try: diff --git a/src/config/common/analytics_client.py b/src/config/common/analytics_client.py index c5d14a00916..4209330c561 100644 --- a/src/config/common/analytics_client.py +++ b/src/config/common/analytics_client.py @@ -34,12 +34,13 @@ def __init__(self, endpoint, data={}): self.endpoint = endpoint self.data = data - def request(self, path, fqdn_uuid, data=None): + def request(self, path, fqdn_uuid, user_token=None, + data=None): req_data = dict(self.data) if data: req_data.update(data) - req_params = self._get_req_params(data=req_data) + req_params = self._get_req_params(user_token, data=req_data) url = urlparse.urljoin(self.endpoint, path + fqdn_uuid) resp = requests.get(url, **req_params) @@ -51,7 +52,7 @@ def request(self, path, fqdn_uuid, data=None): return resp.json() - def _get_req_params(self, data=None): + def _get_req_params(self, user_token, data=None): req_params = { 'headers': { 'Accept': 'application/json' @@ -59,5 +60,7 @@ def _get_req_params(self, data=None): 'data': data, 'allow_redirects': False, } + if user_token: + req_params['headers']['X-AUTH-TOKEN'] = user_token return req_params diff --git a/src/config/common/tests/tools/install_venv_common.py b/src/config/common/tests/tools/install_venv_common.py index 6c77c839664..b43c8176a49 100644 --- a/src/config/common/tests/tools/install_venv_common.py +++ b/src/config/common/tests/tools/install_venv_common.py @@ -120,7 +120,9 @@ def pip_install(self, find_links, *args): find_links_str = ' '.join('--find-links file://'+x for x in find_links) cmd_array = ['%stools/with_venv.sh' %(os.environ.get('tools_path', '')), 'python', '.venv/bin/pip', 'install', - '--upgrade', '--no-cache-dir'] + '--upgrade'] + if not args[0].startswith('pip'): + cmd_array.extend(['--no-cache-dir']) for link in find_links: cmd_array.extend(['--find-links', 'file://'+link]) self.run_command(cmd_array + list(args), diff --git a/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py b/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py index e8ff6de6979..fd3c450e126 100644 --- a/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py +++ b/src/config/svc-monitor/svc_monitor/scheduler/vrouter_scheduler.py @@ -106,7 +106,8 @@ def query_uve(self, filter_string): path = "/analytics/uves/vrouter/" response_dict = {} try: - response = self._analytics.request(path, filter_string) + response = self._analytics.request(path, filter_string, + user_token=self._vnc_lib.get_auth_token()) for values in response['value']: response_dict[values['name']] = values['value'] except analytics_client.OpenContrailAPIFailed: diff --git a/src/opserver/SConscript b/src/opserver/SConscript index 8def0775596..b0d0877cb2a 100644 --- a/src/opserver/SConscript +++ b/src/opserver/SConscript @@ -13,6 +13,7 @@ OpEnv = BuildEnv.Clone() setup_sources = [ 'setup.py', 'MANIFEST.in', + 'requirements.txt', ] setup_sources_rules = [] @@ -41,7 +42,9 @@ local_sources = [ 'partition_handler.py', 'consistent_schdlr.py', 'gendb_move_tables.py', - 'alarm_notify.py' + 'alarm_notify.py', + 'vnc_cfg_api_client.py', + 'opserver_local.py', ] plugins_sources = [ diff --git a/src/opserver/analytics_db.py b/src/opserver/analytics_db.py index 312fcd5c921..59c1b6a8ade 100644 --- a/src/opserver/analytics_db.py +++ b/src/opserver/analytics_db.py @@ -34,6 +34,7 @@ from cassandra.query import named_tuple_factory from cassandra.query import PreparedStatement, tuple_factory import platform +from opserver_util import OpServerUtils class AnalyticsDb(object): def __init__(self, logger, cassandra_server_list, @@ -603,7 +604,7 @@ def db_purge_thrift(self, purge_cutoff, purge_id): return (total_rows_deleted, purge_error_details) # end db_purge - def get_dbusage_info(self, rest_api_ip, rest_api_port): + def get_dbusage_info(self, ip, port, user, password): """Collects database usage information from all db nodes Returns: A dictionary with db node name as key and db usage in % as value @@ -611,12 +612,17 @@ def get_dbusage_info(self, rest_api_ip, rest_api_port): to_return = {} try: - uve_url = "http://" + rest_api_ip + ":" + str(rest_api_port) + "/analytics/uves/database-nodes?cfilt=DatabaseUsageInfo" - node_dburls = json.loads(urllib2.urlopen(uve_url).read()) + uve_url = "http://" + ip + ":" + str(port) + \ + "/analytics/uves/database-nodes?cfilt=DatabaseUsageInfo" + data = OpServerUtils.get_url_http(uve_url, user, password) + node_dburls = json.loads(data) for node_dburl in node_dburls: - # calculate disk usage percentage for analytics in each cassandra node - db_uve_state = json.loads(urllib2.urlopen(node_dburl['href']).read()) + # calculate disk usage percentage for analytics in each + # cassandra node + db_uve_data = OpServerUtils.get_url_http(node_dburl['href'], + user, password) + db_uve_state = json.loads(db_uve_data) db_usage_in_perc = (100* float(db_uve_state['DatabaseUsageInfo']['database_usage'][0]['analytics_db_size_1k'])/ float(db_uve_state['DatabaseUsageInfo']['database_usage'][0]['disk_space_available_1k'] + diff --git a/src/opserver/flow.py b/src/opserver/flow.py index 152c39b3909..dc948d526cf 100755 --- a/src/opserver/flow.py +++ b/src/opserver/flow.py @@ -76,7 +76,7 @@ def run(self): def parse_args(self): """ Eg. python flow.py --analytics-api-ip 127.0.0.1 - --analytics-api-port 8081 + --analytics-api-port 8181 --vrouter a6s23 --source-vn default-domain:default-project:vn1 --destination-vn default-domain:default-project:vn2 @@ -94,7 +94,7 @@ def parse_args(self): """ defaults = { 'analytics_api_ip': '127.0.0.1', - 'analytics_api_port': '8081', + 'analytics_api_port': '8181', 'start_time': 'now-10m', 'end_time': 'now', 'direction' : 'ingress', @@ -139,6 +139,11 @@ def parse_args(self): help="Show vmi uuid information") parser.add_argument( "--verbose", action="store_true", help="Show internal information") + parser.add_argument( + "--admin-user", help="Name of admin user", default="admin") + parser.add_argument( + "--admin-password", help="Password of admin user", + default="contrail123") self._args = parser.parse_args() try: @@ -332,13 +337,15 @@ def query(self): json.dumps(flow_query.__dict__)) print '' resp = OpServerUtils.post_url_http( - flow_url, json.dumps(flow_query.__dict__)) + flow_url, json.dumps(flow_query.__dict__), self._args.admin_user, + self._args.admin_password) result = {} if resp is not None: resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] result = OpServerUtils.get_query_result( - self._args.analytics_api_ip, self._args.analytics_api_port, qid) + self._args.analytics_api_ip, self._args.analytics_api_port, qid, + self._args.admin_user, self._args.admin_password) return result # end query diff --git a/src/opserver/introspect_util.py b/src/opserver/introspect_util.py index b449a354ace..2cc44366159 100644 --- a/src/opserver/introspect_util.py +++ b/src/opserver/introspect_util.py @@ -2,28 +2,37 @@ # Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. # -import urllib, urllib2 +import urllib import xmltodict import json import requests from lxml import etree import socket - +from requests.auth import HTTPBasicAuth class JsonDrv (object): - def _http_con(self, url): - return urllib2.urlopen(url) - - def load(self, url): - return json.load(self._http_con(url)) - + def load(self, url, user, password): + try: + if user and password: + auth=HTTPBasicAuth(user, password) + else: + auth=None + resp = requests.get(url, auth=auth) + return json.loads(resp.text) + except requests.ConnectionError, e: + print "Socket Connection error : " + str(e) + return None class XmlDrv (object): - def load(self, url): + def load(self, url, user, password): try: - resp = requests.get(url) + if user and password: + auth=HTTPBasicAuth(user, password) + else: + auth=None + resp = requests.get(url, auth=auth) return etree.fromstring(resp.text) except requests.ConnectionError, e: print "Socket Connection error : " + str(e) @@ -54,14 +63,14 @@ def _mk_url_str(self, path, query): return path+query_str return "http://%s:%d/%s%s" % (self._ip, self._port, path, query_str) - def dict_get(self, path='', query=None, drv=None): - try: - if path: - if drv is not None: - return drv().load(self._mk_url_str(path, query)) - return self._drv.load(self._mk_url_str(path, query)) - except urllib2.HTTPError: - return None + def dict_get(self, path='', query=None, drv=None, user=None, + password=None): + if path: + if drv is not None: + return drv().load(self._mk_url_str(path, query), user, + password) + return self._drv.load(self._mk_url_str(path, query), user, + password) # end dict_get diff --git a/src/opserver/log.py b/src/opserver/log.py index f768424bc35..22ff90236bd 100755 --- a/src/opserver/log.py +++ b/src/opserver/log.py @@ -82,7 +82,7 @@ def run(self): def parse_args(self): """ Eg. python log.py --analytics-api-ip 127.0.0.1 - --analytics-api-port 8081 + --analytics-api-port 8181 --source 127.0.0.1 --node-type Control --module bgp | cfgm | vnswad @@ -105,7 +105,7 @@ def parse_args(self): """ defaults = { 'analytics_api_ip': '127.0.0.1', - 'analytics_api_port': '8081', + 'analytics_api_port': '8181', } parser = argparse.ArgumentParser( @@ -158,6 +158,9 @@ def parse_args(self): parser.add_argument("--output-file", "-o", help="redirect output to file") parser.add_argument("--json", help="Dump output as json", action="store_true") parser.add_argument("--all", action="store_true", help=argparse.SUPPRESS) + parser.add_argument("--admin-user", help="Name of admin user", default="admin") + parser.add_argument("--admin-password", help="Password of admin user", + default="contrail123") self._args = parser.parse_args() return 0 # end parse_args @@ -457,13 +460,15 @@ def query(self): print 'Performing query: {0}'.format( json.dumps(messages_query.__dict__)) resp = OpServerUtils.post_url_http( - messages_url, json.dumps(messages_query.__dict__)) + messages_url, json.dumps(messages_query.__dict__), + self._args.admin_user, self._args.admin_password) result = {} if resp is not None: resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] result = OpServerUtils.get_query_result( - self._args.analytics_api_ip, self._args.analytics_api_port, qid) + self._args.analytics_api_ip, self._args.analytics_api_port, qid, + self._args.admin_user, self._args.admin_password) return result # end query diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index ecb67d5e8e6..34de586ec76 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -48,7 +48,9 @@ from sandesh_common.vns.constants import ModuleNames, CategoryNames,\ ModuleCategoryMap, Module2NodeType, NodeTypeNames, ModuleIds,\ INSTANCE_ID_DEFAULT, COLLECTOR_DISCOVERY_SERVICE_NAME,\ - ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, ALARM_GENERATOR_SERVICE_NAME + ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, ALARM_GENERATOR_SERVICE_NAME, \ + OpServerAdminPort, CLOUD_ADMIN_ROLE, APIAAAModes, \ + AAA_MODE_CLOUD_ADMIN, AAA_MODE_NO_AUTH from sandesh.viz.constants import _TABLES, _OBJECT_TABLES,\ _OBJECT_TABLE_SCHEMA, _OBJECT_TABLE_COLUMN_VALUES, \ _STAT_TABLES, STAT_OBJECTID_FIELD, STAT_VT_PREFIX, \ @@ -70,6 +72,9 @@ from generator_introspect_util import GeneratorIntrospectUtil from stevedore import hook, extension from partition_handler import PartInfo, UveStreamer, UveCacheProcessor +from functools import wraps +from vnc_cfg_api_client import VncCfgApiClient +from opserver_local import LocalApp _ERRORS = { errno.EBADMSG: 400, @@ -389,6 +394,26 @@ def disc_publish(self): self.disc.publish(ANALYTICS_API_SERVER_DISCOVERY_SERVICE_NAME, data) # end disc_publish + def validate_user_token(func): + @wraps(func) + def _impl(self, *f_args, **f_kwargs): + if self._args.auth_conf_info.get('cloud_admin_access_only') and \ + bottle.request.app == bottle.app(): + user_token = bottle.request.headers.get('X-Auth-Token') + if not user_token or not \ + self._vnc_api_client.is_role_cloud_admin(user_token): + raise bottle.HTTPResponse(status = 401, + body = 'Authentication required', + headers = self._reject_auth_headers()) + return func(self, *f_args, **f_kwargs) + return _impl + # end validate_user_token + + def _reject_auth_headers(self): + header_val = 'Keystone uri=\'%s\'' % \ + self._args.auth_conf_info.get('auth_uri') + return { "WWW-Authenticate" : header_val } + def __init__(self, args_str=' '.join(sys.argv[1:])): self.gevs = [] self._args = None @@ -464,6 +489,10 @@ def __init__(self, args_str=' '.join(sys.argv[1:])): body = gevent.queue.Queue() + self._vnc_api_client = None + if self._args.auth_conf_info.get('cloud_admin_access_only'): + self._vnc_api_client = VncCfgApiClient(self._args.auth_conf_info, + self._sandesh, self._logger) self._uvedbstream = UveStreamer(self._logger, None, None, self.get_agp, self._args.redis_password) @@ -738,6 +767,10 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): 'partitions' : 15, 'sandesh_send_rate_limit': SandeshSystem. \ get_sandesh_send_rate_limit(), + 'aaa_mode' : AAA_MODE_CLOUD_ADMIN, + 'api_server' : '127.0.0.1:8082', + 'admin_port' : OpServerAdminPort, + 'cloud_admin_role' : CLOUD_ADMIN_ROLE, } redis_opts = { 'redis_server_port' : 6379, @@ -752,6 +785,14 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): 'cassandra_user' : None, 'cassandra_password' : None, } + keystone_opts = { + 'auth_host': '127.0.0.1', + 'auth_protocol': 'http', + 'auth_port': 35357, + 'admin_user': 'admin', + 'admin_password': 'contrail123', + 'admin_tenant_name': 'default-domain' + } # read contrail-analytics-api own conf file config = None @@ -766,6 +807,8 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): disc_opts.update(dict(config.items('DISCOVERY'))) if 'CASSANDRA' in config.sections(): cassandra_opts.update(dict(config.items('CASSANDRA'))) + if 'KEYSTONE' in config.sections(): + keystone_opts.update(dict(config.items('KEYSTONE'))) # Override with CLI options # Don't surpress add_help here so it will handle -h @@ -779,6 +822,7 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): defaults.update(redis_opts) defaults.update(disc_opts) defaults.update(cassandra_opts) + defaults.update(keystone_opts) defaults.update() parser.set_defaults(**defaults) @@ -850,6 +894,26 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): help="Number of partitions for hashing UVE keys") parser.add_argument("--sandesh_send_rate_limit", type=int, help="Sandesh send rate limit in messages/sec") + parser.add_argument("--cloud_admin_role", + help="Name of cloud-admin role") + parser.add_argument("--aaa_mode", choices=APIAAAModes, + help="AAA mode") + parser.add_argument("--auth_host", + help="IP address of keystone server") + parser.add_argument("--auth_protocol", + help="Keystone authentication protocol") + parser.add_argument("--auth_port", type=int, + help="Keystone server port") + parser.add_argument("--admin_user", + help="Name of keystone admin user") + parser.add_argument("--admin_password", + help="Password of keystone admin user") + parser.add_argument("--admin_tenant_name", + help="Tenant name for keystone admin user") + parser.add_argument("--api_server", + help="Address of VNC API server in ip:port format") + parser.add_argument("--admin_port", + help="Port with local auth for admin access") self._args = parser.parse_args(remaining_argv) if type(self._args.collectors) is str: self._args.collectors = self._args.collectors.split() @@ -857,6 +921,25 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): self._args.redis_uve_list = self._args.redis_uve_list.split() if type(self._args.cassandra_server_list) is str: self._args.cassandra_server_list = self._args.cassandra_server_list.split() + + auth_conf_info = {} + auth_conf_info['admin_user'] = self._args.admin_user + auth_conf_info['admin_password'] = self._args.admin_password + auth_conf_info['admin_tenant_name'] = self._args.admin_tenant_name + auth_conf_info['auth_protocol'] = self._args.auth_protocol + auth_conf_info['auth_host'] = self._args.auth_host + auth_conf_info['auth_port'] = self._args.auth_port + auth_conf_info['auth_uri'] = '%s://%s:%d' % (self._args.auth_protocol, + self._args.auth_host, self._args.auth_port) + auth_conf_info['api_server_use_ssl'] = False + auth_conf_info['cloud_admin_access_only'] = \ + False if self._args.aaa_mode == AAA_MODE_NO_AUTH else True + auth_conf_info['cloud_admin_role'] = self._args.cloud_admin_role + auth_conf_info['admin_port'] = self._args.admin_port + api_server_info = self._args.api_server.split(':') + auth_conf_info['api_server_ip'] = api_server_info[0] + auth_conf_info['api_server_port'] = int(api_server_info[1]) + self._args.auth_conf_info = auth_conf_info # end _parse_args def get_args(self): @@ -925,9 +1008,11 @@ def _serve_streams(self, alarmsonly): ph.start() return body + @validate_user_token def uve_stream(self): return self._serve_streams(False) + @validate_user_token def alarm_stream(self): return self._serve_streams(True) @@ -1077,8 +1162,10 @@ def _query(self, request): if tabl == OVERLAY_TO_UNDERLAY_FLOW_MAP: overlay_to_underlay_map = OverlayToUnderlayMapper( - request.json, self._args.host_ip, - self._args.rest_api_port, self._logger) + request.json, 'localhost', + self._args.auth_conf_info['admin_port'], + self._args.auth_conf_info['admin_user'], + self._args.auth_conf_info['admin_password'], self._logger) try: yield overlay_to_underlay_map.process_query() except OverlayToUnderlayMapperError as e: @@ -1197,17 +1284,19 @@ def _sync_query(self, request, qid): return # end _sync_query + @validate_user_token def query_process(self): self._post_common(bottle.request, None) result = self._query(bottle.request) return result # end query_process + @validate_user_token def query_status_get(self, queryId): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) return self._query_status(bottle.request, queryId) # end query_status_get @@ -1215,15 +1304,16 @@ def query_chunk_get(self, queryId, chunkId): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) return self._query_chunk(bottle.request, queryId, int(chunkId)) # end query_chunk_get + @validate_user_token def show_queries(self): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) queries = {} try: redish = redis.StrictRedis(db=0, host='127.0.0.1', @@ -1349,13 +1439,14 @@ def _uve_http_post_filter_set(req): return filters # end _uve_http_post_filter_set + @validate_user_token def dyn_http_post(self, tables): (ok, result) = self._post_common(bottle.request, None) base_url = bottle.request.urlparts.scheme + \ '://' + bottle.request.urlparts.netloc if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) uve_type = tables uve_tbl = uve_type if uve_type in UVE_MAP: @@ -1405,6 +1496,7 @@ def dyn_http_post(self, tables): yield u']}' # end _uve_alarm_http_post + @validate_user_token def dyn_http_get(self, table, name): # common handling for all resource get (ok, result) = self._get_common(bottle.request) @@ -1412,7 +1504,7 @@ def dyn_http_get(self, table, name): '://' + bottle.request.urlparts.netloc if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) uve_tbl = table if table in UVE_MAP: uve_tbl = UVE_MAP[table] @@ -1457,12 +1549,13 @@ def dyn_http_get(self, table, name): yield json.dumps(rsp) # end dyn_http_get + @validate_user_token def uve_alarm_http_types(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') ret = {} @@ -1478,12 +1571,13 @@ def uve_alarm_http_types(self): ret[aname] = avalue return json.dumps(ret) + @validate_user_token def alarms_http_get(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') @@ -1510,12 +1604,13 @@ def alarms_http_get(self): return bottle.HTTPError(_ERRORS[errno.EIO],json.dumps(alms)) # end alarms_http_get + @validate_user_token def dyn_list_http_get(self, tables): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) arg_line = bottle.request.url.rsplit('/', 1)[1] uve_args = arg_line.split('?') uve_type = tables[:-1] @@ -1554,12 +1649,13 @@ def dyn_list_http_get(self, tables): return json.dumps(uve_links) # end dyn_list_http_get + @validate_user_token def analytics_http_get(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/' @@ -1569,12 +1665,13 @@ def analytics_http_get(self): return json.dumps(analytics_links) # end analytics_http_get + @validate_user_token def uves_http_get(self): # common handling for all resource get (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/uves/' @@ -1603,6 +1700,7 @@ def uves_http_get(self): return bottle.HTTPError(_ERRORS[errno.EIO],json.dumps(uvetype_links)) # end _uves_http_get + @validate_user_token def alarms_ack_http_post(self): self._post_common(bottle.request, None) if ('application/json' not in bottle.request.headers['Content-Type']): @@ -1655,6 +1753,7 @@ def alarms_ack_http_post(self): return bottle.HTTPResponse(status=200) # end alarms_ack_http_post + @validate_user_token def send_trace_buffer(self, source, module, instance_id, name): response = {} trace_req = SandeshTraceRequest(name) @@ -1677,11 +1776,12 @@ def send_trace_buffer(self, source, module, instance_id, name): return json.dumps(response) # end send_trace_buffer + @validate_user_token def tables_process(self): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/table/' @@ -1736,6 +1836,7 @@ def get_purge_cutoff(self, purge_input, start_times): return purge_cutoff #end get_purge_cutoff + @validate_user_token def process_purge_request(self): self._post_common(bottle.request, None) @@ -1880,8 +1981,10 @@ def _auto_purge(self): while True: trigger_purge = False db_node_usage = self._analytics_db.get_dbusage_info( - self._args.rest_api_ip, - self._args.rest_api_port) + 'localhost', + self._args.auth_conf_info['admin_port'], + self._args.auth_conf_info['admin_user'], + self._args.auth_conf_info['admin_password']) self._logger.info("node usage:" + str(db_node_usage) ) self._logger.info("threshold:" + str(self._args.db_purge_threshold)) @@ -1917,8 +2020,7 @@ def _auto_purge(self): gevent.sleep(60*30) # sleep for 30 minutes # end _auto_purge - - + @validate_user_token def _get_analytics_data_start_time(self): analytics_start_time = (self._analytics_db.get_analytics_start_time())[SYSTEM_OBJECT_START_TIME] response = {'analytics_data_start_time': analytics_start_time} @@ -1930,7 +2032,7 @@ def table_process(self, table): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + '/analytics/table/' + table + '/' @@ -1950,11 +2052,12 @@ def table_process(self, table): return json.dumps(json_links) # end table_process + @validate_user_token def table_schema_process(self, table): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') for i in range(0, len(self._VIRTUAL_TABLES)): @@ -1965,11 +2068,12 @@ def table_schema_process(self, table): return (json.dumps({})) # end table_schema_process + @validate_user_token def column_values_process(self, table): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) base_url = bottle.request.urlparts.scheme + '://' + \ bottle.request.urlparts.netloc + \ @@ -2034,11 +2138,12 @@ def generator_info(self, table, column): return [] # end generator_info + @validate_user_token def column_process(self, table, column): (ok, result) = self._get_common(bottle.request) if not ok: (code, msg) = result - abort(code, msg) + bottle.abort(code, msg) bottle.response.set_header('Content-Type', 'application/json') for i in range(0, len(self._VIRTUAL_TABLES)): @@ -2184,6 +2289,11 @@ def run(self): sp2.start() self.gevs.append(sp2) + if self._vnc_api_client: + self.gevs.append(gevent.spawn(self._vnc_api_client.connect)) + self._local_app = LocalApp(bottle.app(), self._args.auth_conf_info) + self.gevs.append(gevent.spawn(self._local_app.start_http_server)) + try: gevent.joinall(self.gevs) except KeyboardInterrupt: diff --git a/src/opserver/opserver_local.py b/src/opserver/opserver_local.py new file mode 100644 index 00000000000..d5ba564e8dc --- /dev/null +++ b/src/opserver/opserver_local.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +import gevent +from gevent import monkey +monkey.patch_all() +import bottle +import base64 +from bottle import GeventServer +from gevent.pool import Pool + +def get_bottle_server(pool_size): + """ + Custom gevent pool server for bottle + """ + + class GeventPoolServer(GeventServer): + """ Gevent server with limited pool size + """ + def __init__(self, host='127.0.0.1', port=8181, **options): + super(GeventPoolServer, self ).__init__(host, port, + spawn=Pool(size=pool_size), **options) + + return GeventPoolServer +# end get_bottle_server + +# Open port for access to API server for trouble shooting +class LocalApp(object): + + def __init__(self, app, conf_info): + self._http_host = 'localhost' + self._http_port = conf_info['admin_port'] + self._http_app = bottle.Bottle() + self._http_app.merge(app.routes) + self._http_app.config.local_auth = True + self._http_app.error_handler = app.error_handler + self._conf_info = conf_info + + # 2 decorators below due to change in api between bottle 0.11.6 + # (which insists on global app) vs later (which need on specific + # app) + @self._http_app.hook('before_request') + @bottle.hook('before_request') + def local_auth_check(*args, **kwargs): + if bottle.request.app != self._http_app: + return + # expect header to have something like 'Basic YWJjOmRlZg==' + auth_hdr_val = bottle.request.environ.get('HTTP_AUTHORIZATION') + if not auth_hdr_val: + bottle.abort(401, 'HTTP_AUTHORIZATION header missing') + try: + auth_type, user_passwd = auth_hdr_val.split() + except Exception as e: + bottle.abort(401, 'Auth Exception: %s' %(str(e))) + enc_user_passwd = auth_hdr_val.split()[1] + user_passwd = base64.b64decode(enc_user_passwd) + user, passwd = user_passwd.split(':') + if (not self._conf_info.get('admin_user') == user or + not self._conf_info.get('admin_password') == passwd): + bottle.abort(401, 'Authentication check failed') + + # Add admin role to the request + bottle.request.environ['HTTP_X_ROLE'] = 'admin' + # end __init__ + + def start_http_server(self): + self._http_app.run( + host=self._http_host, port=self._http_port, + server=get_bottle_server(1024)) + # end start_http_server +# end class LocalApp diff --git a/src/opserver/opserver_util.py b/src/opserver/opserver_util.py index 8b6f1240fc3..27eb376551c 100644 --- a/src/opserver/opserver_util.py +++ b/src/opserver/opserver_util.py @@ -27,7 +27,7 @@ class SandeshType(object): SYSTEM = 1 TRACE = 4 - +from requests.auth import HTTPBasicAuth def enum(**enums): return type('Enum', (), enums) @@ -197,7 +197,7 @@ def parse_start_end_time(start_time, end_time, last): # end parse_start_end_time @staticmethod - def post_url_http(url, params, sync=False): + def post_url_http(url, params, user, password, sync=False): if sync: hdrs = OpServerUtils.POST_HEADERS_SYNC stm = False @@ -211,10 +211,12 @@ def post_url_http(url, params, sync=False): if int(pkg_resources.get_distribution("requests").version[0]) != 0: response = requests.post(url, stream=stm, data=params, + auth=HTTPBasicAuth(user, password), headers=hdrs) else: response = requests.post(url, prefetch=pre, data=params, + auth=HTTPBasicAuth(user, password), headers=hdrs) except requests.exceptions.ConnectionError, e: print "Connection to %s failed %s" % (url, str(e)) @@ -227,13 +229,15 @@ def post_url_http(url, params, sync=False): # end post_url_http @staticmethod - def get_url_http(url): + def get_url_http(url, user, password): data = {} try: if int(pkg_resources.get_distribution("requests").version[0]) != 0: - data = requests.get(url, stream=True) + data = requests.get(url, stream=True, + auth=HTTPBasicAuth(user, password)) else: - data = requests.get(url, prefetch=False) + data = requests.get(url, prefetch=False, + auth=HTTPBasicAuth(user, password)) except requests.exceptions.ConnectionError, e: print "Connection to %s failed %s" % (url, str(e)) @@ -268,13 +272,14 @@ def parse_query_result(result): # end parse_query_result @staticmethod - def get_query_result(opserver_ip, opserver_port, qid, time_out=None): + def get_query_result(opserver_ip, opserver_port, qid, user, password, + time_out=None): sleep_interval = 0.5 time_left = time_out while True: url = OpServerUtils.opserver_query_url( opserver_ip, opserver_port) + '/' + qid - resp = OpServerUtils.get_url_http(url) + resp = OpServerUtils.get_url_http(url, user, password) if resp.status_code != 200: yield {} return @@ -297,7 +302,7 @@ def get_query_result(opserver_ip, opserver_port, qid, time_out=None): for chunk in status['chunks']: url = OpServerUtils.opserver_url( opserver_ip, opserver_port) + chunk['href'] - resp = OpServerUtils.get_url_http(url) + resp = OpServerUtils.get_url_http(url, user, password) if resp.status_code != 200: yield {} else: diff --git a/src/opserver/overlay_to_underlay_mapper.py b/src/opserver/overlay_to_underlay_mapper.py index 1635935aee5..6ea982375b0 100644 --- a/src/opserver/overlay_to_underlay_mapper.py +++ b/src/opserver/overlay_to_underlay_mapper.py @@ -27,10 +27,12 @@ class OverlayToUnderlayMapperError(Exception): class OverlayToUnderlayMapper(object): def __init__(self, query_json, analytics_api_ip, - analytics_api_port, logger): + analytics_api_port, user, password, logger): self.query_json = query_json self._analytics_api_ip = analytics_api_ip self._analytics_api_port = analytics_api_port + self._user = user + self._password = password self._logger = logger if self.query_json is not None: self._start_time = self.query_json['start_time'] @@ -233,7 +235,8 @@ def _send_query(self, query): self._logger.debug('Sending query: %s' % (query)) opserver_url = OpServerUtils.opserver_query_url(self._analytics_api_ip, str(self._analytics_api_port)) - resp = OpServerUtils.post_url_http(opserver_url, query, True) + resp = OpServerUtils.post_url_http(opserver_url, query, self._user, + self._password, True) try: resp = json.loads(resp) value = resp['value'] diff --git a/src/opserver/requirements.txt b/src/opserver/requirements.txt index a613123ba3b..e7eaa97824d 100644 --- a/src/opserver/requirements.txt +++ b/src/opserver/requirements.txt @@ -1,9 +1,11 @@ +requests +pycassa lxml -gevent==1.1a2 -geventhttpclient==1.0a +gevent +geventhttpclient redis xmltodict prettytable psutil>=0.6.0 -bottle==0.12.8 +bottle sseclient diff --git a/src/opserver/setup.py b/src/opserver/setup.py index 27ec91b212d..54bc6ef1b18 100644 --- a/src/opserver/setup.py +++ b/src/opserver/setup.py @@ -40,6 +40,12 @@ def run(self): if not re.search('\nOK', ''.join(f.readlines())): os._exit(1) +def requirements(filename): + with open(filename) as f: + lines = f.read().splitlines() + c = re.compile(r'\s*#.*') + return filter(bool, map(lambda y: c.sub('', y).strip(), lines)) + setup( name='opserver', version='0.1dev', @@ -49,15 +55,7 @@ def run(self): zip_safe=False, include_package_data=True, long_description="VNC Analytics API Implementation", - install_requires=[ - 'lxml', - 'gevent', - 'geventhttpclient', - 'redis', - 'xmltodict', - 'prettytable', - 'psutil>=0.4.1' - ], + install_requires=requirements('requirements.txt'), entry_points = { # Please update sandesh/common/vns.sandesh on process name change 'console_scripts' : [ diff --git a/src/opserver/stats.py b/src/opserver/stats.py index c1c85f6d6d7..203078bdc50 100755 --- a/src/opserver/stats.py +++ b/src/opserver/stats.py @@ -57,7 +57,7 @@ def run(self): def parse_args(self): """ Eg. python stats.py --analytics-api-ip 127.0.0.1 - --analytics-api-port 8081 + --analytics-api-port 8181 --table AnalyticsCpuState.cpu_info --where name=a6s40 cpu_info.module_id=Collector --select "T=60 SUM(cpu_info.cpu_share)" @@ -68,7 +68,7 @@ def parse_args(self): """ defaults = { 'analytics_api_ip': '127.0.0.1', - 'analytics_api_port': '8081', + 'analytics_api_port': '8181', 'start_time': 'now-10m', 'end_time': 'now', 'select' : [], @@ -96,6 +96,11 @@ def parse_args(self): "--where", help="List of Where Terms to be ANDed", nargs='+') parser.add_argument( "--sort", help="List of Sort Terms", nargs='+') + parser.add_argument( + "--admin-user", help="Name of admin user", default="admin") + parser.add_argument( + "--admin-password", help="Password of admin user", + default="contrail123") self._args = parser.parse_args() if self._args.table is None and self._args.dtable is None: @@ -132,7 +137,8 @@ def query(self): print json.dumps(query_dict) resp = OpServerUtils.post_url_http( - query_url, json.dumps(query_dict), sync = True) + query_url, json.dumps(query_dict), self._args.admin_user, + self._args.admin_password, sync = True) res = None if resp is not None: diff --git a/src/opserver/test-requirements.txt b/src/opserver/test-requirements.txt index 8e62f6cf868..9376030e977 100644 --- a/src/opserver/test-requirements.txt +++ b/src/opserver/test-requirements.txt @@ -3,7 +3,6 @@ testrepository python-subunit coverage fixtures==1.3.1 -requests>=1.1.0 thrift mock==1.0.1 flexmock==0.9.7 diff --git a/src/opserver/test/test_flow.py b/src/opserver/test/test_flow.py index 30b1d3763d3..04a7da998a3 100755 --- a/src/opserver/test/test_flow.py +++ b/src/opserver/test/test_flow.py @@ -46,8 +46,8 @@ def custom_output(self, outputdict): def setUp(self): self._querier = FlowQuerier() - flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y: self.custom_post_url_http(x, y)) - flexmock(OpServerUtils).should_receive('get_query_result').once().replace_with(lambda x, y, z: self.custom_get_query_result(x, y, z)) + flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y)) + flexmock(OpServerUtils).should_receive('get_query_result').once().replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) flexmock(self._querier).should_receive('output').replace_with(lambda x: self.custom_output(x)) #@unittest.skip("skip test_1_noarg_query") diff --git a/src/opserver/test/test_log.py b/src/opserver/test/test_log.py index a45a1bbc76b..7ee248f2d90 100755 --- a/src/opserver/test/test_log.py +++ b/src/opserver/test/test_log.py @@ -55,8 +55,8 @@ def setUp(self): self.maxDiff = None self._querier = LogQuerier() - flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y: self.custom_post_url_http(x, y)) - flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z: self.custom_get_query_result(x, y, z)) + flexmock(OpServerUtils).should_receive('post_url_http').replace_with(lambda x, y, w, z: self.custom_post_url_http(x, y)) + flexmock(OpServerUtils).should_receive('get_query_result').replace_with(lambda x, y, z, a, b: self.custom_get_query_result(x, y, z)) flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x)) diff --git a/src/opserver/test/test_overlay_to_underlay_mapper.py b/src/opserver/test/test_overlay_to_underlay_mapper.py index 1594c2f3864..1edc11e96de 100755 --- a/src/opserver/test/test_overlay_to_underlay_mapper.py +++ b/src/opserver/test/test_overlay_to_underlay_mapper.py @@ -235,7 +235,7 @@ def test_get_overlay_flow_data_noerror(self, mock_send_query, overlay_to_underlay_mapper = \ OverlayToUnderlayMapper( item['input']['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertEqual(item['output']['flowrecord_data'], overlay_to_underlay_mapper._get_overlay_flow_data()) args, _ = overlay_to_underlay_mapper._send_query.call_args @@ -296,7 +296,7 @@ def test_get_overlay_flow_data_raise_exception(self): for query in queries: overlay_to_underlay_mapper = \ - OverlayToUnderlayMapper(query, None, None, logging) + OverlayToUnderlayMapper(query, None, None, None, None, logging) self.assertRaises(_OverlayToFlowRecordFieldsNameError, overlay_to_underlay_mapper._get_overlay_flow_data) # end test_get_overlay_flow_data_raise_exception @@ -618,7 +618,7 @@ def test_get_underlay_flow_data_noerror(self, mock_send_query, overlay_to_underlay_mapper = \ OverlayToUnderlayMapper( item['input']['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertEqual(item['output']['uflow_data'], overlay_to_underlay_mapper._get_underlay_flow_data( item['input']['flow_record_data'])) @@ -683,7 +683,7 @@ def test_get_underlay_flow_data_raise_exception(self): for query in queries: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(query['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertRaises(_UnderlayToUFlowDataFieldsNameError, overlay_to_underlay_mapper._get_underlay_flow_data, query['flow_record_data']) @@ -696,6 +696,8 @@ def test_send_query_no_error(self, mock_post_url_http): 'input': { 'analytics_api_ip': '10.10.10.1', 'analytics_api_port': 8081, + 'username': 'admin', + 'password': 'admin123', 'query': { 'table': FLOW_TABLE, 'start_time': 'now-10m', 'end_time': 'now-5m', @@ -714,6 +716,8 @@ def test_send_query_no_error(self, mock_post_url_http): 'input': { 'analytics_api_ip': '192.168.10.1', 'analytics_api_port': 8090, + 'username': 'admin', + 'password': 'admin123', 'query': { 'table': 'StatTable.UFlowData.flow', 'start_time': 1416275005000000, @@ -751,11 +755,14 @@ def test_send_query_no_error(self, mock_post_url_http): for item in input_output_list: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(None, item['input']['analytics_api_ip'], - item['input']['analytics_api_port'], logging) + item['input']['analytics_api_port'], + item['input']['username'], item['input']['password'], + logging) self.assertEqual(overlay_to_underlay_mapper._send_query( item['input']['query']), item['output']['response']['value']) OpServerUtils.post_url_http.assert_called_with( - item['output']['query_url'], item['input']['query'], True) + item['output']['query_url'], item['input']['query'], + item['input']['username'], item['input']['password'], True) # end test_send_query_no_error @mock.patch('opserver.overlay_to_underlay_mapper.OpServerUtils.post_url_http') @@ -810,7 +817,7 @@ def test_send_query_raise_exception(self, mock_post_url_http): for item in queries: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(None, item['analytics_api_ip'], - item['analytics_api_port'], logging) + item['analytics_api_port'], None, None, logging) self.assertRaises(_QueryError, overlay_to_underlay_mapper._send_query, item['query']) # end test_send_query_raise_exception @@ -888,7 +895,7 @@ def test_send_response_no_error(self): overlay_to_underlay_mapper = \ OverlayToUnderlayMapper( item['input']['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertEqual(item['output']['underlay_response'], json.loads(overlay_to_underlay_mapper._send_response( item['input']['uflow_data']))) @@ -911,7 +918,7 @@ def test_send_response_raise_exception(self): for item in input_list: overlay_to_underlay_mapper = \ OverlayToUnderlayMapper(item['overlay_to_underlay_map_query'], - None, None, logging) + None, None, None, None, logging) self.assertRaises(_UnderlayToUFlowDataFieldsNameError, overlay_to_underlay_mapper._send_response, item['uflow_data']) # end test_send_response_raise_exception @@ -956,7 +963,7 @@ def test_process_query(self, mock_get_overlay_flow_data, [json.dumps(item['response']) for item in test_data] for item in test_data: overlay_to_underlay_mapper = \ - OverlayToUnderlayMapper(None, None, None, logging) + OverlayToUnderlayMapper(None, None, None, None, None, logging) self.assertEqual(item['response'], json.loads(overlay_to_underlay_mapper.process_query())) overlay_to_underlay_mapper._get_overlay_flow_data.called_with() diff --git a/src/opserver/test/test_stats.py b/src/opserver/test/test_stats.py index c175f4b1661..d6915de2bf0 100755 --- a/src/opserver/test/test_stats.py +++ b/src/opserver/test/test_stats.py @@ -36,7 +36,7 @@ def custom_display(self, result): def setUp(self): self._querier = StatQuerier() - flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y, **kwargs: self.custom_post_url_http(x, y, kwargs)) + flexmock(OpServerUtils).should_receive('post_url_http').once().replace_with(lambda x, y, w, z, **kwargs: self.custom_post_url_http(x, y, kwargs)) flexmock(self._querier).should_receive('display').replace_with(lambda x: self.custom_display(x)) diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index 61a185909e6..a15db04cd7f 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -12,7 +12,6 @@ from mockkafka import mockkafka from mockzoo import mockzoo import redis -import urllib2 import copy import os import json @@ -296,7 +295,8 @@ def stop(self): class OpServer(object): def __init__(self, primary_collector, secondary_collector, redis_port, - analytics_fixture, logger, kafka=False, is_dup=False): + analytics_fixture, logger, admin_user, admin_password, + kafka=False, is_dup=False): self.primary_collector = primary_collector self.secondary_collector = secondary_collector self.analytics_fixture = analytics_fixture @@ -314,7 +314,10 @@ def __init__(self, primary_collector, secondary_collector, redis_port, self.hostname = self.hostname+'dup' self._generator_id = self.hostname+':'+NodeTypeNames[NodeType.ANALYTICS]+\ ':'+ModuleNames[Module.OPSERVER]+':0' - self.listen_port = AnalyticsFixture.get_free_port() + self.rest_api_port = AnalyticsFixture.get_free_port() + self.admin_port = AnalyticsFixture.get_free_port() + self.admin_user = admin_user + self.admin_password = admin_password # end __init__ def set_primary_collector(self, collector): @@ -331,7 +334,7 @@ def get_generator_id(self): def start(self): assert(self._instance == None) - self._log_file = '/tmp/opserver.messages.' + str(self.listen_port) + self._log_file = '/tmp/opserver.messages.' + str(self.admin_port) part = "0" if self._kafka: part = "4" @@ -346,7 +349,10 @@ def start(self): '--log_file', self._log_file, '--log_level', "SYS_INFO", '--partitions', part, - '--rest_api_port', str(self.listen_port)] + '--rest_api_port', str(self.rest_api_port), + '--admin_port', str(self.admin_port), + '--admin_user', self.admin_user, + '--admin_password', self.admin_password] if self.analytics_fixture.redis_uves[0].password: args.append('--redis_password') args.append(self.analytics_fixture.redis_uves[0].password) @@ -383,15 +389,16 @@ def verify_setup(self): def stop(self): if self._instance is not None: rcode = self.analytics_fixture.process_stop( - "contrail-analytics-api:%s" % str(self.listen_port), + "contrail-analytics-api:%s" % str(self.admin_port), self._instance, self._log_file, is_py=True) #assert(rcode == 0) self._instance = None # end stop def send_tracebuffer_request(self, src, mod, instance, tracebuf): - vops = VerificationOpsSrv('127.0.0.1', self.listen_port) - res = vops.send_tracebuffer_req(src, mod, instance, tracebuf) + vns = VerificationOpsSrv('127.0.0.1', self.admin_port, + self.admin_user, self.admin_password) + res = vns.send_tracebuffer_req(src, mod, instance, tracebuf) self._logger.info('send_tracebuffer_request: %s' % (str(res))) assert(res['status'] == 'pass') # end send_tracebuffer_request @@ -558,6 +565,8 @@ def stop(self): # end class Zookeeper class AnalyticsFixture(fixtures.Fixture): + ADMIN_USER = 'test' + ADMIN_PASSWORD = 'password' def __init__(self, logger, builddir, redis_port, cassandra_port, ipfix_port = False, sflow_port = False, syslog_port = False, @@ -584,6 +593,8 @@ def __init__(self, logger, builddir, redis_port, cassandra_port, self.cassandra_user = cassandra_user self.cassandra_password = cassandra_password self.zookeeper = None + self.admin_user = AnalyticsFixture.ADMIN_USER + self.admin_password = AnalyticsFixture.ADMIN_PASSWORD def setUp(self): super(AnalyticsFixture, self).setUp() @@ -634,10 +645,11 @@ def setUp(self): opkafka = True self.opserver = OpServer(primary_collector, secondary_collector, self.redis_uves[0].port, - self, self.logger, opkafka) + self, self.logger, self.admin_user, + self.admin_password, opkafka) if not self.opserver.start(): self.logger.error("OpServer did NOT start") - self.opserver_port = self.opserver.listen_port + self.opserver_port = self.get_opserver_port() if self.kafka is not None: self.alarmgen = AlarmGen(primary_collector, secondary_collector, @@ -663,7 +675,7 @@ def get_collectors(self): # end get_collectors def get_opserver_port(self): - return self.opserver.listen_port + return self.opserver.admin_port # end get_opserver_port def verify_on_setup(self): @@ -718,13 +730,8 @@ def verify_opserver_api(self): ''' data = {} url = 'http://127.0.0.1:' + str(self.opserver_port) + '/' - try: - data = urllib2.urlopen(url).read() - except urllib2.HTTPError, e: - self.logger.info("HTTP error: %d" % e.code) - except urllib2.URLError, e: - self.logger.info("Network error: %s" % e.reason.args[1]) - + data = OpServerUtils.get_url_http(url, self.admin_user, + self.admin_password) self.logger.info("Checking OpServer %s" % str(data)) if data == {}: return False @@ -809,7 +816,8 @@ def verify_uvetable_alarm(self, table, name, type, is_set = True, any_of = None) @retry(delay=2, tries=10) def verify_collector_obj_count(self): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('ObjectCollectorInfo', start_time='-10m', end_time='now', select_fields=["ObjectLog"], @@ -850,7 +858,8 @@ def verify_generator_list(self, collector, exp_genlist): @retry(delay=1, tries=10) def verify_generator_uve_list(self, exp_gen_list): self.logger.info('verify_generator_uve_list') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # get generator list gen_list = vns.uve_query('generators', {'cfilt':'ModuleClientState:client_info'}) @@ -869,7 +878,8 @@ def verify_generator_uve_list(self, exp_gen_list): @retry(delay=1, tries=6) def verify_message_table_messagetype(self): self.logger.info("verify_message_table_messagetype") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # query for CollectorInfo logs res = vns.post_query('MessageTable', start_time='-10m', end_time='now', @@ -892,7 +902,8 @@ def verify_message_table_messagetype(self): @retry(delay=1, tries=6) def verify_message_table_select_uint_type(self): self.logger.info("verify_message_table_select_uint_type") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # query for CollectorInfo logs res = vns.post_query('MessageTable', start_time='-10m', end_time='now', @@ -915,7 +926,8 @@ def verify_message_table_select_uint_type(self): @retry(delay=1, tries=6) def verify_message_table_moduleid(self): self.logger.info("verify_message_table_moduleid") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # query for contrail-query-engine logs res_qe = vns.post_query('MessageTable', start_time='-10m', end_time='now', @@ -935,7 +947,8 @@ def verify_message_table_moduleid(self): @retry(delay=1, tries=6) def verify_message_table_where_or(self): self.logger.info("verify_message_table_where_or") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) res = vns.post_query( @@ -957,7 +970,8 @@ def verify_message_table_where_or(self): @retry(delay=1, tries=6) def verify_message_table_where_and(self): self.logger.info("verify_message_table_where_and") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) res = vns.post_query( @@ -979,7 +993,8 @@ def verify_message_table_where_and(self): @retry(delay=1, tries=6) def verify_message_table_where_prefix(self): self.logger.info('verify_message_table_where_prefix') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) prefix_key_value_map = {'Source': socket.gethostname()[:-1], 'ModuleId': 'contrail-', 'Messagetype': 'Collector', 'Category': 'Discovery'} @@ -999,7 +1014,8 @@ def verify_message_table_where_prefix(self): @retry(delay=1, tries=6) def verify_message_table_filter(self): self.logger.info("verify_message_table_where_filter") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) res = vns.post_query('MessageTable', @@ -1031,7 +1047,8 @@ def verify_message_table_filter(self): @retry(delay=1, tries=6) def verify_message_table_filter2(self): self.logger.info("verify_message_table_filter2") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) a_query = Query(table="MessageTable", start_time='now-10m', end_time='now', @@ -1067,7 +1084,8 @@ def verify_message_table_filter2(self): @retry(delay=1, tries=1) def verify_message_table_sort(self): self.logger.info("verify_message_table_sort:Ascending Sort") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause1 = "ModuleId = contrail-query-engine" where_clause2 = str("Source =" + socket.gethostname()) @@ -1147,7 +1165,8 @@ def verify_message_table_sort(self): def verify_message_table_limit(self): self.logger.info("verify_message_table_limit") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('MessageTable', start_time='-10m', end_time='now', select_fields=['ModuleId', 'Messagetype'], @@ -1160,7 +1179,8 @@ def verify_message_table_limit(self): @retry(delay=1, tries=8) def verify_intervn_all(self, gen_obj): self.logger.info("verify_intervn_all") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('StatTable.UveVirtualNetworkAgent.vn_stats', start_time='-10m', end_time='now', @@ -1174,7 +1194,8 @@ def verify_intervn_all(self, gen_obj): @retry(delay=1, tries=8) def verify_intervn_sum(self, gen_obj): self.logger.info("verify_intervn_sum") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('StatTable.UveVirtualNetworkAgent.vn_stats', start_time='-10m', end_time='now', @@ -1188,7 +1209,8 @@ def verify_intervn_sum(self, gen_obj): @retry(delay=1, tries=10) def verify_flow_samples(self, generator_obj): self.logger.info("verify_flow_samples") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) vrouter = generator_obj._hostname res = vns.post_query('FlowSeriesTable', start_time=str(generator_obj.flow_start_time), @@ -1198,7 +1220,8 @@ def verify_flow_samples(self, generator_obj): if len(res) != generator_obj.num_flow_samples: return False - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) result = vns.post_query('FlowSeriesTable', start_time=str(generator_obj.egress_flow_start_time), end_time=str(generator_obj.egress_flow_end_time), @@ -1213,7 +1236,8 @@ def verify_flow_samples(self, generator_obj): def verify_where_query_prefix(self,generator_obj): self.logger.info('verify where query in FlowSeriesTable') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) vrouter = generator_obj._hostname a_query = Query(table="FlowSeriesTable", start_time=(generator_obj.flow_start_time), @@ -1239,7 +1263,8 @@ def verify_flow_table(self, generator_obj): vrouter = generator_obj._hostname # query flow records self.logger.info('verify_flow_table') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query('FlowRecordTable', start_time=str(generator_obj.flow_start_time), end_time=str(generator_obj.flow_end_time), @@ -1511,7 +1536,8 @@ def verify_flow_series_aggregation_binning(self, generator_object): generator_obj = generator_object[0] vrouter = generator_obj._hostname self.logger.info('verify_flow_series_aggregation_binning') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) # Helper function for stats aggregation def _aggregate_stats(flow, start_time, end_time): @@ -2021,7 +2047,8 @@ def _aggregate_flows_stats(flows, start_time, end_time): def verify_fieldname_messagetype(self): self.logger.info('Verify stats table for stats name field'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table="StatTable.FieldNames.fields", start_time="now-10m", end_time="now", @@ -2062,7 +2089,8 @@ def verify_collector_redis_uve_connection(self, collector, connected=True): @retry(delay=2, tries=5) def verify_opserver_redis_uve_connection(self, opserver, connected=True): self.logger.info('verify_opserver_redis_uve_connection') - vops = VerificationOpsSrv('127.0.0.1', opserver.http_port) + vops = VerificationOpsSrv('127.0.0.1', opserver.http_port, + self.admin_user, self.admin_password) try: redis_uve = vops.get_redis_uve_info()['RedisUveInfo'] if redis_uve['status'] == 'Connected': @@ -2075,7 +2103,8 @@ def verify_opserver_redis_uve_connection(self, opserver, connected=True): @retry(delay=2, tries=5) def verify_tracebuffer_in_analytics_db(self, src, mod, tracebuf): self.logger.info('verify trace buffer data in analytics db') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) where_clause = [] where_clause.append('Source = ' + src) where_clause.append('ModuleId = ' + mod) @@ -2093,7 +2122,8 @@ def verify_tracebuffer_in_analytics_db(self, src, mod, tracebuf): @retry(delay=1, tries=5) def verify_table_source_module_list(self, exp_src_list, exp_mod_list): self.logger.info('verify source/module list') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) try: src_list = vns.get_table_column_values(COLLECTOR_GLOBAL_TABLE, SOURCE) @@ -2116,7 +2146,8 @@ def verify_table_source_module_list(self, exp_src_list, exp_mod_list): @retry(delay=1, tries=5) def verify_where_query(self): self.logger.info('Verify where query with int type works'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table="StatTable.QueryPerfInfo.query_stats", start_time="now-1h", end_time="now", @@ -2130,7 +2161,8 @@ def verify_where_query(self): def verify_collector_object_log(self, start_time, end_time): self.logger.info('verify_collector_object_log') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table='ObjectCollectorInfo', start_time=start_time, end_time=end_time, select_fields=['ObjectLog']) @@ -2152,7 +2184,8 @@ def verify_collector_object_log_before_purge(self, start_time, end_time): def verify_database_purge_query(self, json_qstr): self.logger.info('verify database purge query'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_purge_query_json(json_qstr) try: assert(res['status'] == 'started') @@ -2177,7 +2210,8 @@ def verify_collector_object_log_after_purge(self, start_time, end_time): def verify_database_purge_status(self, purge_id): self.logger.info('verify database purge status: purge_id [%s]' % (purge_id)) - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table='StatTable.DatabasePurgeInfo.stats', start_time='now-1m', end_time='now', select_fields=['stats.purge_id', 'stats.purge_status', @@ -2208,7 +2242,8 @@ def verify_database_purge_with_percentage_input(self): def verify_database_purge_support_utc_time_format(self): self.logger.info('verify database purge support utc time format') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) json_qstr = json.dumps({'purge_input': 'now'}) end_time = OpServerUtils.convert_to_utc_timestamp_usec('now') start_time = end_time - 20*60*pow(10,6) @@ -2221,7 +2256,8 @@ def verify_database_purge_support_utc_time_format(self): def verify_database_purge_support_datetime_format(self): self.logger.info('verify database purge support datetime format') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) dt = datetime.datetime.now().strftime("%Y %b %d %H:%M:%S.%f") json_qstr = json.dumps({'purge_input': dt}) end_time = OpServerUtils.convert_to_utc_timestamp_usec(dt) @@ -2235,7 +2271,8 @@ def verify_database_purge_support_datetime_format(self): def verify_database_purge_support_deltatime_format(self): self.logger.info('verify database purge support deltatime format') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) json_qstr = json.dumps({'purge_input': '-1s'}) end_time = OpServerUtils.convert_to_utc_timestamp_usec('-1s') start_time = end_time - 10*60*pow(10,6) @@ -2248,7 +2285,8 @@ def verify_database_purge_support_deltatime_format(self): def verify_database_purge_request_limit(self): self.logger.info('verify database purge request limit') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) json_qstr = json.dumps({'purge_input': 50}) res = vns.post_purge_query_json(json_qstr) self.logger.info(str(res)) @@ -2267,7 +2305,8 @@ def verify_database_purge_request_limit(self): def verify_object_table_sandesh_types(self, table, object_id, exp_msg_types): self.logger.info('verify_object_table_sandesh_types') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query(table, start_time='-1m', end_time='now', select_fields=['Messagetype', 'ObjectLog', 'SystemLog'], where_clause='ObjectId=%s' % object_id) @@ -2285,7 +2324,8 @@ def verify_object_table_sandesh_types(self, table, object_id, @retry(delay=1, tries=10) def verify_object_value_table_query(self, table, exp_object_values): self.logger.info('verify_object_value_table_query') - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) res = vns.post_query(table, start_time='-10m', end_time='now', select_fields=['ObjectId'], where_clause='') @@ -2303,7 +2343,8 @@ def verify_object_value_table_query(self, table, exp_object_values): @retry(delay=1, tries=5) def verify_keyword_query(self, line, keywords=[]): self.logger.info('Verify where query with keywords'); - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port); + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) query = Query(table="MessageTable", start_time="now-1h", @@ -2340,7 +2381,8 @@ def verify_fieldname_table(self): to ensure that the 2 entries are present in the table ''' self.logger.info("verify_fieldname_table") - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) self.logger.info("VerificationOpsSrv") res = vns.post_query('StatTable.FieldNames.fields', start_time='-1m', @@ -2375,7 +2417,8 @@ def _get_filters_json(self, filters): @retry(delay=1, tries=4) def verify_uve_list(self, table, filts=None, exp_uve_list=[]): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filters = self._get_filters_url_param(filts) table_query = table+'s' self.logger.info('verify_uve_list: %s:%s' % @@ -2432,7 +2475,8 @@ def _verify_uves(self, exp_uves, actual_uves): @retry(delay=1, tries=4) def verify_get_alarms(self, table, filts=None, exp_uves=None): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filters = self._get_filters_url_param(filts) self.logger.info('verify_get_alarms: %s' % str(filters)) try: @@ -2446,7 +2490,8 @@ def verify_get_alarms(self, table, filts=None, exp_uves=None): @retry(delay=1, tries=4) def verify_multi_uve_get(self, table, filts=None, exp_uves=None): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filters = self._get_filters_url_param(filts) table_query = table+'/*' if not filters: @@ -2464,7 +2509,8 @@ def verify_multi_uve_get(self, table, filts=None, exp_uves=None): @retry(delay=1, tries=4) def verify_uve_post(self, table, filts=None, exp_uves=None): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) filter_json = self._get_filters_json(filts) self.logger.info('verify_uve_post: %s: %s' % (table, filter_json)) try: @@ -2478,7 +2524,8 @@ def verify_uve_post(self, table, filts=None, exp_uves=None): @retry(delay=1, tries=5) def verify_alarm_list_include(self, table, filts=None, expected_alarms=[]): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) yfilts = filts or {} yfilts['cfilt'] = ["UVEAlarms"] filters = self._get_filters_url_param(yfilts) @@ -2502,7 +2549,8 @@ def verify_alarm_list_include(self, table, filts=None, expected_alarms=[]): @retry(delay=1, tries=5) def verify_alarm_list_exclude(self, table, filts=None, unexpected_alms=[]): - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) yfilts = filts or {} yfilts['cfilt'] = ["UVEAlarms"] filters = self._get_filters_url_param(yfilts) @@ -2544,7 +2592,8 @@ def _verify_alarms(self, exp_alarms, actual_alarms): @retry(delay=1, tries=3) def verify_alarm(self, table, key, expected_alarm): self.logger.info('verify_alarm: %s:%s' % (table, key)) - vns = VerificationOpsSrv('127.0.0.1', self.opserver_port) + vns = VerificationOpsSrv('127.0.0.1', self.opserver_port, + self.admin_user, self.admin_password) table_query = table+'/'+key filters = {'cfilt':'UVEAlarms'} try: @@ -2589,7 +2638,7 @@ def cleanUp(self): redis_uve.stop() if self.kafka is not None: self.kafka.stop() - + self.zookeeper.stop() super(AnalyticsFixture, self).cleanUp() self.logger.info('cleanUp complete') diff --git a/src/opserver/test/utils/opserver_introspect_utils.py b/src/opserver/test/utils/opserver_introspect_utils.py index 867e4ea2638..60c949f5768 100644 --- a/src/opserver/test/utils/opserver_introspect_utils.py +++ b/src/opserver/test/utils/opserver_introspect_utils.py @@ -15,20 +15,23 @@ from opserver_results import * from opserver.opserver_util import OpServerUtils - class VerificationOpsSrv (IntrospectUtilBase): - - def __init__(self, ip, port=8081): + def __init__(self, ip, port=8181, user='test', + password='password'): super(VerificationOpsSrv, self).__init__(ip, port) + self._user = user + self._password = password def get_ops_vm(self, vm='default-virtual-machine'): - vm_dict = self.dict_get('analytics/uves/virtual-machine/' + vm) + vm_dict = self.dict_get('analytics/uves/virtual-machine/' + vm, + user=self._user, password=self._password) return OpVMResult(vm_dict) def get_ops_vn(self, vn='default-virtual-network'): res = None try: - vn_dict = self.dict_get('analytics/uves/virtual-network/' + vn) + vn_dict = self.dict_get('analytics/uves/virtual-network/' + vn, + user=self._user, password=self._password) res = OpVNResult(vn_dict) except Exception as e: print e @@ -40,7 +43,8 @@ def get_ops_collector(self, col=None): col = socket.gethostname() res = None try: - col_dict = self.dict_get('analytics/uves/analytics-node/' + col) + col_dict = self.dict_get('analytics/uves/analytics-node/' + col, + user=self._user, password=self._password) res = OpCollectorResult(col_dict) except Exception as e: print e @@ -49,22 +53,27 @@ def get_ops_collector(self, col=None): def send_tracebuffer_req(self, src, mod, instance, buf_name): return self.dict_get('analytics/send-tracebuffer/%s/%s/%s/%s' \ - % (src, mod, instance, buf_name)) + % (src, mod, instance, buf_name), user=self._user, + password=self._password) def get_table_column_values(self, table, col_name): return self.dict_get('analytics/table/%s/column-values/%s' \ - % (table, col_name)) + % (table, col_name), user=self._user, + password=self._password) def uve_query(self, table, query): - return self.dict_get('analytics/uves/%s' % (table), query) + return self.dict_get('analytics/uves/%s' % (table), query, + user=self._user, password=self._password) def get_alarms(self, filters): - return self.dict_get('analytics/alarms', filters) + return self.dict_get('analytics/alarms', filters, user=self._user, + password=self._password) def post_uve_request(self, table, json_body): url = 'http://%s:%s/analytics/uves/%s' % (self._ip, str(self._port), table) try: - res = OpServerUtils.post_url_http(url, json_body, sync=True) + res = OpServerUtils.post_url_http(url, json_body, user=self._user, + password=self._password, sync=True) res = json.loads(res) except Exception as e: print 'Error: POST uve request: %s' % str(e) @@ -85,11 +94,13 @@ def post_query_json(self, json_str, sync=True): ''' res = None try: - flows_url = OpServerUtils.opserver_query_url(self._ip, str(self._port)) + flows_url = OpServerUtils.opserver_query_url(self._ip, + str(self._port)) print flows_url print "query is: ", json_str res = [] - resp = OpServerUtils.post_url_http(flows_url, json_str, sync) + resp = OpServerUtils.post_url_http(flows_url, json_str, + self._user, self._password, sync) if sync: if resp is not None: res = json.loads(resp) @@ -98,7 +109,8 @@ def post_query_json(self, json_str, sync=True): if resp is not None: resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] - result = OpServerUtils.get_query_result(self._ip, str(self._port), qid, 30) + result = OpServerUtils.get_query_result(self._ip, + str(self._port), qid, self._user, self._password, 30) for item in result: res.append(item) except Exception as e: @@ -118,7 +130,8 @@ def post_purge_query_json(self, json_str, sync=True): print purge_request_url print "query is: ", json_str resp = OpServerUtils.post_url_http( - purge_request_url, json_str, sync) + purge_request_url, json_str, self._user, self._password, + sync) if resp is not None: res = json.loads(resp) except Exception as e: @@ -144,7 +157,8 @@ def post_query(self, table, start_time=None, end_time=None, print json.dumps(query_dict) res = [] resp = OpServerUtils.post_url_http( - flows_url, json.dumps(query_dict), sync) + flows_url, json.dumps(query_dict), self._user, self._password, + sync) if sync: if resp is not None: res = json.loads(resp) @@ -154,7 +168,8 @@ def post_query(self, table, start_time=None, end_time=None, resp = json.loads(resp) qid = resp['href'].rsplit('/', 1)[1] result = OpServerUtils.get_query_result( - self._ip, str(self._port), qid, 30) + self._ip, str(self._port), qid, self._user, + self._password, 30) for item in result: res.append(item) except Exception as e: diff --git a/src/opserver/vnc_cfg_api_client.py b/src/opserver/vnc_cfg_api_client.py new file mode 100644 index 00000000000..f08ab3f8ea8 --- /dev/null +++ b/src/opserver/vnc_cfg_api_client.py @@ -0,0 +1,81 @@ +# +# Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. +# + +import time +import requests +from pysandesh.connection_info import ConnectionState +from pysandesh.gen_py.process_info.ttypes import ConnectionType,\ + ConnectionStatus +from vnc_api import vnc_api + +class VncCfgApiClient(object): + + def __init__(self, conf_info, sandesh_instance, logger): + self._conf_info = conf_info + self._sandesh_instance = sandesh_instance + self._logger = logger + self._vnc_api_client = None + # end __init__ + + def _update_connection_state(self, status, message = ''): + server_addrs = ['%s:%d' % (self._conf_info['api_server_ip'], \ + self._conf_info['api_server_port'])] + ConnectionState.update(conn_type=ConnectionType.APISERVER, name='', + status=status, message=message, server_addrs=server_addrs) + # end _update_connection_state + + def _get_user_token_info(self, user_token): + if self._vnc_api_client: + return self._vnc_api_client.obj_perms(user_token) + else: + self._logger.error('VNC Config API Client NOT FOUND') + return None + # end _get_user_token_info + + def connect(self): + # Retry till API server is up + connected = False + self._update_connection_state(ConnectionStatus.INIT) + while not connected: + try: + self._vnc_api_client = vnc_api.VncApi( + self._conf_info['admin_user'], + self._conf_info['admin_password'], + self._conf_info['admin_tenant_name'], + self._conf_info['api_server_ip'], + self._conf_info['api_server_port'], + api_server_use_ssl=self._conf_info['api_server_use_ssl'], + auth_host=self._conf_info['auth_host'], + auth_port=self._conf_info['auth_port'], + auth_protocol=self._conf_info['auth_protocol']) + connected = True + self._update_connection_state(ConnectionStatus.UP) + except Exception as e: + # Update connection info + self._update_connection_state(ConnectionStatus.DOWN, str(e)) + time.sleep(3) + # end connect + + def is_role_cloud_admin(self, user_token): + result = self._get_user_token_info(user_token) + if not result or not result['token_info']: + self._logger.error( + 'Token info for %s NOT FOUND' % str(user_token)) + return False + # Handle v2 and v3 responses + token_info = result['token_info'] + if 'access' in token_info: + roles_list = [roles['name'] for roles in \ + token_info['access']['user']['roles']] + elif 'token' in token_info: + roles_list = [roles['name'] for roles in \ + token_info['token']['roles']] + else: + self._logger.error('Role info for %s NOT FOUND: %s' % \ + (str(user_token), str(token_info))) + return False + return self._conf_info['cloud_admin_role'] in roles_list + # end is_role_cloud_admin + +# end class VncCfgApiServer diff --git a/src/sandesh/common/vns.sandesh b/src/sandesh/common/vns.sandesh index 4bad4150d5b..d62cc4a7241 100644 --- a/src/sandesh/common/vns.sandesh +++ b/src/sandesh/common/vns.sandesh @@ -102,6 +102,7 @@ const u16 MetadataProxyVrouterAgentPort = 8097 // TCP const u16 VrouterAgentMirrorClientUdpPort = 8097 // UDP const u16 VrouterAgentDnsClientUdpPort = 8098 // UDP const u16 ContrailDnsClientUdpPort = 8092 // UDP +const u16 OpServerAdminPort = 8181 const map ServiceHttpPortMap = { SERVICE_VROUTER_AGENT : HttpPortAgent, @@ -504,3 +505,13 @@ const list ThreadPoolNames = [ NATIVETRANSPORTREQUESTS, COMPACTIONEXECUTOR, ] + +const string CLOUD_ADMIN_ROLE = "admin" + +const string AAA_MODE_NO_AUTH = "no-auth" +const string AAA_MODE_CLOUD_ADMIN = "cloud-admin" + +const list APIAAAModes = [ + AAA_MODE_NO_AUTH, + AAA_MODE_CLOUD_ADMIN, +]