diff --git a/library/common/sandesh_uve.sandesh b/library/common/sandesh_uve.sandesh index 40138539..e3a5c264 100644 --- a/library/common/sandesh_uve.sandesh +++ b/library/common/sandesh_uve.sandesh @@ -226,6 +226,9 @@ struct SandeshClientInfo { 4: optional u32 http_port 5: optional u64 start_time 6: optional string collector_name + 11: optional string collector_ip + 12: optional list collector_list + // primary and secondary fields are deprecated 7: optional string primary 8: optional string secondary 9: optional io.SocketIOStats rx_socket_stats diff --git a/library/cpp/SConscript b/library/cpp/SConscript index af138f19..9c31a7ec 100644 --- a/library/cpp/SConscript +++ b/library/cpp/SConscript @@ -103,6 +103,7 @@ libsandesh = env.Library(target = 'sandesh', 'sandesh_uve.cc', 'sandesh_message_builder.cc', 'sandesh_statistics.cc', + 'sandesh_util.cc', 'protocol/TXMLProtocol.cpp', 'transport/TFDTransport.cpp', 'transport/TSimpleFileTransport.cpp', @@ -128,6 +129,7 @@ env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_connection.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_statistics.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', 'request_pipeline.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_message_builder.h') +env.Install(env['TOP_INCLUDE'] + '/sandesh', 'sandesh_util.h') env.Install(env['TOP_INCLUDE'] + '/sandesh', SandeshGenHdrs) env.Install(env['TOP_INCLUDE'] + '/sandesh', SandeshTraceGenHdrs) env.Install(env['TOP_INCLUDE'] + '/sandesh/protocol', 'protocol/TProtocol.h') diff --git a/library/cpp/sandesh.cc b/library/cpp/sandesh.cc index 680439f4..dfdfe27f 100644 --- a/library/cpp/sandesh.cc +++ b/library/cpp/sandesh.cc @@ -9,7 +9,7 @@ // #include -#include +#include #include #include #include @@ -102,7 +102,8 @@ void Sandesh::InitClient(EventManager *evm, Endpoint server, bool periodicuve) { connect_to_collector_); // Create and initialize the client assert(client_ == NULL); - client_ = new SandeshClient(evm, server, Endpoint(), 0, periodicuve); + std::vector collector_endpoints = boost::assign::list_of(server); + client_ = new SandeshClient(evm, collector_endpoints, 0, periodicuve); client_->Initiate(); } @@ -202,59 +203,29 @@ bool Sandesh::ConnectToCollector(const std::string &collector_ip, return true; } -void Sandesh::ReConfigCollectors(std::vector list) { +void Sandesh::ReConfigCollectors(const std::vector& collector_list) { if (client_) { - client_->ReConfigCollectors(list); + client_->ReConfigCollectors(collector_list); } } - -static bool make_endpoint(TcpServer::Endpoint& ep,const std::string& epstr) { - - typedef boost::tokenizer > tokenizer; - boost::char_separator sep(":"); - - tokenizer tokens(epstr, sep); - tokenizer::iterator it = tokens.begin(); - std::string sip(*it); - ++it; - std::string sport(*it); - int port; - stringToInteger(sport, port); - boost::system::error_code ec; - address addr = address::from_string(sip, ec); - if (ec) { - SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " << - sip << " Error: " << ec); - return false; - } - ep = TcpServer::Endpoint(addr, port); - return true; -} - bool Sandesh::InitClient(EventManager *evm, const std::vector &collectors, CollectorSubFn csf) { connect_to_collector_ = true; SANDESH_LOG(INFO, "SANDESH: CONNECT TO COLLECTOR: " << connect_to_collector_); - - Endpoint primary = Endpoint(); - Endpoint secondary = Endpoint(); - - if (collectors.size()!=0) { - if (!make_endpoint(primary, collectors[0])) { + std::vector collector_endpoints; + BOOST_FOREACH(const std::string &collector, collectors) { + Endpoint ep; + if (!MakeEndpoint(&ep, collector)) { + SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " << + collector); return false; } - if (collectors.size()>1) { - if (!make_endpoint(secondary, collectors[1])) { - return false; - } - } + collector_endpoints.push_back(ep); } - - client_ = new SandeshClient(evm, - primary, secondary, csf, true); + client_ = new SandeshClient(evm, collector_endpoints, csf, true); client_->Initiate(); return true; } diff --git a/library/cpp/sandesh.h b/library/cpp/sandesh.h index dc2f4fa6..f39b5da1 100644 --- a/library/cpp/sandesh.h +++ b/library/cpp/sandesh.h @@ -90,6 +90,7 @@ #include #include #include +#include #include #include #include @@ -186,7 +187,7 @@ class Sandesh { SandeshContext *client_context = NULL); static bool ConnectToCollector(const std::string &collector_ip, int collector_port, bool periodicuve = false); - static void ReConfigCollectors(std::vector list); + static void ReConfigCollectors(const std::vector& collector_list); static void Uninit(); // Disk Usage diff --git a/library/cpp/sandesh_client.cc b/library/cpp/sandesh_client.cc index 8dacaf40..53048017 100644 --- a/library/cpp/sandesh_client.cc +++ b/library/cpp/sandesh_client.cc @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -30,6 +31,7 @@ #include #include "sandesh_client.h" #include "sandesh_uve.h" +#include "sandesh_util.h" using boost::asio::ip::address; using namespace boost::asio; @@ -53,21 +55,18 @@ const std::vector (2500, SandeshLevel::INVALID, false, false); SandeshClient::SandeshClient(EventManager *evm, - Endpoint primary, Endpoint secondary, Sandesh::CollectorSubFn csf, bool periodicuve) + const std::vector &collectors, Sandesh::CollectorSubFn csf, + bool periodicuve) : TcpServer(evm), sm_task_instance_(kSMTaskInstance), sm_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSMTask)), session_task_instance_(kSessionTaskInstance), session_writer_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionWriterTask)), session_reader_task_id_(TaskScheduler::GetInstance()->GetTaskId(kSessionReaderTask)), - primary_(primary), - secondary_(secondary), + collectors_(collectors), csf_(csf), sm_(SandeshClientSM::CreateClientSM(evm, this, sm_task_instance_, sm_task_id_, periodicuve)), session_wm_info_(kSessionWaterMarkInfo) { - SANDESH_LOG(INFO,"primary " << primary_); - SANDESH_LOG(INFO,"secondary " << secondary_); - // Set task policy for exclusion between state machine and session tasks since // session delete happens in state machine task if (!task_policy_set_) { @@ -82,69 +81,40 @@ SandeshClient::SandeshClient(EventManager *evm, SandeshClient::~SandeshClient() {} void SandeshClient::CollectorHandler(std::vector resp) { - - Endpoint primary = Endpoint(); - Endpoint secondary = Endpoint(); - + std::vector collectors; if (resp.size()>=1) { - primary = resp[0].ep; - SANDESH_LOG(INFO, "DiscUpdate for primary " << primary); + collectors.push_back(resp[0].ep); + SANDESH_LOG(INFO, "Discovery update for collector #1 " << resp[0].ep); } if (resp.size()>=2) { - secondary = resp[1].ep; - SANDESH_LOG(INFO, "DiscUpdate for secondary " << secondary); + collectors.push_back(resp[1].ep); + SANDESH_LOG(INFO, "Discovery update for collector #2 " << resp[1].ep); } - if (primary!=Endpoint()) { - sm_->SetCandidates(primary, secondary); + if (collectors.size()) { + sm_->SetCollectors(collectors); } } -void SandeshClient::ReConfigCollectors(std::vector collector_list) { - - Endpoint primary = Endpoint(); - Endpoint secondary = Endpoint(); - - std::vector ep; - uint32_t port; - address addr; - boost::system::error_code ec; - if (collector_list.size()>=1) { - boost::split(ep, collector_list[0], boost::is_any_of(":")); +void SandeshClient::ReConfigCollectors( + const std::vector& collector_list) { + std::vector collector_endpoints; - addr = address::from_string(ep[0], ec); - if (ec) { - SANDESH_LOG(ERROR, "ReConfig for primary failed Error: " << ec); + BOOST_FOREACH(const std::string& collector, collector_list) { + Endpoint ep; + if (!MakeEndpoint(&ep, collector)) { + SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " << + collector); return; } - primary.address(addr); - port = strtoul(ep[1].c_str(), NULL, 0); - primary.port(port); - - SANDESH_LOG(INFO, "ReConfig for primary " << primary); - } - if (collector_list.size()>=2) { - boost::split(ep, collector_list[1], boost::is_any_of(":")); - - addr = address::from_string(ep[0], ec); - if (ec) { - SANDESH_LOG(ERROR, "ReConfig for secondary failed Error: " << ec); - return; - } - secondary.address(addr); - port = strtoul(ep[1].c_str(), NULL, 0); - secondary.port(port); - - SANDESH_LOG(INFO, "ReConfig for secondary " << secondary); - } - if (primary!=Endpoint()) { - sm_->SetCandidates(primary, secondary); + collector_endpoints.push_back(ep); } + sm_->SetCollectors(collector_endpoints); } void SandeshClient::Initiate() { sm_->SetAdminState(false); - if (primary_ != Endpoint()) - sm_->SetCandidates(primary_,secondary_); + if (collectors_.size()) + sm_->SetCollectors(collectors_); // subscribe for the collector service only if the collector list // is not provided by the generator. else if (csf_ != 0) { @@ -304,7 +274,8 @@ static uint64_t client_start_time; void SandeshClient::SendUVE(int count, const string & stateName, const string & server, - Endpoint primary, Endpoint secondary) { + const Endpoint & server_ip, + const std::vector & collector_eps) { ModuleClientState mcs; mcs.set_name(Sandesh::source() + ":" + Sandesh::node_type() + ":" + Sandesh::module() + ":" + Sandesh::instance_id()); @@ -313,18 +284,22 @@ void SandeshClient::SendUVE(int count, client_start_time = UTCTimestampUsec(); client_start = true; } - std::ostringstream pri,sec; - pri << primary; - sec << secondary; - sci.set_start_time(client_start_time); sci.set_successful_connections(count); sci.set_pid(getpid()); sci.set_http_port(Sandesh::http_port()); sci.set_status(stateName); sci.set_collector_name(server); - sci.set_primary(pri.str()); - sci.set_secondary(sec.str()); + std::ostringstream collector_ip; + collector_ip << server_ip; + sci.set_collector_ip(collector_ip.str()); + std::vector collectors; + BOOST_FOREACH(const TcpServer::Endpoint& ep, collector_eps) { + std::ostringstream collector_ip; + collector_ip << ep; + collectors.push_back(collector_ip.str()); + } + sci.set_collector_list(collectors); // Sandesh client socket statistics SocketIOStats rx_stats; GetRxSocketStats(rx_stats); diff --git a/library/cpp/sandesh_client.h b/library/cpp/sandesh_client.h index 65fc8e80..5425adb4 100644 --- a/library/cpp/sandesh_client.h +++ b/library/cpp/sandesh_client.h @@ -38,8 +38,7 @@ class SandeshHeader; class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { public: - SandeshClient(EventManager *evm, Endpoint primary, - Endpoint secondary = Endpoint(), + SandeshClient(EventManager *evm, const std::vector &collectors, Sandesh::CollectorSubFn csf = 0, bool periodicuve = false); @@ -62,7 +61,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { const uint32_t header_offset); void SendUVE(int count, const std::string & stateName, const std::string & server, - Endpoint primary, Endpoint secondary); + const Endpoint & server_ip, const std::vector & collector_eps); bool SendSandesh(Sandesh *snh); @@ -91,7 +90,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { void ResetSessionWaterMarkInfo(); void GetSessionWaterMarkInfo( std::vector &scwm_info) const; - void ReConfigCollectors(std::vector); + void ReConfigCollectors(const std::vector&); friend class CollectorInfoRequest; protected: @@ -110,7 +109,7 @@ class SandeshClient : public TcpServer, public SandeshClientSM::Mgr { int session_task_instance_; int session_writer_task_id_; int session_reader_task_id_; - Endpoint primary_, secondary_; + std::vector collectors_; Sandesh::CollectorSubFn csf_; boost::scoped_ptr sm_; std::vector session_wm_info_; diff --git a/library/cpp/sandesh_client_sm.cc b/library/cpp/sandesh_client_sm.cc index d4b1d927..da84667d 100644 --- a/library/cpp/sandesh_client_sm.cc +++ b/library/cpp/sandesh_client_sm.cc @@ -78,14 +78,14 @@ struct EvStop : sc::event { bool enq_; }; -struct EvDiscUpdate : sc::event { - EvDiscUpdate(TcpServer::Endpoint active, TcpServer::Endpoint backup) : - active_(active), backup_(backup) { +struct EvCollectorUpdate : sc::event { + EvCollectorUpdate(const std::vector &collectors) : + collectors_(collectors) { } static const char * Name() { - return "EvDiscUpdate"; + return "EvCollectorUpdate"; } - TcpServer::Endpoint active_,backup_; + std::vector collectors_; }; struct EvIdleHoldTimerExpired : sc::event { @@ -215,7 +215,7 @@ struct Idle : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, ReleaseSandesh::reaction, DeleteTcpSession::reaction > reactions; @@ -259,40 +259,26 @@ struct Idle : public sc::state { return discard_event(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - state_machine->DiscUpdate( - SandeshClientSM::IDLE, true, - event.active_, event.backup_); + state_machine->CollectorUpdate(event.collectors_); return discard_event(); } sc::result react(const EvIdleHoldTimerExpired &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::IDLE, false, - TcpServer::Endpoint(), TcpServer::Endpoint())) { - // Update connection info - ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, - std::string(), ConnectionStatus::INIT, - state_machine->server(), - state_machine->StateName() + " : " + event.Name() + - " -> Connect"); - return transit(); - } // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, - std::string(), ConnectionStatus::DOWN, + std::string(), ConnectionStatus::INIT, state_machine->server(), - state_machine->StateName() + " : " + event.Name() + - " -> Disconnect"); - return transit(); + state_machine->StateName() + " : " + event.Name() + " -> Connect"); + return transit(); } }; struct Disconnect : public sc::state { typedef mpl::list< TransitToIdle::reaction, - sc::custom_reaction, + sc::custom_reaction, ReleaseSandesh::reaction, DeleteTcpSession::reaction > reactions; @@ -304,20 +290,16 @@ struct Disconnect : public sc::state { state_machine->SendUVE(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::DISCONNECT, true, - event.active_, event.backup_)) { - // Update connection info - ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, - std::string(), ConnectionStatus::INIT, - state_machine->server(), - state_machine->StateName() + " : " + event.Name() + - " -> Connect"); - return transit(); - } - return discard_event(); + state_machine->CollectorUpdate(event.collectors_); + // Update connection info + ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, + std::string(), ConnectionStatus::INIT, + state_machine->server(), + state_machine->StateName() + " : " + event.Name() + + " -> Connect"); + return transit(); } }; @@ -328,7 +310,7 @@ struct Connect : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, ReleaseSandesh::reaction, DeleteTcpSession::reaction > reactions; @@ -354,9 +336,7 @@ struct Connect : public sc::state { sc::result react(const EvTcpConnectFail &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - state_machine->DiscUpdate( - SandeshClientSM::CONNECT, false, - TcpServer::Endpoint(), TcpServer::Endpoint()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } @@ -364,10 +344,7 @@ struct Connect : public sc::state { sc::result react(const EvConnectTimerExpired &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - // Flip the active and backup collector - state_machine->DiscUpdate( - SandeshClientSM::CONNECT, false, - TcpServer::Endpoint(), TcpServer::Endpoint()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } @@ -389,14 +366,13 @@ struct Connect : public sc::state { sc::result react(const EvTcpClose &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::CONNECT, true, - event.active_, event.backup_)) { + if (state_machine->CollectorUpdate(event.collectors_)) { return ToIdle(state_machine, event.Name()); } return discard_event(); @@ -435,7 +411,7 @@ struct ClientInit : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, DeleteTcpSession::reaction > reactions; @@ -457,15 +433,14 @@ struct ClientInit : public sc::state { sc::result react(const EvTcpClose &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - state_machine->DiscUpdate( - SandeshClientSM::CLIENT_INIT, false, - TcpServer::Endpoint(), TcpServer::Endpoint()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } sc::result react(const EvConnectTimerExpired &event) { SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); + state_machine->CollectorChange(); return ToIdle(state_machine, event.Name()); } @@ -513,11 +488,9 @@ struct ClientInit : public sc::state { return discard_event(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - if (state_machine->DiscUpdate( - SandeshClientSM::CLIENT_INIT, true, - event.active_, event.backup_)) { + if (state_machine->CollectorUpdate(event.collectors_)) { return ToIdle(state_machine, event.Name()); } return discard_event(); @@ -543,7 +516,7 @@ struct Established : public sc::state { sc::custom_reaction, sc::custom_reaction, sc::custom_reaction, - sc::custom_reaction, + sc::custom_reaction, DeleteTcpSession::reaction > reactions; @@ -566,13 +539,9 @@ struct Established : public sc::state { } sc::result react(const EvTcpClose &event) { - SandeshClientSMImpl *state_machine = &context(); SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name()); - - if (state_machine->DiscUpdate( - SandeshClientSM::ESTABLISHED, false, - TcpServer::Endpoint(), TcpServer::Endpoint())) { + if (state_machine->CollectorChange()) { // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, std::string(), ConnectionStatus::INIT, @@ -607,12 +576,9 @@ struct Established : public sc::state { return discard_event(); } - sc::result react(const EvDiscUpdate &event) { + sc::result react(const EvCollectorUpdate &event) { SandeshClientSMImpl *state_machine = &context(); - - if (state_machine->DiscUpdate( - SandeshClientSM::ESTABLISHED, true, - event.active_, event.backup_)) { + if (state_machine->CollectorUpdate(event.collectors_)) { // Update connection info ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR, std::string(), ConnectionStatus::INIT, @@ -931,8 +897,6 @@ void SandeshClientSMImpl::Enqueue(const Ev &event) { SandeshClientSMImpl::SandeshClientSMImpl(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve) : SandeshClientSM(mgr), - active_(TcpServer::Endpoint()), - backup_(TcpServer::Endpoint()), work_queue_(sm_task_id, sm_task_instance, boost::bind(&SandeshClientSMImpl::DequeueEvent, this, _1)), connect_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Connect timer", sm_task_id, sm_task_instance)), @@ -1040,52 +1004,57 @@ bool SandeshClientSMImpl::StatisticsTimerExpired() { return true; } -void SandeshClientSMImpl::GetCandidates(TcpServer::Endpoint& active, - TcpServer::Endpoint &backup) const { - active = active_; - backup = backup_; +void SandeshClientSMImpl::SetCollectors( + const std::vector& collectors) { + Enqueue(scm::EvCollectorUpdate(collectors)); } -void SandeshClientSMImpl::SetCandidates( - TcpServer::Endpoint active, TcpServer::Endpoint backup) { - Enqueue(scm::EvDiscUpdate(active,backup)); +void SandeshClientSMImpl::GetCollectors( + std::vector& collectors) { + collectors = collectors_; } -bool SandeshClientSMImpl::DiscUpdate(State from_state, bool update, - TcpServer::Endpoint active, TcpServer::Endpoint backup) { - - if (update) { - active_ = active; - backup_ = backup; - } - if ((from_state == DISCONNECT) || - ((from_state == IDLE) && (!update))) { - set_server(active_); - SendUVE(); - return true; - } - if ((from_state == ESTABLISHED)||(from_state == CONNECT)||(from_state == CLIENT_INIT)) { - if (!update) { - if (backup_!=TcpServer::Endpoint()) { - TcpServer::Endpoint temp = backup_; - backup_ = active_; - active_ = temp; - set_server(active_); - SendUVE(); - return true; - } - } else { - if (server()!=active_) { - set_server(active_); - SendUVE(); - return true; - } +TcpServer::Endpoint SandeshClientSMImpl::GetCollector() const { + if (collectors_.size()) { + return collectors_[collector_index_]; + } + return TcpServer::Endpoint(); +} + +TcpServer::Endpoint SandeshClientSMImpl::GetNextCollector() { + if (collectors_.size()) { + if (++collector_index_ == collectors_.size()) { + collector_index_ = 0; } + return collectors_[collector_index_]; + } + return TcpServer::Endpoint(); +} + +bool SandeshClientSMImpl::CollectorUpdate( + const std::vector& collectors) { + collectors_ = collectors; + collector_index_ = 0; + TcpServer::Endpoint collector(GetCollector()); + if (server() != collector) { + set_server(collector); + SendUVE(); + return true; } SendUVE(); return false; } +bool SandeshClientSMImpl::CollectorChange() { + TcpServer::Endpoint collector(GetNextCollector()); + if (server() != collector) { + set_server(collector); + SendUVE(); + return true; + } + return false; +} + SandeshClientSM * SandeshClientSM::CreateClientSM( EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve) { diff --git a/library/cpp/sandesh_client_sm.h b/library/cpp/sandesh_client_sm.h index e3b2695b..6d505f1d 100644 --- a/library/cpp/sandesh_client_sm.h +++ b/library/cpp/sandesh_client_sm.h @@ -39,8 +39,8 @@ class SandeshClientSM { const uint32_t header_offset) = 0; virtual void SendUVE(int count, const std::string & stateName, const std::string & server, - TcpServer::Endpoint pri = TcpServer::Endpoint(), - TcpServer::Endpoint sec = TcpServer::Endpoint()) = 0; + const TcpServer::Endpoint & server_ip, + const std::vector & collectors) = 0; virtual SandeshSession *CreateSMSession( TcpSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, @@ -74,8 +74,8 @@ class SandeshClientSM { // This function is used to start and stop the state machine virtual void SetAdminState(bool down) = 0; - // This function should be called when there is a discovery service update - virtual void SetCandidates(TcpServer::Endpoint active, TcpServer::Endpoint backup) = 0; + // This function should be called when there is a change in the Collector list + virtual void SetCollectors(const std::vector &collectors) = 0; // This function is used to send UVE sandesh's to the server virtual bool SendSandeshUVE(Sandesh* snh) = 0; diff --git a/library/cpp/sandesh_client_sm_priv.h b/library/cpp/sandesh_client_sm_priv.h index a87e8f20..87ee4c34 100644 --- a/library/cpp/sandesh_client_sm_priv.h +++ b/library/cpp/sandesh_client_sm_priv.h @@ -59,9 +59,10 @@ class SandeshClientSMImpl : public SandeshClientSM, } void SetAdminState(bool down); - void SetCandidates(TcpServer::Endpoint active, TcpServer::Endpoint backup); - void GetCandidates(TcpServer::Endpoint& active, - TcpServer::Endpoint &backup) const; + void SetCollectors(const std::vector& collectors); + void GetCollectors(std::vector& collectors); + TcpServer::Endpoint GetCollector() const; + TcpServer::Endpoint GetNextCollector(); bool SendSandeshUVE(Sandesh* snh); bool SendSandesh(Sandesh* snh); void EnqueDelSession(SandeshSession * session); @@ -89,8 +90,8 @@ class SandeshClientSMImpl : public SandeshClientSM, bool IdleHoldTimerRunning(); void IdleHoldTimerFired(); - bool DiscUpdate(State from_state, bool update, - TcpServer::Endpoint active, TcpServer::Endpoint backup); + bool CollectorUpdate(const std::vector &collectors); + bool CollectorChange(); // Calculate Timer value for active to connect transition. int GetConnectTime() const; @@ -136,7 +137,8 @@ class SandeshClientSMImpl : public SandeshClientSM, void unconsumed_event(const sc::event_base &event); void SendUVE () { - mgr_->SendUVE(connects(), StateName(), collector_name(), active_, backup_); + mgr_->SendUVE(connects(), StateName(), collector_name(), server(), + collectors_); } private: static const int kStatisticsSendInterval = 30000; // 30 sec .. specified in milliseconds @@ -159,8 +161,9 @@ class SandeshClientSMImpl : public SandeshClientSM, void UpdateEventEnqueueFail(const sc::event_base &event); void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail); + std::vector collectors_; + int collector_index_; TcpServer::Endpoint active_; - TcpServer::Endpoint backup_; WorkQueue work_queue_; Timer *connect_timer_; Timer *idle_hold_timer_; diff --git a/library/cpp/sandesh_util.cc b/library/cpp/sandesh_util.cc new file mode 100644 index 00000000..abbd428f --- /dev/null +++ b/library/cpp/sandesh_util.cc @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. + */ + +// +// sandesh_util.cc +// + +#include + +#include + +#include "sandesh_util.h" + +using boost::asio::ip::address; + +bool MakeEndpoint(TcpServer::Endpoint* ep, const std::string& epstr) { + typedef boost::tokenizer > tokenizer; + boost::char_separator sep(":"); + + tokenizer tokens(epstr, sep); + tokenizer::iterator it = tokens.begin(); + std::string sip(*it); + ++it; + std::string sport(*it); + int port; + stringToInteger(sport, port); + boost::system::error_code ec; + address addr = address::from_string(sip, ec); + if (ec) { + return false; + } + *ep = TcpServer::Endpoint(addr, port); + return true; +} diff --git a/library/cpp/sandesh_util.h b/library/cpp/sandesh_util.h new file mode 100644 index 00000000..a298c2f5 --- /dev/null +++ b/library/cpp/sandesh_util.h @@ -0,0 +1,16 @@ +/* + * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved. + */ + +// +// sandesh_util.h +// + +#ifndef __SANDESH_UTIL_H__ +#define __SANDESH_UTIL_H__ + +#include + +bool MakeEndpoint(TcpServer::Endpoint* ep, const std::string& ep_str); + +#endif // __SANDESH_UTIL_H__ diff --git a/library/cpp/test/sandesh_client_test.cc b/library/cpp/test/sandesh_client_test.cc index 0931d37c..106a40af 100644 --- a/library/cpp/test/sandesh_client_test.cc +++ b/library/cpp/test/sandesh_client_test.cc @@ -28,6 +28,7 @@ using namespace std; using namespace boost::assign; using namespace boost::posix_time; using boost::system::error_code; +using boost::asio::ip::address; using namespace contrail::sandesh::protocol; using namespace contrail::sandesh::transport; @@ -86,7 +87,7 @@ class SandeshSessionMock : public SandeshSession { class SandeshClientMock : public SandeshClient { public: - SandeshClientMock(EventManager *evm, Endpoint dummy) : + SandeshClientMock(EventManager *evm, std::vector dummy) : SandeshClient(evm, dummy), session_(NULL), old_session_(NULL) { @@ -150,7 +151,7 @@ class SandeshClientMock : public SandeshClient { class SandeshClientStateMachineTest : public ::testing::Test { protected: SandeshClientStateMachineTest() : - client_(new SandeshClientMock((&evm_), Endpoint())), + client_(new SandeshClientMock((&evm_), std::vector())), timer_(TimerManager::CreateTimer(*evm_.io_service(), "Dummy timer")), sm_(dynamic_cast(client_->state_machine())) { task_util::WaitForIdle(); @@ -179,6 +180,17 @@ class SandeshClientStateMachineTest : public ::testing::Test { return false; } + std::vector GetCollectorEndpoints( + const std::vector& collectors) { + boost::system::error_code ec; + std::vector collector_endpoints; + BOOST_FOREACH(const std::string &collector, collectors) { + Endpoint ep(address::from_string(collector, ec), 8086); + collector_endpoints.push_back(ep); + } + return collector_endpoints; + } + void RunToState(SandeshClientSM::State state) { timer_->Start(15000, boost::bind(&SandeshClientStateMachineTest::DummyTimerHandler, this)); @@ -198,6 +210,12 @@ class SandeshClientStateMachineTest : public ::testing::Test { VerifyState(state); break; } + case SandeshClientSM::DISCONNECT: { + GetToState(SandeshClientSM::IDLE); + EvAdminUpNoHoldTime(); + RunToState(state); + break; + } case SandeshClientSM::CONNECT: { GetToState(SandeshClientSM::IDLE); EvAdminUp(); @@ -211,6 +229,12 @@ class SandeshClientStateMachineTest : public ::testing::Test { VerifyState(SandeshClientSM::CLIENT_INIT); break; } + case SandeshClientSM::ESTABLISHED: { + GetToState(SandeshClientSM::CLIENT_INIT); + EvSandeshCtrlMessageRecv(); + RunToState(SandeshClientSM::ESTABLISHED); + break; + } default: { ASSERT_TRUE(false); break; @@ -227,17 +251,26 @@ class SandeshClientStateMachineTest : public ::testing::Test { switch (state) { case SandeshClientSM::IDLE: - EXPECT_TRUE(!ConnectTimerRunning()); + EXPECT_FALSE(ConnectTimerRunning()); + EXPECT_TRUE(client_->session() == NULL); + break; + case SandeshClientSM::DISCONNECT: + EXPECT_FALSE(ConnectTimerRunning()); EXPECT_TRUE(client_->session() == NULL); break; case SandeshClientSM::CONNECT: EXPECT_TRUE(ConnectTimerRunning()); - EXPECT_TRUE(!IdleHoldTimerRunning()); + EXPECT_FALSE(IdleHoldTimerRunning()); EXPECT_TRUE(client_->session() != NULL); break; case SandeshClientSM::CLIENT_INIT: EXPECT_TRUE(ConnectTimerRunning()); - EXPECT_TRUE(!IdleHoldTimerRunning()); + EXPECT_FALSE(IdleHoldTimerRunning()); + EXPECT_TRUE(client_->session() != NULL); + break; + case SandeshClientSM::ESTABLISHED: + EXPECT_FALSE(ConnectTimerRunning()); + EXPECT_FALSE(IdleHoldTimerRunning()); EXPECT_TRUE(client_->session() != NULL); break; default: @@ -265,6 +298,10 @@ class SandeshClientStateMachineTest : public ::testing::Test { sm_->SetAdminState(false); sm_->set_idle_hold_time(1); } + void EvAdminUpNoHoldTime() { + sm_->SetAdminState(false); + sm_->set_idle_hold_time(0); + } void EvAdminDown() { sm_->SetAdminState(true); sm_->set_idle_hold_time(1); @@ -291,6 +328,26 @@ class SandeshClientStateMachineTest : public ::testing::Test { string xml((const char *)msg, sizeof(msg)); sm_->OnMessage(session, xml); } + void EvSandeshCtrlMessageRecv(SandeshSessionMock *session = NULL) { + session = GetSession(session); + string xml; + contrail::sandesh::test::CreateFakeCtrlMessage(xml); + sm_->OnMessage(session, xml); + } + void EvCollectorUpdateTrue() { + if (collector_list_ != collector_list1_) { + collector_list_ = collector_list1_; + } else { + collector_list_ = collector_list2_; + } + sm_->SetCollectors(GetCollectorEndpoints(collector_list_)); + } + void EvCollectorUpdateFalse() { + if (collector_list_.size() == 0) { + collector_list_ = collector_list1_; + } + sm_->SetCollectors(GetCollectorEndpoints(collector_list_)); + } bool IdleHoldTimerRunning() { return sm_->IdleHoldTimerRunning(); } bool ConnectTimerRunning() { return sm_->ConnectTimerRunning(); } @@ -299,8 +356,17 @@ class SandeshClientStateMachineTest : public ::testing::Test { SandeshClientMock *client_; SandeshClientSMImpl *sm_; Timer *timer_; + std::vector collector_list_; + + static const std::vector collector_list1_; + static const std::vector collector_list2_; }; +const std::vector SandeshClientStateMachineTest::collector_list1_ = + boost::assign::list_of("1.1.1.1")("2.2.2.2"); +const std::vector SandeshClientStateMachineTest::collector_list2_ = + boost::assign::list_of("3.3.3.3"); + typedef boost::function EvGen; struct EvGenComp { bool operator()(const EvGen &lhs, const EvGen &rhs) { return &lhs < &rhs; } @@ -318,31 +384,43 @@ TEST_F(SandeshClientStateMachineTest, Matrix) { (SandeshSessionMock *) NULL), E) Transitions idle = map_list_of - CLIENT_SSM_TRANSITION(EvAdminUp, SandeshClientSM::CONNECT); + CLIENT_SSM_TRANSITION(EvAdminUp, SandeshClientSM::CONNECT) + CLIENT_SSM_TRANSITION(EvAdminUpNoHoldTime, SandeshClientSM::DISCONNECT) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::IDLE) // collector_list1_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::IDLE); // collector_list1_ - Transitions none = map_list_of - CLIENT_SSM_TRANSITION(EvAdminDown, SandeshClientSM::IDLE); + Transitions disconnect = map_list_of + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::CONNECT) // collector_list2_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::CONNECT); // collector_list2_ Transitions connect = map_list_of CLIENT_SSM_TRANSITION(EvAdminDown, SandeshClientSM::IDLE) CLIENT_SSM_TRANSITION(EvConnectTimerExpired, SandeshClientSM::IDLE) CLIENT_SSM_TRANSITION(EvTcpConnected, SandeshClientSM::CLIENT_INIT) CLIENT_SSM_TRANSITION(EvTcpConnectFail, SandeshClientSM::IDLE) - CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE); + CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::IDLE) // collector_list1_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::CONNECT); // collector_list1_ Transitions client_init = map_list_of CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE) - CLIENT_SSM_TRANSITION2(EvSandeshMessageRecv, SandeshClientSM::CLIENT_INIT); + CLIENT_SSM_TRANSITION2(EvSandeshMessageRecv, SandeshClientSM::CLIENT_INIT) + CLIENT_SSM_TRANSITION2(EvSandeshCtrlMessageRecv, SandeshClientSM::ESTABLISHED) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::IDLE) // collector_list2_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::CLIENT_INIT); // collector_list2_ + + Transitions established = map_list_of + CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::IDLE) + CLIENT_SSM_TRANSITION2(EvSandeshMessageRecv, SandeshClientSM::ESTABLISHED) + CLIENT_SSM_TRANSITION(EvCollectorUpdateTrue, SandeshClientSM::CONNECT) // collector_list1_ + CLIENT_SSM_TRANSITION(EvCollectorUpdateFalse, SandeshClientSM::ESTABLISHED) // collector_list1_ + CLIENT_SSM_TRANSITION2(EvTcpClose, SandeshClientSM::CONNECT); Transitions matrix[] = - { idle, none, connect, client_init }; + { idle, disconnect, connect, client_init, established }; - for (int k = SandeshClientSM::IDLE; k <= SandeshClientSM::CLIENT_INIT; k++) { + for (int k = SandeshClientSM::IDLE; k <= SandeshClientSM::ESTABLISHED; k++) { SandeshClientSM::State i = static_cast(k); - // Ignore DISCONNECT and ESTABLISHED in SandeshClientSM - if (i == SandeshClientSM::ESTABLISHED || i == SandeshClientSM::DISCONNECT) { - continue; - } int count = 0; for (Transitions::iterator j = matrix[i].begin(); j != matrix[i].end(); j++) { diff --git a/library/cpp/test/sandesh_test_common.cc b/library/cpp/test/sandesh_test_common.cc index 5e31c42d..ee2dcf83 100644 --- a/library/cpp/test/sandesh_test_common.cc +++ b/library/cpp/test/sandesh_test_common.cc @@ -67,6 +67,37 @@ void CreateFakeMessage(uint8_t *data, size_t length) { EXPECT_EQ(offset, length); } +void CreateFakeCtrlMessage(std::string& data) { + const std::string ctrl_xml("true"); + SandeshHeader header; + // Populate the header + header.set_Namespace("Test"); + header.set_Timestamp(123456); + header.set_Module("SandeshStateMachineTest"); + header.set_Source("TestMachine"); + header.set_Context(""); + header.set_SequenceNum(0); + header.set_VersionSig(0); + header.set_Type(SandeshType::REQUEST); + header.set_Hints(g_sandesh_constants.SANDESH_CONTROL_HINT); + boost::shared_ptr btrans = + boost::shared_ptr( + new TMemoryBuffer(512)); + boost::shared_ptr prot = + boost::shared_ptr( + new TXMLProtocol(btrans)); + // Write the sandesh header + int nbytes = header.write(prot); + EXPECT_GT(nbytes, 0); + // Get the buffer + uint8_t *hbuffer; + uint32_t hlen; + btrans->getBuffer(&hbuffer, &hlen); + EXPECT_EQ(hlen, nbytes); + data.append((char*)hbuffer, hlen); + data.append(ctrl_xml); +} + } // end namespace test } // end namespace sandesh } // end namespace contrail diff --git a/library/cpp/test/sandesh_test_common.h b/library/cpp/test/sandesh_test_common.h index 416c5950..33f5e5fd 100644 --- a/library/cpp/test/sandesh_test_common.h +++ b/library/cpp/test/sandesh_test_common.h @@ -37,6 +37,7 @@ class SandeshServerTest : public SandeshServer { }; void CreateFakeMessage(uint8_t *data, size_t length); +void CreateFakeCtrlMessage(std::string& data); } // end namespace test } // end namespace sandesh