Skip to content

Commit

Permalink
This fix provides the connection end points that
Browse files Browse the repository at this point in the history
the application is not able to reach.
Closes-Bug:#1557174
Change-Id: If2dad6d5feaf2c0e2ca15fbca774ac887785472e
  • Loading branch information
arvindvis committed May 25, 2016
1 parent 15f7d8f commit 0face39
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 24 deletions.
21 changes: 20 additions & 1 deletion src/analytics/main.cc
Expand Up @@ -41,6 +41,9 @@ using namespace boost::asio::ip;
namespace opt = boost::program_options;
using process::ConnectionStateManager;
using process::GetProcessStateCb;
using process::ConnectionType;
using process::ConnectionTypeName;
using process::g_process_info_constants;

static TaskTrigger *collector_info_trigger;
static Timer *collector_info_log_timer;
Expand Down Expand Up @@ -306,11 +309,27 @@ int main(int argc, char *argv[])
// 5. Database global
// 6. Kafka Pub
// 7. Database protobuf if enabled

std::vector<ConnectionTypeName> expected_connections = boost::assign::list_of
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::COLLECTOR)->second, ""))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::REDIS_UVE)->second, "To"))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::REDIS_UVE)->second, "From"))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DISCOVERY)->second,
g_vns_constants.COLLECTOR_DISCOVERY_SERVICE_NAME))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DATABASE)->second,
hostname+":Global"))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::KAFKA_PUB)->second, kstr));
ConnectionStateManager<NodeStatusUVE, NodeStatus>::
GetInstance()->Init(*a_evm->io_service(),
hostname, module_id, instance_id,
boost::bind(&GetProcessStateCb, _1, _2, _3,
protobuf_server_enabled ? 7 : 6));
expected_connections));

LOG(INFO, "COLLECTOR analytics_data_ttl: " << options.analytics_data_ttl());
LOG(INFO, "COLLECTOR analytics_flow_ttl: " << options.analytics_flow_ttl());
Expand Down
78 changes: 74 additions & 4 deletions src/base/connection_info.cc
Expand Up @@ -129,13 +129,12 @@ std::vector<ConnectionInfo> ConnectionState::GetInfos() const {

void GetProcessStateCb(const std::vector<ConnectionInfo> &cinfos,
ProcessState::type &state, std::string &message,
size_t expected_connections) {
const std::vector<ConnectionTypeName> &expected_connections) {
// Determine if the number of connections is as expected.
size_t num_connections(cinfos.size());
if (num_connections != expected_connections) {
if (num_connections != expected_connections.size()) {
GetConnectionInfoMessage(cinfos, expected_connections, message);
state = ProcessState::NON_FUNCTIONAL;
message = "Number of connections:" + integerToString(num_connections) +
", Expected:" + integerToString(expected_connections);
return;
}
std::string cup(g_process_info_constants.ConnectionStatusNames.
Expand Down Expand Up @@ -169,4 +168,75 @@ void GetProcessStateCb(const std::vector<ConnectionInfo> &cinfos,
return;
}

// Custom find function to compare ConnectionInfo and
// expected connection structures
struct CompareConnections : public std::unary_function<ConnectionTypeName,
bool> {
ConnectionTypeName expected_connection_;
explicit CompareConnections(const ConnectionTypeName exp_connection) :
expected_connection_(exp_connection) {}
bool operator() (const ConnectionInfo cinfo) {
if (expected_connection_.first == cinfo.get_type() &&
expected_connection_.second == cinfo.get_name()) {
return true;
} else {
return false;
}
}
};

void GetConnectionInfoMessage(const std::vector<ConnectionInfo> &cinfos,
const std::vector<ConnectionTypeName> &expected_connections,
std::string &message) {
size_t num_connections(cinfos.size());
message = "Number of connections:" + integerToString(num_connections) +
", Expected:" + integerToString(expected_connections.size());
if (num_connections > expected_connections.size()) {
size_t i = 0;
message += " Extra: ";
// find the extra connection
for (std::vector<ConnectionInfo>::const_iterator it = cinfos.begin();
it != cinfos.end(); it++) {
const ConnectionInfo &cinfo(*it);
ConnectionTypeName con_info(cinfo.get_type(), cinfo.get_name());
std::vector<ConnectionTypeName>::const_iterator position;
position = std::find(expected_connections.begin(),
expected_connections.end(), con_info);
if (position == expected_connections.end()) {
i++;
message += con_info.first;
if (!con_info.second.empty()) {
message += ":" + con_info.second;
}
if (i != num_connections-expected_connections.size()) {
message += ",";
}
}
}
} else {
// find the missing connection
size_t i = 0;
message += " Missing: ";
for (std::vector<ConnectionTypeName>::const_iterator it =
expected_connections.begin(); it != expected_connections.end();
it++) {
std::vector<ConnectionInfo>::const_iterator position;
position = std::find_if(cinfos.begin(), cinfos.end(),
CompareConnections(*it));
// If connection is not found in cinfo, its a missing
// connection
if (position == cinfos.end()) {
i++;
message += it->first;
if (!it->second.empty()) {
message += ":" + it->second;
}
if (i != expected_connections.size() - cinfos.size()) {
message += ",";
}
}
}
}
}

} // namespace process
6 changes: 5 additions & 1 deletion src/base/connection_info.h
Expand Up @@ -27,9 +27,13 @@ typedef boost::asio::ip::tcp::endpoint Endpoint;
typedef boost::function<void (const std::vector<ConnectionInfo> &,
ProcessState::type &, std::string &)> ProcessStateFn;

typedef std::pair<std::string, std::string> ConnectionTypeName;
void GetProcessStateCb(const std::vector<ConnectionInfo> &cinfos,
ProcessState::type &state, std::string &message,
size_t expected_connections);
const std::vector<ConnectionTypeName> &expected_connections);
void GetConnectionInfoMessage(const std::vector<ConnectionInfo> &cinfos,
const std::vector<ConnectionTypeName> &expected_connections,
std::string &message);

// ConnectionState
class ConnectionState {
Expand Down
42 changes: 32 additions & 10 deletions src/base/test/connection_info_test.cc
Expand Up @@ -21,14 +21,18 @@ using process::ConnectionStatus;
using process::ConnectionType;
using process::g_process_info_constants;
using process::GetProcessStateCb;
using process::ConnectionTypeName;

class ConnectionInfoTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
std::vector<ConnectionTypeName> expected_connections = boost::assign::list_of
(ConnectionTypeName("Test", "Test1"))
(ConnectionTypeName("Test", "Test2"));
ConnectionStateManager<NodeStatusTestUVE, NodeStatusTest>::
GetInstance()->Init(*evm_.io_service(), "Test",
"ConnectionInfoTest", "0", boost::bind(
&process::GetProcessStateCb, _1, _2, _3, 2));
&process::GetProcessStateCb, _1, _2, _3, expected_connections));
}
static void TearDownTestCase() {
ConnectionStateManager<NodeStatusTestUVE, NodeStatusTest>::
Expand Down Expand Up @@ -121,23 +125,41 @@ TEST_F(ConnectionInfoTest, Callback) {
UpdateConnInfo("Test1", ConnectionStatus::UP, "Test1 UP", &vcinfo);
ProcessState::type pstate;
std::string message1;
GetProcessStateCb(vcinfo, pstate, message1, 1);
std::vector<ConnectionTypeName> expected_connections = boost::assign::list_of
(ConnectionTypeName("Test", "Test1"));
// Expected connection and conn_info are same
GetProcessStateCb(vcinfo, pstate, message1, expected_connections);
EXPECT_EQ(ProcessState::FUNCTIONAL, pstate);
EXPECT_TRUE(message1.empty());
std::string message2;
GetProcessStateCb(vcinfo, pstate, message2, 2);
expected_connections.push_back(ConnectionTypeName("Test","Test2"));
// Expected connection more than conn_info
GetProcessStateCb(vcinfo, pstate, message2, expected_connections);
EXPECT_EQ(ProcessState::NON_FUNCTIONAL, pstate);
EXPECT_EQ("Number of connections:1, Expected:2", message2);
UpdateConnInfo("Test2", ConnectionStatus::DOWN, "Test2 DOWN", &vcinfo);
EXPECT_EQ("Number of connections:1, Expected:2 Missing: Test:Test2", message2);
// 2 expected connections are more than conn_info
expected_connections.push_back(ConnectionTypeName("Test","Test3"));
std::string message3;
GetProcessStateCb(vcinfo, pstate, message3, 2);
GetProcessStateCb(vcinfo, pstate, message3, expected_connections);
EXPECT_EQ(ProcessState::NON_FUNCTIONAL, pstate);
EXPECT_EQ("Test:Test2 connection down", message3);
UpdateConnInfo("Test3", ConnectionStatus::DOWN, "Test3 DOWN", &vcinfo);
EXPECT_EQ("Number of connections:1, Expected:3 Missing: Test:Test2,Test:Test3", message3);
expected_connections.pop_back();
UpdateConnInfo("Test2", ConnectionStatus::DOWN, "Test2 DOWN", &vcinfo);
std::string message4;
GetProcessStateCb(vcinfo, pstate, message4, 3);
GetProcessStateCb(vcinfo, pstate, message4, expected_connections);
EXPECT_EQ(ProcessState::NON_FUNCTIONAL, pstate);
EXPECT_EQ("Test:Test2 connection down", message4);
UpdateConnInfo("Test3", ConnectionStatus::DOWN, "Test3 DOWN", &vcinfo);
std::string message5;
// More connection in conn_info than expected_connections
GetProcessStateCb(vcinfo, pstate, message5, expected_connections);
EXPECT_EQ(ProcessState::NON_FUNCTIONAL, pstate);
EXPECT_EQ("Number of connections:3, Expected:2 Extra: Test:Test3", message5);
std::string message6;
expected_connections.push_back(ConnectionTypeName("Test","Test3"));
GetProcessStateCb(vcinfo, pstate, message6, expected_connections);
EXPECT_EQ(ProcessState::NON_FUNCTIONAL, pstate);
EXPECT_EQ("Test:Test2, Test:Test3 connection down", message4);
EXPECT_EQ("Test:Test2, Test:Test3 connection down", message6);
}

int main(int argc, char *argv[]) {
Expand Down
22 changes: 20 additions & 2 deletions src/control-node/main.cc
Expand Up @@ -58,6 +58,9 @@ using process::ConnectionInfo;
using process::ConnectionStateManager;
using process::GetProcessStateCb;
using process::ProcessState;
using process::ConnectionType;
using process::ConnectionTypeName;
using process::g_process_info_constants;

static EventManager evm;

Expand Down Expand Up @@ -479,7 +482,7 @@ static void ControlNodeGetProcessStateCb(const BgpServer *bgp_server,
const IFMapManager *ifmap_manager,
const std::vector<ConnectionInfo> &cinfos,
ProcessState::type &state, std::string &message,
size_t expected_connections) {
std::vector<ConnectionTypeName> expected_connections) {
GetProcessStateCb(cinfos, state, message, expected_connections);
if (state == ProcessState::NON_FUNCTIONAL)
return;
Expand Down Expand Up @@ -657,11 +660,26 @@ int main(int argc, char *argv[]) {
// 3. Discovery Server subscribe Collector
// 4. Discovery Server subscribe IfmapServer
// 5. IFMap Server (irond)
std::vector<ConnectionTypeName> expected_connections = boost::assign::list_of
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DISCOVERY)->second,
g_vns_constants.COLLECTOR_DISCOVERY_SERVICE_NAME))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::COLLECTOR)->second, ""))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DISCOVERY)->second,
g_vns_constants.IFMAP_SERVER_DISCOVERY_SERVICE_NAME))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::IFMAP)->second, "IFMapServer"))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DISCOVERY)->second,
g_vns_constants.XMPP_SERVER_DISCOVERY_SERVICE_NAME));
ConnectionStateManager<NodeStatusUVE, NodeStatus>::GetInstance()->Init(
*evm.io_service(), options.hostname(),
module_name, g_vns_constants.INSTANCE_ID_DEFAULT,
boost::bind(&ControlNodeGetProcessStateCb,
bgp_server.get(), ifmap_manager, _1, _2, _3, 5));
bgp_server.get(), ifmap_manager, _1, _2, _3,
expected_connections));

// Parse discovery server configuration.
DiscoveryServiceClient *ds_client = NULL;
Expand Down
26 changes: 20 additions & 6 deletions src/query_engine/qed.cc
Expand Up @@ -39,6 +39,9 @@ using namespace boost::asio;
using namespace std;
using process::ConnectionStateManager;
using process::GetProcessStateCb;
using process::ConnectionType;
using process::ConnectionTypeName;
using process::g_process_info_constants;
// This is to force qed to wait for a gdbattach
// before proceeding.
// It will make it easier to debug qed during systest
Expand Down Expand Up @@ -222,18 +225,29 @@ main(int argc, char *argv[]) {
// 2. Redis
// 3. Cassandra
// 4. Discovery (if collector list not configured)
int num_expected_connections = 4;
bool use_collector_list = false;
if (options.collectors_configured() || !csf) {
num_expected_connections = 3;
use_collector_list = true;
std::vector<ConnectionTypeName> expected_connections =
boost::assign::list_of
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DATABASE)->second, ""))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::REDIS_QUERY)->second, "Query"))
(ConnectionTypeName(g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::COLLECTOR)->second, ""));
bool use_collector_list = true;
if (!options.collectors_configured() && csf) {
// Use discovery to connect to collector
use_collector_list = false;
expected_connections.push_back(ConnectionTypeName(
g_process_info_constants.ConnectionTypeNames.find(
ConnectionType::DISCOVERY)->second,
g_vns_constants.COLLECTOR_DISCOVERY_SERVICE_NAME));
}
ConnectionStateManager<NodeStatusUVE, NodeStatus>::
GetInstance()->Init(*evm.io_service(),
options.hostname(), module_name,
instance_id,
boost::bind(&GetProcessStateCb, _1, _2, _3,
num_expected_connections));
expected_connections));
Sandesh::set_send_rate_limit(options.sandesh_send_rate_limit());
bool success;
// subscribe to the collector service with discovery only if the
Expand Down

0 comments on commit 0face39

Please sign in to comment.