Skip to content

Commit

Permalink
These are the changes in opserver to track redis connection status
Browse files Browse the repository at this point in the history
UVEServer is enhanced to catch exceptions when redis failure happen, and remove redis connections
The connections are recreated when needed.

Also, when a redis connection goes up or down, we update the connection status in the analytics node UVE.

The "Redis" connection type is split between "Redis-UVE" and "Redis-Query"

Change-Id: I6a939f7092e810f7ec3bf9f2f3012239c97dd18d
Partial-Bug: 1459973
  • Loading branch information
anishmehta committed Jul 27, 2015
1 parent f797920 commit 6ab1986
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 88 deletions.
12 changes: 6 additions & 6 deletions src/analytics/OpServerProxy.cc
Expand Up @@ -231,7 +231,7 @@ class OpServerProxy::OpServerImpl {
tbb::mutex::scoped_lock lock(rac_mutex_);
redis_uve_.RedisStatusUpdate(RAC_UP);
}
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
ConnectionState::GetInstance()->Update(ConnectionType::REDIS_UVE,
"To", ConnectionStatus::UP, to_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUpPostProcess, this));
Expand All @@ -248,7 +248,7 @@ class OpServerProxy::OpServerImpl {
//Handle the AUTH callback
redisReply reply = *reinterpret_cast<redisReply*>(r);
if (reply.type != REDIS_REPLY_ERROR) {
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
ConnectionState::GetInstance()->Update(ConnectionType::REDIS_UVE,
"From", ConnectionStatus::UP, from_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUpPostProcess, this));
Expand Down Expand Up @@ -323,7 +323,7 @@ class OpServerProxy::OpServerImpl {
redis_up_ = false;

// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
ConnectionState::GetInstance()->Update(ConnectionType::REDIS_UVE,
"To", ConnectionStatus::DOWN, to_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::RAC_ConnectProcess,
Expand All @@ -333,7 +333,7 @@ class OpServerProxy::OpServerImpl {
void FromOpsConnDown() {
LOG(DEBUG, "FromOpsConnDown.. DOWN.. Reconnect..");
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
ConnectionState::GetInstance()->Update(ConnectionType::REDIS_UVE,
"From", ConnectionStatus::DOWN, from_ops_conn_->Endpoint(),
std::string());
evm_->io_service()->post(boost::bind(&OpServerProxy::OpServerImpl::RAC_ConnectProcess,
Expand Down Expand Up @@ -488,7 +488,7 @@ class OpServerProxy::OpServerImpl {
boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnUp, this),
boost::bind(&OpServerProxy::OpServerImpl::ToOpsConnDown, this)));
// Update connection
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
ConnectionState::GetInstance()->Update(ConnectionType::REDIS_UVE,
"To", ConnectionStatus::INIT, to_ops_conn_->Endpoint(),
std::string());
to_ops_conn_.get()->RAC_Connect();
Expand All @@ -497,7 +497,7 @@ class OpServerProxy::OpServerImpl {
boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnUp, this),
boost::bind(&OpServerProxy::OpServerImpl::FromOpsConnDown, this)));
// Update connection info
ConnectionState::GetInstance()->Update(ConnectionType::REDIS,
ConnectionState::GetInstance()->Update(ConnectionType::REDIS_UVE,
"From", ConnectionStatus::INIT, from_ops_conn_->Endpoint(),
std::string());
from_ops_conn_.get()->RAC_Connect();
Expand Down
8 changes: 5 additions & 3 deletions src/base/sandesh/process_info.sandesh
Expand Up @@ -10,11 +10,12 @@ enum ConnectionType {
XMPP,
COLLECTOR,
DATABASE,
REDIS,
REDIS_QUERY,
ZOOKEEPER,
DISCOVERY,
APISERVER,
TOR,
REDIS_UVE,
}

const map<ConnectionType, string> ConnectionTypeNames = {
Expand All @@ -23,11 +24,12 @@ const map<ConnectionType, string> ConnectionTypeNames = {
ConnectionType.XMPP : "XMPP",
ConnectionType.COLLECTOR : "Collector",
ConnectionType.DATABASE : "Database",
ConnectionType.REDIS : "Redis",
ConnectionType.REDIS_QUERY : "Redis-Query",
ConnectionType.ZOOKEEPER : "Zookeeper",
ConnectionType.DISCOVERY : "Discovery",
ConnectionType.APISERVER : "ApiServer",
ConnectionType.TOR : "ToR"
ConnectionType.TOR : "ToR",
ConnectionType.REDIS_UVE: "Redis-UVE",
}

enum ConnectionStatus {
Expand Down
32 changes: 16 additions & 16 deletions src/opserver/opserver.py
Expand Up @@ -212,15 +212,15 @@ def redis_query_result(host, port, redis_password, qid):
status = redis_query_status(host, port, redis_password, qid)
except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] result : Connection Error' % (qid),
server_addrs = ['%s:%d' % (host, port)])
yield bottle.HTTPError(_ERRORS[errno.EIO],
'Failure in connection to the query DB')
except Exception as e:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] result : Exception: %s' % (qid, str(e)),
server_addrs = ['%s:%d' % (host, port)])
Expand All @@ -239,7 +239,7 @@ def redis_query_result(host, port, redis_password, qid):
else:
yield {}
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
message = None,
status = ConnectionStatus.UP,
server_addrs = ['%s:%d' % (host, port)],
Expand Down Expand Up @@ -309,7 +309,7 @@ def redis_publish(self, msg_type, destination, msg):
redis_inst.publish('analytics', redis_msg)
except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_UVE,
name = 'UVE', status = ConnectionStatus.DOWN,
message = 'Connection Error',
server_addrs = ['%s:%d' % (redis_server[0], \
Expand Down Expand Up @@ -946,7 +946,7 @@ def _query_status(self, request, qid):
qid=qid)
except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] status : Connection Error' % (qid),
server_addrs = ['%s:%s' % (redis_query_ip, \
Expand All @@ -955,7 +955,7 @@ def _query_status(self, request, qid):
'Failure in connection to the query DB')
except Exception as e:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] status : Exception %s' % (qid, str(e)),
server_addrs = ['%s:%s' % (redis_query_ip, \
Expand Down Expand Up @@ -992,7 +992,7 @@ def _query_chunk(self, request, qid, chunk_id):
done = True
except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query [%s] chunk #%d : Connection Error' % \
(qid, chunk_id),
Expand All @@ -1002,7 +1002,7 @@ def _query_chunk(self, request, qid, chunk_id):
'Failure in connection to the query DB')
except Exception as e:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query [%s] chunk #%d : Exception %s' % \
(qid, chunk_id, str(e)),
Expand Down Expand Up @@ -1073,7 +1073,7 @@ def _query(self, request):
qid, request.json)
if prg is None:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] Query Engine not responding' % qid,
server_addrs = ['127.0.0.1' + ':' +
Expand All @@ -1085,7 +1085,7 @@ def _query(self, request):

except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] Connection Error' % (qid),
server_addrs = ['127.0.0.1' + ':' +
Expand All @@ -1094,7 +1094,7 @@ def _query(self, request):
'Failure in connection to the query DB')
except Exception as e:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Query[%s] Exception: %s' % (qid, str(e)),
server_addrs = ['127.0.0.1' + ':' +
Expand Down Expand Up @@ -1186,7 +1186,7 @@ def _sync_query(self, request, qid):

except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Sync Query[%s] Connection Error' % qid,
server_addrs = ['127.0.0.1' + ':' +
Expand All @@ -1195,7 +1195,7 @@ def _sync_query(self, request, qid):
'Failure in connection to the query DB')
except Exception as e:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Sync Query[%s] Exception: %s' % (qid, str(e)),
server_addrs = ['127.0.0.1' + ':' +
Expand All @@ -1205,7 +1205,7 @@ def _sync_query(self, request, qid):
'Error: %s' % e)
else:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.UP,
message = None,
server_addrs = ['127.0.0.1' + ':' +
Expand Down Expand Up @@ -1279,7 +1279,7 @@ def show_queries(self):
queries['error_queries'] = error_queries_info
except redis.exceptions.ConnectionError:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Show queries : Connection Error',
server_addrs = ['127.0.0.1' + ':' +
Expand All @@ -1288,7 +1288,7 @@ def show_queries(self):
'Failure in connection to the query DB')
except Exception as err:
# Update connection info
ConnectionState.update(conn_type = ConnectionType.REDIS,
ConnectionState.update(conn_type = ConnectionType.REDIS_QUERY,
name = 'Query', status = ConnectionStatus.DOWN,
message = 'Show queries : Exception %s' % str(err),
server_addrs = ['127.0.0.1' + ':' +
Expand Down
2 changes: 1 addition & 1 deletion src/opserver/test/utils/analytics_fixture.py
Expand Up @@ -308,7 +308,7 @@ def start(self):
str(self.analytics_fixture.cassandra_port),
'--http_server_port', str(self.http_port),
'--log_file', self._log_file,
'--log_level', "SYS_INFO",
'--log_level', "SYS_DEBUG",
'--rest_api_port', str(self.listen_port)]
if self.analytics_fixture.redis_uves[0].password:
args.append('--redis_password')
Expand Down

0 comments on commit 6ab1986

Please sign in to comment.