From 181a51dc4c368a7da73038d34361c1c9b570a6b5 Mon Sep 17 00:00:00 2001 From: arvindvis Date: Sun, 25 Jan 2015 17:29:05 -0800 Subject: [PATCH] his fix adds authentication support to REDIS. Analytics, QE and contrail-api are affected by this change. Here is a summary of the changes in the individual files in different modules: Analytics: src/analytics/OpServerProxy.cc: Change ToOpsConnUp to call Authenticate function before calling ToOpsConnUpPostProcess. Add callback to handle reply for authentication command for both channels. If authentication succeeds make a call to ToOpsConnUpPostProcess. Change some of the API's to include password as argument src/analytics/redis_processor_vizd.cc: src/analytics/main.cc: src/analytics/viz_collector.cc: change the api to pass the redis password as well src/analytics/test/utils/mockredis/mockredis/mockredis.py: Before starting redis instance modify redis.conf to include password, if specified nclude password argument to the start and stop api's src/analytics/redis_connection.cc: Removed a condition which required async_connection_context should not have any callback set.This was causing a conflict. QueryEngine: src/query_engine/QEOpServerProxy.cc: Places where redisSync commands where getting called, we first make a call to AUTH command For commands invoked in Async context, introduce 2 new API's viz., ConnAuth,ConnUpPrePostProcess, AuthCallbackProcess are called to authenticate the context first Make changes to existing API like QEOpServerImpl,QEOpServerProxy to include password argument src/query_engine/qed.cc: src/query_engine/options.cc: src/query_engine/query.cc: src/query_engine/rac_alloc.cc: Make changes in existing API to take redis_passwd as additional argument analytics-api: src/opserver/opserver.py: redis_query_start, redis_query_status, redis_query_chunk_iter, redis_query_chunk, redis_query_result, redis_query_result_dict, OpStateServer all make calls to StrictRedis, made change to include password to the API systemless test: src/opserver/test/utils/analytics_fixture.py: change Redis,QueryEngine and Collector class to invoke the daemons with password as argument. change the Fixture API to take password. src/opserver/test/analytics_redistest.py: New test file which calls the testcases with password contrail. Keyword names have been changed from passwd to password Other changes addressed in the comment are taken care as well In the case of password conflicts we are going to crash the daemonsa Change has been made in analytics_redis_test.py to abstract common code Closes Bug:#1392113 Change-Id: I707b9a6df9b770ca0f1a396335f190aa780a937e --- src/analytics/OpServerProxy.cc | 129 +++++++++---- src/analytics/OpServerProxy.h | 3 +- src/analytics/main.cc | 1 + src/analytics/options.cc | 3 + src/analytics/options.h | 2 + src/analytics/redis_connection.cc | 2 - src/analytics/redis_processor_vizd.cc | 57 +++++- src/analytics/redis_processor_vizd.h | 9 +- .../utils/mockredis/mockredis/mockredis.py | 10 +- src/analytics/viz_collector.cc | 7 +- src/analytics/viz_collector.h | 3 +- src/opserver/opserver.py | 61 +++--- src/opserver/test/SConscript | 1 + src/opserver/test/analytics_redistest.py | 174 ++++++++++++++++++ src/opserver/test/analytics_systest.py | 95 ++-------- src/opserver/test/utils/analytics_fixture.py | 45 +++-- src/opserver/uveserver.py | 25 ++- src/query_engine/QEOpServerProxy.cc | 81 +++++++- src/query_engine/QEOpServerProxy.h | 3 +- src/query_engine/options.cc | 3 + src/query_engine/options.h | 2 + src/query_engine/qed.cc | 2 + src/query_engine/query.cc | 11 +- src/query_engine/query.h | 8 +- 24 files changed, 546 insertions(+), 191 deletions(-) create mode 100755 src/opserver/test/analytics_redistest.py diff --git a/src/analytics/OpServerProxy.cc b/src/analytics/OpServerProxy.cc index 504ba727f58..a9806908d44 100644 --- a/src/analytics/OpServerProxy.cc +++ b/src/analytics/OpServerProxy.cc @@ -137,24 +137,82 @@ class OpServerProxy::OpServerImpl { if (!started_) { RedisProcessorExec::FlushUVEs(redis_uve_.GetIp(), - redis_uve_.GetPort()); + redis_uve_.GetPort(), + redis_password_); started_=true; } if (collector_) collector_->RedisUpdate(true); } - void ToOpsConnUp() { - LOG(DEBUG, "ToOpsConnUp.. UP"); - { - tbb::mutex::scoped_lock lock(rac_mutex_); - redis_uve_.RedisStatusUpdate(RAC_UP); - } - // Update connection info - ConnectionState::GetInstance()->Update(ConnectionType::REDIS, + void toConnectCallbackProcess(const redisAsyncContext *c, void *r, void *privdata) { + //Handle the AUTH callback + redisReply reply = *reinterpret_cast(r); + if (reply.type != REDIS_REPLY_ERROR) { + { + tbb::mutex::scoped_lock lock(rac_mutex_); + redis_uve_.RedisStatusUpdate(RAC_UP); + } + ConnectionState::GetInstance()->Update(ConnectionType::REDIS, "To", ConnectionStatus::UP, to_ops_conn_->Endpoint(), std::string()); - evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUpPostProcess, this)); + evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUpPostProcess, this)); + return; + + } else { + LOG(ERROR,"In connectCallbackProcess.. Error"); + assert(reply.type != REDIS_REPLY_ERROR); + } + + } + + void fromConnectCallbackProcess(const redisAsyncContext *c, void *r, void *privdata) { + //Handle the AUTH callback + redisReply reply = *reinterpret_cast(r); + if (reply.type != REDIS_REPLY_ERROR) { + ConnectionState::GetInstance()->Update(ConnectionType::REDIS, + "From", ConnectionStatus::UP, from_ops_conn_->Endpoint(), + std::string()); + evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUpPostProcess, this)); + return; + + } else { + LOG(ERROR,"In connectCallbackProcess.. Error"); + assert(reply.type != REDIS_REPLY_ERROR); + } + + } + + void ToOpsAuthenticated() { + //Set callback for connection status + to_ops_conn_.get()->SetClientAsyncCmdCb(boost::bind( + &OpServerImpl::toConnectCallbackProcess, + this, _1, _2, _3)); + if (!redis_password_.empty()) { + //Call the AUTH command + to_ops_conn_.get()->RedisAsyncCommand(NULL,"AUTH %s",redis_password_.c_str()); + } else { + to_ops_conn_.get()->RedisAsyncCommand(NULL,"PING"); + } + } + + void FromOpsAuthenticated() { + //Set callback for connection status + from_ops_conn_.get()->SetClientAsyncCmdCb(boost::bind( + &OpServerImpl::fromConnectCallbackProcess, + this, _1, _2, _3)); + if (!redis_password_.empty()) { + //Call the AUTH command + from_ops_conn_.get()->RedisAsyncCommand(NULL,"AUTH %s",redis_password_.c_str()); + } else { + from_ops_conn_.get()->RedisAsyncCommand(NULL,"PING"); + } + + } + + void ToOpsConnUp() { + LOG(DEBUG, "ToOpsConnUp.. UP"); + evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsAuthenticated, this)); } void FromOpsConnUpPostProcess() { @@ -165,11 +223,7 @@ class OpServerProxy::OpServerImpl { void FromOpsConnUp() { LOG(DEBUG, "FromOpsConnUp.. UP"); - // Update connection info - ConnectionState::GetInstance()->Update(ConnectionType::REDIS, - "From", ConnectionStatus::UP, from_ops_conn_->Endpoint(), - std::string()); - evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUpPostProcess, this)); + evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsAuthenticated, this)); } void RAC_ConnectProcess(RacConnType type) { @@ -302,17 +356,23 @@ class OpServerProxy::OpServerImpl { return from_ops_conn_; } + const string get_redis_password() { + return redis_password_; + } + OpServerImpl(EventManager *evm, VizCollector *collector, - const std::string redis_uve_ip, - unsigned short redis_uve_port) : + const std::string redis_uve_ip, + unsigned short redis_uve_port, + const std::string redis_password) : redis_uve_(redis_uve_ip, redis_uve_port), evm_(evm), collector_(collector), started_(false), analytics_cb_proc_fn(NULL), - processor_cb_proc_fn(NULL) { - to_ops_conn_.reset(new RedisAsyncConnection(evm_, - redis_uve_ip, redis_uve_port, + processor_cb_proc_fn(NULL), + redis_password_(redis_password) { + to_ops_conn_.reset(new RedisAsyncConnection(evm_, + redis_uve_ip, redis_uve_port, boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUp, this), boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnDown, this))); // Update connection info @@ -320,8 +380,8 @@ class OpServerProxy::OpServerImpl { "To", ConnectionStatus::INIT, to_ops_conn_->Endpoint(), std::string()); to_ops_conn_.get()->RAC_Connect(); - from_ops_conn_.reset(new RedisAsyncConnection(evm_, - redis_uve_ip, redis_uve_port, + from_ops_conn_.reset(new RedisAsyncConnection(evm_, + redis_uve_ip, redis_uve_port, boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUp, this), boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnDown, this))); // Update connection info @@ -345,12 +405,15 @@ class OpServerProxy::OpServerImpl { RedisAsyncConnection::ClientAsyncCmdCbFn analytics_cb_proc_fn; RedisAsyncConnection::ClientAsyncCmdCbFn processor_cb_proc_fn; tbb::mutex rac_mutex_; + const std::string redis_password_; }; OpServerProxy::OpServerProxy(EventManager *evm, VizCollector *collector, const std::string& redis_uve_ip, - unsigned short redis_uve_port) { - impl_ = new OpServerImpl(evm, collector, redis_uve_ip, redis_uve_port); + unsigned short redis_uve_port, + const std::string& redis_password) { + impl_ = new OpServerImpl(evm, collector, redis_uve_ip, redis_uve_port, + redis_password); } OpServerProxy::~OpServerProxy() { @@ -398,8 +461,8 @@ OpServerProxy::UVEDelete(const std::string &type, return ret; } -bool -OpServerProxy::GetSeq(const string &source, const string &node_type, +bool +OpServerProxy::GetSeq(const string &source, const string &node_type, const string &module, const string &instance_id, std::map & seqReply) { @@ -408,21 +471,21 @@ OpServerProxy::GetSeq(const string &source, const string &node_type, if (!impl_->to_ops_conn()) return false; - return RedisProcessorExec::SyncGetSeq(impl_->redis_uve_.GetIp(), - impl_->redis_uve_.GetPort(), source, node_type, module, - instance_id, seqReply); + return RedisProcessorExec::SyncGetSeq(impl_->redis_uve_.GetIp(), + impl_->redis_uve_.GetPort(), impl_->get_redis_password(), + source, node_type, module, instance_id, seqReply); } -bool +bool OpServerProxy::DeleteUVEs(const string &source, const string &module, const string &node_type, const string &instance_id) { shared_ptr prac = impl_->to_ops_conn(); if (!(prac && prac->IsConnUp())) return false; - return RedisProcessorExec::SyncDeleteUVEs(impl_->redis_uve_.GetIp(), - impl_->redis_uve_.GetPort(), source, node_type, - module, instance_id); + return RedisProcessorExec::SyncDeleteUVEs(impl_->redis_uve_.GetIp(), + impl_->redis_uve_.GetPort(), impl_->get_redis_password(),source, + node_type, module, instance_id); } void diff --git a/src/analytics/OpServerProxy.h b/src/analytics/OpServerProxy.h index 704fd8055e1..e67eef4bb67 100644 --- a/src/analytics/OpServerProxy.h +++ b/src/analytics/OpServerProxy.h @@ -26,7 +26,8 @@ class OpServerProxy { // To construct this interface, pass in the hostname and port for Redis OpServerProxy(EventManager *evm, VizCollector *collector, - const std::string& redis_uve_ip, unsigned short redis_uve_port); + const std::string& redis_uve_ip, unsigned short redis_uve_port, + const std::string& redis_uve_password); OpServerProxy() : impl_(NULL) { } virtual ~OpServerProxy(); diff --git a/src/analytics/main.cc b/src/analytics/main.cc index 18adde62831..22106566d35 100644 --- a/src/analytics/main.cc +++ b/src/analytics/main.cc @@ -309,6 +309,7 @@ int main(int argc, char *argv[]) cassandra_ports, string("127.0.0.1"), options.redis_port(), + options.redis_password(), options.syslog_port(), options.sflow_port(), options.ipfix_port(), diff --git a/src/analytics/options.cc b/src/analytics/options.cc index fd3980c2aa2..55d66735076 100644 --- a/src/analytics/options.cc +++ b/src/analytics/options.cc @@ -136,6 +136,8 @@ void Options::Initialize(EventManager &evm, "Port of Redis-uve server") ("REDIS.server", opt::value()->default_value("127.0.0.1"), "IP address of Redis Server") + ("REDIS.password", opt::value()->default_value(""), + "password for Redis Server") ; config_file_options_.add(config); @@ -265,4 +267,5 @@ void Options::Process(int argc, char *argv[], GetOptValue(var_map, redis_port_, "REDIS.port"); GetOptValue(var_map, redis_server_, "REDIS.server"); + GetOptValue(var_map, redis_password_, "REDIS.password"); } diff --git a/src/analytics/options.h b/src/analytics/options.h index 162a88c5566..6e2de53f316 100644 --- a/src/analytics/options.h +++ b/src/analytics/options.h @@ -30,6 +30,7 @@ class Options { const uint16_t discovery_port() const { return discovery_port_; } const std::string redis_server() const { return redis_server_; } const uint16_t redis_port() const { return redis_port_; } + const std::string redis_password() const { return redis_password_; } const std::string hostname() const { return hostname_; } const std::string host_ip() const { return host_ip_; } const uint16_t http_server_port() const { return http_server_port_; } @@ -84,6 +85,7 @@ class Options { uint16_t discovery_port_; std::string redis_server_; uint16_t redis_port_; + std::string redis_password_; std::string hostname_; std::string host_ip_; uint16_t http_server_port_; diff --git a/src/analytics/redis_connection.cc b/src/analytics/redis_connection.cc index a6150fc52be..b891413002a 100644 --- a/src/analytics/redis_connection.cc +++ b/src/analytics/redis_connection.cc @@ -274,8 +274,6 @@ bool RedisAsyncConnection::SetClientAsyncCmdCb(ClientAsyncCmdCbFn cb_fn) { if (it == fns_map.end()) { assert(0); } else { - if ((!it->second) || (it->second->client_async_cmd_cbfn_ != NULL)) - assert(0); it->second->client_async_cmd_cbfn_ = cb_fn; } return true; diff --git a/src/analytics/redis_processor_vizd.cc b/src/analytics/redis_processor_vizd.cc index 15624366652..d3c09590701 100644 --- a/src/analytics/redis_processor_vizd.cc +++ b/src/analytics/redis_processor_vizd.cc @@ -125,8 +125,9 @@ RedisProcessorExec::UVEDelete(RedisAsyncConnection * rac, RedisProcessorIf *rpi, bool -RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redis_port, - const std::string &source, const std::string &node_type, +RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redis_port, + const std::string & redis_password, const std::string &source, + const std::string &node_type, const std::string &module, const std::string &instance_id, std::map & seqReply) { @@ -137,7 +138,21 @@ RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redi << ":" << module << ":" << instance_id); redisFree(c); return false; - } + } + + //Authenticate the context with password + if (!redis_password.empty()) { + redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s", + redis_password.c_str()); + if (reply->type == REDIS_REPLY_ERROR) { + LOG(ERROR, "Authentication to redis error"); + freeReplyObject(reply); + redisFree(c); + return false; + } + freeReplyObject(reply); + } + string lua_scr(reinterpret_cast(seqnum_lua), seqnum_lua_len); @@ -175,7 +190,8 @@ RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redi bool -RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port, +RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port, + const std::string &redis_password, const std::string &source, const std::string &node_type, const std::string &module, const std::string &instance_id) { @@ -186,8 +202,22 @@ RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short node_type << ":" << module << ":" << instance_id); redisFree(c); return false; - } + } + //Authenticate the context with password + if (!redis_password.empty()) { + std::string & local_password = const_cast(redis_password); + redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s", + local_password.c_str()); + if (reply->type == REDIS_REPLY_ERROR) { + LOG(ERROR, "Authentication to redis error"); + freeReplyObject(reply); + redisFree(c); + return false; + } + freeReplyObject(reply); + } + string lua_scr(reinterpret_cast(delrequest_lua), delrequest_lua_len); redisReply * reply = (redisReply *) redisCommand(c, @@ -216,7 +246,8 @@ RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short bool RedisProcessorExec::FlushUVEs(const std::string & redis_ip, - unsigned short redis_port) { + unsigned short redis_port, + const std::string & redis_password) { redisContext *c = redisConnect(redis_ip.c_str(), redis_port); if (c->err) { @@ -225,6 +256,20 @@ RedisProcessorExec::FlushUVEs(const std::string & redis_ip, return false; } + //Authenticate the context with password + if (!redis_password.empty()) { + std::string & local_password = const_cast(redis_password); + redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s", + local_password.c_str()); + if (reply->type == REDIS_REPLY_ERROR) { + LOG(ERROR, "Authentication to redis error"); + freeReplyObject(reply); + redisFree(c); + return false; + } + freeReplyObject(reply); + } + string lua_scr(reinterpret_cast(flushuves_lua), flushuves_lua_len); redisReply * reply = (redisReply *) redisCommand(c, "EVAL %s 0 %s", diff --git a/src/analytics/redis_processor_vizd.h b/src/analytics/redis_processor_vizd.h index 4c98b41c828..4f3904e91f8 100644 --- a/src/analytics/redis_processor_vizd.h +++ b/src/analytics/redis_processor_vizd.h @@ -34,18 +34,21 @@ class RedisProcessorExec { const std::string &key, int32_t seq); static bool - SyncGetSeq(const std::string & redis_ip, unsigned short redis_port, + SyncGetSeq(const std::string & redis_ip, unsigned short redis_port, + const std::string & redis_password, const std::string &source, const std::string &node_type, const std::string &module, const std::string &instance_id, std::map & seqReply); static bool - SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port, + SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port, + const std::string & redis_password, const std::string &source, const std::string &node_type, const std::string &module, const std::string &instance_id); static bool - FlushUVEs(const std::string & redis_ip, unsigned short redis_port); + FlushUVEs(const std::string & redis_ip, unsigned short redis_port, + const std::string & redis_password); }; class RedisProcessorIf { diff --git a/src/analytics/test/utils/mockredis/mockredis/mockredis.py b/src/analytics/test/utils/mockredis/mockredis/mockredis.py index 2ae389b7a0a..7c1137161ee 100644 --- a/src/analytics/test/utils/mockredis/mockredis/mockredis.py +++ b/src/analytics/test/utils/mockredis/mockredis/mockredis.py @@ -41,7 +41,7 @@ def redis_version(): ''' -def start_redis(port, exe=None): +def start_redis(port, exe=None, password=None): ''' Client uses this function to start an instance of redis Arguments: @@ -68,13 +68,15 @@ def start_redis(port, exe=None): ("port 6379", "port " + str(port)), ("/var/log/redis_6379.log", redisbase + "log"), ("/var/lib/redis/6379", redisbase + "cache")]) + if password: + replace_string_(redisbase + redis_conf,[("# requirepass foobared","requirepass " + password)]) if exe == None: exe = "redis-server" command = exe + " " + redisbase + redis_conf subprocess.Popen(command.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - r = redis.StrictRedis(host='localhost', port=port, db=0) + r = redis.StrictRedis(host='localhost', port=port, db=0, password=password) done = False while not done: try: @@ -87,14 +89,14 @@ def start_redis(port, exe=None): logging.info('Redis ready') -def stop_redis(port): +def stop_redis(port, password=None): ''' Client uses this function to stop an instance of redis This will only work for redis instances that were started by this module Arguments: cport : The Client Port for the instance of redis to be stopped ''' - r = redis.StrictRedis(host='localhost', port=port, db=0) + r = redis.StrictRedis(host='localhost', port=port, db=0, password=password) r.shutdown() del r redisbase = "/tmp/redis.%s.%d/" % (os.getenv('USER', 'None'), port) diff --git a/src/analytics/viz_collector.cc b/src/analytics/viz_collector.cc index 04fa545d95c..d4eb2482fec 100644 --- a/src/analytics/viz_collector.cc +++ b/src/analytics/viz_collector.cc @@ -31,13 +31,14 @@ VizCollector::VizCollector(EventManager *evm, unsigned short listen_port, const std::vector &cassandra_ips, const std::vector &cassandra_ports, const std::string &redis_uve_ip, unsigned short redis_uve_port, - int syslog_port, int sflow_port, int ipfix_port, - bool dup, int analytics_ttl) : + const std::string &redis_password, int syslog_port, int sflow_port, + int ipfix_port, bool dup, int analytics_ttl) : db_initializer_(new DbHandlerInitializer(evm, DbGlobalName(dup), -1, std::string("collector:DbIf"), boost::bind(&VizCollector::DbInitializeCb, this), cassandra_ips, cassandra_ports, analytics_ttl)), - osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port)), + osp_(new OpServerProxy(evm, this, redis_uve_ip, redis_uve_port, + redis_password)), ruleeng_(new Ruleeng(db_initializer_->GetDbHandler(), osp_.get())), collector_(new Collector(evm, listen_port, db_initializer_->GetDbHandler(), osp_.get(), diff --git a/src/analytics/viz_collector.h b/src/analytics/viz_collector.h index 63be79a64f6..de27179b32f 100644 --- a/src/analytics/viz_collector.h +++ b/src/analytics/viz_collector.h @@ -34,7 +34,8 @@ class VizCollector { const std::vector &cassandra_ips, const std::vector &cassandra_ports, const std::string &redis_uve_ip, unsigned short redis_uve_port, - int syslog_port, int sflow_port, int ipfix_port, bool dup=false, + const std::string &redis_password, int syslog_port, int sflow_port, + int ipfix_port, bool dup=false, int analytics_ttl=g_viz_constants.AnalyticsTTL); VizCollector(EventManager *evm, DbHandler *db_handler, Ruleeng *ruleeng, Collector *collector, OpServerProxy *osp); diff --git a/src/opserver/opserver.py b/src/opserver/opserver.py index 0aa30d71d90..5a4e0ee90b8 100644 --- a/src/opserver/opserver.py +++ b/src/opserver/opserver.py @@ -96,8 +96,9 @@ def obj_to_dict(obj): # end obj_to_dict -def redis_query_start(host, port, qid, inp): - redish = redis.StrictRedis(db=0, host=host, port=port) +def redis_query_start(host, port, redis_password, qid, inp): + redish = redis.StrictRedis(db=0, host=host, port=port, + password=redis_password) for key, value in inp.items(): redish.hset("QUERY:" + qid, key, json.dumps(value)) query_metadata = {} @@ -118,8 +119,9 @@ def redis_query_start(host, port, qid, inp): # end redis_query_start -def redis_query_status(host, port, qid): - redish = redis.StrictRedis(db=0, host=host, port=port) +def redis_query_status(host, port, redis_password, qid): + redish = redis.StrictRedis(db=0, host=host, port=port, + password=redis_password) resp = {"progress": 0} chunks = [] # For now, the number of chunks will be always 1 @@ -142,8 +144,9 @@ def redis_query_status(host, port, qid): # end redis_query_status -def redis_query_chunk_iter(host, port, qid, chunk_id): - redish = redis.StrictRedis(db=0, host=host, port=port) +def redis_query_chunk_iter(host, port, redis_password, qid, chunk_id): + redish = redis.StrictRedis(db=0, host=host, port=port, + password=redis_password) iters = 0 fin = False @@ -164,8 +167,8 @@ def redis_query_chunk_iter(host, port, qid, chunk_id): # end redis_query_chunk_iter -def redis_query_chunk(host, port, qid, chunk_id): - res_iter = redis_query_chunk_iter(host, port, qid, chunk_id) +def redis_query_chunk(host, port, redis_password, qid, chunk_id): + res_iter = redis_query_chunk_iter(host, port, redis_password, qid, chunk_id) dli = u'' starter = True @@ -199,9 +202,10 @@ def redis_query_chunk(host, port, qid, chunk_id): # end redis_query_chunk -def redis_query_result(host, port, qid): + +def redis_query_result(host, port, redis_password, qid): try: - status = redis_query_status(host, port, qid) + status = redis_query_status(host, port, redis_password, qid) except redis.exceptions.ConnectionError: # Update connection info ConnectionState.update(conn_type = ConnectionType.REDIS, @@ -225,7 +229,8 @@ def redis_query_result(host, port, qid): if status['progress'] == 100: for chunk in status['chunks']: chunk_id = int(chunk['href'].rsplit('/', 1)[1]) - for gen in redis_query_chunk(host, port, qid, chunk_id): + for gen in redis_query_chunk(host, port, redis_password, qid, + chunk_id): yield gen else: yield {} @@ -238,17 +243,16 @@ def redis_query_result(host, port, qid): return # end redis_query_result +def redis_query_result_dict(host, port, redis_password, qid): -def redis_query_result_dict(host, port, qid): - - stat = redis_query_status(host, port, qid) + stat = redis_query_status(host, port, redis_password, qid) prg = int(stat["progress"]) res = [] if (prg < 0) or (prg == 100): done = False - gen = redis_query_result(host, port, qid) + gen = redis_query_result(host, port, redis_password, qid) result = u'' while not done: try: @@ -276,9 +280,10 @@ def redis_query_info(redish, qid): class OpStateServer(object): - def __init__(self, logger): + def __init__(self, logger, redis_password=None): self._logger = logger self._redis_list = [] + self._redis_password= redis_password # end __init__ def update_redis_list(self, redis_list): @@ -294,7 +299,8 @@ def redis_publish(self, msg_type, destination, msg): # Publish message in the Redis bus for redis_server in self._redis_list: redis_inst = redis.StrictRedis(redis_server[0], - redis_server[1], db=0) + redis_server[1], db=0, + password=self._redis_password) try: redis_inst.publish('analytics', redis_msg) except redis.exceptions.ConnectionError: @@ -488,10 +494,11 @@ def __init__(self): self._post_common = self._http_post_common self._collector_pool = None - self._state_server = OpStateServer(self._logger) + self._state_server = OpStateServer(self._logger, self._args.redis_password) self._uve_server = UVEServer(('127.0.0.1', self._args.redis_server_port), - self._logger) + self._logger, + self._args.redis_password) self._LEVEL_LIST = [] for k in SandeshLevel._VALUES_TO_NAMES: @@ -657,6 +664,7 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): Eg. python opserver.py --host_ip 127.0.0.1 --redis_server_port 6379 --redis_query_port 6379 + --redis_password --collectors 127.0.0.1:8086 --cassandra_server_list 127.0.0.1:9160 --http_server_port 8090 @@ -698,6 +706,7 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): redis_opts = { 'redis_server_port' : 6379, 'redis_query_port' : 6379, + 'redis_password' : None, } disc_opts = { 'disc_server_ip' : None, @@ -735,6 +744,8 @@ def _parse_args(self, args_str=' '.join(sys.argv[1:])): parser.add_argument("--redis_query_port", type=int, help="Redis query port") + parser.add_argument("--redis_password", + help="Redis server password") parser.add_argument("--collectors", help="List of Collector IP addresses in ip:port format", nargs="+") @@ -860,6 +871,7 @@ def _query_status(self, request, qid): try: resp = redis_query_status(host=redis_query_ip, port=int(self._args.redis_query_port), + redis_password=self._args.redis_password, qid=qid) except redis.exceptions.ConnectionError: # Update connection info @@ -899,6 +911,7 @@ def _query_chunk(self, request, qid, chunk_id): done = False gen = redis_query_chunk(host=redis_query_ip, port=int(self._args.redis_query_port), + redis_password=self._args.redis_password, qid=qid, chunk_id=chunk_id) bottle.response.set_header('Content-Type', 'application/json') while not done: @@ -985,6 +998,7 @@ def _query(self, request): prg = redis_query_start('127.0.0.1', int(self._args.redis_query_port), + self._args.redis_password, qid, request.json) if prg is None: # Update connection info @@ -1055,6 +1069,7 @@ def _sync_query(self, request, qid): resp = redis_query_status(host='127.0.0.1', port=int( self._args.redis_query_port), + redis_password=self._args.redis_password, qid=qid) # We want to print progress only if it has changed @@ -1081,6 +1096,7 @@ def _sync_query(self, request, qid): done = False gen = redis_query_result(host='127.0.0.1', port=int(self._args.redis_query_port), + redis_password=self._args.redis_password, qid=qid) bottle.response.set_header('Content-Type', 'application/json') while not done: @@ -1159,7 +1175,8 @@ def show_queries(self): queries = {} try: redish = redis.StrictRedis(db=0, host='127.0.0.1', - port=int(self._args.redis_query_port)) + port=int(self._args.redis_query_port), + password=self._args.redis_password) pending_queries = redish.lrange('QUERYQ', 0, -1) pending_queries_info = [] for query_id in pending_queries: @@ -1176,6 +1193,7 @@ def show_queries(self): status = redis_query_status(host='127.0.0.1', port=int( self._args.redis_query_port), + redis_password=self._args.redis_password, qid=query_id) query_data = redis_query_info(redish, query_id) if status is None: @@ -1641,7 +1659,8 @@ def generator_info(self, table, column): redish = redis.StrictRedis( db=1, host=redis_uve[0], - port=redis_uve[1]) + port=redis_uve[1], + password=self._args.redis_password) try: for key in redish.smembers("NGENERATORS"): source = key.split(':')[0] diff --git a/src/opserver/test/SConscript b/src/opserver/test/SConscript index b40cdf77ff2..13048cf8da7 100644 --- a/src/opserver/test/SConscript +++ b/src/opserver/test/SConscript @@ -20,6 +20,7 @@ local_sources = [ 'analytics_db_test.py', 'overlay_to_underlay_mapper_test.py', # 'analytics_perftest.py', + 'analytics_redistest.py' ] local_sources_rules = [] for file in local_sources: diff --git a/src/opserver/test/analytics_redistest.py b/src/opserver/test/analytics_redistest.py new file mode 100755 index 00000000000..0b0b22b6f72 --- /dev/null +++ b/src/opserver/test/analytics_redistest.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python + +# +# Copyright (c) 2015 Juniper Networks, Inc. All rights reserved. +# + +# +# analytics_redistest.py +# +# System tests for analytics redis +# + +import sys +builddir = sys.path[0] + '/../..' +import threading +threading._DummyThread._Thread__stop = lambda x: 42 +import signal +import gevent +from gevent import monkey +monkey.patch_all() +import os +import unittest +import testtools +import fixtures +import socket +from utils.analytics_fixture import AnalyticsFixture +from mockcassandra import mockcassandra +from mockredis import mockredis +import logging +import time +from opserver.sandesh.viz.constants import * + +logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s') + +class AnalyticsTest(testtools.TestCase, fixtures.TestWithFixtures): + + @classmethod + def setUpClass(cls): + if (os.getenv('LD_LIBRARY_PATH', '').find('build/lib') < 0): + if (os.getenv('DYLD_LIBRARY_PATH', '').find('build/lib') < 0): + assert(False) + + cls.cassandra_port = AnalyticsTest.get_free_port() + mockcassandra.start_cassandra(cls.cassandra_port) + cls.redis_port = AnalyticsTest.get_free_port() + mockredis.start_redis( + cls.redis_port, builddir+'/testroot/bin/redis-server') + + + @classmethod + def tearDownClass(cls): + mockcassandra.stop_cassandra(cls.cassandra_port) + mockredis.stop_redis(cls.redis_port) + pass + + #@unittest.skip('verify source/module list') + def test_00_table_source_module_list_with_password(self): + ''' + This test verifies /analytics/table//column-values/Source + and /analytics/table/
/column-values/ModuleId + ''' + logging.info('*** test_00_source_module_list ***') + + vizd_obj = self.useFixture( + AnalyticsFixture(logging, builddir, -1, + self.__class__.cassandra_port, + collector_ha_test=True, redis_password='contrail')) + self.verify_source_module(vizd_obj) + return True + #end test_00_table_source_module_list + + #@unittest.skip('verify redis-uve restart') + def test_01_redis_uve_restart_with_password(self): + logging.info('*** test_01_redis_uve_restart ***') + + vizd_obj = self.useFixture( + AnalyticsFixture(logging, + builddir, -1, + self.__class__.cassandra_port,redis_password='contrail')) + self.verify_uve_resync(vizd_obj) + return True + # end test_01_redis_uve_restart + + #@unittest.skip('verify source/module list') + def test_02_table_source_module_list(self): + ''' + This test verifies /analytics/table/
/column-values/Source + and /analytics/table/
/column-values/ModuleId + ''' + logging.info('*** test_02_source_module_list ***') + + vizd_obj = self.useFixture( + AnalyticsFixture(logging, builddir, -1, + self.__class__.cassandra_port, + collector_ha_test=True)) + self.verify_source_module(vizd_obj) + return True + #end test_02_table_source_module_list + + #@unittest.skip('verify redis-uve restart') + def test_03_redis_uve_restart(self): + logging.info('*** test_03_redis_uve_restart ***') + + vizd_obj = self.useFixture( + AnalyticsFixture(logging, + builddir, -1, + self.__class__.cassandra_port)) + self.verify_uve_resync(vizd_obj) + # end test_03_redis_uve_restart + + def verify_source_module(self, vizd_obj): + assert vizd_obj.verify_on_setup() + assert vizd_obj.verify_collector_obj_count() + exp_genlist1 = ['contrail-collector', 'contrail-analytics-api', 'contrail-query-engine'] + assert vizd_obj.verify_generator_list(vizd_obj.collectors[0], + exp_genlist1) + exp_genlist2 = ['contrail-collector'] + assert vizd_obj.verify_generator_list(vizd_obj.collectors[1], + exp_genlist2) + exp_src_list = [col.hostname for col in vizd_obj.collectors] + exp_mod_list = exp_genlist1 + assert vizd_obj.verify_table_source_module_list(exp_src_list, + exp_mod_list) + # stop the second redis_uve instance and verify the src/module list + vizd_obj.redis_uves[1].stop() + exp_src_list = [vizd_obj.collectors[0].hostname] + exp_mod_list = exp_genlist1 + assert vizd_obj.verify_table_source_module_list(exp_src_list, + exp_mod_list) + + def verify_uve_resync(self, vizd_obj): + assert vizd_obj.verify_on_setup() + assert vizd_obj.verify_collector_redis_uve_connection( + vizd_obj.collectors[0]) + assert vizd_obj.verify_opserver_redis_uve_connection( + vizd_obj.opserver) + # verify redis-uve list + host = socket.gethostname() + gen_list = [host+':Analytics:contrail-collector:0', + host+':Analytics:contrail-query-engine:0', + host+':Analytics:contrail-analytics-api:0'] + assert vizd_obj.verify_generator_uve_list(gen_list) + # stop redis-uve + vizd_obj.redis_uves[0].stop() + assert vizd_obj.verify_collector_redis_uve_connection( + vizd_obj.collectors[0], False) + assert vizd_obj.verify_opserver_redis_uve_connection( + vizd_obj.opserver, False) + # start redis-uve and verify that contrail-collector and Opserver are + # connected to the redis-uve + vizd_obj.redis_uves[0].start() + assert vizd_obj.verify_collector_redis_uve_connection( + vizd_obj.collectors[0]) + assert vizd_obj.verify_opserver_redis_uve_connection( + vizd_obj.opserver) + # verify that UVEs are resynced with redis-uve + assert vizd_obj.verify_generator_uve_list(gen_list) + + @staticmethod + def get_free_port(): + cs = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + cs.bind(("", 0)) + cport = cs.getsockname()[1] + cs.close() + return cport + +def _term_handler(*_): + raise IntSignal() + +if __name__ == '__main__': + gevent.signal(signal.SIGINT,_term_handler) + unittest.main(catchbreak=True) + diff --git a/src/opserver/test/analytics_systest.py b/src/opserver/test/analytics_systest.py index d0871b973c3..d0323cb1154 100755 --- a/src/opserver/test/analytics_systest.py +++ b/src/opserver/test/analytics_systest.py @@ -458,84 +458,13 @@ def test_08_send_tracebuffer(self): ModuleNames[Module.COLLECTOR], 'UveTrace') #end test_08_send_tracebuffer - #@unittest.skip('verify source/module list') - def test_09_table_source_module_list(self): - ''' - This test verifies /analytics/table/
/column-values/Source - and /analytics/table/
/column-values/ModuleId - ''' - logging.info('*** test_09_source_module_list ***') - if AnalyticsTest._check_skip_test() is True: - return True - - vizd_obj = self.useFixture( - AnalyticsFixture(logging, builddir, -1, - self.__class__.cassandra_port, - collector_ha_test=True)) - assert vizd_obj.verify_on_setup() - assert vizd_obj.verify_collector_obj_count() - exp_genlist1 = ['contrail-collector', 'contrail-analytics-api', 'contrail-query-engine'] - assert vizd_obj.verify_generator_list(vizd_obj.collectors[0], - exp_genlist1) - exp_genlist2 = ['contrail-collector'] - assert vizd_obj.verify_generator_list(vizd_obj.collectors[1], - exp_genlist2) - exp_src_list = [col.hostname for col in vizd_obj.collectors] - exp_mod_list = exp_genlist1 - assert vizd_obj.verify_table_source_module_list(exp_src_list, - exp_mod_list) - # stop the second redis_uve instance and verify the src/module list - vizd_obj.redis_uves[1].stop() - exp_src_list = [vizd_obj.collectors[0].hostname] - exp_mod_list = exp_genlist1 - assert vizd_obj.verify_table_source_module_list(exp_src_list, - exp_mod_list) - #end test_09_table_source_module_list - - #@unittest.skip('verify redis-uve restart') - def test_10_redis_uve_restart(self): - logging.info('*** test_10_redis_uve_restart ***') - if AnalyticsTest._check_skip_test() is True: - return True - - vizd_obj = self.useFixture( - AnalyticsFixture(logging, builddir, -1, - self.__class__.cassandra_port)) - assert vizd_obj.verify_on_setup() - assert vizd_obj.verify_collector_redis_uve_connection( - vizd_obj.collectors[0]) - assert vizd_obj.verify_opserver_redis_uve_connection( - vizd_obj.opserver) - # verify redis-uve list - host = socket.gethostname() - gen_list = [host+':Analytics:contrail-collector:0', - host+':Analytics:contrail-query-engine:0', - host+':Analytics:contrail-analytics-api:0'] - assert vizd_obj.verify_generator_uve_list(gen_list) - # stop redis-uve - vizd_obj.redis_uves[0].stop() - assert vizd_obj.verify_collector_redis_uve_connection( - vizd_obj.collectors[0], False) - assert vizd_obj.verify_opserver_redis_uve_connection( - vizd_obj.opserver, False) - # start redis-uve and verify that contrail-collector and Opserver are - # connected to the redis-uve - vizd_obj.redis_uves[0].start() - assert vizd_obj.verify_collector_redis_uve_connection( - vizd_obj.collectors[0]) - assert vizd_obj.verify_opserver_redis_uve_connection( - vizd_obj.opserver) - # verify that UVEs are resynced with redis-uve - assert vizd_obj.verify_generator_uve_list(gen_list) - # end test_10_redis_uve_restart - #@unittest.skip(' where queries with different conditions') - def test_11_where_clause_query(self): + def test_09_where_clause_query(self): ''' This test is used to check the working of integer fields in the where query ''' - logging.info("*** test_11_where_clause_query ***") + logging.info("*** test_09_where_clause_query ***") if AnalyticsTest._check_skip_test() is True: return True @@ -559,14 +488,14 @@ def test_11_where_clause_query(self): generator_obj.generate_flow_samples() assert vizd_obj.verify_where_query_prefix(generator_obj) return True; - #end test_11_where_clause_query + #end test_09_where_clause_query #@unittest.skip('verify ObjectValueTable query') - def test_12_verify_object_value_table_query(self): + def test_10_verify_object_value_table_query(self): ''' This test verifies the ObjectValueTable query. ''' - logging.info('*** test_12_verify_object_value_table_query ***') + logging.info('*** test_10_verify_object_value_table_query ***') if AnalyticsTest._check_skip_test() is True: return True @@ -591,15 +520,15 @@ def test_12_verify_object_value_table_query(self): msg_count=1) assert vizd_obj.verify_object_value_table_query(table='ObjectVMTable', exp_object_values=['vm11&>']) - # end test_12_verify_object_table_query + # end test_10_verify_object_table_query #@unittest.skip('verify ObjectTable query') - def test_13_verify_syslog_table_query(self): + def test_11_verify_syslog_table_query(self): ''' This test verifies the Syslog query. ''' import logging.handlers - logging.info('*** test_13_verify_syslog_table_query ***') + logging.info('*** test_11_verify_syslog_table_query ***') if AnalyticsTest._check_skip_test() is True: return True @@ -628,17 +557,17 @@ def test_13_verify_syslog_table_query(self): syslogger.critical(line) assert vizd_obj.verify_keyword_query(line, ['football', 'baseball']) - # end test_13_verify_syslog_table_query + # end test_11_verify_syslog_table_query #@unittest.skip('Skipping Fieldnames table test') - def test_14_fieldname_table(self): + def test_12_fieldname_table(self): ''' This test starts vizd and a python generators that simulates vrouter and sends messages. It uses the test class' cassandra instance.Then it checks that the Field names table got the values. ''' - logging.info("*** test_14_fieldname_table ***") + logging.info("*** test_12_fieldname_table ***") if AnalyticsTest._check_skip_test() is True: return True @@ -656,7 +585,7 @@ def test_14_fieldname_table(self): generator_obj.generate_intervn() assert vizd_obj.verify_fieldname_table() return True - # end test_14_fieldname_table + # end test_12_fieldname_table @staticmethod def get_free_port(): diff --git a/src/opserver/test/utils/analytics_fixture.py b/src/opserver/test/utils/analytics_fixture.py index d5ab0febc37..ed70514572f 100644 --- a/src/opserver/test/utils/analytics_fixture.py +++ b/src/opserver/test/utils/analytics_fixture.py @@ -76,6 +76,9 @@ def __init__(self, analytics_fixture, redis_uve, self._redis_uve = redis_uve self._logger = logger self._is_dup = is_dup + self.redis_password = None + if self._redis_uve.password: + self.redis_password = str(self._redis_uve.password) if self._is_dup is True: self.hostname = self.hostname+'dup' self._generator_id = self.hostname+':'+NodeTypeNames[NodeType.ANALYTICS]+\ @@ -124,6 +127,9 @@ def start(self): '--DEFAULT.syslog_port', str(self.syslog_port), '--DEFAULT.ipfix_port', str(self.ipfix_port), '--DEFAULT.log_file', self._log_file] + if self.redis_password: + args.append('--REDIS.password') + args.append(self.redis_password) if self._is_dup is True: args.append('--DEFAULT.dup') if (self.protobuf_port): @@ -172,7 +178,7 @@ def stop(self): # end class Collector class OpServer(object): - def __init__(self, primary_collector, secondary_collector, redis_port, + def __init__(self, primary_collector, secondary_collector, redis_port, analytics_fixture, logger, is_dup=False): self.primary_collector = primary_collector self.secondary_collector = secondary_collector @@ -183,6 +189,9 @@ def __init__(self, primary_collector, secondary_collector, redis_port, self._instance = None self._logger = logger self._is_dup = is_dup + self.redis_password = None + if self.analytics_fixture.redis_uves[0].password: + self.redis_password = str(self.analytics_fixture.redis_uves[0].password) if self._is_dup is True: self.hostname = self.hostname+'dup' self._generator_id = self.hostname+':'+NodeTypeNames[NodeType.ANALYTICS]+\ @@ -209,13 +218,16 @@ def start(self): args = ['python', self.analytics_fixture.builddir + \ '/analytics_test/bin/contrail-analytics-api', '--redis_server_port', str(self._redis_port), - '--redis_query_port', + '--redis_query_port', str(self.analytics_fixture.redis_uves[0].port), '--cassandra_server_list', '127.0.0.1:' + str(self.analytics_fixture.cassandra_port), '--http_server_port', str(self.http_port), '--log_file', self._log_file, '--rest_api_port', str(self.listen_port)] + if self.analytics_fixture.redis_uves[0].password: + args.append('--redis_password') + args.append(self.analytics_fixture.redis_uves[0].password) args.append('--redis_uve_list') for redis_uve in self.analytics_fixture.redis_uves: args.append('127.0.0.1:'+str(redis_uve.port)) @@ -276,6 +288,9 @@ def __init__(self, primary_collector, secondary_collector, self.hostname = socket.gethostname() self._instance = None self._logger = logger + self.redis_password = None + if self.analytics_fixture.redis_uves[0].password: + self.redis_password = str(self.analytics_fixture.redis_uves[0].password) self._generator_id = self.hostname+':'+NodeTypeNames[NodeType.ANALYTICS]+\ ':'+ModuleNames[Module.QUERY_ENGINE]+':0' # end __init__ @@ -304,6 +319,9 @@ def start(self, analytics_start_time=None): '--DEFAULT.log_local', '--DEFAULT.log_level', 'SYS_DEBUG', '--DEFAULT.log_file', self._log_file, '--DEFAULT.collectors', self.primary_collector] + if self.redis_password: + args.append('--REDIS.password') + args.append(self.redis_password) if self.secondary_collector is not None: args.append('--DEFAULT.collectors') args.append(self.secondary_collector) @@ -345,13 +363,14 @@ def stop(self): # end class QueryEngine class Redis(object): - def __init__(self, port, builddir): + def __init__(self, port, builddir, password=None): self.builddir = builddir self.port = port if self.port == -1: self.use_global = False else: self.use_global = True + self.password = password self.running = False # end __init__ @@ -362,16 +381,17 @@ def start(self): if self.port == -1: self.port = AnalyticsFixture.get_free_port() mockredis.start_redis( - self.port, self.builddir+'/testroot/bin/redis-server') + self.port, self.builddir+'/testroot/bin/redis-server', + self.password) else: - redish = redis.StrictRedis("127.0.0.1", self.port) + redish = redis.StrictRedis("127.0.0.1", self.port, password=self.password) redish.flushall() # end start def stop(self): assert(not self.use_global) if self.running: - mockredis.stop_redis(self.port) + mockredis.stop_redis(self.port, self.password) self.running = False #end stop @@ -381,7 +401,7 @@ class AnalyticsFixture(fixtures.Fixture): def __init__(self, logger, builddir, redis_port, cassandra_port, ipfix_port = False, syslog_port = False, protobuf_port = False, - noqed=False, collector_ha_test=False): + noqed=False, collector_ha_test=False, redis_password=None): self.builddir = builddir self.redis_port = redis_port @@ -392,11 +412,13 @@ def __init__(self, logger, builddir, redis_port, cassandra_port, self.logger = logger self.noqed = noqed self.collector_ha_test = collector_ha_test + self.redis_password = redis_password def setUp(self): super(AnalyticsFixture, self).setUp() - self.redis_uves = [Redis(self.redis_port, self.builddir)] + self.redis_uves = [Redis(self.redis_port, self.builddir, + self.redis_password)] self.redis_uves[0].start() self.opserver = None @@ -404,7 +426,7 @@ def setUp(self): self.collectors = [Collector(self, self.redis_uves[0], self.logger, ipfix_port = self.ipfix_port, syslog_port = self.syslog_port, - protobuf_port = self.protobuf_port)] + protobuf_port = self.protobuf_port)] if not self.collectors[0].start(): self.logger.error("Collector did NOT start") return @@ -413,7 +435,8 @@ def setUp(self): primary_collector = self.collectors[0].get_addr() secondary_collector = None if self.collector_ha_test: - self.redis_uves.append(Redis(-1, self.builddir)) + self.redis_uves.append(Redis(-1, self.builddir, + self.redis_password)) self.redis_uves[1].start() self.collectors.append(Collector(self, self.redis_uves[1], self.logger, is_dup=True)) @@ -1749,7 +1772,7 @@ def verify_keyword_query(self, line, keywords=[]): def verify_generator_list_in_redis(self, redis_uve, exp_gen_list): self.logger.info('Verify generator list in redis') try: - r = redis.StrictRedis(db=1, port=redis_uve.port) + r = redis.StrictRedis(db=1, port=redis_uve.port, password=redis_uve.password) gen_list = r.smembers('NGENERATORS') except Exception as e: self.logger.error('Failed to get generator list from redis - %s' % e) diff --git a/src/opserver/uveserver.py b/src/opserver/uveserver.py index ccf6b22c657..c2ed37cc567 100644 --- a/src/opserver/uveserver.py +++ b/src/opserver/uveserver.py @@ -14,27 +14,32 @@ import xmltodict import redis import datetime +import sys from opserver_util import OpServerUtils import re from gevent.coros import BoundedSemaphore from pysandesh.util import UTCTimestampUsec +from pysandesh.connection_info import ConnectionState class UVEServer(object): - def __init__(self, redis_uve_server, logger): + def __init__(self, redis_uve_server, logger, redis_password=None): self._local_redis_uve = redis_uve_server self._redis_uve_list = [] self._logger = logger self._sem = BoundedSemaphore(1) self._redis = None + self._redis_password = redis_password if self._local_redis_uve: self._redis = redis.StrictRedis(self._local_redis_uve[0], - self._local_redis_uve[1], db=1) + self._local_redis_uve[1], + password=self._redis_password, + db=1) #end __init__ def update_redis_uve_list(self, redis_uve_list): self._redis_uve_list = redis_uve_list - # end update_redis_uve_list + # end update_redis_uve_list def fill_redis_uve_info(self, redis_uve_info): redis_uve_info.ip = self._local_redis_uve[0] @@ -127,6 +132,14 @@ def run(self): self._redis.hset("PREVIOUS:" + key + ":" + typ, attr, vstr) self._redis.delete(value) + except redis.exceptions.ResponseError: + #send redis connection down msg. Coule be bcos of authentication + ConnectionState.update(conn_type = ConnectionType.REDIS, + name = 'UVE', status = ConnectionStatus.DOWN, + message = 'UVE result : Connection Error', + server_addrs = ['%s:%d' % (self._local_redis_uve[0], + self._local_redis_uve[1])]) + sys.exit() except redis.exceptions.ConnectionError: if lck: self._sem.release() @@ -197,7 +210,8 @@ def get_uve(self, key, flat, sfilter=None, mfilter=None, tfilter=None, multi=Fal statdict = {} for redis_uve in self._redis_uve_list: redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], db=1) + port=redis_uve[1], + password=self._redis_password, db=1) try: qmap = {} for origs in redish.smembers("ORIGINS:" + key): @@ -398,7 +412,8 @@ def get_uve_list(self, key, kfilter, sfilter, patterns.add(self.get_uve_regex(filt)) for redis_uve in self._redis_uve_list: redish = redis.StrictRedis(host=redis_uve[0], - port=redis_uve[1], db=1) + port=redis_uve[1], + password=self._redis_password, db=1) try: for entry in redish.smembers("TABLE:" + key): info = (entry.split(':', 1)[1]).rsplit(':', 5) diff --git a/src/query_engine/QEOpServerProxy.cc b/src/query_engine/QEOpServerProxy.cc index 951079d71ae..b3839a48311 100644 --- a/src/query_engine/QEOpServerProxy.cc +++ b/src/query_engine/QEOpServerProxy.cc @@ -591,6 +591,8 @@ class QEOpServerProxy::QEOpServerImpl { if (!connState_[cnum]) { QE_LOG_NOQID(DEBUG, "ConnUp SetCB" << cnum); + cb_proc_fn_[cnum] = boost::bind(&QEOpServerImpl::CallbackProcess, + this, cnum, _1, _2, _3); conns_[cnum].get()->SetClientAsyncCmdCb(cb_proc_fn_[cnum]); connState_[cnum] = true; } @@ -624,6 +626,18 @@ class QEOpServerProxy::QEOpServerImpl { " . No Redis Connection"); return; } + //Authenticate the context with password + if (!redis_password_.empty()) { + redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s", + redis_password_.c_str()); + if (reply->type == REDIS_REPLY_ERROR) { + QE_LOG_NOQID(ERROR, "Authentication to redis error"); + freeReplyObject(reply); + redisFree(c); + return; + } + freeReplyObject(reply); + } char stat[80]; string key = "REPLY:" + qid; @@ -653,6 +667,19 @@ class QEOpServerProxy::QEOpServerImpl { return; } + //Authenticate the context with password + if ( !redis_password_.empty()) { + redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s", + redis_password_.c_str()); + if (reply->type == REDIS_REPLY_ERROR) { + QE_LOG_NOQID(ERROR, "Authentication to redis error"); + freeReplyObject(reply); + redisFree(c); + return; + } + freeReplyObject(reply); + } + string key = "QUERY:" + qid; redisReply * reply = (redisReply *) redisCommand(c, "hgetall %s", key.c_str()); @@ -756,14 +783,29 @@ class QEOpServerProxy::QEOpServerImpl { } + void ConnUpPrePostProcess(uint8_t cnum) { + //Assign callback for AUTH command + cb_proc_fn_[cnum] = boost::bind(&QEOpServerImpl::ConnectCallbackProcess, + this, cnum, _1, _2, _3); + conns_[cnum].get()->SetClientAsyncCmdCb(cb_proc_fn_[cnum]); + //Send AUTH command + RedisAsyncConnection * rac = conns_[cnum].get(); + if (!redis_password_.empty()) { + RedisAsyncArgCommand(rac, NULL, + list_of(string("AUTH"))(redis_password_.c_str())); + } else { + RedisAsyncArgCommand(rac, NULL, + list_of(string("PING"))); + } + } + void ConnUp(uint8_t cnum) { - QE_LOG_NOQID(DEBUG, "ConnUp.. UP " << cnum); - ConnectionState::GetInstance()->Update(ConnectionType::REDIS, - "Query", ConnectionStatus::UP, conns_[cnum]->Endpoint(), - std::string()); + std::ostringstream ostr; + ostr << "ConnUp.. UP " << cnum; + QE_LOG_NOQID(DEBUG, ostr.str()); qosp_->evm_->io_service()->post( - boost::bind(&QEOpServerImpl::ConnUpPostProcess, - this, cnum)); + boost::bind(&QEOpServerImpl::ConnUpPrePostProcess, + this, cnum)); } void ConnDown(uint8_t cnum) { @@ -776,6 +818,22 @@ class QEOpServerProxy::QEOpServerImpl { conns_[cnum].get())); } + void ConnectCallbackProcess(uint8_t cnum, const redisAsyncContext *c, void *r, void *privdata) { + QE_LOG_NOQID(DEBUG,"In ConnectCallbackProcess.."); + redisReply reply = *reinterpret_cast(r); + if (reply.type != REDIS_REPLY_ERROR) { + ConnectionState::GetInstance()->Update(ConnectionType::REDIS, + "Query", ConnectionStatus::UP, conns_[cnum]->Endpoint(), + std::string()); + qosp_->evm_->io_service()->post( + boost::bind(&QEOpServerImpl::ConnUpPostProcess, + this, cnum)); + }else { + QE_LOG_NOQID(ERROR,"In connectCallbackProcess.. Error"); + QE_ASSERT(reply.type != REDIS_REPLY_ERROR); + } + } + void CallbackProcess(uint8_t cnum, const redisAsyncContext *c, void *r, void *privdata) { //QE_TRACE_NOQID(DEBUG, "Redis CB" << cnum); @@ -827,11 +885,13 @@ class QEOpServerProxy::QEOpServerImpl { } - QEOpServerImpl(const string & redis_host, uint16_t port, QEOpServerProxy * qosp, + QEOpServerImpl(const string & redis_host, uint16_t port, + const string & redis_password, QEOpServerProxy * qosp, int max_tasks) : hostname_(boost::asio::ip::host_name()), redis_host_(redis_host), port_(port), + redis_password_(redis_password), qosp_(qosp), max_tasks_(max_tasks) { for (int i=0; i conns_[kConnections+1]; RedisAsyncConnection::ClientAsyncCmdCbFn cb_proc_fn_[kConnections+1]; @@ -880,10 +941,12 @@ class QEOpServerProxy::QEOpServerImpl { }; QEOpServerProxy::QEOpServerProxy(EventManager *evm, QueryEngine *qe, - const string & hostname, uint16_t port, int max_tasks) : + const string & hostname, uint16_t port, const string & redis_password, + int max_tasks) : evm_(evm), qe_(qe), - impl_(new QEOpServerImpl(hostname, port, this, max_tasks)) {} + impl_(new QEOpServerImpl(hostname, port, redis_password, this, + max_tasks)) {} QEOpServerProxy::~QEOpServerProxy() {} diff --git a/src/query_engine/QEOpServerProxy.h b/src/query_engine/QEOpServerProxy.h index efaf7a9098a..0f0761f0d36 100644 --- a/src/query_engine/QEOpServerProxy.h +++ b/src/query_engine/QEOpServerProxy.h @@ -42,7 +42,8 @@ class QEOpServerProxy { // when the OpServer issues a Query. QEOpServerProxy(EventManager *evm, QueryEngine *qe, const std::string & hostname, - uint16_t port, int max_chunks = nMaxChunks); + uint16_t port, const std::string & redis_password, + int max_chunks = nMaxChunks); virtual ~QEOpServerProxy(); // When the result of a Query is available, the client should diff --git a/src/query_engine/options.cc b/src/query_engine/options.cc index 4a39ed1ce8d..3c1ad5785b6 100644 --- a/src/query_engine/options.cc +++ b/src/query_engine/options.cc @@ -128,6 +128,8 @@ void Options::Initialize(EventManager &evm, "Port of Redis-uve server") ("REDIS.server", opt::value()->default_value("127.0.0.1"), "IP address of Redis Server") + ("REDIS.password", opt::value()->default_value(""), + "password for Redis Server") ; config_file_options_.add(config); @@ -231,4 +233,5 @@ void Options::Process(int argc, char *argv[], GetOptValue(var_map, redis_port_, "REDIS.port"); GetOptValue(var_map, redis_server_, "REDIS.server"); + GetOptValue(var_map, redis_password_, "REDIS.password"); } diff --git a/src/query_engine/options.h b/src/query_engine/options.h index b63eb0f3f06..266ad304182 100644 --- a/src/query_engine/options.h +++ b/src/query_engine/options.h @@ -24,6 +24,7 @@ class Options { const uint16_t discovery_port() const { return discovery_port_; } const std::string redis_server() const { return redis_server_; } const uint16_t redis_port() const { return redis_port_; } + const std::string redis_password() const { return redis_password_; } const std::string hostname() const { return hostname_; } const std::string host_ip() const { return host_ip_; } const uint16_t http_server_port() const { return http_server_port_; } @@ -65,6 +66,7 @@ class Options { uint16_t discovery_port_; std::string redis_server_; uint16_t redis_port_; + std::string redis_password_; std::string hostname_; std::string host_ip_; uint16_t http_server_port_; diff --git a/src/query_engine/qed.cc b/src/query_engine/qed.cc index f217a0d93dd..da0aa9052d9 100644 --- a/src/query_engine/qed.cc +++ b/src/query_engine/qed.cc @@ -263,6 +263,7 @@ main(int argc, char *argv[]) { qe = new QueryEngine(&evm, options.redis_server(), options.redis_port(), + options.redis_password(), max_tasks, options.max_slice(), options.analytics_data_ttl()); @@ -272,6 +273,7 @@ main(int argc, char *argv[]) { cassandra_ports, options.redis_server(), options.redis_port(), + options.redis_password(), max_tasks, options.max_slice(), options.analytics_data_ttl(), diff --git a/src/query_engine/query.cc b/src/query_engine/query.cc index 3bd1fa15754..524aa4d0d5c 100644 --- a/src/query_engine/query.cc +++ b/src/query_engine/query.cc @@ -988,9 +988,10 @@ AnalyticsQuery::AnalyticsQuery(std::string qid, GenDb::GenDbIf *dbif, QueryEngine::QueryEngine(EventManager *evm, const std::string & redis_ip, unsigned short redis_port, - int max_tasks, int max_slice, uint64_t anal_ttl) : + const std::string & redis_password, int max_tasks, int max_slice, + uint64_t anal_ttl) : qosp_(new QEOpServerProxy(evm, - this, redis_ip, redis_port, max_tasks)), + this, redis_ip, redis_port, redis_password, max_tasks)), evm_(evm), cassandra_ports_(0) { @@ -1011,13 +1012,13 @@ QueryEngine::QueryEngine(EventManager *evm, std::vector cassandra_ips, std::vector cassandra_ports, const std::string & redis_ip, unsigned short redis_port, - int max_tasks, int max_slice, uint64_t anal_ttl, - uint64_t start_time) : + const std::string & redis_password, int max_tasks, int max_slice, + uint64_t anal_ttl, uint64_t start_time) : dbif_(GenDb::GenDbIf::GenDbIfImpl( boost::bind(&QueryEngine::db_err_handler, this), cassandra_ips, cassandra_ports, 0, "QueryEngine", true)), qosp_(new QEOpServerProxy(evm, - this, redis_ip, redis_port, max_tasks)), + this, redis_ip, redis_port, redis_password, max_tasks)), evm_(evm), cassandra_ports_(cassandra_ports), cassandra_ips_(cassandra_ips) diff --git a/src/query_engine/query.h b/src/query_engine/query.h index a13c377df9c..3c2f87488dc 100644 --- a/src/query_engine/query.h +++ b/src/query_engine/query.h @@ -887,13 +887,15 @@ class QueryEngine { QueryEngine(EventManager *evm, std::vector cassandra_ips, std::vector cassandra_ports, - const std::string & redis_ip, unsigned short redis_port, - int max_tasks, int max_slice, uint64_t anal_ttl, + const std::string & redis_ip, unsigned short redis_port, + const std::string & redis_password, + int max_tasks, int max_slice, uint64_t anal_ttl, uint64_t start_time=0); QueryEngine(EventManager *evm, const std::string & redis_ip, unsigned short redis_port, - int max_tasks, int max_slice, uint64_t anal_ttl); + const std::string & redis_password, int max_tasks, + int max_slice, uint64_t anal_ttl); int QueryPrepare(QueryParams qp,