From e59f53c3d827cdeb600668ab35bd4faea9e70a98 Mon Sep 17 00:00:00 2001 From: Nipa Kumar Date: Wed, 27 Jul 2016 11:09:05 -0700 Subject: [PATCH] Walk Discovery Publisher list during service unavailability. On published services being detected unavailable (via TCP close), walk the list of published services recieved from Discovery Server instead of resubscribing. This is particularly useful when cfg-node is inaccessible during upgrade etc., Also the list returned by the Discovery Server is an ordered list based on priority and not equal priority, hence connects should be ordered. Use std::rotate to demote the node in the list to the end, this is more effecient as it solves iterator node deallocate/allocate. Add additional tests to test border conditions. Change-Id: Iba6115201cac619a69fa9c92f21d29eb0d729e19 Closes-Bug:1605412 --- src/vnsw/agent/controller/controller_dns.cc | 16 +- src/vnsw/agent/controller/controller_init.cc | 30 +- src/vnsw/agent/controller/controller_init.h | 2 +- src/vnsw/agent/controller/controller_peer.cc | 16 +- src/vnsw/agent/test/test_xmpp_discovery.cc | 340 +++++++++++++++++-- 5 files changed, 357 insertions(+), 47 deletions(-) diff --git a/src/vnsw/agent/controller/controller_dns.cc b/src/vnsw/agent/controller/controller_dns.cc index 178f88d14a0..c07ded38618 100644 --- a/src/vnsw/agent/controller/controller_dns.cc +++ b/src/vnsw/agent/controller/controller_dns.cc @@ -112,7 +112,21 @@ void AgentDnsXmppChannel::HandleXmppClientChannelEvent(AgentDnsXmppChannel *peer } else if (state == xmps::TIMEDOUT) { DiscoveryAgentClient *dac = Agent::GetInstance()->discovery_client(); if (dac) { - dac->ReDiscoverDNS(); + std::vector resp = + Agent::GetInstance()->GetDiscoveryServerResponseList(); + std::vector::iterator iter; + for (iter = resp.begin(); iter != resp.end(); iter++) { + DSResponse dr = *iter; + if (peer->GetXmppServer().compare( + dr.ep.address().to_string()) == 0) { + + // Add the TIMEDOUT server to the end. + if (iter+1 == resp.end()) break; + std::rotate(iter, iter+1, resp.end()); + Agent::GetInstance()->controller()->ApplyDiscoveryDnsXmppServices(resp); + break; + } + } } } } diff --git a/src/vnsw/agent/controller/controller_init.cc b/src/vnsw/agent/controller/controller_init.cc index 5db3162999f..bf230c97fca 100644 --- a/src/vnsw/agent/controller/controller_init.cc +++ b/src/vnsw/agent/controller/controller_init.cc @@ -2,6 +2,7 @@ * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. */ +#include #include "base/logging.h" #include "base/timer.h" #include "base/contrail_ports.h" @@ -438,11 +439,14 @@ void VNController::DisConnectControllerIfmapServer(uint8_t idx) { agent_->reset_controller_ifmap_xmpp_server(idx); } -bool VNController::AgentXmppServerExists(const std::string &server_ip, - std::vector resp) { +bool VNController::AgentXmppServerConnectedExists( + const std::string &server_ip, + std::vector resp) { std::vector::iterator iter; - for (iter = resp.begin(); iter != resp.end(); iter++) { + int8_t count = -1; + int8_t min_iter = std::min(static_cast(resp.size()), MAX_XMPP_SERVERS); + for (iter = resp.begin(); ++count < min_iter; iter++) { DSResponse dr = *iter; if (dr.ep.address().to_string().compare(server_ip) == 0) { return true; @@ -462,15 +466,17 @@ bool VNController::ApplyDiscoveryXmppServicesInternal(std::vector re std::vector::iterator iter; int8_t count = -1; agent_->UpdateDiscoveryServerResponseList(resp); - for (iter = resp.begin(); iter != resp.end(); iter++) { + + /* Apply only MAX_XMPP_SERVERS from list as the list is ordered */ + int8_t min_iter = std::min(static_cast(resp.size()), MAX_XMPP_SERVERS); + for (iter = resp.begin(); ++count < min_iter; iter++) { DSResponse dr = *iter; - count ++; CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, "XMPP Discovery Server Response", count, dr.ep.address().to_string(), integerToString(dr.ep.port())); AgentXmppChannel *chnl = FindAgentXmppChannel(dr.ep.address().to_string()); - if (chnl) { + if (chnl) { if (chnl->GetXmppChannel() && chnl->GetXmppChannel()->GetPeerState() == xmps::READY) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, @@ -502,7 +508,7 @@ bool VNController::ApplyDiscoveryXmppServicesInternal(std::vector re } else if (agent_->controller_xmpp_channel(xs_idx)) { - if (AgentXmppServerExists( + if (AgentXmppServerConnectedExists( agent_->controller_ifmap_xmpp_server(xs_idx), resp)) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, @@ -580,16 +586,18 @@ bool VNController::ApplyDiscoveryDnsXmppServicesInternal( std::vector::iterator iter; int8_t count = -1; agent_->UpdateDiscoveryDnsServerResponseList(resp); - for (iter = resp.begin(); iter != resp.end(); iter++) { + + /* Apply only MAX_XMPP_SERVERS from list as the list is ordered */ + int8_t min_iter = std::min(static_cast(resp.size()), MAX_XMPP_SERVERS); + for (iter = resp.begin(); ++count < min_iter; iter++) { DSResponse dr = *iter; - count++; CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, "DNS Discovery Server Response", count, dr.ep.address().to_string(), integerToString(dr.ep.port())); AgentDnsXmppChannel *chnl = FindAgentDnsXmppChannel(dr.ep.address().to_string()); - if (chnl) { + if (chnl) { if (chnl->GetXmppChannel() && chnl->GetXmppChannel()->GetPeerState() == xmps::READY) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, @@ -619,7 +627,7 @@ bool VNController::ApplyDiscoveryDnsXmppServicesInternal( } else if (agent_->dns_xmpp_channel(xs_idx)) { - if (AgentXmppServerExists( + if (AgentXmppServerConnectedExists( agent_->dns_server(xs_idx), resp)) { CONTROLLER_DISCOVERY_TRACE(DiscoveryConnection, diff --git a/src/vnsw/agent/controller/controller_init.h b/src/vnsw/agent/controller/controller_init.h index f6eb9017857..743bb112057 100644 --- a/src/vnsw/agent/controller/controller_init.h +++ b/src/vnsw/agent/controller/controller_init.h @@ -183,7 +183,7 @@ class VNController { AgentDnsXmppChannel *FindAgentDnsXmppChannel(const std::string &server_ip); void DeleteConnectionInfo(const std::string &addr, bool is_dns) const; const std::string MakeConnectionPrefix(bool is_dns) const; - bool AgentXmppServerExists(const std::string &server_ip, + bool AgentXmppServerConnectedExists(const std::string &server_ip, std::vector resp); bool ApplyDiscoveryXmppServicesInternal(std::vector resp); bool ApplyDiscoveryDnsXmppServicesInternal(std::vector resp); diff --git a/src/vnsw/agent/controller/controller_peer.cc b/src/vnsw/agent/controller/controller_peer.cc index 4286bcd86d3..a4d39ebec93 100644 --- a/src/vnsw/agent/controller/controller_peer.cc +++ b/src/vnsw/agent/controller/controller_peer.cc @@ -1612,7 +1612,21 @@ void AgentXmppChannel::HandleAgentXmppClientChannelEvent(AgentXmppChannel *peer, "NULL", "Connection to Xmpp Server, Timed out"); DiscoveryAgentClient *dac = Agent::GetInstance()->discovery_client(); if (dac) { - dac->ReDiscoverController(); + std::vector resp = + Agent::GetInstance()->GetDiscoveryServerResponseList(); + std::vector::iterator iter; + for (iter = resp.begin(); iter != resp.end(); iter++) { + DSResponse dr = *iter; + if (peer->GetXmppServer().compare( + dr.ep.address().to_string()) == 0) { + + // Add the TIMEDOUT server to the end. + if (iter+1 == resp.end()) break; + std::rotate(iter, iter+1, resp.end()); + agent->controller()->ApplyDiscoveryXmppServices(resp); + break; + } + } } } } diff --git a/src/vnsw/agent/test/test_xmpp_discovery.cc b/src/vnsw/agent/test/test_xmpp_discovery.cc index 09b941b62bf..4e2bac984c8 100644 --- a/src/vnsw/agent/test/test_xmpp_discovery.cc +++ b/src/vnsw/agent/test/test_xmpp_discovery.cc @@ -33,39 +33,6 @@ using namespace pugi; void RouterIdDepInit(Agent *agent) { } -class AgentBgpXmppPeerTest : public AgentXmppChannel { -public: - AgentBgpXmppPeerTest(std::string xs, uint8_t xs_idx) : - AgentXmppChannel(Agent::GetInstance(), xs, "0", xs_idx), - rx_count_(0), stop_scheduler_(false), rx_channel_event_queue_( - TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"), 0, - boost::bind(&AgentBgpXmppPeerTest::ProcessChannelEvent, this, _1)) { - } - - virtual void ReceiveUpdate(const XmppStanza::XmppMessage *msg) { - rx_count_++; - AgentXmppChannel::ReceiveUpdate(msg); - } - - bool ProcessChannelEvent(xmps::PeerState state) { - AgentXmppChannel::HandleAgentXmppClientChannelEvent(static_cast(this), state); - return true; - } - - void HandleXmppChannelEvent(xmps::PeerState state) { - rx_channel_event_queue_.Enqueue(state); - } - - size_t Count() const { return rx_count_; } - virtual ~AgentBgpXmppPeerTest() { } - void stop_scheduler(bool stop) {stop_scheduler_ = stop;} - -private: - size_t rx_count_; - bool stop_scheduler_; - WorkQueue rx_channel_event_queue_; -}; - class ControlNodeMockBgpXmppPeer { public: ControlNodeMockBgpXmppPeer() : channel_ (NULL), rx_count_(0) { @@ -556,4 +523,311 @@ TEST_F(AgentXmppUnitTest, XmppConnection_Discovery_TimedOut) { == xmps::NOT_READY); } +TEST_F(AgentXmppUnitTest, XmppConnection_ApplyDiscovery_OrderedList) { + + client->Reset(); + client->WaitForIdle(); + + XmppServerConnectionInit(); + + // Simulate Discovery response for Xmpp Server + std::vector ds_response; + DSResponse resp; + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs2->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.4")); + resp.ep.port(xs4->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Wait until older XmppClient, XmppChannel is cleaned + client->WaitForIdle(); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.3"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs3->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Wait until older XmppClient, XmppChannel is cleaned + client->WaitForIdle(); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.3")); + resp.ep.port(xs3->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.4")); + resp.ep.port(xs4->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.5")); + resp.ep.port(xs5->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.6")); + resp.ep.port(xs6->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.3"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs3->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Increment connection attempts to simulate TIMEDOUT state + XmppConnection *connection = + const_cast + (agent_->controller_xmpp_channel(0)->GetXmppChannel()->connection()); + XmppStateMachine *sm = connection->state_machine(); + uint8_t count = 0; + while (count++ < XmppStateMachine::kMaxAttempts) { + sm->connect_attempts_inc(); + } + + // Bring the connected Server Down, to trigger Discovery Server list walk + xs3->Shutdown(); + client->WaitForIdle(); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + // Agent to walk the discovery list and apply new published service + WAIT_FOR(1000, 10000, + agent_->controller_ifmap_xmpp_port(0) == (uint32_t)xs5->GetPort()); + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.5"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Increment connection attempts to simulate TIMEDOUT state + connection = const_cast + (agent_->controller_xmpp_channel(0)->GetXmppChannel()->connection()); + sm = connection->state_machine(); + count = 0; + while (count++ < XmppStateMachine::kMaxAttempts) { + sm->connect_attempts_inc(); + } + + // Bring the connected Server Down, to trigger Discovery Server list walk + xs5->Shutdown(); + client->WaitForIdle(); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + // Agent to walk the discovery list and apply new published service + WAIT_FOR(1000, 10000, + agent_->controller_ifmap_xmpp_port(0) == (uint32_t)xs6->GetPort()); + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.6"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs4->GetPort()); + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.4"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.4")); + resp.ep.port(xs4->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.6")); + resp.ep.port(xs6->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs2->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Simulate Discovery response for Xmpp Server + ds_response.clear(); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.1")); + resp.ep.port(xs1->GetPort()); + ds_response.push_back(resp); + resp.ep.address(boost::asio::ip::address::from_string("127.0.0.2")); + resp.ep.port(xs2->GetPort()); + ds_response.push_back(resp); + + agent_->controller()->ApplyDiscoveryXmppServices(ds_response); + client->WaitForIdle(); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(1) == + (uint32_t)xs2->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + // Increment connection attempts to simulate TIMEDOUT state + connection = const_cast + (agent_->controller_xmpp_channel(1)->GetXmppChannel()->connection()); + sm = connection->state_machine(); + count = 0; + while (count++ < XmppStateMachine::kMaxAttempts) { + sm->connect_attempts_inc(); + } + + // Bring the connected Server Down, to trigger Discovery Server list walk + xs2->Shutdown(); + client->WaitForIdle(); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + //wait for connection establishment + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(0).c_str(), "127.0.0.1"); + EXPECT_TRUE(agent_->controller_ifmap_xmpp_port(0) == + (uint32_t)xs1->GetPort()); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::READY); + + ASSERT_STREQ(agent_->controller_ifmap_xmpp_server(1).c_str(), "127.0.0.2"); + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + + xs1->Shutdown(); + xs2->Shutdown(); + xs3->Shutdown(); + xs4->Shutdown(); + xs5->Shutdown(); + xs6->Shutdown(); + client->WaitForIdle(); + + //wait for connection establishment + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(0)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); + + WAIT_FOR(1000, 10000, + agent_->controller_xmpp_channel(1)->GetXmppChannel()->GetPeerState() + == xmps::NOT_READY); +} + }