Skip to content

Commit

Permalink
C++ sandesh client library changes to accept more than 2 collectors
Browse files Browse the repository at this point in the history
Presently, the contrail services may be either provisoned with the
collector list or it gets the collector list from discovery service. In
either case, the sandesh client library doesn't accept more than 2
collectors.
With the removal of discovery service, all the contrail services that
connects to collector would be provisioned with the collector list.
Therefore, it is necessary that the sandesh client library accepts
collector list without any limitation.
With this patch, InitGenerator() can accept collector list > 2 and there
is the notion of primary and secondary collector has been removed. The
sandesh client would connect to the first collector in the list and upon
connection failure/closure would connect to the next collector in the list
and so on. It is expected that the sandesh clients would randomize the
collector list before passing it to InitGenerator() and
ReConfigCollectors().
Renamed EvDiscUpdate event to EvCollectorUpdate. EvCollectorUpdate would
be enqueued upon receiving either discovery update or the
ReConfigCollectors() [called if the process receives SIGHUP to indicate
change in the collector list]
This patch doesn't remove the code that subscribes for the collector
service with discovery as this would break the functionality without the
provisioning/controller changes.

Change-Id: Ib84df4a3103e5483a9f3c7cb8ba5f3d034651058
Closes-Bug: #1641846
  • Loading branch information
Sundaresan Rajangam committed Nov 17, 2016
1 parent f9c7f96 commit 77724ee
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 244 deletions.
3 changes: 3 additions & 0 deletions library/common/sandesh_uve.sandesh
Expand Up @@ -226,6 +226,9 @@ struct SandeshClientInfo {
4: optional u32 http_port
5: optional u64 start_time
6: optional string collector_name
11: optional string collector_ip
12: optional list<string> collector_list
// primary and secondary fields are deprecated
7: optional string primary
8: optional string secondary
9: optional io.SocketIOStats rx_socket_stats
Expand Down
2 changes: 2 additions & 0 deletions library/cpp/SConscript
Expand Up @@ -103,6 +103,7 @@ libsandesh = env.Library(target = 'sandesh',
'sandesh_uve.cc',
'sandesh_message_builder.cc',
'sandesh_statistics.cc',
'sandesh_util.cc',
'protocol/TXMLProtocol.cpp',
'transport/TFDTransport.cpp',
'transport/TSimpleFileTransport.cpp',
Expand All @@ -128,6 +129,7 @@ env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_connection.h')
env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_statistics.h')
env.Install(env['TOP_INCLUDE'] + '/sandesh', 'request_pipeline.h')
env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_message_builder.h')
env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_util.h')
env.Install(env['TOP_INCLUDE'] + '/sandesh', SandeshGenHdrs)
env.Install(env['TOP_INCLUDE'] + '/sandesh', SandeshTraceGenHdrs)
env.Install(env['TOP_INCLUDE'] + '/sandesh/protocol', 'protocol/TProtocol.h')
Expand Down
55 changes: 13 additions & 42 deletions library/cpp/sandesh.cc
Expand Up @@ -9,7 +9,7 @@
//

#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>
#include <boost/foreach.hpp>
#include <base/logging.h>
#include <base/parse_object.h>
#include <base/queue_task.h>
Expand Down Expand Up @@ -102,7 +102,8 @@ void Sandesh::InitClient(EventManager *evm, Endpoint server, bool periodicuve) {
connect_to_collector_);
// Create and initialize the client
assert(client_ == NULL);
client_ = new SandeshClient(evm, server, Endpoint(), 0, periodicuve);
std::vector<Endpoint> collector_endpoints = boost::assign::list_of(server);
client_ = new SandeshClient(evm, collector_endpoints, 0, periodicuve);
client_->Initiate();
}

Expand Down Expand Up @@ -202,59 +203,29 @@ bool Sandesh::ConnectToCollector(const std::string &collector_ip,
return true;
}

void Sandesh::ReConfigCollectors(std::vector<std::string> list) {
void Sandesh::ReConfigCollectors(const std::vector<std::string>& collector_list) {
if (client_) {
client_->ReConfigCollectors(list);
client_->ReConfigCollectors(collector_list);
}
}


static bool make_endpoint(TcpServer::Endpoint& ep,const std::string& epstr) {

typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
boost::char_separator<char> sep(":");

tokenizer tokens(epstr, sep);
tokenizer::iterator it = tokens.begin();
std::string sip(*it);
++it;
std::string sport(*it);
int port;
stringToInteger(sport, port);
boost::system::error_code ec;
address addr = address::from_string(sip, ec);
if (ec) {
SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " <<
sip << " Error: " << ec);
return false;
}
ep = TcpServer::Endpoint(addr, port);
return true;
}

bool Sandesh::InitClient(EventManager *evm,
const std::vector<std::string> &collectors,
CollectorSubFn csf) {
connect_to_collector_ = true;
SANDESH_LOG(INFO, "SANDESH: CONNECT TO COLLECTOR: " <<
connect_to_collector_);

Endpoint primary = Endpoint();
Endpoint secondary = Endpoint();

if (collectors.size()!=0) {
if (!make_endpoint(primary, collectors[0])) {
std::vector<Endpoint> collector_endpoints;
BOOST_FOREACH(const std::string &collector, collectors) {
Endpoint ep;
if (!MakeEndpoint(&ep, collector)) {
SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " <<
collector);
return false;
}
if (collectors.size()>1) {
if (!make_endpoint(secondary, collectors[1])) {
return false;
}
}
collector_endpoints.push_back(ep);
}

client_ = new SandeshClient(evm,
primary, secondary, csf, true);
client_ = new SandeshClient(evm, collector_endpoints, csf, true);
client_->Initiate();
return true;
}
Expand Down
3 changes: 2 additions & 1 deletion library/cpp/sandesh.h
Expand Up @@ -90,6 +90,7 @@
#include <base/queue_task.h>
#include <base/string_util.h>
#include <base/time_util.h>
#include <sandesh/sandesh_util.h>
#include <sandesh/sandesh_types.h>
#include <sandesh/protocol/TProtocol.h>
#include <sandesh/transport/TBufferTransports.h>
Expand Down Expand Up @@ -186,7 +187,7 @@ class Sandesh {
SandeshContext *client_context = NULL);
static bool ConnectToCollector(const std::string &collector_ip,
int collector_port, bool periodicuve = false);
static void ReConfigCollectors(std::vector<std::string> list);
static void ReConfigCollectors(const std::vector<std::string>& collector_list);
static void Uninit();

// Disk Usage
Expand Down
97 changes: 36 additions & 61 deletions library/cpp/sandesh_client.cc
Expand Up @@ -10,6 +10,7 @@

#include <boost/bind.hpp>
#include <boost/assign.hpp>
#include <boost/foreach.hpp>

#include <base/task_annotations.h>
#include <io/event_manager.h>
Expand All @@ -30,6 +31,7 @@
#include <sandesh/common/vns_constants.h>
#include "sandesh_client.h"
#include "sandesh_uve.h"
#include "sandesh_util.h"

using boost::asio::ip::address;
using namespace boost::asio;
Expand All @@ -53,21 +55,18 @@ const std::vector<Sandesh::QueueWaterMarkInfo>
(2500, SandeshLevel::INVALID, false, false);

SandeshClient::SandeshClient(EventManager *evm,
Endpoint primary, Endpoint secondary, Sandesh::CollectorSubFn csf, bool periodicuve)
const std::vector<Endpoint> &collectors, Sandesh::CollectorSubFn csf,
bool periodicuve)
: TcpServer(evm),
sm_task_instance_(kSMTaskInstance),
sm_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSMTask)),
session_task_instance_(kSessionTaskInstance),
session_writer_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionWriterTask)),
session_reader_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionReaderTask)),
primary_(primary),
secondary_(secondary),
collectors_(collectors),
csf_(csf),
sm_(SandeshClientSM::CreateClientSM(evm, this, sm_task_instance_, sm_task_id_, periodicuve)),
session_wm_info_(kSessionWaterMarkInfo) {
SANDESH_LOG(INFO,"primary " << primary_);
SANDESH_LOG(INFO,"secondary " << secondary_);

// Set task policy for exclusion between state machine and session tasks since
// session delete happens in state machine task
if (!task_policy_set_) {
Expand All @@ -82,69 +81,40 @@ SandeshClient::SandeshClient(EventManager *evm,
SandeshClient::~SandeshClient() {}

void SandeshClient::CollectorHandler(std::vector<DSResponse> resp) {

Endpoint primary = Endpoint();
Endpoint secondary = Endpoint();

std::vector<Endpoint> collectors;
if (resp.size()>=1) {
primary = resp[0].ep;
SANDESH_LOG(INFO, "DiscUpdate for primary " << primary);
collectors.push_back(resp[0].ep);
SANDESH_LOG(INFO, "Discovery update for collector #1 " << resp[0].ep);
}
if (resp.size()>=2) {
secondary = resp[1].ep;
SANDESH_LOG(INFO, "DiscUpdate for secondary " << secondary);
collectors.push_back(resp[1].ep);
SANDESH_LOG(INFO, "Discovery update for collector #2 " << resp[1].ep);
}
if (primary!=Endpoint()) {
sm_->SetCandidates(primary, secondary);
if (collectors.size()) {
sm_->SetCollectors(collectors);
}
}

void SandeshClient::ReConfigCollectors(std::vector<std::string> collector_list) {

Endpoint primary = Endpoint();
Endpoint secondary = Endpoint();

std::vector<std::string> ep;
uint32_t port;
address addr;
boost::system::error_code ec;
if (collector_list.size()>=1) {
boost::split(ep, collector_list[0], boost::is_any_of(":"));
void SandeshClient::ReConfigCollectors(
const std::vector<std::string>& collector_list) {
std::vector<Endpoint> collector_endpoints;

addr = address::from_string(ep[0], ec);
if (ec) {
SANDESH_LOG(ERROR, "ReConfig for primary failed Error: " << ec);
BOOST_FOREACH(const std::string& collector, collector_list) {
Endpoint ep;
if (!MakeEndpoint(&ep, collector)) {
SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " <<
collector);
return;
}
primary.address(addr);
port = strtoul(ep[1].c_str(), NULL, 0);
primary.port(port);

SANDESH_LOG(INFO, "ReConfig for primary " << primary);
}
if (collector_list.size()>=2) {
boost::split(ep, collector_list[1], boost::is_any_of(":"));

addr = address::from_string(ep[0], ec);
if (ec) {
SANDESH_LOG(ERROR, "ReConfig for secondary failed Error: " << ec);
return;
}
secondary.address(addr);
port = strtoul(ep[1].c_str(), NULL, 0);
secondary.port(port);

SANDESH_LOG(INFO, "ReConfig for secondary " << secondary);
}
if (primary!=Endpoint()) {
sm_->SetCandidates(primary, secondary);
collector_endpoints.push_back(ep);
}
sm_->SetCollectors(collector_endpoints);
}

void SandeshClient::Initiate() {
sm_->SetAdminState(false);
if (primary_ != Endpoint())
sm_->SetCandidates(primary_,secondary_);
if (collectors_.size())
sm_->SetCollectors(collectors_);
// subscribe for the collector service only if the collector list
// is not provided by the generator.
else if (csf_ != 0) {
Expand Down Expand Up @@ -304,7 +274,8 @@ static uint64_t client_start_time;

void SandeshClient::SendUVE(int count,
const string & stateName, const string & server,
Endpoint primary, Endpoint secondary) {
const Endpoint & server_ip,
const std::vector<TcpServer::Endpoint> & collector_eps) {
ModuleClientState mcs;
mcs.set_name(Sandesh::source() + ":" + Sandesh::node_type() +
":" + Sandesh::module() + ":" + Sandesh::instance_id());
Expand All @@ -313,18 +284,22 @@ void SandeshClient::SendUVE(int count,
client_start_time = UTCTimestampUsec();
client_start = true;
}
std::ostringstream pri,sec;
pri << primary;
sec << secondary;

sci.set_start_time(client_start_time);
sci.set_successful_connections(count);
sci.set_pid(getpid());
sci.set_http_port(Sandesh::http_port());
sci.set_status(stateName);
sci.set_collector_name(server);
sci.set_primary(pri.str());
sci.set_secondary(sec.str());
std::ostringstream collector_ip;
collector_ip << server_ip;
sci.set_collector_ip(collector_ip.str());
std::vector<std::string> collectors;
BOOST_FOREACH(const TcpServer::Endpoint& ep, collector_eps) {
std::ostringstream collector_ip;
collector_ip << ep;
collectors.push_back(collector_ip.str());
}
sci.set_collector_list(collectors);
// Sandesh client socket statistics
SocketIOStats rx_stats;
GetRxSocketStats(rx_stats);
Expand Down
9 changes: 4 additions & 5 deletions library/cpp/sandesh_client.h
Expand Up @@ -38,8 +38,7 @@ class SandeshHeader;
class SandeshClient : public TcpServer, public SandeshClientSM::Mgr {
public:

SandeshClient(EventManager *evm, Endpoint primary,
Endpoint secondary = Endpoint(),
SandeshClient(EventManager *evm, const std::vector<Endpoint> &collectors,
Sandesh::CollectorSubFn csf = 0,
bool periodicuve = false);

Expand All @@ -62,7 +61,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr {
const uint32_t header_offset);
void SendUVE(int count,
const std::string & stateName, const std::string & server,
Endpoint primary, Endpoint secondary);
const Endpoint & server_ip, const std::vector<Endpoint> & collector_eps);

bool SendSandesh(Sandesh *snh);

Expand Down Expand Up @@ -91,7 +90,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr {
void ResetSessionWaterMarkInfo();
void GetSessionWaterMarkInfo(
std::vector<Sandesh::QueueWaterMarkInfo> &scwm_info) const;
void ReConfigCollectors(std::vector<std::string>);
void ReConfigCollectors(const std::vector<std::string>&);

friend class CollectorInfoRequest;
protected:
Expand All @@ -110,7 +109,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr {
int session_task_instance_;
int session_writer_task_id_;
int session_reader_task_id_;
Endpoint primary_, secondary_;
std::vector<Endpoint> collectors_;
Sandesh::CollectorSubFn csf_;
boost::scoped_ptr<SandeshClientSM> sm_;
std::vector<Sandesh::QueueWaterMarkInfo> session_wm_info_;
Expand Down

0 comments on commit 77724ee

Please sign in to comment.