Skip to content

Commit

Permalink
his fix adds authentication support to REDIS. Analytics, QE
Browse files Browse the repository at this point in the history
     and contrail-api are affected by this change. Here is a
     summary of the changes in the individual files in different
     modules: Analytics: src/analytics/OpServerProxy.cc:

            Change ToOpsConnUp to call Authenticate function before
            calling ToOpsConnUpPostProcess.
            Add callback to handle reply for authentication command
             for both channels.
            If authentication succeeds make a call to
             ToOpsConnUpPostProcess.
            Change some of the API's to include password as argument

    src/analytics/redis_processor_vizd.cc:
    src/analytics/main.cc:
    src/analytics/viz_collector.cc:
      change the api to pass the redis password as well

    src/analytics/test/utils/mockredis/mockredis/mockredis.py:
     Before starting redis instance modify redis.conf to include
      password, if specified
     nclude password argument to the start and stop api's

    src/analytics/redis_connection.cc:
      Removed a condition which required async_connection_context
        should not have any callback set.This was causing a conflict.

    QueryEngine:

    src/query_engine/QEOpServerProxy.cc:

            Places where redisSync commands where getting called, we first
             make a call to AUTH command
            For commands invoked in Async context, introduce 2 new API's
             viz., ConnAuth,ConnUpPrePostProcess, AuthCallbackProcess
             are called to authenticate the context first
            Make changes to existing API like QEOpServerImpl,QEOpServerProxy
             to include password argument

    src/query_engine/qed.cc:
   src/query_engine/options.cc:
    src/query_engine/query.cc:
    src/query_engine/rac_alloc.cc:
            Make changes in existing API to take redis_passwd as
             additional argument
    analytics-api:

    src/opserver/opserver.py:
            redis_query_start, redis_query_status, redis_query_chunk_iter,
             redis_query_chunk, redis_query_result, redis_query_result_dict,
             OpStateServer all make calls to StrictRedis, made change to
             include password to the API

    systemless test:
            src/opserver/test/utils/analytics_fixture.py:
                    change Redis,QueryEngine and Collector class to invoke the
                      daemons with password as argument. change the Fixture API
                      to take password.
            src/opserver/test/analytics_redistest.py:
                    New test file which calls the testcases with password
                      contrail.
   Keyword names have been changed from passwd to password
    Other changes addressed in the comment are taken care as well

    In the case of password conflicts we are going to crash the daemonsa

    Change has been made in analytics_redis_test.py to abstract common
    code
    Closes Bug:#1392113

Change-Id: I707b9a6df9b770ca0f1a396335f190aa780a937e
  • Loading branch information
arvindvis committed Jan 26, 2015
1 parent 34c2af6 commit 181a51d
Show file tree
Hide file tree
Showing 24 changed files with 546 additions and 191 deletions.
129 changes: 96 additions & 33 deletions src/analytics/OpServerProxy.cc
Expand Up @@ -137,24 +137,82 @@ class OpServerProxy::OpServerImpl {

if (!started_) {
RedisProcessorExec::FlushUVEs(redis_uve_.GetIp(),
redis_uve_.GetPort());
redis_uve_.GetPort(),
redis_password_);
started_=true;
}
if (collector_)
collector_->RedisUpdate(true);
}

void ToOpsConnUp() {
LOG(DEBUG, "ToOpsConnUp.. UP");
{
tbb::mutex::scoped_lock lock(rac_mutex_);
redis_uve_.RedisStatusUpdate(RAC_UP);
}
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
void toConnectCallbackProcess(const redisAsyncContext *c, void *r, void *privdata) {
//Handle the AUTH callback
redisReply reply = *reinterpret_cast<redisReply*>(r);
if (reply.type != REDIS_REPLY_ERROR) {
{
tbb::mutex::scoped_lock lock(rac_mutex_);
redis_uve_.RedisStatusUpdate(RAC_UP);
}
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
"To", ConnectionStatus::UP, to_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUpPostProcess, this));
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUpPostProcess, this));
return;

} else {
LOG(ERROR,"In connectCallbackProcess.. Error");
assert(reply.type != REDIS_REPLY_ERROR);
}

}

void fromConnectCallbackProcess(const redisAsyncContext *c, void *r, void *privdata) {
//Handle the AUTH callback
redisReply reply = *reinterpret_cast<redisReply*>(r);
if (reply.type != REDIS_REPLY_ERROR) {
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
"From", ConnectionStatus::UP, from_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUpPostProcess, this));
return;

} else {
LOG(ERROR,"In connectCallbackProcess.. Error");
assert(reply.type != REDIS_REPLY_ERROR);
}

}

void ToOpsAuthenticated() {
//Set callback for connection status
to_ops_conn_.get()->SetClientAsyncCmdCb(boost::bind(
&OpServerImpl::toConnectCallbackProcess,
this, _1, _2, _3));
if (!redis_password_.empty()) {
//Call the AUTH command
to_ops_conn_.get()->RedisAsyncCommand(NULL,"AUTH %s",redis_password_.c_str());
} else {
to_ops_conn_.get()->RedisAsyncCommand(NULL,"PING");
}
}

void FromOpsAuthenticated() {
//Set callback for connection status
from_ops_conn_.get()->SetClientAsyncCmdCb(boost::bind(
&OpServerImpl::fromConnectCallbackProcess,
this, _1, _2, _3));
if (!redis_password_.empty()) {
//Call the AUTH command
from_ops_conn_.get()->RedisAsyncCommand(NULL,"AUTH %s",redis_password_.c_str());
} else {
from_ops_conn_.get()->RedisAsyncCommand(NULL,"PING");
}

}

void ToOpsConnUp() {
LOG(DEBUG, "ToOpsConnUp.. UP");
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsAuthenticated, this));
}

void FromOpsConnUpPostProcess() {
Expand All @@ -165,11 +223,7 @@ class OpServerProxy::OpServerImpl {

void FromOpsConnUp() {
LOG(DEBUG, "FromOpsConnUp.. UP");
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
"From", ConnectionStatus::UP, from_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUpPostProcess, this));
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsAuthenticated, this));
}

void RAC_ConnectProcess(RacConnType type) {
Expand Down Expand Up @@ -302,26 +356,32 @@ class OpServerProxy::OpServerImpl {
return from_ops_conn_;
}

const string get_redis_password() {
return redis_password_;
}

OpServerImpl(EventManager *evm, VizCollector *collector,
const std::string redis_uve_ip,
unsigned short redis_uve_port) :
const std::string redis_uve_ip,
unsigned short redis_uve_port,
const std::string redis_password) :
redis_uve_(redis_uve_ip, redis_uve_port),
evm_(evm),
collector_(collector),
started_(false),
analytics_cb_proc_fn(NULL),
processor_cb_proc_fn(NULL) {
to_ops_conn_.reset(new RedisAsyncConnection(evm_,
redis_uve_ip, redis_uve_port,
processor_cb_proc_fn(NULL),
redis_password_(redis_password) {
to_ops_conn_.reset(new RedisAsyncConnection(evm_,
redis_uve_ip, redis_uve_port,
boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUp, this),
boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnDown, this)));
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
"To", ConnectionStatus::INIT, to_ops_conn_->Endpoint(),
std::string());
to_ops_conn_.get()->RAC_Connect();
from_ops_conn_.reset(new RedisAsyncConnection(evm_,
redis_uve_ip, redis_uve_port,
from_ops_conn_.reset(new RedisAsyncConnection(evm_,
redis_uve_ip, redis_uve_port,
boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUp, this),
boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnDown, this)));
// Update connection info
Expand All @@ -345,12 +405,15 @@ class OpServerProxy::OpServerImpl {
RedisAsyncConnection::ClientAsyncCmdCbFn analytics_cb_proc_fn;
RedisAsyncConnection::ClientAsyncCmdCbFn processor_cb_proc_fn;
tbb::mutex rac_mutex_;
const std::string redis_password_;
};

OpServerProxy::OpServerProxy(EventManager *evm, VizCollector *collector,
const std::string& redis_uve_ip,
unsigned short redis_uve_port) {
impl_ = new OpServerImpl(evm, collector, redis_uve_ip, redis_uve_port);
unsigned short redis_uve_port,
const std::string& redis_password) {
impl_ = new OpServerImpl(evm, collector, redis_uve_ip, redis_uve_port,
redis_password);
}

OpServerProxy::~OpServerProxy() {
Expand Down Expand Up @@ -398,8 +461,8 @@ OpServerProxy::UVEDelete(const std::string &type,
return ret;
}

bool
OpServerProxy::GetSeq(const string &source, const string &node_type,
bool
OpServerProxy::GetSeq(const string &source, const string &node_type,
const string &module, const string &instance_id,
std::map<std::string,int32_t> & seqReply) {

Expand All @@ -408,21 +471,21 @@ OpServerProxy::GetSeq(const string &source, const string &node_type,

if (!impl_->to_ops_conn()) return false;

return RedisProcessorExec::SyncGetSeq(impl_->redis_uve_.GetIp(),
impl_->redis_uve_.GetPort(), source, node_type, module,
instance_id, seqReply);
return RedisProcessorExec::SyncGetSeq(impl_->redis_uve_.GetIp(),
impl_->redis_uve_.GetPort(), impl_->get_redis_password(),
source, node_type, module, instance_id, seqReply);
}

bool
bool
OpServerProxy::DeleteUVEs(const string &source, const string &module,
const string &node_type, const string &instance_id) {

shared_ptr<RedisAsyncConnection> prac = impl_->to_ops_conn();
if (!(prac && prac->IsConnUp())) return false;

return RedisProcessorExec::SyncDeleteUVEs(impl_->redis_uve_.GetIp(),
impl_->redis_uve_.GetPort(), source, node_type,
module, instance_id);
return RedisProcessorExec::SyncDeleteUVEs(impl_->redis_uve_.GetIp(),
impl_->redis_uve_.GetPort(), impl_->get_redis_password(),source,
node_type, module, instance_id);
}

void
Expand Down
3 changes: 2 additions & 1 deletion src/analytics/OpServerProxy.h
Expand Up @@ -26,7 +26,8 @@ class OpServerProxy {

// To construct this interface, pass in the hostname and port for Redis
OpServerProxy(EventManager *evm, VizCollector *collector,
const std::string& redis_uve_ip, unsigned short redis_uve_port);
const std::string& redis_uve_ip, unsigned short redis_uve_port,
const std::string& redis_uve_password);
OpServerProxy() : impl_(NULL) { }
virtual ~OpServerProxy();

Expand Down
1 change: 1 addition & 0 deletions src/analytics/main.cc
Expand Up @@ -309,6 +309,7 @@ int main(int argc, char *argv[])
cassandra_ports,
string("127.0.0.1"),
options.redis_port(),
options.redis_password(),
options.syslog_port(),
options.sflow_port(),
options.ipfix_port(),
Expand Down
3 changes: 3 additions & 0 deletions src/analytics/options.cc
Expand Up @@ -136,6 +136,8 @@ void Options::Initialize(EventManager &evm,
"Port of Redis-uve server")
("REDIS.server", opt::value<string>()->default_value("127.0.0.1"),
"IP address of Redis Server")
("REDIS.password", opt::value<string>()->default_value(""),
"password for Redis Server")
;

config_file_options_.add(config);
Expand Down Expand Up @@ -265,4 +267,5 @@ void Options::Process(int argc, char *argv[],

GetOptValue<uint16_t>(var_map, redis_port_, "REDIS.port");
GetOptValue<string>(var_map, redis_server_, "REDIS.server");
GetOptValue<string>(var_map, redis_password_, "REDIS.password");
}
2 changes: 2 additions & 0 deletions src/analytics/options.h
Expand Up @@ -30,6 +30,7 @@ class Options {
const uint16_t discovery_port() const { return discovery_port_; }
const std::string redis_server() const { return redis_server_; }
const uint16_t redis_port() const { return redis_port_; }
const std::string redis_password() const { return redis_password_; }
const std::string hostname() const { return hostname_; }
const std::string host_ip() const { return host_ip_; }
const uint16_t http_server_port() const { return http_server_port_; }
Expand Down Expand Up @@ -84,6 +85,7 @@ class Options {
uint16_t discovery_port_;
std::string redis_server_;
uint16_t redis_port_;
std::string redis_password_;
std::string hostname_;
std::string host_ip_;
uint16_t http_server_port_;
Expand Down
2 changes: 0 additions & 2 deletions src/analytics/redis_connection.cc
Expand Up @@ -274,8 +274,6 @@ bool RedisAsyncConnection::SetClientAsyncCmdCb(ClientAsyncCmdCbFn cb_fn) {
if (it == fns_map.end()) {
assert(0);
} else {
if ((!it->second) || (it->second->client_async_cmd_cbfn_ != NULL))
assert(0);
it->second->client_async_cmd_cbfn_ = cb_fn;
}
return true;
Expand Down
57 changes: 51 additions & 6 deletions src/analytics/redis_processor_vizd.cc
Expand Up @@ -125,8 +125,9 @@ RedisProcessorExec::UVEDelete(RedisAsyncConnection * rac, RedisProcessorIf *rpi,


bool
RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redis_port,
const std::string &source, const std::string &node_type,
RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redis_port,
const std::string & redis_password, const std::string &source,
const std::string &node_type,
const std::string &module, const std::string &instance_id,
std::map<std::string,int32_t> & seqReply) {

Expand All @@ -137,7 +138,21 @@ RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redi
<< ":" << module << ":" << instance_id);
redisFree(c);
return false;
}
}

//Authenticate the context with password
if (!redis_password.empty()) {
redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s",
redis_password.c_str());
if (reply->type == REDIS_REPLY_ERROR) {
LOG(ERROR, "Authentication to redis error");
freeReplyObject(reply);
redisFree(c);
return false;
}
freeReplyObject(reply);
}


string lua_scr(reinterpret_cast<char *>(seqnum_lua), seqnum_lua_len);

Expand Down Expand Up @@ -175,7 +190,8 @@ RedisProcessorExec::SyncGetSeq(const std::string & redis_ip, unsigned short redi


bool
RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port,
RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port,
const std::string &redis_password,
const std::string &source, const std::string &node_type,
const std::string &module, const std::string &instance_id) {

Expand All @@ -186,8 +202,22 @@ RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short
node_type << ":" << module << ":" << instance_id);
redisFree(c);
return false;
}
}

//Authenticate the context with password
if (!redis_password.empty()) {
std::string & local_password = const_cast<std::string &>(redis_password);
redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s",
local_password.c_str());
if (reply->type == REDIS_REPLY_ERROR) {
LOG(ERROR, "Authentication to redis error");
freeReplyObject(reply);
redisFree(c);
return false;
}
freeReplyObject(reply);
}

string lua_scr(reinterpret_cast<char *>(delrequest_lua), delrequest_lua_len);

redisReply * reply = (redisReply *) redisCommand(c,
Expand Down Expand Up @@ -216,7 +246,8 @@ RedisProcessorExec::SyncDeleteUVEs(const std::string & redis_ip, unsigned short

bool
RedisProcessorExec::FlushUVEs(const std::string & redis_ip,
unsigned short redis_port) {
unsigned short redis_port,
const std::string & redis_password) {
redisContext *c = redisConnect(redis_ip.c_str(), redis_port);

if (c->err) {
Expand All @@ -225,6 +256,20 @@ RedisProcessorExec::FlushUVEs(const std::string & redis_ip,
return false;
}

//Authenticate the context with password
if (!redis_password.empty()) {
std::string & local_password = const_cast<std::string &>(redis_password);
redisReply * reply = (redisReply *) redisCommand(c, "AUTH %s",
local_password.c_str());
if (reply->type == REDIS_REPLY_ERROR) {
LOG(ERROR, "Authentication to redis error");
freeReplyObject(reply);
redisFree(c);
return false;
}
freeReplyObject(reply);
}

string lua_scr(reinterpret_cast<char *>(flushuves_lua), flushuves_lua_len);

redisReply * reply = (redisReply *) redisCommand(c, "EVAL %s 0 %s",
Expand Down
9 changes: 6 additions & 3 deletions src/analytics/redis_processor_vizd.h
Expand Up @@ -34,18 +34,21 @@ class RedisProcessorExec {
const std::string &key, int32_t seq);

static bool
SyncGetSeq(const std::string & redis_ip, unsigned short redis_port,
SyncGetSeq(const std::string & redis_ip, unsigned short redis_port,
const std::string & redis_password,
const std::string &source, const std::string &node_type,
const std::string &module, const std::string &instance_id,
std::map<std::string,int32_t> & seqReply);

static bool
SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port,
SyncDeleteUVEs(const std::string & redis_ip, unsigned short redis_port,
const std::string & redis_password,
const std::string &source, const std::string &node_type,
const std::string &module, const std::string &instance_id);

static bool
FlushUVEs(const std::string & redis_ip, unsigned short redis_port);
FlushUVEs(const std::string & redis_ip, unsigned short redis_port,
const std::string & redis_password);
};

class RedisProcessorIf {
Expand Down

0 comments on commit 181a51d

Please sign in to comment.