diff --git a/src/vnsw/agent/controller/controller_dns.cc b/src/vnsw/agent/controller/controller_dns.cc index 178f88d14a0..c07ded38618 100644 --- a/src/vnsw/agent/controller/controller_dns.cc +++ b/src/vnsw/agent/controller/controller_dns.cc @@ -112,7 +112,21 @@ void AgentDnsXmppChannel::HandleXmppClientChannelEvent(AgentDnsXmppChannel *peer } else if (state == xmps::TIMEDOUT) { DiscoveryAgentClient *dac = Agent::GetInstance()->discovery_client(); if (dac) { - dac->ReDiscoverDNS(); + std::vector resp = + Agent::GetInstance()->GetDiscoveryServerResponseList(); + std::vector::iterator iter; + for (iter = resp.begin(); iter != resp.end(); iter++) { + DSResponse dr = *iter; + if (peer->GetXmppServer().compare( + dr.ep.address().to_string()) == 0) { + + // Add the TIMEDOUT server to the end. + if (iter+1 == resp.end()) break; + std::rotate(iter, iter+1, resp.end()); + Agent::GetInstance()->controller()->ApplyDiscoveryDnsXmppServices(resp); + break; + } + } } } } diff --git a/src/vnsw/agent/controller/controller_init.cc b/src/vnsw/agent/controller/controller_init.cc index 5db3162999f..bf230c97fca 100644 --- a/src/vnsw/agent/controller/controller_init.cc +++ b/src/vnsw/agent/controller/controller_init.cc @@ -2,6 +2,7 @@ * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. */ +#include #include "base/logging.h" #include "base/timer.h" #include "base/contrail_ports.h" @@ -438,11 +439,14 @@ void VNController::DisConnectControllerIfmapServer(uint8_t idx) { agent_->reset_controller_ifmap_xmpp_server(idx); } -bool VNController::AgentXmppServerExists(const std::string &server_ip, - std::vector resp) { +bool VNController::AgentXmppServerConnectedExists( + const std::string &server_ip, + std::vector resp) { std::vector::iterator iter; - for (iter = resp.begin(); iter != resp.end(); iter++) { + int8_t count = -1; + int8_t min_iter = std::min(static_cast(resp.size()), MAX_XMPP_SERVERS); + for (iter = resp.begin(); ++count < min_iter; iter++) { DSResponse dr = *iter; if (dr.ep.address().to_string().compare(server_ip) == 0) { return true; @@ -462,15 +466,17 @@ bool VNController::ApplyDiscoveryXmppServicesInternal(std::vector re std::vector::iterator iter; int8_t count = -1; agent_->UpdateDiscoveryServerResponseList(resp); - for (iter = resp.begin(); iter != resp.end(); iter++) { + + /* Apply only MAX_XMPP_SERVERS from list as the list is ordered */ + int8_t min_iter = std::min(static_cast(resp.size()), MAX_XMPP_SERVERS); + for (iter = resp.begin(); ++count < min_iter; iter++) { DSResponse dr = *iter; - count ++; CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, "XMPP Discovery Server Response", count, dr.ep.address().to_string(), integerToString(dr.ep.port())); AgentXmppChannel *chnl = FindAgentXmppChannel(dr.ep.address().to_string()); - if (chnl) { + if (chnl) { if (chnl->GetXmppChannel() && chnl->GetXmppChannel()->GetPeerState() == xmps::READY) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, @@ -502,7 +508,7 @@ bool VNController::ApplyDiscoveryXmppServicesInternal(std::vector re } else if (agent_->controller_xmpp_channel(xs_idx)) { - if (AgentXmppServerExists( + if (AgentXmppServerConnectedExists( agent_->controller_ifmap_xmpp_server(xs_idx), resp)) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, @@ -580,16 +586,18 @@ bool VNController::ApplyDiscoveryDnsXmppServicesInternal( std::vector::iterator iter; int8_t count = -1; agent_->UpdateDiscoveryDnsServerResponseList(resp); - for (iter = resp.begin(); iter != resp.end(); iter++) { + + /* Apply only MAX_XMPP_SERVERS from list as the list is ordered */ + int8_t min_iter = std::min(static_cast(resp.size()), MAX_XMPP_SERVERS); + for (iter = resp.begin(); ++count < min_iter; iter++) { DSResponse dr = *iter; - count++; CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, "DNS Discovery Server Response", count, dr.ep.address().to_string(), integerToString(dr.ep.port())); AgentDnsXmppChannel *chnl = FindAgentDnsXmppChannel(dr.ep.address().to_string()); - if (chnl) { + if (chnl) { if (chnl->GetXmppChannel() && chnl->GetXmppChannel()->GetPeerState() == xmps::READY) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, @@ -619,7 +627,7 @@ bool VNController::ApplyDiscoveryDnsXmppServicesInternal( } else if (agent_->dns_xmpp_channel(xs_idx)) { - if (AgentXmppServerExists( + if (AgentXmppServerConnectedExists( agent_->dns_server(xs_idx), resp)) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, diff --git a/src/vnsw/agent/controller/controller_init.h b/src/vnsw/agent/controller/controller_init.h index f6eb9017857..743bb112057 100644 --- a/src/vnsw/agent/controller/controller_init.h +++ b/src/vnsw/agent/controller/controller_init.h @@ -183,7 +183,7 @@ class VNController { AgentDnsXmppChannel *FindAgentDnsXmppChannel(const std::string &server_ip); void DeleteConnectionInfo(const std::string &addr, bool is_dns) const; const std::string MakeConnectionPrefix(bool is_dns) const; - bool AgentXmppServerExists(const std::string &server_ip, + bool AgentXmppServerConnectedExists(const std::string &server_ip, std::vector resp); bool ApplyDiscoveryXmppServicesInternal(std::vector resp); bool ApplyDiscoveryDnsXmppServicesInternal(std::vector resp); diff --git a/src/vnsw/agent/controller/controller_peer.cc b/src/vnsw/agent/controller/controller_peer.cc index bba42f5a097..af5e2bba3fc 100644 --- a/src/vnsw/agent/controller/controller_peer.cc +++ b/src/vnsw/agent/controller/controller_peer.cc @@ -1622,7 +1622,21 @@ void AgentXmppChannel::HandleAgentXmppClientChannelEvent(AgentXmppChannel *peer, "NULL", "Connection to Xmpp Server, Timed out"); DiscoveryAgentClient *dac = Agent::GetInstance()->discovery_client(); if (dac) { - dac->ReDiscoverController(); + std::vector resp = + Agent::GetInstance()->GetDiscoveryServerResponseList(); + std::vector::iterator iter; + for (iter = resp.begin(); iter != resp.end(); iter++) { + DSResponse dr = *iter; + if (peer->GetXmppServer().compare( + dr.ep.address().to_string()) == 0) { + + // Add the TIMEDOUT server to the end. + if (iter+1 == resp.end()) break; + std::rotate(iter, iter+1, resp.end()); + agent->controller()->ApplyDiscoveryXmppServices(resp); + break; + } + } } } } diff --git a/src/vnsw/agent/test/test_xmpp_discovery.cc b/src/vnsw/agent/test/test_xmpp_discovery.cc index 09b941b62bf..4e2bac984c8 100644 --- a/src/vnsw/agent/test/test_xmpp_discovery.cc +++ b/src/vnsw/agent/test/test_xmpp_discovery.cc @@ -33,39 +33,6 @@ using namespace pugi; void RouterIdDepInit(Agent *agent) { } -class AgentBgpXmppPeerTest : public AgentXmppChannel { -public: - AgentBgpXmppPeerTest(std::string xs, uint8_t xs_idx) : - AgentXmppChannel(Agent::GetInstance(), xs, "0", xs_idx), - rx_count_(0), stop_scheduler_(false), rx_channel_event_queue_( - TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"), 0, - boost::bind(&AgentBgpXmppPeerTest::ProcessChannelEvent, this, _1)) { - } - - virtual void ReceiveUpdate(const XmppStanza::XmppMessage *msg) { - rx_count_++; - AgentXmppChannel::ReceiveUpdate(msg); - } - - bool ProcessChannelEvent(xmps::PeerState state) { - AgentXmppChannel::HandleAgentXmppClientChannelEvent(static_cast(this), state); - return true; - } - - void HandleXmppChannelEvent(xmps::PeerState state) { - rx_channel_event_queue_.Enqueue(state); - } - - size_t Count() const { return rx_count_; } - virtual ~AgentBgpXmppPeerTest() { } - void stop_scheduler(bool stop) {stop_scheduler_ = stop;} - -private: - size_t rx_count_; - bool stop_scheduler_; - WorkQueue rx_channel_event_queue_; -}; - class ControlNodeMockBgpXmppPeer { public: ControlNodeMockBgpXmppPeer() : channel_ (NULL), rx_count_(0) { @@ -556,4 +523,311 @@ TEST_F(AgentXmppUnitTest, XmppConnection_Discovery_TimedOut) { == xmps::NOT_READY); } +TEST_F(AgentXmppUnitTest, XmppConnection_ApplyDiscovery_OrderedList) { + + client->Reset(); + client->WaitForIdle(); + + XmppServerConnectionInit(); + + // Simulate Discovery response for Xmpp Server + std::vector ds_response; + DSResponse resp; + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs2->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.4")); + resp.ep.port(xs4->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Wait until older XmppClient, XmppChannel is cleaned + client->WaitForIdle(); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.3"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs3->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Wait until older XmppClient, XmppChannel is cleaned + client->WaitForIdle(); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.4")); + resp.ep.port(xs4->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.5")); + resp.ep.port(xs5->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.6")); + resp.ep.port(xs6->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.3"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs3->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Increment connection attempts to simulate TIMEDOUT state + XmppConnection *connection = + const_cast + (agent_->controller_xmpp_channel(0)->GetXmppChannel()->connection()); + XmppStateMachine *sm = connection->state_machine(); + uint8_t count = 0; + while (count++ < XmppStateMachine::kMaxAttempts) { + sm->connect_attempts_inc(); + } + + // Bring the connected Server Down, to trigger Discovery Server list walk + xs3->Shutdown(); + client->WaitForIdle(); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + // Agent to walk the discovery list and apply new published service + WAIT_FOR(1000, 10000, + agent_->controller_ifmap_xmpp_port(0) == (uint32_t)xs5->GetPort()); + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.5"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Increment connection attempts to simulate TIMEDOUT state + connection = const_cast + (agent_->controller_xmpp_channel(0)->GetXmppChannel()->connection()); + sm = connection->state_machine(); + count = 0; + while (count++ < XmppStateMachine::kMaxAttempts) { + sm->connect_attempts_inc(); + } + + // Bring the connected Server Down, to trigger Discovery Server list walk + xs5->Shutdown(); + client->WaitForIdle(); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + // Agent to walk the discovery list and apply new published service + WAIT_FOR(1000, 10000, + agent_->controller_ifmap_xmpp_port(0) == (uint32_t)xs6->GetPort()); + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.6"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.4")); + resp.ep.port(xs4->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.6")); + resp.ep.port(xs6->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs2->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs2->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Increment connection attempts to simulate TIMEDOUT state + connection = const_cast + (agent_->controller_xmpp_channel(1)->GetXmppChannel()->connection()); + sm = connection->state_machine(); + count = 0; + while (count++ < XmppStateMachine::kMaxAttempts) { + sm->connect_attempts_inc(); + } + + // Bring the connected Server Down, to trigger Discovery Server list walk + xs2->Shutdown(); + client->WaitForIdle(); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + + xs1->Shutdown(); + xs2->Shutdown(); + xs3->Shutdown(); + xs4->Shutdown(); + xs5->Shutdown(); + xs6->Shutdown(); + client->WaitForIdle(); + + //wait for connection establishment + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); +} + }