From 77724eec5589b2895e94224a2e3ea4a1d67610f7 Mon Sep 17 00:00:00 2001 From: Sundaresan Rajangam Date: Tue, 15 Nov 2016 07:34:35 -0800 Subject: [PATCH] C++ sandesh client library changes to accept more than 2 collectors Presently, the contrail services may be either provisoned with the collector list or it gets the collector list from discovery service. In either case, the sandesh client library doesn't accept more than 2 collectors. With the removal of discovery service, all the contrail services that connects to collector would be provisioned with the collector list. Therefore, it is necessary that the sandesh client library accepts collector list without any limitation. With this patch, InitGenerator() can accept collector list > 2 and there is the notion of primary and secondary collector has been removed. The sandesh client would connect to the first collector in the list and upon connection failure/closure would connect to the next collector in the list and so on. It is expected that the sandesh clients would randomize the collector list before passing it to InitGenerator() and ReConfigCollectors(). Renamed EvDiscUpdate event to EvCollectorUpdate. EvCollectorUpdate would be enqueued upon receiving either discovery update or the ReConfigCollectors() [called if the process receives SIGHUP to indicate change in the collector list] This patch doesn't remove the code that subscribes for the collector service with discovery as this would break the functionality without the provisioning/controller changes. Change-Id: Ib84df4a3103e5483a9f3c7cb8ba5f3d034651058 Closes-Bug: #1641846 --- library/common/sandesh_uve.sandesh | 3 + library/cpp/SConscript | 2 + library/cpp/sandesh.cc | 55 ++----- library/cpp/sandesh.h | 3 +- library/cpp/sandesh_client.cc | 97 +++++-------- library/cpp/sandesh_client.h | 9 +- library/cpp/sandesh_client_sm.cc | 185 ++++++++++-------------- library/cpp/sandesh_client_sm.h | 8 +- library/cpp/sandesh_client_sm_priv.h | 17 ++- library/cpp/sandesh_util.cc | 35 +++++ library/cpp/sandesh_util.h | 16 ++ library/cpp/test/sandesh_client_test.cc | 110 ++++++++++++-- library/cpp/test/sandesh_test_common.cc | 31 ++++ library/cpp/test/sandesh_test_common.h | 1 + 14 files changed, 328 insertions(+), 244 deletions(-) create mode 100644 library/cpp/sandesh_util.cc create mode 100644 library/cpp/sandesh_util.h diff --git a/library/common/sandesh_uve.sandesh b/library/common/sandesh_uve.sandesh index 40138539..e3a5c264 100644 --- a/library/common/sandesh_uve.sandesh +++ b/library/common/sandesh_uve.sandesh @@ -226,6 +226,9 @@ struct SandeshClientInfo { 4: optional u32 http_port 5: optional u64 start_time 6: optional string collector_name + 11: optional string collector_ip + 12: optional list collector_list + // primary and secondary fields are deprecated 7: optional string primary 8: optional string secondary 9: optional io.SocketIOStats rx_socket_stats diff --git a/library/cpp/SConscript b/library/cpp/SConscript index af138f19..9c31a7ec 100644 --- a/library/cpp/SConscript +++ b/library/cpp/SConscript @@ -103,6 +103,7 @@ libsandesh = env.Library(target = 'sandesh', 'sandesh_uve.cc', 'sandesh_message_builder.cc', 'sandesh_statistics.cc', + 'sandesh_util.cc', 'protocol/TXMLProtocol.cpp', 'transport/TFDTransport.cpp', 'transport/TSimpleFileTransport.cpp', @@ -128,6 +129,7 @@ env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_connection.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_statistics.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', 'request_pipeline.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_message_builder.h') +env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_util.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', SandeshGenHdrs) env.Install(env['TOP_INCLUDE'] + '/sandesh', SandeshTraceGenHdrs) env.Install(env['TOP_INCLUDE'] + '/sandesh/protocol', 'protocol/TProtocol.h') diff --git a/library/cpp/sandesh.cc b/library/cpp/sandesh.cc index 680439f4..dfdfe27f 100644 --- a/library/cpp/sandesh.cc +++ b/library/cpp/sandesh.cc @@ -9,7 +9,7 @@ // #include -#include +#include #include #include #include @@ -102,7 +102,8 @@ void Sandesh::InitClient(EventManager *evm, Endpoint server, bool periodicuve) { connect_to_collector_); // Create and initialize the client assert(client_ == NULL); - client_ = new SandeshClient(evm, server, Endpoint(), 0, periodicuve); + std::vector collector_endpoints = boost::assign::list_of(server); + client_ = new SandeshClient(evm, collector_endpoints, 0, periodicuve); client_->Initiate(); } @@ -202,59 +203,29 @@ bool Sandesh::ConnectToCollector(const std::string &collector_ip, return true; } -void Sandesh::ReConfigCollectors(std::vector list) { +void Sandesh::ReConfigCollectors(const std::vector& collector_list) { if (client_) { - client_->ReConfigCollectors(list); + client_->ReConfigCollectors(collector_list); } } - -static bool make_endpoint(TcpServer::Endpoint& ep,const std::string& epstr) { - - typedef boost::tokenizer > tokenizer; - boost::char_separator sep(":"); - - tokenizer tokens(epstr, sep); - tokenizer::iterator it = tokens.begin(); - std::string sip(*it); - ++it; - std::string sport(*it); - int port; - stringToInteger(sport, port); - boost::system::error_code ec; - address addr = address::from_string(sip, ec); - if (ec) { - SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " << - sip << " Error: " << ec); - return false; - } - ep = TcpServer::Endpoint(addr, port); - return true; -} - bool Sandesh::InitClient(EventManager *evm, const std::vector &collectors, CollectorSubFn csf) { connect_to_collector_ = true; SANDESH_LOG(INFO, "SANDESH: CONNECT TO COLLECTOR: " << connect_to_collector_); - - Endpoint primary = Endpoint(); - Endpoint secondary = Endpoint(); - - if (collectors.size()!=0) { - if (!make_endpoint(primary, collectors[0])) { + std::vector collector_endpoints; + BOOST_FOREACH(const std::string &collector, collectors) { + Endpoint ep; + if (!MakeEndpoint(&ep, collector)) { + SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " << + collector); return false; } - if (collectors.size()>1) { - if (!make_endpoint(secondary, collectors[1])) { - return false; - } - } + collector_endpoints.push_back(ep); } - - client_ = new SandeshClient(evm, - primary, secondary, csf, true); + client_ = new SandeshClient(evm, collector_endpoints, csf, true); client_->Initiate(); return true; } diff --git a/library/cpp/sandesh.h b/library/cpp/sandesh.h index dc2f4fa6..f39b5da1 100644 --- a/library/cpp/sandesh.h +++ b/library/cpp/sandesh.h @@ -90,6 +90,7 @@ #include #include #include +#include #include #include #include @@ -186,7 +187,7 @@ class Sandesh { SandeshContext *client_context = NULL); static bool ConnectToCollector(const std::string &collector_ip, int collector_port, bool periodicuve = false); - static void ReConfigCollectors(std::vector list); + static void ReConfigCollectors(const std::vector& collector_list); static void Uninit(); // Disk Usage diff --git a/library/cpp/sandesh_client.cc b/library/cpp/sandesh_client.cc index 8dacaf40..53048017 100644 --- a/library/cpp/sandesh_client.cc +++ b/library/cpp/sandesh_client.cc @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -30,6 +31,7 @@ #include #include "sandesh_client.h" #include "sandesh_uve.h" +#include "sandesh_util.h" using boost::asio::ip::address; using namespace boost::asio; @@ -53,21 +55,18 @@ const std::vector (2500, SandeshLevel::INVALID, false, false); SandeshClient::SandeshClient(EventManager *evm, - Endpoint primary, Endpoint secondary, Sandesh::CollectorSubFn csf, bool periodicuve) + const std::vector &collectors, Sandesh::CollectorSubFn csf, + bool periodicuve) : TcpServer(evm), sm_task_instance_(kSMTaskInstance), sm_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSMTask)), session_task_instance_(kSessionTaskInstance), session_writer_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionWriterTask)), session_reader_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionReaderTask)), - primary_(primary), - secondary_(secondary), + collectors_(collectors), csf_(csf), sm_(SandeshClientSM::CreateClientSM(evm, this, sm_task_instance_, sm_task_id_, periodicuve)), session_wm_info_(kSessionWaterMarkInfo) { - SANDESH_LOG(INFO,"primary " << primary_); - SANDESH_LOG(INFO,"secondary " << secondary_); - // Set task policy for exclusion between state machine and session tasks since // session delete happens in state machine task if (!task_policy_set_) { @@ -82,69 +81,40 @@ SandeshClient::SandeshClient(EventManager *evm, SandeshClient::~SandeshClient() {} void SandeshClient::CollectorHandler(std::vector resp) { - - Endpoint primary = Endpoint(); - Endpoint secondary = Endpoint(); - + std::vector collectors; if (resp.size()>=1) { - primary = resp[0].ep; - SANDESH_LOG(INFO, "DiscUpdate for primary " << primary); + collectors.push_back(resp[0].ep); + SANDESH_LOG(INFO, "Discovery update for collector #1 " << resp[0].ep); } if (resp.size()>=2) { - secondary = resp[1].ep; - SANDESH_LOG(INFO, "DiscUpdate for secondary " << secondary); + collectors.push_back(resp[1].ep); + SANDESH_LOG(INFO, "Discovery update for collector #2 " << resp[1].ep); } - if (primary!=Endpoint()) { - sm_->SetCandidates(primary, secondary); + if (collectors.size()) { + sm_->SetCollectors(collectors); } } -void SandeshClient::ReConfigCollectors(std::vector collector_list) { - - Endpoint primary = Endpoint(); - Endpoint secondary = Endpoint(); - - std::vector ep; - uint32_t port; - address addr; - boost::system::error_code ec; - if (collector_list.size()>=1) { - boost::split(ep, collector_list[0], boost::is_any_of(":")); +void SandeshClient::ReConfigCollectors( + const std::vector& collector_list) { + std::vector collector_endpoints; - addr = address::from_string(ep[0], ec); - if (ec) { - SANDESH_LOG(ERROR, "ReConfig for primary failed Error: " << ec); + BOOST_FOREACH(const std::string& collector, collector_list) { + Endpoint ep; + if (!MakeEndpoint(&ep, collector)) { + SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " << + collector); return; } - primary.address(addr); - port = strtoul(ep[1].c_str(), NULL, 0); - primary.port(port); - - SANDESH_LOG(INFO, "ReConfig for primary " << primary); - } - if (collector_list.size()>=2) { - boost::split(ep, collector_list[1], boost::is_any_of(":")); - - addr = address::from_string(ep[0], ec); - if (ec) { - SANDESH_LOG(ERROR, "ReConfig for secondary failed Error: " << ec); - return; - } - secondary.address(addr); - port = strtoul(ep[1].c_str(), NULL, 0); - secondary.port(port); - - SANDESH_LOG(INFO, "ReConfig for secondary " << secondary); - } - if (primary!=Endpoint()) { - sm_->SetCandidates(primary, secondary); + collector_endpoints.push_back(ep); } + sm_->SetCollectors(collector_endpoints); } void SandeshClient::Initiate() { sm_->SetAdminState(false); - if (primary_ != Endpoint()) - sm_->SetCandidates(primary_,secondary_); + if (collectors_.size()) + sm_->SetCollectors(collectors_); // subscribe for the collector service only if the collector list // is not provided by the generator. else if (csf_ != 0) { @@ -304,7 +274,8 @@ static uint64_t client_start_time; void SandeshClient::SendUVE(int count, const string & stateName, const string & server, - Endpoint primary, Endpoint secondary) { + const Endpoint & server_ip, + const std::vector & collector_eps) { ModuleClientState mcs; mcs.set_name(Sandesh::source() + ":" + Sandesh::node_type() + ":" + Sandesh::module() + ":" + Sandesh::instance_id()); @@ -313,18 +284,22 @@ void SandeshClient::SendUVE(int count, client_start_time = UTCTimestampUsec(); client_start = true; } - std::ostringstream pri,sec; - pri << primary; - sec << secondary; - sci.set_start_time(client_start_time); sci.set_successful_connections(count); sci.set_pid(getpid()); sci.set_http_port(Sandesh::http_port()); sci.set_status(stateName); sci.set_collector_name(server); - sci.set_primary(pri.str()); - sci.set_secondary(sec.str()); + std::ostringstream collector_ip; + collector_ip << server_ip; + sci.set_collector_ip(collector_ip.str()); + std::vector collectors; + BOOST_FOREACH(const TcpServer::Endpoint& ep, collector_eps) { + std::ostringstream collector_ip; + collector_ip << ep; + collectors.push_back(collector_ip.str()); + } + sci.set_collector_list(collectors); // Sandesh client socket statistics SocketIOStats rx_stats; GetRxSocketStats(rx_stats); diff --git a/library/cpp/sandesh_client.h b/library/cpp/sandesh_client.h index 65fc8e80..5425adb4 100644 --- a/library/cpp/sandesh_client.h +++ b/library/cpp/sandesh_client.h @@ -38,8 +38,7 @@ class SandeshHeader; class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { public: - SandeshClient(EventManager *evm, Endpoint primary, - Endpoint secondary = Endpoint(), + SandeshClient(EventManager *evm, const std::vector &collectors, Sandesh::CollectorSubFn csf = 0, bool periodicuve = false); @@ -62,7 +61,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { const uint32_t header_offset); void SendUVE(int count, const std::string & stateName, const std::string & server, - Endpoint primary, Endpoint secondary); + const Endpoint & server_ip, const std::vector & collector_eps); bool SendSandesh(Sandesh *snh); @@ -91,7 +90,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { void ResetSessionWaterMarkInfo(); void GetSessionWaterMarkInfo( std::vector &scwm_info) const; - void ReConfigCollectors(std::vector); + void ReConfigCollectors(const std::vector&); friend class CollectorInfoRequest; protected: @@ -110,7 +109,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { int session_task_instance_; int session_writer_task_id_; int session_reader_task_id_; - Endpoint primary_, secondary_; + std::vector collectors_; Sandesh::CollectorSubFn csf_; boost::scoped_ptr sm_; std::vector session_wm_info_; diff --git a/library/cpp/sandesh_client_sm.cc b/library/cpp/sandesh_client_sm.cc index d4b1d927..da84667d 100644 --- a/library/cpp/sandesh_client_sm.cc +++ b/library/cpp/sandesh_client_sm.cc @@ -78,14 +78,14 @@ struct EvStop : sc::event { bool enq_; }; -struct EvDiscUpdate : sc::event { - EvDiscUpdate(TcpServer::Endpoint active, TcpServer::Endpoint backup) : - active_(active), backup_(backup) { +struct EvCollectorUpdate : sc::event { + EvCollectorUpdate(const std::vector &collectors) : + collectors_(collectors) { } static const char * Name() { - return "EvDiscUpdate"; + return "EvCollectorUpdate"; } - TcpServer::Endpoint active_,backup_; + std::vector collectors_; }; struct EvIdleHoldTimerExpired : sc::event { @@ -215,7 +215,7 @@ struct Idle : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, ReleaseSandesh::reaction, DeleteTcpSession::reaction > reactions; @@ -259,40 +259,26 @@ struct Idle : public sc::state { return discard_event(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - state_machine->DiscUpdate( - SandeshClientSM::IDLE, true, - event.active_, event.backup_); + state_machine->CollectorUpdate(event.collectors_); return discard_event(); } sc::result react(const EvIdleHoldTimerExpired &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::IDLE, false, - TcpServer::Endpoint(), TcpServer::Endpoint())) { - // Update connection info - ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, - std::string(), ConnectionStatus::INIT, - state_machine->server(), - state_machine->StateName() + " : " + event.Name() + - " -> Connect"); - return transit(); - } // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, - std::string(), ConnectionStatus::DOWN, + std::string(), ConnectionStatus::INIT, state_machine->server(), - state_machine->StateName() + " : " + event.Name() + - " -> Disconnect"); - return transit(); + state_machine->StateName() + " : " + event.Name() + " -> Connect"); + return transit(); } }; struct Disconnect : public sc::state { typedef mpl::list< TransitToIdle::reaction, - sc::custom_reaction, + sc::custom_reaction, ReleaseSandesh::reaction, DeleteTcpSession::reaction > reactions; @@ -304,20 +290,16 @@ struct Disconnect : public sc::state { state_machine->SendUVE(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::DISCONNECT, true, - event.active_, event.backup_)) { - // Update connection info - ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, - std::string(), ConnectionStatus::INIT, - state_machine->server(), - state_machine->StateName() + " : " + event.Name() + - " -> Connect"); - return transit(); - } - return discard_event(); + state_machine->CollectorUpdate(event.collectors_); + // Update connection info + ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, + std::string(), ConnectionStatus::INIT, + state_machine->server(), + state_machine->StateName() + " : " + event.Name() + + " -> Connect"); + return transit(); } }; @@ -328,7 +310,7 @@ struct Connect : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, ReleaseSandesh::reaction, DeleteTcpSession::reaction > reactions; @@ -354,9 +336,7 @@ struct Connect : public sc::state { sc::result react(const EvTcpConnectFail &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - state_machine->DiscUpdate( - SandeshClientSM::CONNECT, false, - TcpServer::Endpoint(), TcpServer::Endpoint()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } @@ -364,10 +344,7 @@ struct Connect : public sc::state { sc::result react(const EvConnectTimerExpired &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - // Flip the active and backup collector - state_machine->DiscUpdate( - SandeshClientSM::CONNECT, false, - TcpServer::Endpoint(), TcpServer::Endpoint()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } @@ -389,14 +366,13 @@ struct Connect : public sc::state { sc::result react(const EvTcpClose &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::CONNECT, true, - event.active_, event.backup_)) { + if (state_machine->CollectorUpdate(event.collectors_)) { return ToIdle(state_machine, event.Name()); } return discard_event(); @@ -435,7 +411,7 @@ struct ClientInit : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, DeleteTcpSession::reaction > reactions; @@ -457,15 +433,14 @@ struct ClientInit : public sc::state { sc::result react(const EvTcpClose &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - state_machine->DiscUpdate( - SandeshClientSM::CLIENT_INIT, false, - TcpServer::Endpoint(), TcpServer::Endpoint()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } sc::result react(const EvConnectTimerExpired &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } @@ -513,11 +488,9 @@ struct ClientInit : public sc::state { return discard_event(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::CLIENT_INIT, true, - event.active_, event.backup_)) { + if (state_machine->CollectorUpdate(event.collectors_)) { return ToIdle(state_machine, event.Name()); } return discard_event(); @@ -543,7 +516,7 @@ struct Established : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, DeleteTcpSession::reaction > reactions; @@ -566,13 +539,9 @@ struct Established : public sc::state { } sc::result react(const EvTcpClose &event) { - SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - - if (state_machine->DiscUpdate( - SandeshClientSM::ESTABLISHED, false, - TcpServer::Endpoint(), TcpServer::Endpoint())) { + if (state_machine->CollectorChange()) { // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, std::string(), ConnectionStatus::INIT, @@ -607,12 +576,9 @@ struct Established : public sc::state { return discard_event(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - - if (state_machine->DiscUpdate( - SandeshClientSM::ESTABLISHED, true, - event.active_, event.backup_)) { + if (state_machine->CollectorUpdate(event.collectors_)) { // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, std::string(), ConnectionStatus::INIT, @@ -931,8 +897,6 @@ void SandeshClientSMImpl::Enqueue(const Ev &event) { SandeshClientSMImpl::SandeshClientSMImpl(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve) : SandeshClientSM(mgr), - active_(TcpServer::Endpoint()), - backup_(TcpServer::Endpoint()), work_queue_(sm_task_id, sm_task_instance, boost::bind(&SandeshClientSMImpl::DequeueEvent, this, _1)), connect_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Connect timer", sm_task_id, sm_task_instance)), @@ -1040,52 +1004,57 @@ bool SandeshClientSMImpl::StatisticsTimerExpired() { return true; } -void SandeshClientSMImpl::GetCandidates(TcpServer::Endpoint& active, - TcpServer::Endpoint &backup) const { - active = active_; - backup = backup_; +void SandeshClientSMImpl::SetCollectors( + const std::vector& collectors) { + Enqueue(scm::EvCollectorUpdate(collectors)); } -void SandeshClientSMImpl::SetCandidates( - TcpServer::Endpoint active, TcpServer::Endpoint backup) { - Enqueue(scm::EvDiscUpdate(active,backup)); +void SandeshClientSMImpl::GetCollectors( + std::vector& collectors) { + collectors = collectors_; } -bool SandeshClientSMImpl::DiscUpdate(State from_state, bool update, - TcpServer::Endpoint active, TcpServer::Endpoint backup) { - - if (update) { - active_ = active; - backup_ = backup; - } - if ((from_state == DISCONNECT) || - ((from_state == IDLE) && (!update))) { - set_server(active_); - SendUVE(); - return true; - } - if ((from_state == ESTABLISHED)||(from_state == CONNECT)||(from_state == CLIENT_INIT)) { - if (!update) { - if (backup_!=TcpServer::Endpoint()) { - TcpServer::Endpoint temp = backup_; - backup_ = active_; - active_ = temp; - set_server(active_); - SendUVE(); - return true; - } - } else { - if (server()!=active_) { - set_server(active_); - SendUVE(); - return true; - } +TcpServer::Endpoint SandeshClientSMImpl::GetCollector() const { + if (collectors_.size()) { + return collectors_[collector_index_]; + } + return TcpServer::Endpoint(); +} + +TcpServer::Endpoint SandeshClientSMImpl::GetNextCollector() { + if (collectors_.size()) { + if (++collector_index_ == collectors_.size()) { + collector_index_ = 0; } + return collectors_[collector_index_]; + } + return TcpServer::Endpoint(); +} + +bool SandeshClientSMImpl::CollectorUpdate( + const std::vector& collectors) { + collectors_ = collectors; + collector_index_ = 0; + TcpServer::Endpoint collector(GetCollector()); + if (server() != collector) { + set_server(collector); + SendUVE(); + return true; } SendUVE(); return false; } +bool SandeshClientSMImpl::CollectorChange() { + TcpServer::Endpoint collector(GetNextCollector()); + if (server() != collector) { + set_server(collector); + SendUVE(); + return true; + } + return false; +} + SandeshClientSM * SandeshClientSM::CreateClientSM( EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve) { diff --git a/library/cpp/sandesh_client_sm.h b/library/cpp/sandesh_client_sm.h index e3b2695b..6d505f1d 100644 --- a/library/cpp/sandesh_client_sm.h +++ b/library/cpp/sandesh_client_sm.h @@ -39,8 +39,8 @@ class SandeshClientSM { const uint32_t header_offset) = 0; virtual void SendUVE(int count, const std::string & stateName, const std::string & server, - TcpServer::Endpoint pri = TcpServer::Endpoint(), - TcpServer::Endpoint sec = TcpServer::Endpoint()) = 0; + const TcpServer::Endpoint & server_ip, + const std::vector & collectors) = 0; virtual SandeshSession *CreateSMSession( TcpSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, @@ -74,8 +74,8 @@ class SandeshClientSM { // This function is used to start and stop the state machine virtual void SetAdminState(bool down) = 0; - // This function should be called when there is a discovery service update - virtual void SetCandidates(TcpServer::Endpoint active, TcpServer::Endpoint backup) = 0; + // This function should be called when there is a change in the Collector list + virtual void SetCollectors(const std::vector &collectors) = 0; // This function is used to send UVE sandesh's to the server virtual bool SendSandeshUVE(Sandesh* snh) = 0; diff --git a/library/cpp/sandesh_client_sm_priv.h b/library/cpp/sandesh_client_sm_priv.h index a87e8f20..87ee4c34 100644 --- a/library/cpp/sandesh_client_sm_priv.h +++ b/library/cpp/sandesh_client_sm_priv.h @@ -59,9 +59,10 @@ class SandeshClientSMImpl : public SandeshClientSM, } void SetAdminState(bool down); - void SetCandidates(TcpServer::Endpoint active, TcpServer::Endpoint backup); - void GetCandidates(TcpServer::Endpoint& active, - TcpServer::Endpoint &backup) const; + void SetCollectors(const std::vector& collectors); + void GetCollectors(std::vector& collectors); + TcpServer::Endpoint GetCollector() const; + TcpServer::Endpoint GetNextCollector(); bool SendSandeshUVE(Sandesh* snh); bool SendSandesh(Sandesh* snh); void EnqueDelSession(SandeshSession * session); @@ -89,8 +90,8 @@ class SandeshClientSMImpl : public SandeshClientSM, bool IdleHoldTimerRunning(); void IdleHoldTimerFired(); - bool DiscUpdate(State from_state, bool update, - TcpServer::Endpoint active, TcpServer::Endpoint backup); + bool CollectorUpdate(const std::vector &collectors); + bool CollectorChange(); // Calculate Timer value for active to connect transition. int GetConnectTime() const; @@ -136,7 +137,8 @@ class SandeshClientSMImpl : public SandeshClientSM, void unconsumed_event(const sc::event_base &event); void SendUVE () { - mgr_->SendUVE(connects(), StateName(), collector_name(), active_, backup_); + mgr_->SendUVE(connects(), StateName(), collector_name(), server(), + collectors_); } private: static const int kStatisticsSendInterval = 30000; // 30 sec .. specified in milliseconds @@ -159,8 +161,9 @@ class SandeshClientSMImpl : public SandeshClientSM, void UpdateEventEnqueueFail(const sc::event_base &event); void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail); + std::vector collectors_; + int collector_index_; TcpServer::Endpoint active_; - TcpServer::Endpoint backup_; WorkQueue work_queue_; Timer *connect_timer_; Timer *idle_hold_timer_; diff --git a/library/cpp/sandesh_util.cc b/library/cpp/sandesh_util.cc new file mode 100644 index 00000000..abbd428f --- /dev/null +++ b/library/cpp/sandesh_util.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. + */ + +// +// sandesh_util.cc +// + +#include + +#include + +#include "sandesh_util.h" + +using boost::asio::ip::address; + +bool MakeEndpoint(TcpServer::Endpoint* ep, const std::string& epstr) { + typedef boost::tokenizer > tokenizer; + boost::char_separator sep(":"); + + tokenizer tokens(epstr, sep); + tokenizer::iterator it = tokens.begin(); + std::string sip(*it); + ++it; + std::string sport(*it); + int port; + stringToInteger(sport, port); + boost::system::error_code ec; + address addr = address::from_string(sip, ec); + if (ec) { + return false; + } + *ep = TcpServer::Endpoint(addr, port); + return true; +} diff --git a/library/cpp/sandesh_util.h b/library/cpp/sandesh_util.h new file mode 100644 index 00000000..a298c2f5 --- /dev/null +++ b/library/cpp/sandesh_util.h @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. + */ + +// +// sandesh_util.h +// + +#ifndef __SANDESH_UTIL_H__ +#define __SANDESH_UTIL_H__ + +#include + +bool MakeEndpoint(TcpServer::Endpoint* ep, const std::string& ep_str); + +#endif // __SANDESH_UTIL_H__ diff --git a/library/cpp/test/sandesh_client_test.cc b/library/cpp/test/sandesh_client_test.cc index 0931d37c..106a40af 100644 --- a/library/cpp/test/sandesh_client_test.cc +++ b/library/cpp/test/sandesh_client_test.cc @@ -28,6 +28,7 @@ using namespace std; using namespace boost::assign; using namespace boost::posix_time; using boost::system::error_code; +using boost::asio::ip::address; using namespace contrail::sandesh::protocol; using namespace contrail::sandesh::transport; @@ -86,7 +87,7 @@ class SandeshSessionMock : public SandeshSession { class SandeshClientMock : public SandeshClient { public: - SandeshClientMock(EventManager *evm, Endpoint dummy) : + SandeshClientMock(EventManager *evm, std::vector dummy) : SandeshClient(evm, dummy), session_(NULL), old_session_(NULL) { @@ -150,7 +151,7 @@ class SandeshClientMock : public SandeshClient { class SandeshClientStateMachineTest : public ::testing::Test { protected: SandeshClientStateMachineTest() : - client_(new SandeshClientMock((&evm_), Endpoint())), + client_(new SandeshClientMock((&evm_), std::vector())), timer_(TimerManager::CreateTimer(*evm_.io_service(), "Dummy timer")), sm_(dynamic_cast(client_->state_machine())) { task_util::WaitForIdle(); @@ -179,6 +180,17 @@ class SandeshClientStateMachineTest : public ::testing::Test { return false; } + std::vector GetCollectorEndpoints( + const std::vector& collectors) { + boost::system::error_code ec; + std::vector collector_endpoints; + BOOST_FOREACH(const std::string &collector, collectors) { + Endpoint ep(address::from_string(collector, ec), 8086); + collector_endpoints.push_back(ep); + } + return collector_endpoints; + } + void RunToState(SandeshClientSM::State state) { timer_->Start(15000, boost::bind(&SandeshClientStateMachineTest::DummyTimerHandler, this)); @@ -198,6 +210,12 @@ class SandeshClientStateMachineTest : public ::testing::Test { VerifyState(state); break; } + case SandeshClientSM::DISCONNECT: { + GetToState(SandeshClientSM::IDLE); + EvAdminUpNoHoldTime(); + RunToState(state); + break; + } case SandeshClientSM::CONNECT: { GetToState(SandeshClientSM::IDLE); EvAdminUp(); @@ -211,6 +229,12 @@ class SandeshClientStateMachineTest : public ::testing::Test { VerifyState(SandeshClientSM::CLIENT_INIT); break; } + case SandeshClientSM::ESTABLISHED: { + GetToState(SandeshClientSM::CLIENT_INIT); + EvSandeshCtrlMessageRecv(); + RunToState(SandeshClientSM::ESTABLISHED); + break; + } default: { ASSERT_TRUE(false); break; @@ -227,17 +251,26 @@ class SandeshClientStateMachineTest : public ::testing::Test { switch (state) { case SandeshClientSM::IDLE: - EXPECT_TRUE(!ConnectTimerRunning()); + EXPECT_FALSE(ConnectTimerRunning()); + EXPECT_TRUE(client_->session() == NULL); + break; + case SandeshClientSM::DISCONNECT: + EXPECT_FALSE(ConnectTimerRunning()); EXPECT_TRUE(client_->session() == NULL); break; case SandeshClientSM::CONNECT: EXPECT_TRUE(ConnectTimerRunning()); - EXPECT_TRUE(!IdleHoldTimerRunning()); + EXPECT_FALSE(IdleHoldTimerRunning()); EXPECT_TRUE(client_->session() != NULL); break; case SandeshClientSM::CLIENT_INIT: EXPECT_TRUE(ConnectTimerRunning()); - EXPECT_TRUE(!IdleHoldTimerRunning()); + EXPECT_FALSE(IdleHoldTimerRunning()); + EXPECT_TRUE(client_->session() != NULL); + break; + case SandeshClientSM::ESTABLISHED: + EXPECT_FALSE(ConnectTimerRunning()); + EXPECT_FALSE(IdleHoldTimerRunning()); EXPECT_TRUE(client_->session() != NULL); break; default: @@ -265,6 +298,10 @@ class SandeshClientStateMachineTest : public ::testing::Test { sm_->SetAdminState(false); sm_->set_idle_hold_time(1); } + void EvAdminUpNoHoldTime() { + sm_->SetAdminState(false); + sm_->set_idle_hold_time(0); + } void EvAdminDown() { sm_->SetAdminState(true); sm_->set_idle_hold_time(1); @@ -291,6 +328,26 @@ class SandeshClientStateMachineTest : public ::testing::Test { string xml((const char *)msg, sizeof(msg)); sm_->OnMessage(session, xml); } + void EvSandeshCtrlMessageRecv(SandeshSessionMock *session = NULL) { + session = GetSession(session); + string xml; + contrail::sandesh::test::CreateFakeCtrlMessage(xml); + sm_->OnMessage(session, xml); + } + void EvCollectorUpdateTrue() { + if (collector_list_ != collector_list1_) { + collector_list_ = collector_list1_; + } else { + collector_list_ = collector_list2_; + } + sm_->SetCollectors(GetCollectorEndpoints(collector_list_)); + } + void EvCollectorUpdateFalse() { + if (collector_list_.size() == 0) { + collector_list_ = collector_list1_; + } + sm_->SetCollectors(GetCollectorEndpoints(collector_list_)); + } bool IdleHoldTimerRunning() { return sm_->IdleHoldTimerRunning(); } bool ConnectTimerRunning() { return sm_->ConnectTimerRunning(); } @@ -299,8 +356,17 @@ class SandeshClientStateMachineTest : public ::testing::Test { SandeshClientMock *client_; SandeshClientSMImpl *sm_; Timer *timer_; + std::vector collector_list_; + + static const std::vector collector_list1_; + static const std::vector collector_list2_; }; +const std::vector SandeshClientStateMachineTest::collector_list1_ = + boost::assign::list_of("1.1.1.1")("2.2.2.2"); +const std::vector SandeshClientStateMachineTest::collector_list2_ = + boost::assign::list_of("3.3.3.3"); + typedef boost::function EvGen; struct EvGenComp { bool operator()(const EvGen &lhs, const EvGen &rhs) { return &lhs < &rhs; } @@ -318,31 +384,43 @@ TEST_F(SandeshClientStateMachineTest, Matrix) { (SandeshSessionMock *) NULL), E) Transitions idle = map_list_of - CLIENT_SSM_TRANSITION(EvAdminUp, SandeshClientSM::CONNECT); + CLIENT_SSM_TRANSITION(EvAdminUp, SandeshClientSM::CONNECT) + CLIENT_SSM_TRANSITION(EvAdminUpNoHoldTime, SandeshClientSM::DISCONNECT) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::IDLE) // collector_list1_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::IDLE); // collector_list1_ - Transitions none = map_list_of - CLIENT_SSM_TRANSITION(EvAdminDown, SandeshClientSM::IDLE); + Transitions disconnect = map_list_of + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::CONNECT) // collector_list2_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::CONNECT); // collector_list2_ Transitions connect = map_list_of CLIENT_SSM_TRANSITION(EvAdminDown, SandeshClientSM::IDLE) CLIENT_SSM_TRANSITION(EvConnectTimerExpired, SandeshClientSM::IDLE) CLIENT_SSM_TRANSITION(EvTcpConnected, SandeshClientSM::CLIENT_INIT) CLIENT_SSM_TRANSITION(EvTcpConnectFail, SandeshClientSM::IDLE) - CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE); + CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::IDLE) // collector_list1_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::CONNECT); // collector_list1_ Transitions client_init = map_list_of CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE) - CLIENT_SSM_TRANSITION2(EvSandeshMessageRecv, SandeshClientSM::CLIENT_INIT); + CLIENT_SSM_TRANSITION2(EvSandeshMessageRecv, SandeshClientSM::CLIENT_INIT) + CLIENT_SSM_TRANSITION2(EvSandeshCtrlMessageRecv, SandeshClientSM::ESTABLISHED) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::IDLE) // collector_list2_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::CLIENT_INIT); // collector_list2_ + + Transitions established = map_list_of + CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE) + CLIENT_SSM_TRANSITION2(EvSandeshMessageRecv, SandeshClientSM::ESTABLISHED) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::CONNECT) // collector_list1_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::ESTABLISHED) // collector_list1_ + CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::CONNECT); Transitions matrix[] = - { idle, none, connect, client_init }; + { idle, disconnect, connect, client_init, established }; - for (int k = SandeshClientSM::IDLE; k <= SandeshClientSM::CLIENT_INIT; k++) { + for (int k = SandeshClientSM::IDLE; k <= SandeshClientSM::ESTABLISHED; k++) { SandeshClientSM::State i = static_cast(k); - // Ignore DISCONNECT and ESTABLISHED in SandeshClientSM - if (i == SandeshClientSM::ESTABLISHED || i == SandeshClientSM::DISCONNECT) { - continue; - } int count = 0; for (Transitions::iterator j = matrix[i].begin(); j != matrix[i].end(); j++) { diff --git a/library/cpp/test/sandesh_test_common.cc b/library/cpp/test/sandesh_test_common.cc index 5e31c42d..ee2dcf83 100644 --- a/library/cpp/test/sandesh_test_common.cc +++ b/library/cpp/test/sandesh_test_common.cc @@ -67,6 +67,37 @@ void CreateFakeMessage(uint8_t *data, size_t length) { EXPECT_EQ(offset, length); } +void CreateFakeCtrlMessage(std::string& data) { + const std::string ctrl_xml("true"); + SandeshHeader header; + // Populate the header + header.set_Namespace("Test"); + header.set_Timestamp(123456); + header.set_Module("SandeshStateMachineTest"); + header.set_Source("TestMachine"); + header.set_Context(""); + header.set_SequenceNum(0); + header.set_VersionSig(0); + header.set_Type(SandeshType::REQUEST); + header.set_Hints(g_sandesh_constants.SANDESH_CONTROL_HINT); + boost::shared_ptr btrans = + boost::shared_ptr( + new TMemoryBuffer(512)); + boost::shared_ptr prot = + boost::shared_ptr( + new TXMLProtocol(btrans)); + // Write the sandesh header + int nbytes = header.write(prot); + EXPECT_GT(nbytes, 0); + // Get the buffer + uint8_t *hbuffer; + uint32_t hlen; + btrans->getBuffer(&hbuffer, &hlen); + EXPECT_EQ(hlen, nbytes); + data.append((char*)hbuffer, hlen); + data.append(ctrl_xml); +} + } // end namespace test } // end namespace sandesh } // end namespace contrail diff --git a/library/cpp/test/sandesh_test_common.h b/library/cpp/test/sandesh_test_common.h index 416c5950..33f5e5fd 100644 --- a/library/cpp/test/sandesh_test_common.h +++ b/library/cpp/test/sandesh_test_common.h @@ -37,6 +37,7 @@ class SandeshServerTest : public SandeshServer { }; void CreateFakeMessage(uint8_t *data, size_t length); +void CreateFakeCtrlMessage(std::string& data); } // end namespace test } // end namespace sandesh