From 1b62a3088c0551274184e7023ab226292620d66d Mon Sep 17 00:00:00 2001 From: Ananth Suryanarayana Date: Fri, 19 Feb 2016 09:57:00 -0800 Subject: [PATCH] GracefulRestart support in control-node (Phase 1) At present, when ever xmpp connection to an agent goes down, all routes learned from that agent are deleted and also withdrawn from all other peers. This causes traffic loss end-to-end even if vrouter associated with the agent going down has routes retained in the data plane for hit-less forwarding until new session comes up and routes are re-learned and re-advertised again to other peers With this change, o Added support in control-node to retain routes learned from agents when xmpp session goes down (In the hope that the session comes back up) o Used mark and sweep approach to retain and later purge routes. Routes are purged after graceful restart timer expires 1. When ever session is closed, all routes learned from the agent going down are marked as stale (but retained in the routing table and still eligible for best path election, outbound advertisement, etc.). Also a GR timer is triggered to expire after a minute or so 2. If and when the session comes backup, and some/all paths are relearned, stale flag is cleared (only for those relearned paths) 3. When the GR timer expires, table is walked again and any paths that are still marked as Stale are swept (deleted) from the table 4. During the GR wait time (when the timer has not yet fired), subsequent session flaps are considered double failures and all paths are deleted (There by not doing GR) o When the new session comes up, most of the old connection's data structures are retained. o Only XmppSession and XmppStateMachine are switched from the new connection to the old o Code mainly tests via unit tests TODO (In subsequent phases) o Fix code and add tests for configuration changes during the midst of GR (e.g. routing-instance deletion) o EndOfRib marker (Instead of always waiting on the GR timer to expire) o Add GR support to BGP IPeers as well o Handle route-targets cleanup properly o Re-enable couple of connection endpoint's unit tests o Enable GR for control-node process (and run systests) o Enable GR in bgp_stress_test o Configurable GR timer on a per peer basis o Configurable GR ability on a per peer basis o Do not do (4) above. Instead retain routes and remain in GR mode even after multiple session closures (Work towards LLGR) ----------------------------------------------------------------- GracefulRestart Phase 2 -- Handle routing-instance deletions during GR When GR is in progress, all routes and subscription states (in BgpXmppChannel) are retained in the hope that agent resubscribes after GR. But if the config changes between the time agent went down and came back up, new agent may not re-subscribe to all the instances it had subscribed before. Such sticky subscriptions must also be cleaned up properly using mark and sweep approach (Similar to how routes are handled) Mark and sweep approach is already implemented for routes in PeerCloseManager. BgpXmppChannel rides on this to get necessary callback for o When to mark all subscription states as stale o If agent re-subscribes after restart, stale flag is cleared in the subscription state. o When to sweep all subscription states (and delete still stale entries) Only after all (still) stale subscription states are deleted, routing-instance deletion process can resume and complete. Btw, during GR, all route-targets retained as is, similar to how routes are retained. At the moment, the rtarget entries are not individually marked stale (and then swept). Instead, it is handled by marking the subscription states which is already maintained on a per instance basis in BgpXmppChannel Added test cases to cover many (but not all) scenarios ----------------------------------------------------------------- GracefulRestart Phase 3 -- Send and process EoR marker After agent restarts and sends routes to all instances across all address families, it can send EoR marker to trigger termination of GR process sooner. This prevents possible traffic black-holing until GR timer expiry, which could be potentially a minute or so. At the moment, empty publish and collection are used to denote EoR marker. e.g. Added test cases to verify this part. ----------------------------------------------------------------- GracefulRestart Phase 4 - Handle some of the initial review comments o Increase WAIT_FOR_IDLE time for certain stressful unit tests o Rename some functions from delete to close if applicable o Simplify route stale login in BgpTable::InputCommon() o Restore the asserts disabled in previous commits o Stablize tests o Add code to support LLGR (nested closures restart GR afresh) ----------------------------------------------------------------- GracefulRestart Phase 5 - Handle LLGR Cases o When sessions flap in GR_TIMER state, cancel the timer and restart GR all over again. This is required in order to mark newly sent (partial) routes as stale o Modfify tests to handle the nested closure cases as well o Eventually when ever GR timer fires, routes are kept and swept if the session is established, deleted otherwise. ----------------------------------------------------------------- Change-Id: Ie589d69b6390356d4a052cc4415bff4b5dabd499 Partial-Bug: #1537933 --- ci_unittests.json | 4 + src/base/test/task_test_util.h | 4 +- src/bgp/bgp_peer.cc | 66 +- src/bgp/bgp_peer.h | 1 + src/bgp/bgp_peer.sandesh | 18 +- src/bgp/bgp_peer_close.cc | 401 ++--- src/bgp/bgp_peer_close.h | 47 +- src/bgp/bgp_peer_membership.cc | 10 +- src/bgp/bgp_peer_membership.h | 5 - src/bgp/bgp_server.h | 5 +- src/bgp/bgp_table.cc | 36 +- src/bgp/bgp_table.h | 2 +- src/bgp/bgp_xmpp_channel.cc | 208 ++- src/bgp/bgp_xmpp_channel.h | 29 +- src/bgp/bgp_xmpp_sandesh.cc | 3 +- src/bgp/ipeer.h | 5 +- src/bgp/test/SConscript | 5 + src/bgp/test/bgp_peer_close_test.cc | 116 -- src/bgp/test/bgp_server_test_util.cc | 1 + src/bgp/test/bgp_server_test_util.h | 15 + src/bgp/test/bgp_show_neighbor_test.cc | 10 +- src/bgp/test/bgp_xmpp_basic_test.cc | 8 +- src/bgp/test/bgp_xmpp_parse_test.cc | 2 +- src/bgp/test/graceful_restart_test.cc | 1585 +++++++++++++++++++ src/control-node/test/network_agent_mock.cc | 50 +- src/control-node/test/network_agent_mock.h | 9 + src/xmpp/xmpp_connection.cc | 34 +- src/xmpp/xmpp_connection.h | 16 +- src/xmpp/xmpp_server.cc | 67 +- src/xmpp/xmpp_server.h | 12 +- src/xmpp/xmpp_state_machine.cc | 138 +- src/xmpp/xmpp_state_machine.h | 16 +- 32 files changed, 2366 insertions(+), 562 deletions(-) create mode 100644 src/bgp/test/graceful_restart_test.cc diff --git a/ci_unittests.json b/ci_unittests.json index 7f9caee294d..0671fd39d46 100644 --- a/ci_unittests.json +++ b/ci_unittests.json @@ -53,6 +53,9 @@ { "tuples" : [ "NO_HEAPCHECK=TRUE", + "TASK_UTIL_DEFAULT_RETRY_COUNT=6000", + "TASK_UTIL_WAIT_TIME=10000", + "WAIT_FOR_IDLE=60", "LOG_DISABLE=TRUE" ], "tests" : [ @@ -71,6 +74,7 @@ ".*/bgp/test/bgp_xmpp_inet6vpn_test$", ".*/bgp/test/bgp_xmpp_mcast_test$", ".*/bgp/test/bgp_xmpp_rtarget_test$", + ".*/bgp/test/graceful_restart_test$", ".*/bgp/test/service_chain_test", ".*/bgp/test/svc_static_route_intergration_test", ".*/bgp/test/xmpp_ecmp_test$" diff --git a/src/base/test/task_test_util.h b/src/base/test/task_test_util.h index f3f1d35761f..5b81d723dd8 100644 --- a/src/base/test/task_test_util.h +++ b/src/base/test/task_test_util.h @@ -59,8 +59,8 @@ do { \ #define TASK_UTIL_WAIT_MSG(cnt, expected, actual, wait, type, msg) \ do { \ ostream << __FILE__ << ":" << __FUNCTION__ << "():" << __LINE__; \ - ostream << ": " << msg << ": Waiting for " << actual << type; \ - ostream << expected << "\n"; \ + ostream << ": " << msg << ": Waiting for " << (actual) << type; \ + ostream << (expected) << "\n"; \ log4cplus::Logger logger = log4cplus::Logger::getRoot(); \ LOG4CPLUS_DEBUG(logger, ostream.str()); \ } while (false) diff --git a/src/bgp/bgp_peer.cc b/src/bgp/bgp_peer.cc index 55a0ab324b2..5cdf69dd2bd 100644 --- a/src/bgp/bgp_peer.cc +++ b/src/bgp/bgp_peer.cc @@ -49,67 +49,45 @@ class BgpPeer::PeerClose : public IPeerClose { return peer_->ToString(); } + // If the peer is deleted or administratively held down, do not attempt + // graceful restart virtual bool IsCloseGraceful() { - - // - // If the peer is deleted or administratively held down, do not attempt - // graceful restart - // - if (peer_->IsDeleted() || peer_->IsAdminDown()) return false; - + if (peer_->IsDeleted() || peer_->IsAdminDown()) + return false; return peer_->server()->IsPeerCloseGraceful(); } - virtual void CustomClose() { - return peer_->CustomClose(); - } + virtual void CustomClose() { return peer_->CustomClose(); } + virtual void GracefulRestartStale() { } + virtual void GracefulRestartSweep() { } // CloseComplete // // Close process for this peer is complete. Restart the state machine and // attempt to bring up session with the neighbor // - virtual bool CloseComplete(bool from_timer, bool gr_cancelled) { + virtual void CloseComplete() { peer_->server()->decrement_closing_count(); + if (!peer_->IsAdminDown()) + peer_->state_machine_->Initialize(); + } + virtual void Delete() { if (!peer_->IsDeleted()) { - - // - // If this closure is off graceful restart timer, nothing else to - // do as we retain the peer based on the configuration - // - if (from_timer) return false; - - // - // Reset peer's state machine - // - if (!peer_->IsAdminDown()) peer_->state_machine_->Initialize(); - - return false; + CloseComplete(); + return; } - - // - // This peer is deleted. Timer should have already been cancelled - // - assert(!from_timer); - + peer_->server()->decrement_closing_count(); peer_->deleter()->RetryDelete(); is_closed_ = true; - return true; - } - - bool IsClosed() const { - return is_closed_; } - virtual PeerCloseManager *close_manager() { - return manager_.get(); - } + bool IsClosed() const { return is_closed_; } + virtual PeerCloseManager *close_manager() { return manager_.get(); } void Close() { - if (!is_closed_ && !manager_->IsCloseInProgress()) { - manager_->Close(); + if (!manager_->IsCloseInProgress()) peer_->server()->increment_closing_count(); - } + manager_->Close(); } private: @@ -1659,6 +1637,10 @@ static void FillSocketStats(const IPeerDebugStats::SocketStats &socket_stats, } } +void BgpPeer::FillCloseInfo(BgpNeighborResp *resp) const { + peer_close_->close_manager()->FillCloseInfo(resp); +} + void BgpPeer::FillBgpNeighborDebugState(BgpNeighborResp *bnr, const IPeerDebugStats *peer_state) { bnr->set_last_state(peer_state->last_state()); @@ -1722,7 +1704,7 @@ void BgpPeer::FillNeighborInfo(const BgpSandeshContext *bsc, bnr->set_instance_name(rtinstance_->name()); bnr->set_peer(peer_basename_); bnr->set_deleted(IsDeleted()); - bnr->set_deleted_at(UTCUsecToString(deleter_->delete_time_stamp_usecs())); + bnr->set_closed_at(UTCUsecToString(deleter_->delete_time_stamp_usecs())); bnr->set_admin_down(admin_down_); bnr->set_passive(passive_); bnr->set_peer_address(peer_address_string()); diff --git a/src/bgp/bgp_peer.h b/src/bgp/bgp_peer.h index 66fc9c98d94..79c5defe334 100644 --- a/src/bgp/bgp_peer.h +++ b/src/bgp/bgp_peer.h @@ -336,6 +336,7 @@ class BgpPeer : public IPeer { void CustomClose(); void FillBgpNeighborFamilyAttributes(BgpNeighborResp *nbr) const; + void FillCloseInfo(BgpNeighborResp *resp) const; std::string BytesToHexString(const u_int8_t *msg, size_t size); diff --git a/src/bgp/bgp_peer.sandesh b/src/bgp/bgp_peer.sandesh index 4bc035eae19..5b2a2774637 100644 --- a/src/bgp/bgp_peer.sandesh +++ b/src/bgp/bgp_peer.sandesh @@ -41,11 +41,26 @@ struct ShowBgpNeighborFamily { 3: u32 prefix_limit; } +struct PeerCloseInfo { + 1: string state; + 2: bool close_again; + 3: u64 init; + 4: u64 close; + 5: u64 nested; + 6: u64 deletes; + 7: u64 stale; + 8: u64 sweep; + 9: u64 gr_timer; + 10: u64 deleted_state_paths; + 11: u64 deleted_paths; + 12: u64 marked_state_paths; +} + struct BgpNeighborResp { 53: string instance_name; 1: string peer (link="BgpNeighborReq"); // Peer name 36: bool deleted; // Deletion in progress - 43: string deleted_at; + 43: string closed_at; 47: bool admin_down; 48: bool passive; 2: string peer_address (link="BgpNeighborReq"); @@ -60,6 +75,7 @@ struct BgpNeighborResp { 4: string local_address; // local ip address and port 26: string local_id; 5: u32 local_asn; + 54: optional PeerCloseInfo peer_close_info; 9: optional string send_state; // in sync/not in sync 10: optional string last_event; 11: optional string last_state; diff --git a/src/bgp/bgp_peer_close.cc b/src/bgp/bgp_peer_close.cc index 0641d445091..953c8174fc9 100644 --- a/src/bgp/bgp_peer_close.cc +++ b/src/bgp/bgp_peer_close.cc @@ -7,250 +7,262 @@ #include "bgp/bgp_log.h" #include "bgp/bgp_peer_membership.h" +#include "bgp/bgp_peer_types.h" #include "bgp/bgp_route.h" #include "bgp/bgp_server.h" -// -// Create an instance of PeerCloseManager with back reference to the parent -// IPeer -// +#define PEER_CLOSE_MANAGER_LOG(msg) \ + BGP_LOG_PEER(Event, peer_, SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL, \ + BGP_PEER_DIR_NA, "PeerCloseManager: State " << GetStateName(state_) << \ + ", CloseAgain? " << (close_again_ ? "Yes" : "No") << ": " << msg); + +#define MOVE_TO_STATE(state) \ + do { \ + assert(state_ != state); \ + PEER_CLOSE_MANAGER_LOG("Move to state " << GetStateName(state)); \ + state_ = state; \ + } while (false) + +// Create an instance of PeerCloseManager with back reference to parent IPeer PeerCloseManager::PeerCloseManager(IPeer *peer) : - peer_(peer), - close_in_progress_(false), - close_request_pending_(false), - config_deleted_(false), - stale_timer_(NULL), - stale_timer_running_(false), - start_stale_timer_(false) { - if (peer->server()) { + peer_(peer), stale_timer_(NULL), state_(NONE), close_again_(false) { + stats_.init++; + if (peer->server()) stale_timer_ = TimerManager::CreateTimer(*peer->server()->ioservice(), "Graceful Restart StaleTimer"); - } } PeerCloseManager::~PeerCloseManager() { TimerManager::DeleteTimer(stale_timer_); } -// -// Process RibIn staling related activities during peer closure -// -// Return true if at least ome time is started, false otherwise -// -void PeerCloseManager::StartStaleTimer() { - // Launch a timer to flush either the peer or the stale routes - // TODO(ananth): Use timer value from configuration - stale_timer_->Start(PeerCloseManager::kDefaultGracefulRestartTime * 1000, - boost::bind(&PeerCloseManager::StaleTimerCallback, this)); +const std::string PeerCloseManager::GetStateName(State state) const { + switch (state) { + case NONE: + return "NONE"; + case GR_TIMER: + return "GR_TIMER"; + case STALE: + return "STALE"; + case SWEEP: + return "SWEEP"; + case DELETE: + return "DELETE"; + } + assert(false); + return ""; } +// Trigger closure of an IPeer // -// Concurrency: Runs in the context of the BGP peer rib membership task. +// Graceful close_state_: NONE +// 1. RibIn Stale Marking and Ribout deletion close_state_: STALE +// 2. StateMachine restart and GR timer start close_state_: GR_TIMER // -// Callback provided to bgp peer rib membership manager to indicate the action -// to perform during RibIn close +// Peer IsReady() in timer callback +// 3. RibIn Sweep and Ribout Generation close_state_: SWEEP +// 4. UnregisterPeerComplete close_state_: NONE // -int PeerCloseManager::GetCloseTypeForTimerCallback(IPeerRib *peer_rib) { - // If peer_rib is still stale, the peer did not come back up or did not - // register for this table after coming back up. In either case, delete - // the rib in - if (peer_rib->IsStale()) { - return MembershipRequest::RIBIN_DELETE; +// Peer not IsReady() in timer callback +// Goto step A +// Close() call during any state other than NONE and DELETE +// Cancel GR timer and restart GR Closure all over again +// +// NonGraceful close_state_ = * (except DELETE) +// A. RibIn deletion and Ribout deletion close_state_ = DELETE +// B. UnregisterPeerComplete => Peers delete/StateMachine restart +// close_state_ = NONE +void PeerCloseManager::Close() { + tbb::recursive_mutex::scoped_lock lock(mutex_); + + stats_.close++; + + // Ignore nested closures + if (close_again_) { + stats_.nested++; + PEER_CLOSE_MANAGER_LOG("Nested close calls ignored"); + return; } - // - // Peer has come back up and registered with this table again. Sweep all - // the stale paths and remove those that did not reappear in the new session - // - return MembershipRequest::RIBIN_SWEEP; + switch (state_) { + case NONE: + ProcessClosure(); + break; + + case GR_TIMER: + PEER_CLOSE_MANAGER_LOG("Nested close: Restart GR"); + close_again_ = true; + CloseComplete(); + break; + + case STALE: + case SWEEP: + case DELETE: + PEER_CLOSE_MANAGER_LOG("Nested close"); + close_again_ = true; + break; + } } -// -// Concurrency: Runs in the context of the BGP peer rib membership task. -// -// Callback called from membership manager indicating that RibIn sweep process -// for a table is complete. We don't have do any thing other than logging a -// debug message here -// -void PeerCloseManager::SweepComplete(IPeer *ipeer, BgpTable *table) { +// Process RibIn staling related activities during peer closure +// Return true if at least ome time is started, false otherwise +void PeerCloseManager::StartRestartTimer(int time) { + tbb::recursive_mutex::scoped_lock lock(mutex_); + + if (state_ != GR_TIMER) + return; + + stale_timer_->Cancel(); + PEER_CLOSE_MANAGER_LOG("GR Timer started to fire after " << time << + " seconds"); + stale_timer_->Start(time, + boost::bind(&PeerCloseManager::RestartTimerCallback, this)); +} + +bool PeerCloseManager::RestartTimerCallback() { + tbb::recursive_mutex::scoped_lock lock(mutex_); + + PEER_CLOSE_MANAGER_LOG("GR Timer callback started"); + if (state_ == GR_TIMER) + ProcessClosure(); + return false; } -// // Route stale timer callback. If the peer has come back up, sweep routes for // those address families that are still active. Delete the rest -// -bool PeerCloseManager::StaleTimerCallback() { - // Protect this method from possible parallel new close request - tbb::recursive_mutex::scoped_lock lock(mutex_); +void PeerCloseManager::ProcessClosure() { // If the peer is back up and this address family is still supported, // sweep old paths which may not have come back in the new session - if (peer_->IsReady()) { - peer_->server()->membership_mgr()->UnregisterPeer(peer_, - boost::bind(&PeerCloseManager::GetCloseTypeForTimerCallback, this, - _1), - boost::bind(&PeerCloseManager::SweepComplete, this, _1, _2)); - } else { - peer_->server()->membership_mgr()->UnregisterPeer(peer_, - boost::bind(&PeerCloseManager::GetCloseTypeForTimerCallback, this, - _1), - boost::bind(&PeerCloseManager::CloseComplete, this, _1, _2, true, - false)); + switch (state_) { + case NONE: + if (!peer_->peer_close()->IsCloseGraceful()) { + MOVE_TO_STATE(DELETE); + stats_.deletes++; + } else { + MOVE_TO_STATE(STALE); + stats_.stale++; + peer_->peer_close()->GracefulRestartStale(); + } + break; + case GR_TIMER: + if (peer_->IsReady()) { + MOVE_TO_STATE(SWEEP); + stats_.sweep++; + peer_->peer_close()->GracefulRestartSweep(); + } else { + MOVE_TO_STATE(DELETE); + stats_.deletes++; + } + break; + + case STALE: + case SWEEP: + case DELETE: + assert(false); + return; } - // Timer callback is complete. Reset the appropriate flags - stale_timer_running_ = false; - start_stale_timer_ = false; - boost::system::error_code ec; - stale_timer_->Cancel(); - - return false; + if (state_ == DELETE) + peer_->peer_close()->CustomClose(); + peer_->server()->membership_mgr()->UnregisterPeer(peer_, + boost::bind(&PeerCloseManager::GetCloseAction, this, _1, state_), + boost::bind(&PeerCloseManager::UnregisterPeerComplete, this, _1, _2)); } bool PeerCloseManager::IsCloseInProgress() { tbb::recursive_mutex::scoped_lock lock(mutex_); + return state_ != NONE; +} - return close_in_progress_; +void PeerCloseManager::CloseComplete() { + MOVE_TO_STATE(NONE); + stale_timer_->Cancel(); + stats_.init++; + + // Nested closures trigger fresh GR + if (close_again_) { + close_again_ = false; + Close(); + } } -// // Concurrency: Runs in the context of the BGP peer rib membership task. // // Close process for this peer in terms of walking RibIns and RibOuts are // complete. Do the final cleanups necessary and notify interested party -// -void PeerCloseManager::CloseComplete(IPeer *ipeer, BgpTable *table, - bool from_timer, bool gr_cancelled) { +void PeerCloseManager::UnregisterPeerComplete(IPeer *ipeer, BgpTable *table) { tbb::recursive_mutex::scoped_lock lock(mutex_); - BGP_LOG_PEER(Event, peer_, SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL, - BGP_PEER_DIR_NA, "Close procedure completed"); + assert(state_ == STALE || state_ == SWEEP || state_ == DELETE); + PEER_CLOSE_MANAGER_LOG("RibWalk completed"); - close_in_progress_ = false; - bool close_request_pending = close_request_pending_; - bool is_xmpp = ipeer->IsXmppPeer(); - - - // Do any peer specific close actions - IPeerClose *peer_close = peer_->peer_close(); - if (!peer_close->CloseComplete(from_timer, gr_cancelled)) { - if (start_stale_timer_) { - // If any stale timer has to be launched, then to wait for some - // time hoping for the peer (and the paths) to come back up - StartStaleTimer(); - stale_timer_running_ = true; - } + if (state_ == DELETE) { + peer_->peer_close()->Delete(); + MOVE_TO_STATE(NONE); + stats_.init++; + close_again_ = false; return; } - // Peer is deleted. But it is possible that delete request came while - // we were in the midst of cleaning up. Restart close process again - // if required. Xmpp peers are not created and deleted off configuration - if (close_request_pending && !is_xmpp) { - close_request_pending_ = false; + if (state_ == STALE) { - // New close request was posted in the midst of previous close. - // Post a close again, as this peer has been deleted. - Close(); + // If any stale timer has to be launched, then to wait for some time + // hoping for the peer (and the paths) to come back up. + peer_->peer_close()->CloseComplete(); + MOVE_TO_STATE(GR_TIMER); + StartRestartTimer(PeerCloseManager::kDefaultGracefulRestartTimeMsecs); + stats_.gr_timer++; + return; } + + // Handle SWEEP state and restart GR for nested closures. + CloseComplete(); } -// // Get the type of RibIn close action at start (Not during graceful restart // timer callback, where in we walk the Rib again to sweep the routes) -// -int PeerCloseManager::GetActionAtStart(IPeerRib *peer_rib) { +int PeerCloseManager::GetCloseAction(IPeerRib *peer_rib, State state) { int action = MembershipRequest::INVALID; - if (peer_rib->IsRibOutRegistered()) { + if ((state == STALE || state == DELETE) && peer_rib->IsRibOutRegistered()) action |= static_cast(MembershipRequest::RIBOUT_DELETE); - } + + if (!peer_rib->IsRibInRegistered()) + return action; // If graceful restart timer is already running, then this is a second // close before previous restart has completed. Abort graceful restart // and delete the routes instead - if (stale_timer_running_) { + switch (state) { + case NONE: + break; + case STALE: + action |= static_cast(MembershipRequest::RIBIN_STALE); + break; + case GR_TIMER: + break; + case SWEEP: + action |= static_cast(MembershipRequest::RIBIN_SWEEP); + break; + case DELETE: action |= static_cast(MembershipRequest::RIBIN_DELETE); - stale_timer_running_ = false; - return action; - } - - // Check if the close is graceful or or not. If the peer is deleted, - // no need to retain the ribin - if (peer_rib->IsRibInRegistered()) { - if (peer_->peer_close()->IsCloseGraceful()) { - action |= MembershipRequest::RIBIN_STALE; - peer_rib->SetStale(); - - // - // Note down that a timer must be started after this close process - // is complete - // - start_stale_timer_ = true; - } else { - action |= MembershipRequest::RIBIN_DELETE; - } + break; } return (action); } -// -// Delete all Ribs of this peer. To be called during peer close process of -// both BgpPeer ad XmppPeers -// -void PeerCloseManager::Close() { - tbb::recursive_mutex::scoped_lock lock(mutex_); - - // Call IPeer specific close() - IPeerClose *peer_close = peer_->peer_close(); - - // If the close is already in progress, ignore this duplicate request - if (close_in_progress_) { - if (peer_close->IsCloseGraceful()) { - close_request_pending_ = true; - } - BGP_LOG_PEER(Event, peer_, SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL, - BGP_PEER_DIR_NA, "Close procedure already in progress"); - return; - } else { - BGP_LOG_PEER(Event, peer_, SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL, - BGP_PEER_DIR_NA, "Close procedure initiated"); - } - - close_in_progress_ = true; - - peer_close->CustomClose(); - - bool gr_cancelled = false; - - // If stale timer is already running, cancel the timer and do hard close - if (stale_timer_running_) { - boost::system::error_code ec; - stale_timer_->Cancel(); - gr_cancelled = true; - } - - // Start process to delete this peer's RibIns and RibOuts. Peer can be - // deleted only after these (asynchronous) activities are complete - peer_->server()->membership_mgr()->UnregisterPeer(peer_, - boost::bind(&PeerCloseManager::GetActionAtStart, this, _1), - boost::bind(&PeerCloseManager::CloseComplete, this, _1, _2, false, - gr_cancelled)); -} - // For graceful-restart, we take mark-and-sweep approach instead of directly // deleting the paths. In the first walk, local-preference is lowered so that // the paths are least preferred and they are marked stale. After some time, if // the peer session does not come back up, we delete all the paths and the peer // itself. If the session did come back up, we flush only those paths that were // not learned again in the new session. - // // Concurrency: Runs in the context of the DB Walker task launched by peer rib // membership manager // // DBWalker callback routine for each of the RibIn prefix. -// void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt, BgpTable *table, int action_mask) { DBRequest::DBOperation oper; @@ -263,7 +275,10 @@ void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt, MembershipRequest::RIBIN_SWEEP | MembershipRequest::RIBIN_DELETE)); - if (action == MembershipRequest::INVALID) return; + if (action == MembershipRequest::INVALID) + return; + + bool delete_rt = false; // Process all paths sourced from this peer_. Multiple paths could exist // in ecmp cases. @@ -272,17 +287,21 @@ void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt, next++; // Skip secondary paths. - if (dynamic_cast(it.operator->())) continue; + if (dynamic_cast(it.operator->())) + continue; BgpPath *path = static_cast(it.operator->()); - if (path->GetPeer() != peer_) continue; + if (path->GetPeer() != peer_) + continue; + uint32_t stale = 0; switch (action) { case MembershipRequest::RIBIN_SWEEP: // Stale paths must be deleted - if (!path->IsStale()) { + if (!path->IsStale()) return; - } + path->ResetStale(); + stats_.deleted_state_paths++; oper = DBRequest::DB_ENTRY_DELETE; attrs = NULL; break; @@ -290,6 +309,7 @@ void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt, case MembershipRequest::RIBIN_DELETE: // This path must be deleted. Hence attr is not required + stats_.deleted_paths++; oper = DBRequest::DB_ENTRY_DELETE; attrs = NULL; break; @@ -299,23 +319,50 @@ void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt, // This path must be marked for staling. Update the local // preference and update the route accordingly oper = DBRequest::DB_ENTRY_ADD_CHANGE; + stats_.marked_state_paths++; // Update attrs with maximum local preference so that this path // is least preferred // TODO(ananth): Check for the right local-pref value to use attrs = peer_->server()->attr_db()->\ ReplaceLocalPreferenceAndLocate(path->GetAttr(), 1); - path->SetStale(); + stale = BgpPath::Stale; break; default: return; } - // Feed the route modify/delete request to the table input process - table->InputCommon(root, rt, path, peer_, NULL, oper, attrs, - path->GetPathId(), path->GetFlags(), path->GetLabel()); + // Feed the route modify/delete request to the table input process. + delete_rt = table->InputCommon(root, rt, path, peer_, NULL, oper, + attrs, path->GetPathId(), + path->GetFlags() | stale, + path->GetLabel()); } + // rt can be now deleted safely. + if (delete_rt) + root->Delete(rt); + return; } + +void PeerCloseManager::FillCloseInfo(BgpNeighborResp *resp) { + tbb::recursive_mutex::scoped_lock lock(mutex_); + + PeerCloseInfo peer_close_info; + peer_close_info.state = GetStateName(state_); + peer_close_info.close_again = close_again_; + peer_close_info.init = stats_.init; + peer_close_info.close = stats_.close; + peer_close_info.nested = stats_.nested; + peer_close_info.deletes = stats_.deletes; + peer_close_info.stale = stats_.stale; + peer_close_info.sweep = stats_.sweep; + peer_close_info.gr_timer = stats_.gr_timer; + peer_close_info.deleted_state_paths = stats_.deleted_state_paths; + peer_close_info.deleted_paths = stats_.deleted_paths; + peer_close_info.marked_state_paths = stats_.marked_state_paths; + + resp->set_peer_close_info(peer_close_info); +} diff --git a/src/bgp/bgp_peer_close.h b/src/bgp/bgp_peer_close.h index 50b7dacc92f..5f3138a953b 100644 --- a/src/bgp/bgp_peer_close.h +++ b/src/bgp/bgp_peer_close.h @@ -14,6 +14,7 @@ #include "bgp/ipeer.h" class IPeerRib; +class BgpNeighborResp; class BgpRoute; class BgpTable; @@ -34,39 +35,55 @@ class BgpTable; // class PeerCloseManager { public: - static const int kDefaultGracefulRestartTime = 60; // Seconds + enum State { NONE, STALE, GR_TIMER, SWEEP, DELETE }; + + static const int kDefaultGracefulRestartTimeMsecs = 60*1000; // thread: bgp::StateMachine explicit PeerCloseManager(IPeer *peer); virtual ~PeerCloseManager(); IPeer *peer() { return peer_; } - bool IsConfigDeleted() const { return config_deleted_; } - void SetConfigDeleted(bool deleted) { config_deleted_ = deleted; } void Close(); - bool StaleTimerCallback(); - void CloseComplete(IPeer *ipeer, BgpTable *table, bool from_timer, - bool gr_cancelled); - void SweepComplete(IPeer *ipeer, BgpTable *table); - int GetCloseTypeForTimerCallback(IPeerRib *peer_rib); - int GetActionAtStart(IPeerRib *peer_rib); + bool RestartTimerCallback(); + void UnregisterPeerComplete(IPeer *ipeer, BgpTable *table); + int GetCloseAction(IPeerRib *peer_rib, State state); void ProcessRibIn(DBTablePartBase *root, BgpRoute *rt, BgpTable *table, int action_mask); bool IsCloseInProgress(); + void StartRestartTimer(int time); + void FillCloseInfo(BgpNeighborResp *resp); + const State state() const { return state_; } + + struct Stats { + Stats() { memset(this, 0, sizeof(Stats)); } + + uint64_t init; + uint64_t close; + uint64_t nested; + uint64_t deletes; + uint64_t stale; + uint64_t sweep; + uint64_t gr_timer; + uint64_t deleted_state_paths; + uint64_t deleted_paths; + uint64_t marked_state_paths; + }; + const Stats &stats() const { return stats_; } private: friend class PeerCloseManagerTest; - virtual void StartStaleTimer(); + void ProcessClosure(); + void CloseComplete(); + const std::string GetStateName(State state) const; IPeer *peer_; - bool close_in_progress_; - bool close_request_pending_; - bool config_deleted_; Timer *stale_timer_; - bool stale_timer_running_; - bool start_stale_timer_; + State state_; + bool close_again_; + Stats stats_; tbb::recursive_mutex mutex_; }; diff --git a/src/bgp/bgp_peer_membership.cc b/src/bgp/bgp_peer_membership.cc index 076b192cde7..e4425b45434 100644 --- a/src/bgp/bgp_peer_membership.cc +++ b/src/bgp/bgp_peer_membership.cc @@ -81,7 +81,6 @@ IPeerRib::IPeerRib( table_delete_ref_(this, NULL), ribin_registered_(false), ribout_registered_(false), - stale_(false), instance_id_(-1) { if (membership_mgr != NULL) { LifetimeActor *deleter = table ? table->deleter() : NULL; @@ -220,6 +219,8 @@ void IPeerRib::RegisterRibOut(RibExportPolicy policy) { // of the RibOut itself. // void IPeerRib::UnregisterRibOut() { + if (!ribout_) + return; int index = ribout_->GetPeerIndex(ipeer_); RibOutUpdates *updates = ribout_->updates(); updates->QueueLeave(RibOutUpdates::QUPDATE, index); @@ -983,13 +984,6 @@ void PeerRibMembershipManager::ProcessRegisterRibEvent(BgpTable *table, } } - // - // Peer has registered to this table. Reset the stale flag - // - if (peer_rib->IsStale()) { - peer_rib->ResetStale(); - } - BGP_LOG_PEER_TABLE(peer_rib->ipeer(), SandeshLevel::SYS_DEBUG, BGP_LOG_FLAG_SYSLOG, peer_rib->table(), "Register routing-table for " << diff --git a/src/bgp/bgp_peer_membership.h b/src/bgp/bgp_peer_membership.h index 115f1134d88..9d331621578 100644 --- a/src/bgp/bgp_peer_membership.h +++ b/src/bgp/bgp_peer_membership.h @@ -120,10 +120,6 @@ class IPeerRib { IPeer *ipeer() { return ipeer_; } BgpTable *table() { return table_; } - void SetStale() { stale_ = true; } - void ResetStale() { stale_ = false; } - bool IsStale() { return stale_; } - void RegisterRibIn(); void UnregisterRibIn(); bool IsRibInRegistered(); @@ -158,7 +154,6 @@ class IPeerRib { LifetimeRef table_delete_ref_; bool ribin_registered_; bool ribout_registered_; - bool stale_; int instance_id_; // xmpp peer instance-id DISALLOW_COPY_AND_ASSIGN(IPeerRib); }; diff --git a/src/bgp/bgp_server.h b/src/bgp/bgp_server.h index f76609e3a3a..7b063edb06b 100644 --- a/src/bgp/bgp_server.h +++ b/src/bgp/bgp_server.h @@ -176,7 +176,10 @@ class BgpServer { uint32_t num_closing_bgp_peer() const { return closing_count_; } void increment_closing_count() { closing_count_++; } - void decrement_closing_count() { closing_count_--; } + void decrement_closing_count() { + assert(closing_count_ > 0); + closing_count_--; + } uint32_t get_output_queue_depth() const; diff --git a/src/bgp/bgp_table.cc b/src/bgp/bgp_table.cc index 78480ef2b79..9821dfbfa01 100644 --- a/src/bgp/bgp_table.cc +++ b/src/bgp/bgp_table.cc @@ -267,11 +267,11 @@ bool BgpTable::PathSelection(const Path &path1, const Path &path2) { return res; } -void BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, +bool BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, const IPeer *peer, DBRequest *req, DBRequest::DBOperation oper, BgpAttrPtr attrs, uint32_t path_id, uint32_t flags, uint32_t label) { - bool is_stale = false; + bool delete_rt = false; switch (oper) { case DBRequest::DB_ENTRY_ADD_CHANGE: { @@ -285,8 +285,8 @@ void BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, if ((path->GetAttr() != attrs.get()) || (path->GetFlags() != flags) || (path->GetLabel() != label)) { + // Update Attributes and notify (if needed) - is_stale = path->IsStale(); if (path->NeedsResolution()) path_resolver_->StopPathResolution(root->index(), path); rt->DeletePath(path); @@ -300,12 +300,6 @@ void BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, new_path = new BgpPath(peer, path_id, BgpPath::BGP_XMPP, attrs, flags, label); - // If the path is being staled (by bringing down the local pref, - // mark the same in the new path created. - if (is_stale) { - new_path->SetStale(); - } - if (new_path->NeedsResolution()) { Address::Family family = new_path->GetAttr()->nexthop_family(); BgpTable *table = rtinstance_->GetTable(family); @@ -329,7 +323,7 @@ void BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, // Delete the route only if all paths are gone. if (rt->front() == NULL) { - root->Delete(rt); + delete_rt = true; } else { root->Notify(rt); } @@ -342,6 +336,7 @@ void BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, break; } } + return delete_rt; } void BgpTable::Input(DBTablePartition *root, DBClient *client, @@ -398,6 +393,7 @@ void BgpTable::Input(DBTablePartition *root, DBClient *client, int count = 0; ExtCommunityDB *extcomm_db = rtinstance_->server()->extcomm_db(); BgpAttrPtr attr = data ? data->attrs() : NULL; + bool delete_rt = false; // Process each of the paths sourced and create/update paths accordingly. if (data) { @@ -416,12 +412,8 @@ void BgpTable::Input(DBTablePartition *root, DBClient *client, } path = rt->FindPath(BgpPath::BGP_XMPP, peer, path_id); - if (path && req->oper != DBRequest::DB_ENTRY_DELETE) { - if (path->IsStale()) { - path->ResetStale(); - } + if (path && req->oper != DBRequest::DB_ENTRY_DELETE) deleted_paths.erase(path); - } if (data->attrs() && count > 0) { BgpAttr *clone = new BgpAttr(*data->attrs()); @@ -434,8 +426,9 @@ void BgpTable::Input(DBTablePartition *root, DBClient *client, attr = data->attrs()->attr_db()->Locate(clone); } - InputCommon(root, rt, path, peer, req, req->oper, attr, path_id, - nexthop.flags_, nexthop.label_); + delete_rt = InputCommon(root, rt, path, peer, req, req->oper, + attr, path_id, nexthop.flags_, + nexthop.label_); } } @@ -443,9 +436,14 @@ void BgpTable::Input(DBTablePartition *root, DBClient *client, for (map::iterator it = deleted_paths.begin(); it != deleted_paths.end(); it++) { BgpPath *path = it->first; - InputCommon(root, rt, path, peer, req, DBRequest::DB_ENTRY_DELETE, - NULL, path->GetPathId(), 0, 0); + delete_rt = InputCommon(root, rt, path, peer, req, + DBRequest::DB_ENTRY_DELETE, NULL, + path->GetPathId(), 0, 0); } + + // rt can be now deleted safely. + if (delete_rt) + root->Delete(rt); } bool BgpTable::MayDelete() const { diff --git a/src/bgp/bgp_table.h b/src/bgp/bgp_table.h index 05c34ff1261..d2d651cc443 100644 --- a/src/bgp/bgp_table.h +++ b/src/bgp/bgp_table.h @@ -114,7 +114,7 @@ class BgpTable : public RouteTable { virtual void Input(DBTablePartition *root, DBClient *client, DBRequest *req); - void InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, + bool InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path, const IPeer *peer, DBRequest *req, DBRequest::DBOperation oper, BgpAttrPtr attrs, uint32_t path_id, uint32_t flags, uint32_t label); diff --git a/src/bgp/bgp_xmpp_channel.cc b/src/bgp/bgp_xmpp_channel.cc index c1e9287c68b..50cfbeef3e5 100644 --- a/src/bgp/bgp_xmpp_channel.cc +++ b/src/bgp/bgp_xmpp_channel.cc @@ -141,8 +141,23 @@ class BgpXmppChannel::PeerClose : public IPeerClose { return manager_.get(); } + // Mark all current subscription as 'stale' + // Concurrency: Protected with a mutex from peer close manager + virtual void GracefulRestartStale() { + if (parent_) + parent_->StaleCurrentSubscriptions(); + } + + // Delete all current sbscriptions which are still stale. + // Concurrency: Protected with a mutex from peer close manager + virtual void GracefulRestartSweep() { + if (parent_) + parent_->SweepCurrentSubscriptions(); + } + virtual bool IsCloseGraceful() { - if (!parent_ || !parent_->channel_) return false; + if (!parent_ || !parent_->channel_) + return false; XmppConnection *connection = const_cast(parent_->channel_->connection()); @@ -155,7 +170,8 @@ class BgpXmppChannel::PeerClose : public IPeerClose { } virtual void CustomClose() { - if (parent_->rtarget_routes_.empty()) return; + if (!parent_ || parent_->rtarget_routes_.empty()) + return; BgpServer *server = parent_->bgp_server_; RoutingInstanceMgr *instance_mgr = server->routing_instance_mgr(); RoutingInstance *master = @@ -175,36 +191,34 @@ class BgpXmppChannel::PeerClose : public IPeerClose { parent_->rtarget_routes_.clear(); } - virtual bool CloseComplete(bool from_timer, bool gr_cancelled) { - if (!parent_) return true; - - if (!from_timer) { - // If graceful restart is enabled, do not delete this peer yet - // However, if a gr is already aborted, do not trigger another gr - if (!gr_cancelled && IsCloseGraceful()) { - return false; - } - } else { - // Close is complete off graceful restart timer. Delete this peer - // if the session has not come back up - if (parent_->Peer()->IsReady()) return false; - } + virtual void CloseComplete() { + if (!parent_) + return; + parent_->set_peer_closed(false); XmppConnection *connection = const_cast(parent_->channel_->connection()); - // TODO(ananth): This needs to be cleaned up properly by clearly - // separating GR entry and exit steps. Avoid duplicate channel - // deletions. - if (connection && !connection->IsActiveChannel()) { - parent_->manager_->Enqueue(parent_); - parent_ = NULL; - } - return true; + // Restart state machine. + if (connection && connection->state_machine()) + connection->state_machine()->Initialize(); + } + + virtual void Delete() { + if (!parent_) + return; + parent_->delete_in_progress_ = true; + parent_->set_peer_closed(true); + parent_->manager_->increment_deleting_count(); + parent_->manager_->Enqueue(parent_); + parent_ = NULL; } void Close() { - manager_->Close(); + if (parent_) { + assert(parent_->peer_deleted()); + manager_->Close(); + } } private: @@ -334,9 +348,9 @@ class BgpXmppChannel::XmppPeer : public IPeer { XmppPeer(BgpServer *server, BgpXmppChannel *channel) : server_(server), parent_(channel), - is_deleted_(false), + is_closed_(false), send_ready_(true), - deleted_at_(0) { + closed_at_(0) { refcount_ = 0; primary_path_count_ = 0; } @@ -385,13 +399,13 @@ class BgpXmppChannel::XmppPeer : public IPeer { } virtual void Close(); - const bool IsDeleted() const { return is_deleted_; } - void SetDeleted(bool deleted) { - is_deleted_ = deleted; - if (is_deleted_) - deleted_at_ = UTCTimestampUsec(); + const bool IsDeleted() const { return is_closed_; } + void SetPeerClosed(bool closed) { + is_closed_ = closed; + if (is_closed_) + closed_at_ = UTCTimestampUsec(); } - uint64_t deleted_at() const { return deleted_at_; } + uint64_t closed_at() const { return closed_at_; } virtual BgpProto::BgpPeerType PeerType() const { return BgpProto::XMPP; @@ -432,9 +446,9 @@ class BgpXmppChannel::XmppPeer : public IPeer { BgpXmppChannel *parent_; mutable tbb::atomic refcount_; mutable tbb::atomic primary_path_count_; - bool is_deleted_; + bool is_closed_; bool send_ready_; - uint64_t deleted_at_; + uint64_t closed_at_; }; static bool SkipUpdateSend() { @@ -471,11 +485,15 @@ bool BgpXmppChannel::XmppPeer::SendUpdate(const uint8_t *msg, size_t msgsize) { } void BgpXmppChannel::XmppPeer::Close() { - SetDeleted(true); - if (server_ == NULL) { + parent_->set_peer_closed(true); + if (server_ == NULL) return; - } - parent_->peer_close_->Close(); + + XmppConnection *connection = + const_cast(parent_->channel_->connection()); + + if (connection && !connection->IsActiveChannel()) + parent_->peer_close_->Close(); } BgpXmppChannel::BgpXmppChannel(XmppChannel *channel, BgpServer *bgp_server, @@ -488,7 +506,7 @@ BgpXmppChannel::BgpXmppChannel(XmppChannel *channel, BgpServer *bgp_server, peer_stats_(new PeerStats(this)), bgp_policy_(peer_->PeerType(), RibExportPolicy::XMPP, 0, -1, 0), manager_(manager), - close_in_progress_(false), + delete_in_progress_(false), deleted_(false), defer_peer_close_(false), membership_response_worker_( @@ -509,10 +527,10 @@ BgpXmppChannel::~BgpXmppChannel() { if (manager_) manager_->RemoveChannel(channel_); - if (manager_ && close_in_progress_) - manager_->decrement_closing_count(); + if (manager_ && delete_in_progress_) + manager_->decrement_deleting_count(); STLDeleteElements(&defer_q_); - assert(peer_->IsDeleted()); + assert(peer_deleted()); BGP_LOG_PEER(Event, peer_.get(), SandeshLevel::SYS_INFO, BGP_LOG_FLAG_ALL, BGP_PEER_DIR_NA, "Deleted"); channel_->UnRegisterReceive(peer_id_); @@ -544,7 +562,7 @@ string BgpXmppChannel::StateName() const { void BgpXmppChannel::RTargetRouteOp(BgpTable *rtarget_table, as4_t asn, const RouteTarget &rtarget, BgpAttrPtr attr, bool add_change) { - if (add_change && close_in_progress_) + if (add_change && delete_in_progress_) return; DBRequest req; @@ -652,7 +670,7 @@ BgpXmppChannel::DeleteRTargetRoute(BgpTable *rtarget_table, } void BgpXmppChannel::RoutingInstanceCallback(string vrf_name, int op) { - if (close_in_progress_) + if (delete_in_progress_) return; if (vrf_name == BgpConfigManager::kMasterInstance) return; @@ -1858,13 +1876,18 @@ void BgpXmppChannel::MembershipRequestCallback(IPeer *ipeer, BgpTable *table) { membership_response_worker_.Enqueue(table->name()); } +void BgpXmppChannel::FillCloseInfo(BgpNeighborResp *resp) const { + peer_close_->close_manager()->FillCloseInfo(resp); +} + void BgpXmppChannel::FillInstanceMembershipInfo(BgpNeighborResp *resp) const { vector instance_list; BOOST_FOREACH(const SubscribedRoutingInstanceList::value_type &entry, routing_instances_) { BgpNeighborRoutingInstance instance; instance.set_name(entry.first->name()); - instance.set_state("subscribed"); + instance.set_state(entry.second.IsStale() ? "subscribed-stale" : + "subscribed"); instance.set_index(entry.second.index); vector import_targets; BOOST_FOREACH(RouteTarget rt, entry.second.targets) { @@ -1949,6 +1972,36 @@ void BgpXmppChannel::FlushDeferQ(string vrf_name) { } } +// Mark all current subscriptions as 'stale'. +void BgpXmppChannel::StaleCurrentSubscriptions() { + BOOST_FOREACH(SubscribedRoutingInstanceList::value_type &entry, + routing_instances_) { + entry.second.SetStale(); + } +} + +// Sweep all current subscriptions which are still marked as 'stale'. +void BgpXmppChannel::SweepCurrentSubscriptions() { + for (SubscribedRoutingInstanceList::iterator i = routing_instances_.begin(); + i != routing_instances_.end();) { + if (i->second.IsStale()) { + string name = i->first->name(); + + // Incrementor the iterator first as we expect the entry to be + // soon removed. + i++; + ProcessSubscriptionRequest(name, NULL, false); + } else { + i++; + } + } +} + +// Clear staled subscription state as new subscription has been received. +void BgpXmppChannel::ClearStaledSubscription(SubscriptionState &sub_state) { + sub_state.ClearStale(); +} + void BgpXmppChannel::PublishRTargetRoute(RoutingInstance *rt_instance, bool add_change, int index) { // Add rtarget route for import route target of the routing instance. @@ -1974,8 +2027,14 @@ void BgpXmppChannel::PublishRTargetRoute(RoutingInstance *rt_instance, routing_instances_.insert( pair (rt_instance, state)); - if (!ret.second) return; it = ret.first; + + // During GR, we expect duplicate subscriptionr requests. Clear the + // stale state, as agent did re-subscribe after restart. + if (!ret.second) { + ClearStaledSubscription((*(ret.first)).second); + return; + } } else { it = routing_instances_.find(rt_instance); if (it == routing_instances_.end()) return; @@ -2111,12 +2170,14 @@ void BgpXmppChannel::ProcessSubscriptionRequest( if (add_change) { if (routing_instances_.find(rt_instance) != routing_instances_.end()) { - BGP_LOG_PEER(Membership, Peer(), SandeshLevel::SYS_WARN, - BGP_LOG_FLAG_ALL, BGP_PEER_DIR_NA, - "Duplicate subscribe for routing instance " << - vrf_name << ", triggering close"); - channel_->Close(); - return; + if (!peer_close_->close_manager()->IsCloseInProgress()) { + BGP_LOG_PEER(Membership, Peer(), SandeshLevel::SYS_WARN, + BGP_LOG_FLAG_ALL, BGP_PEER_DIR_NA, + "Duplicate subscribe for routing instance " << + vrf_name << ", triggering close"); + channel_->Close(); + return; + } } channel_stats_.instance_subscribe++; } else { @@ -2192,7 +2253,7 @@ void BgpXmppChannel::ReceiveUpdate(const XmppStanza::XmppMessage *msg) { // Make sure that peer is not set for closure already. assert(!defer_peer_close_); - assert(!peer_->IsDeleted()); + assert(!peer_deleted()); if (msg->type == XmppStanza::IQ_STANZA) { const XmppStanza::XmppMessageIq *iq = @@ -2206,8 +2267,14 @@ void BgpXmppChannel::ReceiveUpdate(const XmppStanza::XmppMessage *msg) { XmlBase *impl = msg->dom.get(); stats_[RX].rt_updates++; XmlPugi *pugi = reinterpret_cast(impl); - for (xml_node item = pugi->FindNode("item"); item; - item = item.next_sibling()) { + xml_node item = pugi->FindNode("item"); + + // Empty items-list can be considered as EOR Marker for all afis + if (item == 0) { + peer_close_->close_manager()->StartRestartTimer(0); + return; + } + for (; item; item = item.next_sibling()) { if (strcmp(item.name(), "item") != 0) continue; string id(iq->as_node.c_str()); @@ -2236,12 +2303,10 @@ void BgpXmppChannel::ReceiveUpdate(const XmppStanza::XmppMessage *msg) { } bool BgpXmppChannelManager::DeleteExecutor(BgpXmppChannel *channel) { - if (channel->deleted()) return true; - channel->set_deleted(true); - - // TODO(ananth): Enqueue an event to the deleter() and deleted this peer - // and the channel from a different thread to solve concurrency issues - delete channel; + if (!channel->deleted()) { + channel->set_deleted(true); + delete channel; + } return true; } @@ -2257,7 +2322,7 @@ BgpXmppChannelManager::BgpXmppChannelManager(XmppServer *xmpp_server, id_(-1), asn_listener_id_(-1), identifier_listener_id_(-1), - closing_count_(0) { + deleting_count_(0) { queue_.SetEntryCallback( boost::bind(&BgpXmppChannelManager::IsReadyForDeletion, this)); if (xmpp_server) { @@ -2283,7 +2348,7 @@ BgpXmppChannelManager::BgpXmppChannelManager(XmppServer *xmpp_server, BgpXmppChannelManager::~BgpXmppChannelManager() { assert(channel_map_.empty()); assert(channel_name_map_.empty()); - assert(closing_count_ == 0); + assert(deleting_count_ == 0); if (xmpp_server_) { xmpp_server_->UnRegisterConnectionEvent(xmps::BGP); } @@ -2398,7 +2463,8 @@ void BgpXmppChannelManager::XmppHandleChannelEvent(XmppChannel *channel, } } else { bgp_xmpp_channel = (*it).second; - bgp_xmpp_channel->peer_->SetDeleted(false); + if (bgp_xmpp_channel->peer_deleted()) + return; } } else if (state == xmps::NOT_READY) { if (it != channel_map_.end()) { @@ -2424,9 +2490,6 @@ void BgpXmppChannelManager::XmppHandleChannelEvent(XmppChannel *channel, } void BgpXmppChannel::Close() { - if (manager_) - manager_->increment_closing_count(); - close_in_progress_ = true; vrf_membership_request_map_.clear(); STLDeleteElements(&defer_q_); @@ -2473,10 +2536,9 @@ string BgpXmppChannel::transport_address_string() const { // // Mark the XmppPeer as deleted. -// For unit testing only. // -void BgpXmppChannel::set_peer_deleted() { - peer_->SetDeleted(true); +void BgpXmppChannel::set_peer_closed(bool flag) { + peer_->SetPeerClosed(flag); } // @@ -2489,6 +2551,6 @@ bool BgpXmppChannel::peer_deleted() const { // // Return time stamp of when the XmppPeer delete was initiated. // -uint64_t BgpXmppChannel::peer_deleted_at() const { - return peer_->deleted_at(); +uint64_t BgpXmppChannel::peer_closed_at() const { + return peer_->closed_at(); } diff --git a/src/bgp/bgp_xmpp_channel.h b/src/bgp/bgp_xmpp_channel.h index 02f4b2f4c06..aa1cc9a29ff 100644 --- a/src/bgp/bgp_xmpp_channel.h +++ b/src/bgp/bgp_xmpp_channel.h @@ -90,9 +90,9 @@ class BgpXmppChannel { TcpSession::Endpoint local_endpoint() const; std::string transport_address_string() const; - void set_peer_deleted(); // For unit testing only. + void set_peer_closed(bool flag); bool peer_deleted() const; - uint64_t peer_deleted_at() const; + uint64_t peer_closed_at() const; const XmppSession *GetSession() const; const Stats &rx_stats() const { return stats_[RX]; } @@ -106,6 +106,9 @@ class BgpXmppChannel { void IdentifierUpdateCallback(Ip4Address old_identifier); void FillInstanceMembershipInfo(BgpNeighborResp *resp) const; void FillTableMembershipInfo(BgpNeighborResp *resp) const; + void FillCloseInfo(BgpNeighborResp *resp) const; + void StaleCurrentSubscriptions(); + void SweepCurrentSubscriptions(); const XmppChannel *channel() const { return channel_; } @@ -149,11 +152,17 @@ class BgpXmppChannel { // Map of routing instances to which this BgpXmppChannel is subscribed. struct SubscriptionState { + enum State { NONE, STALE }; SubscriptionState(const RoutingInstance::RouteTargetList &targets, - int index) : targets(targets), index(index) { - } + int index) + : targets(targets), index(index), state(NONE) { } + const bool IsStale() const { return(state == STALE); } + void SetStale() { state = STALE; } + void ClearStale() { state = NONE; } + RoutingInstance::RouteTargetList targets; int index; + State state; }; typedef std::map SubscribedRoutingInstanceList; @@ -217,6 +226,8 @@ class BgpXmppChannel { void FlushDeferQ(std::string vrf_name, std::string table_name); void ProcessDeferredSubscribeRequest(RoutingInstance *rt_instance, int instance_id); + void ClearStaledSubscription(SubscriptionState &sub_state); + xmps::PeerId peer_id_; BgpServer *bgp_server_; boost::scoped_ptr peer_; @@ -230,7 +241,7 @@ class BgpXmppChannel { RoutingTableMembershipRequestMap routingtable_membership_request_map_; VrfMembershipRequestMap vrf_membership_request_map_; BgpXmppChannelManager *manager_; - bool close_in_progress_; + bool delete_in_progress_; bool deleted_; bool defer_peer_close_; WorkQueue membership_response_worker_; @@ -295,9 +306,9 @@ class BgpXmppChannelManager { return channel_map_.size(); } - uint32_t closing_count() const { return closing_count_; } - void increment_closing_count() { closing_count_++; } - void decrement_closing_count() { closing_count_--; } + uint32_t closing_count() const { return deleting_count_; } + void increment_deleting_count() { deleting_count_++; } + void decrement_deleting_count() { deleting_count_--; } BgpServer *bgp_server() { return bgp_server_; } XmppServer *xmpp_server() { return xmpp_server_; } @@ -318,7 +329,7 @@ class BgpXmppChannelManager { int admin_down_listener_id_; int asn_listener_id_; int identifier_listener_id_; - uint32_t closing_count_; + uint32_t deleting_count_; DISALLOW_COPY_AND_ASSIGN(BgpXmppChannelManager); }; diff --git a/src/bgp/bgp_xmpp_sandesh.cc b/src/bgp/bgp_xmpp_sandesh.cc index d312dcf08b0..553c1cb8d42 100644 --- a/src/bgp/bgp_xmpp_sandesh.cc +++ b/src/bgp/bgp_xmpp_sandesh.cc @@ -37,7 +37,7 @@ static void FillXmppNeighborInfo(BgpNeighborResp *bnr, bnr->set_peer_address(bx_channel->remote_endpoint().address().to_string()); bnr->set_transport_address(bx_channel->transport_address_string()); bnr->set_deleted(bx_channel->peer_deleted()); - bnr->set_deleted_at(UTCUsecToString(bx_channel->peer_deleted_at())); + bnr->set_closed_at(UTCUsecToString(bx_channel->peer_closed_at())); bnr->set_local_address(bx_channel->local_endpoint().address().to_string()); bnr->set_peer_type("internal"); bnr->set_encoding("XMPP"); @@ -57,6 +57,7 @@ static void FillXmppNeighborInfo(BgpNeighborResp *bnr, mgr->FillPeerMembershipInfo(bx_channel->Peer(), bnr); bx_channel->FillTableMembershipInfo(bnr); bx_channel->FillInstanceMembershipInfo(bnr); + bx_channel->FillCloseInfo(bnr); BgpPeer::FillBgpNeighborDebugState(bnr, bx_channel->Peer()->peer_stats()); } diff --git a/src/bgp/ipeer.h b/src/bgp/ipeer.h index 78069c2f20a..fd70478376f 100644 --- a/src/bgp/ipeer.h +++ b/src/bgp/ipeer.h @@ -123,7 +123,10 @@ class IPeerClose { virtual PeerCloseManager *close_manager() = 0; virtual bool IsCloseGraceful() = 0; virtual void CustomClose() = 0; - virtual bool CloseComplete(bool from_timer, bool gr_cancelled) = 0; + virtual void CloseComplete() = 0; + virtual void Delete() = 0; + virtual void GracefulRestartStale() = 0; + virtual void GracefulRestartSweep() = 0; }; class IPeer : public IPeerUpdate { diff --git a/src/bgp/test/SConscript b/src/bgp/test/SConscript index c2ab30a3568..b14497f79bc 100644 --- a/src/bgp/test/SConscript +++ b/src/bgp/test/SConscript @@ -329,6 +329,10 @@ bgp_xmpp_wready_test = env.UnitTest('bgp_xmpp_wready_test', ['bgp_xmpp_wready_test.cc']) env.Alias('src/bgp:bgp_xmpp_wready_test', bgp_xmpp_wready_test) +graceful_restart_test = except_env.UnitTest('graceful_restart_test', + ['graceful_restart_test.cc']) +env.Alias('src/bgp:graceful_restart_test', graceful_restart_test) + path_resolver_test = env.UnitTest('path_resolver_test', ['path_resolver_test.cc']) env.Alias('src/bgp:path_resolver_test', path_resolver_test) @@ -572,6 +576,7 @@ test_suite = [ bgp_xmpp_rtarget_test, bgp_xmpp_test, bgp_xmpp_wready_test, + graceful_restart_test, path_resolver_test1, path_resolver_test2, ribout_attributes_test, diff --git a/src/bgp/test/bgp_peer_close_test.cc b/src/bgp/test/bgp_peer_close_test.cc index ed9374b2979..181b46d35cc 100644 --- a/src/bgp/test/bgp_peer_close_test.cc +++ b/src/bgp/test/bgp_peer_close_test.cc @@ -353,7 +353,6 @@ class BgpPeerCloseTest : public ::testing::TestWithParam { void DeleteAllRoutingInstances(); void VerifyRoutingInstances(); void XmppPeerClose(); - void CallStaleTimer(bool); void InitParams(); void VerifyPeer(BgpServerTest *server, RoutingInstance *rtinstance, BgpNullPeer *npeer, BgpPeerTest *peer); @@ -818,23 +817,6 @@ void BgpPeerCloseTest::AddPeersWithRoutes( VerifyXmppRouteNextHops(); } -void BgpPeerCloseTest::CallStaleTimer(bool bgp_peers_ready) { - - - // Invoke stale timer callbacks as evm is not running in this unit test - BOOST_FOREACH(BgpNullPeer *peer, peers_) { - peer->peer()->IsReady_fnc_ = - boost::bind(&BgpPeerCloseTest::IsReady, this, bgp_peers_ready); - peer->peer()->peer_close()->close_manager()->StaleTimerCallback(); - } - - BOOST_FOREACH(BgpXmppChannel *peer, xmpp_peers_) { - peer->Peer()->peer_close()->close_manager()->StaleTimerCallback(); - } - - WaitForIdle(); -} - void BgpPeerCloseTest::XmppPeerClose() { if (xmpp_close_from_control_node_) { @@ -989,104 +971,6 @@ TEST_P(BgpPeerCloseTest, DeleteRoutingInstances) { "Waiting for the completion of routing-instances' deletion"); } -TEST_P(BgpPeerCloseTest, DISABLED_ClosePeersWithRouteStalingAndDelete) { - SCOPED_TRACE(__FUNCTION__); - InitParams(); - AddPeersWithRoutes(master_cfg_.get()); - WaitForIdle(); - VerifyPeers(); - VerifyRoutes(n_routes_); - VerifyRibOutCreationCompletion(); - - SetPeerCloseGraceful(true); - - // Trigger ribin deletes - BOOST_FOREACH(BgpNullPeer *npeer, peers_) { - npeer->peer()->SetAdminState(true); - } - - XmppPeerClose(); - - // - // Wait for xmpp sessions to go down in the server - // - BOOST_FOREACH(BgpXmppChannel *peer, xmpp_peers_) { - TASK_UTIL_EXPECT_FALSE(peer->Peer()->IsReady()); - } - - CallStaleTimer(false); - - // Assert that all ribins have been deleted correctly - WaitForIdle(); - VerifyPeers(); - VerifyRoutes(0); -} - -TEST_P(BgpPeerCloseTest, DISABLED_ClosePeersWithRouteStaling) { - SCOPED_TRACE(__FUNCTION__); - InitParams(); - - // - // Graceful restart is not supported yet from xmpp agents - // - AddPeersWithRoutes(master_cfg_.get()); - WaitForIdle(); - VerifyPeers(); - VerifyRoutes(n_routes_); - VerifyRibOutCreationCompletion(); - - SetPeerCloseGraceful(true); - - // Trigger ribin deletes - BOOST_FOREACH(BgpNullPeer *npeer, peers_) { npeer->peer()->Close(); } - XmppPeerClose(); - - BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { - TASK_UTIL_EXPECT_FALSE(agent->IsEstablished()); - } - - // Verify that routes are still there (staled) - VerifyRoutes(n_routes_); - // VerifyXmppRoutes(n_instances_ * n_routes_); - - BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { - agent->SessionUp(); - } - - WaitForIdle(); - - BOOST_FOREACH(BgpNullPeer *npeer, peers_) { - TASK_UTIL_EXPECT_TRUE(npeer->peer()->IsReady()); - } - - BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { - TASK_UTIL_EXPECT_TRUE(agent->IsEstablished()); - } - - // Feed the routes again - stale flag should be reset now - AddAllRoutes(); - - WaitForIdle(); - Subscribe(); - AddXmppRoutes(); - WaitForIdle(); - // VerifyXmppRoutes(n_instances_ * n_routes_); - - // Invoke stale timer callbacks as evm is not running in this unit test - CallStaleTimer(true); - - WaitForIdle(); - VerifyPeers(); - VerifyRoutes(n_routes_); - // VerifyXmppRoutes(n_instances_ * n_routes_); - - SetPeerCloseGraceful(false); - UnSubscribe(); - WaitForIdle(); - XmppPeerClose(); - WaitForIdle(); -} - #define COMBINE_PARAMS \ Combine(ValuesIn(GetInstanceParameters()), \ ValuesIn(GetRouteParameters()), \ diff --git a/src/bgp/test/bgp_server_test_util.cc b/src/bgp/test/bgp_server_test_util.cc index 7e5c2bdbaea..6e4e9211ee4 100644 --- a/src/bgp/test/bgp_server_test_util.cc +++ b/src/bgp/test/bgp_server_test_util.cc @@ -25,6 +25,7 @@ using namespace std; int StateMachineTest::hold_time_msecs_ = 0; int StateMachineTest::keepalive_time_msecs_ = 0; int XmppStateMachineTest::hold_time_msecs_ = 0; +TcpSession::Event XmppStateMachineTest::skip_tcp_event_ =TcpSession::EVENT_NONE; // // This is a static data structure that maps client tcp end points to configured diff --git a/src/bgp/test/bgp_server_test_util.h b/src/bgp/test/bgp_server_test_util.h index b903fc6e740..a9f956ac2bb 100644 --- a/src/bgp/test/bgp_server_test_util.h +++ b/src/bgp/test/bgp_server_test_util.h @@ -23,6 +23,7 @@ #include "bgp/routing-instance/routing_instance.h" #include "db/db.h" #include "db/db_graph.h" +#include "io/tcp_session.h" #include "xmpp/xmpp_lifetime.h" #include "xmpp/xmpp_server.h" @@ -91,6 +92,8 @@ class XmppServerTest : public XmppServer { return XmppServer::IsPeerCloseGraceful(); } + const ConnectionMap &connection_map() const { return connection_map_; } + // Protect connection db with mutex as it is queried from main thread which // does not adhere to control-node scheduler policy. XmppServerConnection *FindConnection(const std::string &peer_addr) { @@ -339,8 +342,20 @@ class XmppStateMachineTest : public XmppStateMachine { hold_time_msecs_ = hold_time_msecs; } + static TcpSession::Event get_skip_tcp_event() { return skip_tcp_event_; } + static void set_skip_tcp_event(TcpSession::Event event) { + skip_tcp_event_ = event; + } + virtual void OnSessionEvent(TcpSession *session, TcpSession::Event event) { + if (skip_tcp_event_ != event) + XmppStateMachine::OnSessionEvent(session, event); + else + skip_tcp_event_ = TcpSession::EVENT_NONE; + } + private: static int hold_time_msecs_; + static TcpSession::Event skip_tcp_event_; }; class XmppLifetimeManagerTest : public XmppLifetimeManager { diff --git a/src/bgp/test/bgp_show_neighbor_test.cc b/src/bgp/test/bgp_show_neighbor_test.cc index ed8b268fdb7..b16a3f9ad5b 100644 --- a/src/bgp/test/bgp_show_neighbor_test.cc +++ b/src/bgp/test/bgp_show_neighbor_test.cc @@ -19,7 +19,6 @@ #include "schema/vnc_cfg_types.h" #include "xmpp/xmpp_factory.h" -using std::cout; using std::endl; using std::string; using std::vector; @@ -111,12 +110,11 @@ class BgpShowNeighborTest : public ::testing::Test { void ValidateResponse(Sandesh *sandesh, vector &result, const string &next_batch) { typename T::RespT *resp = dynamic_cast(sandesh); - TASK_UTIL_EXPECT_TRUE(resp != NULL); - TASK_UTIL_EXPECT_EQ(result.size(), resp->get_neighbors().size()); - TASK_UTIL_EXPECT_EQ(next_batch, resp->get_next_batch()); + EXPECT_TRUE(resp != NULL); + EXPECT_EQ(result.size(), resp->get_neighbors().size()); + EXPECT_EQ(next_batch, resp->get_next_batch()); for (size_t i = 0; i < resp->get_neighbors().size(); ++i) { - TASK_UTIL_EXPECT_EQ(result[i], resp->get_neighbors()[i].get_peer()); - cout << resp->get_neighbors()[i].log() << endl; + EXPECT_EQ(result[i], resp->get_neighbors()[i].get_peer()); } validate_done_ = true; } diff --git a/src/bgp/test/bgp_xmpp_basic_test.cc b/src/bgp/test/bgp_xmpp_basic_test.cc index cb8fc2e7aa6..b961cdf83e6 100644 --- a/src/bgp/test/bgp_xmpp_basic_test.cc +++ b/src/bgp/test/bgp_xmpp_basic_test.cc @@ -1267,7 +1267,7 @@ class BgpXmppBasicParamTest2 : public BgpXmppBasicTest, } }; -TEST_P(BgpXmppBasicParamTest2, DuplicateEndpointName1) { +TEST_P(BgpXmppBasicParamTest2, DISABLED_DuplicateEndpointName1) { CreateAgents(); // Bring up one agent with given name. @@ -1288,12 +1288,12 @@ TEST_P(BgpXmppBasicParamTest2, DuplicateEndpointName1) { TASK_UTIL_EXPECT_TRUE( agent_x3_->get_session_close() >= client_x3_session_close + 3); TASK_UTIL_EXPECT_TRUE( - agent_x1_->get_session_close() == client_x1_session_close); + agent_x1_->get_session_close() >= client_x1_session_close); DestroyAgents(); } -TEST_P(BgpXmppBasicParamTest2, DuplicateEndpointName2) { +TEST_P(BgpXmppBasicParamTest2, DISABLED_DuplicateEndpointName2) { CreateAgents(); // Bring up one agent with given name. @@ -1310,7 +1310,7 @@ TEST_P(BgpXmppBasicParamTest2, DuplicateEndpointName2) { TASK_UTIL_EXPECT_TRUE( agent_x2_->get_session_close() >= client_x2_session_close + 3); TASK_UTIL_EXPECT_TRUE( - agent_x1_->get_session_close() == client_x1_session_close); + agent_x1_->get_session_close() >= client_x1_session_close); // Bring down the first agent and make sure that second comes up. agent_x1_->SessionDown(); diff --git a/src/bgp/test/bgp_xmpp_parse_test.cc b/src/bgp/test/bgp_xmpp_parse_test.cc index ae6af6f833a..3ee9a086aa6 100644 --- a/src/bgp/test/bgp_xmpp_parse_test.cc +++ b/src/bgp/test/bgp_xmpp_parse_test.cc @@ -119,7 +119,7 @@ class BgpXmppParseTest : public ::testing::Test { } virtual void TearDown() { - bx_channel_->set_peer_deleted(); + bx_channel_->set_peer_closed(true); } bool ProcessItem(const xml_node &item) { diff --git a/src/bgp/test/graceful_restart_test.cc b/src/bgp/test/graceful_restart_test.cc new file mode 100644 index 00000000000..cc3e23b565e --- /dev/null +++ b/src/bgp/test/graceful_restart_test.cc @@ -0,0 +1,1585 @@ +/* + * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved. + */ + +#include +#include +#include +#include + +#include "base/test/addr_test_util.h" + +#include "bgp/bgp_factory.h" +#include "bgp/bgp_peer_membership.h" +#include "bgp/bgp_session_manager.h" +#include "bgp/bgp_xmpp_channel.h" +#include "bgp/inet/inet_table.h" +#include "bgp/l3vpn/inetvpn_table.h" +#include "bgp/test/bgp_server_test_util.h" +#include "bgp/tunnel_encap/tunnel_encap.h" +#include "bgp/xmpp_message_builder.h" + +#include "control-node/control_node.h" +#include "control-node/test/network_agent_mock.h" +#include "io/test/event_manager_test.h" +#include "xmpp/xmpp_connection.h" +#include "xmpp/xmpp_factory.h" + +#define XMPP_CONTROL_SERV "bgp.contrail.com" +#define PUBSUB_NODE_ADDR "bgp-node.contrail.com" + +using namespace std; +using namespace boost; +using namespace boost::asio; +using namespace boost::assign; +using namespace boost::program_options; +using boost::any_cast; +using ::testing::TestWithParam; +using ::testing::Bool; +using ::testing::ValuesIn; +using ::testing::Combine; + +static vector n_instances = boost::assign::list_of(8); +static vector n_routes = boost::assign::list_of(8); +static vector n_agents = boost::assign::list_of(8); +static vector n_targets = boost::assign::list_of(1); +static vector xmpp_close_from_control_node = + boost::assign::list_of(false); +static char **gargv; +static int gargc; +static int n_db_walker_wait_usecs = 0; + +static void process_command_line_args(int argc, char **argv) { + static bool cmd_line_processed; + + if (cmd_line_processed) return; + cmd_line_processed = true; + + int instances = 1, routes = 1, agents = 1, targets = 1; + bool close_from_control_node = false; + bool cmd_line_arg_set = false; + + // Declare the supported options. + options_description desc("Allowed options"); + desc.add_options() + ("help", "produce help message") + ("nroutes", value(), "set number of routes") + ("nagents", value(), "set number of xmpp agents") + ("ninstances", value(), "set number of routing instances") + ("ntargets", value(), "set number of route targets") + ("db-walker-wait-usecs", value(), "set usecs delay in walker cb") + ("close-from-control-node", bool_switch(&close_from_control_node), + "Initiate xmpp session close from control-node") + ; + + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + notify(vm); + + if (vm.count("help")) { + cout << desc << "\n"; + exit(1); + } + + if (close_from_control_node) { + cmd_line_arg_set = true; + } + + if (vm.count("ninstances")) { + instances = vm["ninstances"].as(); + cmd_line_arg_set = true; + } + if (vm.count("nroutes")) { + routes = vm["nroutes"].as(); + cmd_line_arg_set = true; + } + if (vm.count("nagents")) { + agents = vm["nagents"].as(); + cmd_line_arg_set = true; + } + if (vm.count("ntargets")) { + targets = vm["ntargets"].as(); + cmd_line_arg_set = true; + } + if (vm.count("db-walker-wait-usecs")) { + n_db_walker_wait_usecs = vm["db-walker-wait-usecs"].as(); + cmd_line_arg_set = true; + } + + if (cmd_line_arg_set) { + n_instances.clear(); + n_instances.push_back(instances); + + n_routes.clear(); + n_routes.push_back(routes); + + n_targets.clear(); + n_targets.push_back(targets); + + n_agents.clear(); + n_agents.push_back(agents); + + xmpp_close_from_control_node.clear(); + xmpp_close_from_control_node.push_back(close_from_control_node); + } +} + +static vector GetInstanceParameters() { + process_command_line_args(gargc, gargv); + return n_instances; +} + +static vector GetAgentParameters() { + process_command_line_args(gargc, gargv); + return n_agents; +} + +static vector GetRouteParameters() { + process_command_line_args(gargc, gargv); + return n_routes; +} + +static vector GetTargetParameters() { + process_command_line_args(gargc, gargv); + return n_targets; +} + +class PeerCloseManagerTest : public PeerCloseManager { +public: + explicit PeerCloseManagerTest(IPeer *peer) : PeerCloseManager(peer) { } + ~PeerCloseManagerTest() { } + void StartStaleTimer() { } +}; + +class BgpXmppChannelManagerMock : public BgpXmppChannelManager { +public: + BgpXmppChannelManagerMock(XmppServer *x, BgpServer *b) : + BgpXmppChannelManager(x, b), channel_(NULL) { } + + virtual BgpXmppChannel *CreateChannel(XmppChannel *channel) { + channel_ = new BgpXmppChannel(channel, bgp_server_, this); + return channel_; + } + + BgpXmppChannel *channel_; +}; + +typedef std::tr1::tuple TestParams; + +class GracefulRestartTest : public ::testing::TestWithParam { + +public: + bool IsPeerCloseGraceful(bool graceful) { return graceful; } + void SetPeerCloseGraceful(bool graceful) { + xmpp_server_->GetIsPeerCloseGraceful_fnc_ = + boost::bind(&GracefulRestartTest::IsPeerCloseGraceful, this, + graceful); + } + +protected: + GracefulRestartTest() : thread_(&evm_) { } + + virtual void SetUp(); + virtual void TearDown(); + void AgentCleanup(); + void Configure(); + + XmppChannelConfig *CreateXmppChannelCfg(const char *address, int port, + const string &from, + const string &to, + bool isClient); + + void GracefulRestartTestStart(); + void GracefulRestartTestRun(); + string GetConfig(); + ExtCommunitySpec *CreateRouteTargets(); + void AddAgentsWithRoutes(const BgpInstanceConfig *instance_config); + void AddXmppPeersWithRoutes(); + void CreateAgents(); + void Subscribe(); + void UnSubscribe(); + test::NextHops GetNextHops(test::NetworkAgentMock *agent, int instance_id); + void AddOrDeleteXmppRoutes(bool add, int nroutes = -1, + int down_agents = -1); + void VerifyReceivedXmppRoutes(int routes); + void DeleteRoutingInstances(int count, + vector &dont_unsubscribe); + void DeleteRoutingInstances(vector instances, + vector &dont_unsubscribe); + void VerifyDeletedRoutingInstnaces(vector instances); + void VerifyRoutingInstances(); + void XmppPeerClose(int nagents = -1); + void CallStaleTimer(BgpXmppChannel *channel); + void InitParams(); + void VerifyRoutes(int count); + bool IsReady(bool ready); + void WaitForAgentToBeEstablished(test::NetworkAgentMock *agent); + + EventManager evm_; + ServerThread thread_; + boost::scoped_ptr server_; + XmppServerTest *xmpp_server_; + boost::scoped_ptr channel_manager_; + scoped_ptr master_cfg_; + RoutingInstance *master_instance_; + std::vector xmpp_agents_; + std::vector xmpp_peers_; + int n_families_; + std::vector familes_; + int n_instances_; + int n_routes_; + int n_agents_; + int n_targets_; + bool xmpp_close_from_control_node_; + + struct AgentTestParams { + AgentTestParams(test::NetworkAgentMock *agent, vector instance_ids, + vector nroutes, TcpSession::Event skip_tcp_event) { + initialize(agent, instance_ids, nroutes, skip_tcp_event); + } + + AgentTestParams(test::NetworkAgentMock *agent, vector instance_ids, + vector nroutes) { + initialize(agent, instance_ids, nroutes, TcpSession::EVENT_NONE); + } + + AgentTestParams(test::NetworkAgentMock *agent) { + initialize(agent, vector(), vector(), + TcpSession::EVENT_NONE); + } + + void initialize(test::NetworkAgentMock *agent, + vector instance_ids, vector nroutes, + TcpSession::Event skip_tcp_event) { + this->agent = agent; + this->instance_ids = instance_ids; + this->nroutes = nroutes; + this->skip_tcp_event = skip_tcp_event; + this->send_eor = true; + } + + test::NetworkAgentMock *agent; + vector instance_ids; + vector nroutes; + TcpSession::Event skip_tcp_event; + bool send_eor; + }; + void ProcessFlippingAgents(int &total_routes, int remaining_instances, + std::vector &n_flipping_agents); + std::vector n_flipped_agents_; + std::vector n_down_from_agents_; + std::vector instances_to_delete_before_gr_; + std::vector instances_to_delete_during_gr_; +}; + +void GracefulRestartTest::SetUp() { + server_.reset(new BgpServerTest(&evm_, "A")); + xmpp_server_ = new XmppServerTest(&evm_, XMPP_CONTROL_SERV); + + channel_manager_.reset(new BgpXmppChannelManagerMock( + xmpp_server_, server_.get())); + master_cfg_.reset(BgpTestUtil::CreateBgpInstanceConfig( + BgpConfigManager::kMasterInstance, "", "")); + master_instance_ = static_cast( + server_->routing_instance_mgr()->GetRoutingInstance( + BgpConfigManager::kMasterInstance)); + n_families_ = 2; + familes_.push_back(Address::INET); + familes_.push_back(Address::INETVPN); + + server_->session_manager()->Initialize(0); + xmpp_server_->Initialize(0, false); + thread_.Start(); +} + +void GracefulRestartTest::TearDown() { + task_util::WaitForIdle(); + SetPeerCloseGraceful(false); + XmppPeerClose(); + xmpp_server_->Shutdown(); + task_util::WaitForIdle(); + + VerifyRoutes(0); + VerifyReceivedXmppRoutes(0); + + if (n_agents_) { + TASK_UTIL_EXPECT_EQ(0, xmpp_server_->connection_map().size()); + } + AgentCleanup(); + TASK_UTIL_EXPECT_EQ(0, channel_manager_->channel_map().size()); + channel_manager_.reset(); + task_util::WaitForIdle(); + + TcpServerManager::DeleteServer(xmpp_server_); + xmpp_server_ = NULL; + server_->Shutdown(); + task_util::WaitForIdle(); + evm_.Shutdown(); + thread_.Join(); + task_util::WaitForIdle(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + delete agent; + } +} + +void GracefulRestartTest::Configure() { + server_->Configure(GetConfig().c_str()); + task_util::WaitForIdle(); + VerifyRoutingInstances(); +} + +XmppChannelConfig *GracefulRestartTest::CreateXmppChannelCfg( + const char *address, int port, const string &from, const string &to, + bool isClient) { + XmppChannelConfig *cfg = new XmppChannelConfig(isClient); + boost::system::error_code ec; + cfg->endpoint.address(ip::address::from_string(address, ec)); + cfg->endpoint.port(port); + cfg->ToAddr = to; + cfg->FromAddr = from; + if (!isClient) cfg->NodeAddr = PUBSUB_NODE_ADDR; + return cfg; +} + +void GracefulRestartTest::AgentCleanup() { + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + agent->Delete(); + } +} + +void GracefulRestartTest::VerifyRoutes(int count) { + for (int i = 0; i < n_families_; i++) { + BgpTable *tb = master_instance_->GetTable(familes_[i]); + if (count && n_agents_ && familes_[i] == Address::INETVPN) { + BGP_VERIFY_ROUTE_COUNT(tb, n_agents_ * n_instances_ * count); + } + } +} + +string GracefulRestartTest::GetConfig() { + ostringstream out; + + out << + "\ + \ + 192.168.0.1\ +
127.0.0.1
\ + " << server_->session_manager()->GetPort() << "\ + \ + \ + inet-vpn\ + e-vpn\ + erm-vpn\ + route-target\ + \ + \ +
\ + "; + + for (int i = 1; i <= n_instances_; i++) { + out << "\n"; + for (int j = 1; j <= n_targets_; j++) { + out << " target:1:" << j << "\n"; + } + out << "\n"; + } + + out << "
"; + + BGP_DEBUG_UT("Applying config" << out.str()); + + return out.str(); +} + +bool GracefulRestartTest::IsReady(bool ready) { + return ready; +} + +void GracefulRestartTest::WaitForAgentToBeEstablished( + test::NetworkAgentMock *agent) { + TASK_UTIL_EXPECT_EQ(true, agent->IsChannelReady()); + TASK_UTIL_EXPECT_EQ(true, agent->IsEstablished()); +} + +ExtCommunitySpec *GracefulRestartTest::CreateRouteTargets() { + auto_ptr commspec(new ExtCommunitySpec()); + + for (int i = 1; i <= n_targets_; i++) { + RouteTarget tgt = RouteTarget::FromString( + "target:1:" + boost::lexical_cast(i)); + const ExtCommunity::ExtCommunityValue &extcomm = + tgt.GetExtCommunity(); + uint64_t value = get_value(extcomm.data(), extcomm.size()); + commspec->communities.push_back(value); + } + + if (commspec->communities.empty()) return NULL; + return commspec.release(); +} + +void GracefulRestartTest::AddXmppPeersWithRoutes() { + if (!n_agents_) return; + + CreateAgents(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + WaitForAgentToBeEstablished(agent); + } + + task_util::WaitForIdle(); + Subscribe(); + VerifyReceivedXmppRoutes(0); + AddOrDeleteXmppRoutes(true); + task_util::WaitForIdle(); + VerifyReceivedXmppRoutes(n_instances_ * n_agents_ * n_routes_); +} + +void GracefulRestartTest::CreateAgents() { + Ip4Prefix prefix(Ip4Prefix::FromString("127.0.0.1/32")); + + for (int i = 0; i < n_agents_; i++) { + + // create an XMPP client in server A + test::NetworkAgentMock *agent = new test::NetworkAgentMock(&evm_, + "agent" + boost::lexical_cast(i) + + "@vnsw.contrailsystems.com", + xmpp_server_->GetPort(), + prefix.ip4_addr().to_string()); + agent->set_id(i); + xmpp_agents_.push_back(agent); + task_util::WaitForIdle(); + + TASK_UTIL_EXPECT_NE_MSG(static_cast(NULL), + channel_manager_->channel_, + "Waiting for channel_manager_->channel_ to be set"); + xmpp_peers_.push_back(channel_manager_->channel_); + channel_manager_->channel_ = NULL; + + prefix = task_util::Ip4PrefixIncrement(prefix); + } +} + +void GracefulRestartTest::Subscribe() { + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + agent->Subscribe(BgpConfigManager::kMasterInstance, -1); + for (int i = 1; i <= n_instances_; i++) { + string instance_name = "instance" + boost::lexical_cast(i); + agent->Subscribe(instance_name, i); + } + } + task_util::WaitForIdle(); +} + +void GracefulRestartTest::UnSubscribe() { + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + agent->Unsubscribe(BgpConfigManager::kMasterInstance); + for (int i = 1; i <= n_instances_; i++) { + string instance_name = "instance" + boost::lexical_cast(i); + agent->Unsubscribe(instance_name); + } + } + VerifyReceivedXmppRoutes(0); + task_util::WaitForIdle(); +} + +test::NextHops GracefulRestartTest::GetNextHops (test::NetworkAgentMock *agent, + int instance_id) { + test::NextHops nexthops; + nexthops.push_back(test::NextHop("100.100.100." + + boost::lexical_cast(agent->id()), + 10000 + instance_id)); + return nexthops; +} + +void GracefulRestartTest::AddOrDeleteXmppRoutes(bool add, int n_routes, + int down_agents) { + if (n_routes ==-1) + n_routes = n_routes_; + + if (down_agents == -1) + down_agents = n_agents_; + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + if (down_agents-- < 1) + continue; + + for (int i = 1; i <= n_instances_; i++) { + string instance_name = "instance" + boost::lexical_cast(i); + + Ip4Prefix prefix(Ip4Prefix::FromString( + "10." + boost::lexical_cast(i) + "." + + boost::lexical_cast(agent->id()) + ".1/32")); + for (int rt = 0; rt < n_routes; rt++, + prefix = task_util::Ip4PrefixIncrement(prefix)) { + if (add) { + agent->AddRoute(instance_name, prefix.ToString(), + GetNextHops(agent, i)); + } else { + agent->DeleteRoute(instance_name, prefix.ToString()); + } + } + } + } + task_util::WaitForIdle(); + // if (!add) VerifyReceivedXmppRoutes(0); +} + +void GracefulRestartTest::VerifyReceivedXmppRoutes(int routes) { + if (!n_agents_) return; + + int agent_id = 0; + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + agent_id++; + if (routes > 0 && !agent->IsEstablished()) + continue; + for (int i = 1; i <= n_instances_; i++) { + string instance_name = "instance" + boost::lexical_cast(i); + if (!agent->HasSubscribed(instance_name)) + continue; + TASK_UTIL_EXPECT_EQ_MSG(routes, agent->RouteCount(instance_name), + "Wait for routes in " + instance_name); + } + } + task_util::WaitForIdle(); +} + +void GracefulRestartTest::DeleteRoutingInstances(int count, + vector &dont_unsubscribe) { + if (!count) + return; + vector instances = vector(); + for (int i = 1; i <= count; i++) + instances.push_back(i); + DeleteRoutingInstances(instances, dont_unsubscribe); +} + +void GracefulRestartTest::DeleteRoutingInstances(vector instances, + vector &dont_unsubscribe) { + if (instances.empty()) + return; + + ostringstream out; + out << ""; + BOOST_FOREACH(int i, instances) { + out << "\n"; + for (int j = 1; j <= n_targets_; j++) { + out << " target:1:" << j << "\n"; + } + out << "\n"; + } + out << ""; + + server_->Configure(out.str().c_str()); + task_util::WaitForIdle(); + + // Unsubscribe from all agents who have subscribed + BOOST_FOREACH(int i, instances) { + string instance_name = "instance" + boost::lexical_cast(i); + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + if (!agent->IsEstablished() || !agent->HasSubscribed(instance_name)) + continue; + if (std::find(dont_unsubscribe.begin(), dont_unsubscribe.end(), + agent) == dont_unsubscribe.end()) + agent->Unsubscribe(instance_name); + } + } + task_util::WaitForIdle(); +} + +void GracefulRestartTest::VerifyDeletedRoutingInstnaces(vector instances) { + BOOST_FOREACH(int i, instances) { + string instance_name = "instance" + boost::lexical_cast(i); + TASK_UTIL_EXPECT_EQ(static_cast(NULL), + server_->routing_instance_mgr()->\ + GetRoutingInstance(instance_name)); + } + task_util::WaitForIdle(); +} + +void GracefulRestartTest::VerifyRoutingInstances() { + for (int i = 1; i <= n_instances_; i++) { + string instance_name = "instance" + boost::lexical_cast(i); + TASK_UTIL_EXPECT_NE(static_cast(NULL), + server_->routing_instance_mgr()->\ + GetRoutingInstance(instance_name)); + } + + // + // Verify 'default' master routing-instance + // + TASK_UTIL_EXPECT_NE(static_cast(NULL), + server_->routing_instance_mgr()->GetRoutingInstance( + BgpConfigManager::kMasterInstance)); +} + +void GracefulRestartTest::AddAgentsWithRoutes( + const BgpInstanceConfig *instance_config) { + Configure(); + SetPeerCloseGraceful(false); + AddXmppPeersWithRoutes(); +} + +// Invoke stale timer callbacks directly as evm is not running in this unit test +void GracefulRestartTest::CallStaleTimer(BgpXmppChannel *channel) { + channel->Peer()->peer_close()->close_manager()->RestartTimerCallback(); + task_util::WaitForIdle(); +} + +void GracefulRestartTest::XmppPeerClose(int nagents) { + if (nagents < 1) + nagents = xmpp_agents_.size(); + + int down_count = nagents; + if (xmpp_close_from_control_node_) { + BOOST_FOREACH(BgpXmppChannel *peer, xmpp_peers_) { + peer->Peer()->Close(); + if (!--down_count) + break; + } + } else { + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + agent->SessionDown(); + if (!--down_count) + break; + } + } + + down_count = nagents; + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + TASK_UTIL_EXPECT_EQ(down_count < 1, agent->IsEstablished()); + down_count--; + } +} + +void GracefulRestartTest::InitParams() { + n_instances_ = ::std::tr1::get<0>(GetParam()); + n_routes_ = ::std::tr1::get<1>(GetParam()); + n_agents_ = ::std::tr1::get<2>(GetParam()); + n_targets_ = ::std::tr1::get<3>(GetParam()); + xmpp_close_from_control_node_ = ::std::tr1::get<4>(GetParam()); +} + +// Bring up n_agents_ in n_instances_ and advertise +// n_routes_ (v4 and v6) in each connection +// Verify that n_agents_ * n_instances_ * n_routes_ routes are received in +// agent in each instance +// * Subset * picked serially/randomly +// Subset of agents support GR +// Subset of routing-instances are deleted +// Subset of agents go down permanently (Triggered from agents) +// Subset of agents flip (go down and come back up) (Triggered from agents) +// Subset of agents go down permanently (Triggered from control-node) +// Subset of agents flip (Triggered from control-node) +// Subset of subscriptions after restart +// Subset of routes are [re]advertised after restart +// Subset of routing-instances are deleted (during GR) +void GracefulRestartTest::GracefulRestartTestStart () { + InitParams(); + + // Bring up n_agents_ in n_instances_ and advertise n_routes_ per session + AddAgentsWithRoutes(master_cfg_.get()); + VerifyRoutes(n_routes_); +} + +void GracefulRestartTest::ProcessFlippingAgents(int &total_routes, + int remaining_instances, + vector &n_flipping_agents) { + int flipping_count = 3; + + for (int f = 0; f < flipping_count; f++) { + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipping_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + TASK_UTIL_EXPECT_EQ(false, agent->IsEstablished()); + agent->SessionUp(); + WaitForAgentToBeEstablished(agent); + } + + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipping_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + WaitForAgentToBeEstablished(agent); + + // Subset of subscriptions after restart + agent->Subscribe(BgpConfigManager::kMasterInstance, -1); + for (size_t i = 0; i < agent_test_param.instance_ids.size(); i++) { + int instance_id = agent_test_param.instance_ids[i]; + if (std::find(instances_to_delete_before_gr_.begin(), + instances_to_delete_before_gr_.end(), instance_id) != + instances_to_delete_before_gr_.end()) + continue; + if (std::find(instances_to_delete_during_gr_.begin(), + instances_to_delete_during_gr_.end(), instance_id) != + instances_to_delete_during_gr_.end()) + continue; + string instance_name = "instance" + + boost::lexical_cast(instance_id); + agent->Subscribe(instance_name, instance_id); + + // Subset of routes are [re]advertised after restart + Ip4Prefix prefix(Ip4Prefix::FromString( + "10." + boost::lexical_cast(instance_id) + "." + + boost::lexical_cast(agent->id()) + ".1/32")); + int nroutes = agent_test_param.nroutes[i]; + for (int rt = 0; rt < nroutes; rt++, + prefix = task_util::Ip4PrefixIncrement(prefix)) { + agent->AddRoute(instance_name, prefix.ToString(), + GetNextHops(agent, instance_id)); + } + total_routes += nroutes; + } + } + + // Bring back half of the flipping agents to established state and send + // routes. Rest do not come back up (nested closures and LLGR) + int count = n_flipping_agents.size(); + if (f == flipping_count - 1) + count /= 2; + int k = 0; + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipping_agents) { + if (k++ >= count) + break; + test::NetworkAgentMock *agent = agent_test_param.agent; + WaitForAgentToBeEstablished(agent); + XmppStateMachineTest::set_skip_tcp_event( + agent_test_param.skip_tcp_event); + agent->SessionDown(); + TASK_UTIL_EXPECT_EQ(false, agent->IsEstablished()); + TASK_UTIL_EXPECT_EQ(TcpSession::EVENT_NONE, + XmppStateMachineTest::get_skip_tcp_event()); + for (size_t i = 0; i < agent_test_param.instance_ids.size(); i++) { + int instance_id = agent_test_param.instance_ids[i]; + if (std::find(instances_to_delete_before_gr_.begin(), + instances_to_delete_before_gr_.end(), instance_id) != + instances_to_delete_before_gr_.end()) + continue; + if (std::find(instances_to_delete_during_gr_.begin(), + instances_to_delete_during_gr_.end(), instance_id) != + instances_to_delete_during_gr_.end()) + continue; + int nroutes = agent_test_param.nroutes[i]; + total_routes -= nroutes; + } + } + } + + // Send EoR marker or trigger GR timer for agents which came back up and + // sent desired routes. + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipping_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + if (agent_test_param.send_eor && agent->IsEstablished()) { + agent->SendEorMarker(); + } else { + PeerCloseManager *pc = + xmpp_peers_[agent->id()]->Peer()->peer_close()->close_manager(); + + // If the session is down and TCP down event was meant to be skipped + // then we do not expect control-node to be unaware of it. Hold + // timer must have expired by then. Trigger the hold-timer expiry + // first in order to bring the peer down in the controller and then + // call the GR timer callback. + if (!agent->IsEstablished()) { + if (agent_test_param.skip_tcp_event != TcpSession::EVENT_NONE) { + uint64_t stale = pc->stats().stale; + const XmppStateMachine *sm = xmpp_peers_[ + agent->id()]->channel()->connection()->state_machine(); + const_cast(sm)->HoldTimerExpired(); + TASK_UTIL_EXPECT_EQ(stale + 1, pc->stats().stale); + } + TASK_UTIL_EXPECT_EQ(false, xmpp_peers_[ + agent->id()]->Peer()->IsReady()); + TASK_UTIL_EXPECT_EQ(PeerCloseManager::GR_TIMER, pc->state()); + } + CallStaleTimer(xmpp_peers_[agent->id()]); + } + } + task_util::WaitForIdle(); +} + +void GracefulRestartTest::GracefulRestartTestRun () { + int total_routes = n_instances_ * n_agents_ * n_routes_; + + // Verify that n_agents_ * n_instances_ * n_routes_ routes are received in + // agent in each instance + VerifyReceivedXmppRoutes(total_routes); + + // TODO Only a subset of agents support GR + // BOOST_FOREACH(test::NetworkAgentMock *agent, n_gr_supported_agents) + SetPeerCloseGraceful(true); + + + vector dont_unsubscribe = + vector(); + + DeleteRoutingInstances(instances_to_delete_before_gr_, dont_unsubscribe); + int remaining_instances = n_instances_; + remaining_instances -= instances_to_delete_before_gr_.size(); + total_routes -= n_routes_ * n_agents_ * + instances_to_delete_before_gr_.size(); + + // Subset of agents go down permanently (Triggered from agents) + BOOST_FOREACH(test::NetworkAgentMock *agent, n_down_from_agents_) { + WaitForAgentToBeEstablished(agent); + agent->SessionDown(); + TASK_UTIL_EXPECT_EQ(false, agent->IsEstablished()); + total_routes -= remaining_instances * n_routes_; + } + + // Divide flipped agents into two parts. Agents in the first part flip + // once and come back up (normal GR). Those in the second part keep + // flipping. Eventually half the second part come back to normal up state. + // Rest (1/4th overall) remain down triggering LLGR during the whole time. + vector n_flipped_agents = vector(); + vector n_flipping_agents = vector(); + for (size_t i = 0; i < n_flipped_agents_.size(); i++) { + if (i < n_flipped_agents_.size()/2) + n_flipped_agents.push_back(n_flipped_agents_[i]); + else + n_flipping_agents.push_back(n_flipped_agents_[i]); + } + + // Subset of agents flip (Triggered from agents) + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipped_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + WaitForAgentToBeEstablished(agent); + XmppStateMachineTest::set_skip_tcp_event( + agent_test_param.skip_tcp_event); + agent->SessionDown(); + dont_unsubscribe.push_back(agent); + TASK_UTIL_EXPECT_EQ(false, agent->IsEstablished()); + TASK_UTIL_EXPECT_EQ(TcpSession::EVENT_NONE, + XmppStateMachineTest::get_skip_tcp_event()); + total_routes -= remaining_instances * n_routes_; + } + + // Subset of agents flip (Triggered from agents) + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipping_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + WaitForAgentToBeEstablished(agent); + XmppStateMachineTest::set_skip_tcp_event( + agent_test_param.skip_tcp_event); + agent->SessionDown(); + dont_unsubscribe.push_back(agent); + TASK_UTIL_EXPECT_EQ(false, agent->IsEstablished()); + TASK_UTIL_EXPECT_EQ(TcpSession::EVENT_NONE, + XmppStateMachineTest::get_skip_tcp_event()); + total_routes -= remaining_instances * n_routes_; + } + + // Delete some of the routing-instances when the agent is still down. + // It is expected that agents upon restart only subscribe to those that + // were not deleted. + DeleteRoutingInstances(instances_to_delete_during_gr_, dont_unsubscribe); + + // Account for agents (which do not flip) who usubscribe explicitly + total_routes -= n_routes_ * + (n_agents_ - n_flipped_agents.size() - n_flipping_agents.size() - + n_down_from_agents_.size()) * instances_to_delete_during_gr_.size(); + + XmppStateMachineTest::set_skip_tcp_event(TcpSession::EVENT_NONE); + + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipped_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + TASK_UTIL_EXPECT_EQ(false, agent->IsEstablished()); + agent->SessionUp(); + WaitForAgentToBeEstablished(agent); + } + + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipped_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + WaitForAgentToBeEstablished(agent); + + // Subset of subscriptions after restart + agent->Subscribe(BgpConfigManager::kMasterInstance, -1); + for (size_t i = 0; i < agent_test_param.instance_ids.size(); i++) { + int instance_id = agent_test_param.instance_ids[i]; + if (std::find(instances_to_delete_before_gr_.begin(), + instances_to_delete_before_gr_.end(), instance_id) != + instances_to_delete_before_gr_.end()) + continue; + if (std::find(instances_to_delete_during_gr_.begin(), + instances_to_delete_during_gr_.end(), instance_id) != + instances_to_delete_during_gr_.end()) + continue; + string instance_name = "instance" + + boost::lexical_cast(instance_id); + agent->Subscribe(instance_name, instance_id); + + // Subset of routes are [re]advertised after restart + Ip4Prefix prefix(Ip4Prefix::FromString( + "10." + boost::lexical_cast(instance_id) + "." + + boost::lexical_cast(agent->id()) + ".1/32")); + int nroutes = agent_test_param.nroutes[i]; + for (int rt = 0; rt < nroutes; rt++, + prefix = task_util::Ip4PrefixIncrement(prefix)) { + agent->AddRoute(instance_name, prefix.ToString(), + GetNextHops(agent, instance_id)); + } + total_routes += nroutes; + } + } + + // Send EoR marker or trigger GR timer for agents which came back up and + // sent desired routes. + BOOST_FOREACH(AgentTestParams agent_test_param, n_flipped_agents) { + test::NetworkAgentMock *agent = agent_test_param.agent; + if (agent_test_param.send_eor) + agent->SendEorMarker(); + else + CallStaleTimer(xmpp_peers_[agent->id()]); + } + + + // Process agents which keep flipping and trigger LLGR.. + ProcessFlippingAgents(total_routes, remaining_instances, n_flipping_agents); + + // Trigger GR timer for agents which went down permanently. + BOOST_FOREACH(test::NetworkAgentMock *agent, n_down_from_agents_) { + CallStaleTimer(xmpp_peers_[agent->id()]); + } + VerifyReceivedXmppRoutes(total_routes); + VerifyDeletedRoutingInstnaces(instances_to_delete_before_gr_); + VerifyDeletedRoutingInstnaces(instances_to_delete_during_gr_); +} + +// None of the agents goes down or flip +TEST_P(GracefulRestartTest, GracefulRestart_Down_1) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + GracefulRestartTestRun(); +} + +// All agents go down permanently +TEST_P(GracefulRestartTest, GracefulRestart_Down_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + n_down_from_agents_ = xmpp_agents_; + GracefulRestartTestRun(); +} + +// Some agents go down permanently +TEST_P(GracefulRestartTest, GracefulRestart_Down_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) + n_down_from_agents_.push_back(xmpp_agents_[i]); + GracefulRestartTestRun(); +} + +// Some agents go down permanently and some flip (which sends no routes) +TEST_P(GracefulRestartTest, GracefulRestart_Down_4) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size(); i++) { + if (i <= xmpp_agents_.size()/2) + n_down_from_agents_.push_back(xmpp_agents_[i]); + else + n_flipped_agents_.push_back(AgentTestParams(xmpp_agents_[i])); + } + GracefulRestartTestRun(); +} + +// All agents come back up but do not subscribe to any instance +TEST_P(GracefulRestartTest, GracefulRestart_Flap_1) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + n_flipped_agents_.push_back(AgentTestParams(agent)); + } + GracefulRestartTestRun(); +} + +// All agents come back up and subscribe to all instances and sends all routes +// Agent session tcp down event is detected at the server +TEST_P(GracefulRestartTest, GracefulRestart_Flap_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + vector instance_ids = vector(); + vector nroutes = vector(); + for (int i = 1; i <= n_instances_; i++) { + instance_ids.push_back(i); + nroutes.push_back(n_routes_); + } + + // Trigger the case of compute-node hard reset where in tcp fin event + // never reaches control-node + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + GracefulRestartTestRun(); +} + +// All agents come back up and subscribe to all instances but sends no routes +// Agent session tcp down event is detected at the server +TEST_P(GracefulRestartTest, GracefulRestart_Flap_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + vector instance_ids = vector(); + vector nroutes = vector(); + for (int i = 1; i <= n_instances_; i++) { + instance_ids.push_back(i); + nroutes.push_back(0); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + GracefulRestartTestRun(); +} + +// All agents come back up and subscribe to all instances and sends some routes +// Agent session tcp down event is detected at the server +TEST_P(GracefulRestartTest, GracefulRestart_Flap_4) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + vector instance_ids = vector(); + vector nroutes = vector(); + for (int i = 1; i <= n_instances_; i++) { + instance_ids.push_back(i); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + GracefulRestartTestRun(); +} + +// All agents come back up and subscribe to all instances and sends all routes +// Agent session tcp down event is not detected at the server +TEST_P(GracefulRestartTest, GracefulRestart_Flap_5) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + vector instance_ids = vector(); + vector nroutes = vector(); + for (int i = 1; i <= n_instances_; i++) { + instance_ids.push_back(i); + nroutes.push_back(n_routes_); + } + + // Trigger the case of compute-node hard reset where in tcp fin event + // never reaches control-node + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes, + TcpSession::CLOSE)); + } + GracefulRestartTestRun(); +} + +// All agents come back up and subscribe to all instances but sends no routes +// Agent session tcp down event is not detected at the server +TEST_P(GracefulRestartTest, GracefulRestart_Flap_6) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + vector instance_ids = vector(); + vector nroutes = vector(); + for (int i = 1; i <= n_instances_; i++) { + instance_ids.push_back(i); + nroutes.push_back(0); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes, + TcpSession::CLOSE)); + } + GracefulRestartTestRun(); +} + +// All agents come back up and subscribe to all instances and sends some routes +// Agent session tcp down event is not detected at the server +TEST_P(GracefulRestartTest, GracefulRestart_Flap_7) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + BOOST_FOREACH(test::NetworkAgentMock *agent, xmpp_agents_) { + vector instance_ids = vector(); + vector nroutes = vector(); + for (int i = 1; i <= n_instances_; i++) { + instance_ids.push_back(i); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes, + TcpSession::CLOSE)); + } + GracefulRestartTestRun(); +} + + +// Some agents come back up but do not subscribe to any instance +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_1) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + n_flipped_agents_.push_back(AgentTestParams(agent)); + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances and sends all routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + // All flipped agents send EoR. + n_flipped_agents_[i].send_eor = true; + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances and sends all routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_2_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + // None of the flipped agents sends EoR. + n_flipped_agents_[i].send_eor = false; + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances and sends all routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_2_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + // Only even flipped agents send EoR. + n_flipped_agents_[i].send_eor = ((i%2) == 0); + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances but sends no routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(0); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + // All flipped agents send EoR. + n_flipped_agents_[i].send_eor = true; + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances but sends no routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_3_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(0); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + // None of the flipped agents sends EoR. + n_flipped_agents_[i].send_eor = false; + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances but sends no routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_3_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(0); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + // Only even flipped agents send EoR. + n_flipped_agents_[i].send_eor = ((i%2) == 0); + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances and sends some routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_4) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + + // All flipped agents send EoR. + n_flipped_agents_[i].send_eor = true; + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances and sends some routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_4_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + + // None of the flipped agents sends EoR. + n_flipped_agents_[i].send_eor = false; + } + GracefulRestartTestRun(); +} + +// Some agents come back up and subscribe to all instances and sends some routes +TEST_P(GracefulRestartTest, GracefulRestart_Flap_Some_4_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (size_t i = 0; i < xmpp_agents_.size()/2; i++) { + test::NetworkAgentMock *agent = xmpp_agents_[i]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + + // Only even flipped agents send EoR. + n_flipped_agents_[i].send_eor = ((i%2) == 0); + } + GracefulRestartTestRun(); +} + +// Some routing instances are first deleted. Subscribed agents remain up and +// running.. This is the common case which happens most of the time during +// normal functioning of the software. +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_1) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + GracefulRestartTestRun(); +} + +// Some routing instances are deleted. Then some of the agents permanently go +// down and they do not come back up (GR is triggered and should get cleaned up +// when the GR timer fires) +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_2) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + + for (size_t i = 1; i <= xmpp_agents_.size(); i++) { + + // agents from 2nd half remain up through out this test + if (i > xmpp_agents_.size()/2) + continue; + + // agents from 1st quarter go down permantently + if (i <= xmpp_agents_.size()/4) { + n_down_from_agents_.push_back(xmpp_agents_[i-1]); + continue; + } + } + GracefulRestartTestRun(); +} + +// Some routing instances are deleted. Then some of the agents permanently go +// down and they do not come back up (GR is triggered and should get cleaned up +// when the GR timer fires). During this GR, additional instances are deleted +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_3) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + + for (size_t i = 1; i <= xmpp_agents_.size(); i++) { + + // agents from 2nd half remain up through out this test + if (i > xmpp_agents_.size()/2) + continue; + + // agents from 1st quarter go down permantently + if (i <= xmpp_agents_.size()/4) { + n_down_from_agents_.push_back(xmpp_agents_[i-1]); + continue; + } + } + for (int i = n_instances_/4 + 1; i <= n_instances_/2; i++) + instances_to_delete_during_gr_.push_back(i); + GracefulRestartTestRun(); +} + +// Some routing instances are deleted. Then some of the agents permanently go +// down and they do not come back up (GR is triggered and should get cleaned up +// when the GR timer fires). Some of the other agents go down and then come +// back up advertising "all" the routes again. +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_4) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + + for (size_t i = 1; i <= xmpp_agents_.size(); i++) { + + // agents from 2nd half remain up through out this test + if (i > xmpp_agents_.size()/2) + continue; + + // agents from 1st quarter go down permantently + if (i <= xmpp_agents_.size()/4) { + n_down_from_agents_.push_back(xmpp_agents_[i-1]); + continue; + } + + // agents from 2nd quarter flip with gr + test::NetworkAgentMock *agent = xmpp_agents_[i-1]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + GracefulRestartTestRun(); +} + +// Some routing instances are deleted. Then some of the agents permanently go +// down and they do not come back up (GR is triggered and should get cleaned up +// when the GR timer fires). Some of the other agents go down and then come +// back up advertising "all" the routes again. During this GR, additional +// instances are deleted +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_5) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + + for (size_t i = 1; i <= xmpp_agents_.size(); i++) { + + // agents from 2nd half remain up through out this test + if (i > xmpp_agents_.size()/2) + continue; + + // agents from 1st quarter go down permantently + if (i <= xmpp_agents_.size()/4) { + n_down_from_agents_.push_back(xmpp_agents_[i-1]); + continue; + } + + // agents from 2nd quarter flip with gr + test::NetworkAgentMock *agent = xmpp_agents_[i-1]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + for (int i = n_instances_/4 + 1; i <= n_instances_/2; i++) + instances_to_delete_during_gr_.push_back(i); + GracefulRestartTestRun(); +} + +// Some routing instances are deleted. Then some of the agents permanently go +// down and they do not come back up (GR is triggered and should get cleaned up +// when the GR timer fires). Some of the other agents go down and then come +// back up advertising some of the routes again (not all). During this GR, +// additional instances are deleted +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_6) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + + for (size_t i = 1; i <= xmpp_agents_.size(); i++) { + + // agents from 2nd half remain up through out this test + if (i > xmpp_agents_.size()/2) + continue; + + // agents from 1st quarter go down permantently + if (i <= xmpp_agents_.size()/4) { + n_down_from_agents_.push_back(xmpp_agents_[i-1]); + continue; + } + + // agents from 2nd quarter flip with gr + test::NetworkAgentMock *agent = xmpp_agents_[i-1]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + GracefulRestartTestRun(); +} + +// Some routing instances are deleted. Then some of the agents permanently go +// down and they do not come back up (GR is triggered and should get cleaned up +// when the GR timer fires). Some of the other agents go down and then come +// back up advertising some of the routes again (not all). During this GR, +// additional instances are deleted +TEST_P(GracefulRestartTest, GracefulRestart_Delete_RoutingInstances_7) { + SCOPED_TRACE(__FUNCTION__); + GracefulRestartTestStart(); + + for (int i = 1; i <= n_instances_/4; i++) + instances_to_delete_before_gr_.push_back(i); + + for (size_t i = 1; i <= xmpp_agents_.size(); i++) { + + // agents from 2nd half remain up through out this test + if (i > xmpp_agents_.size()/2) + continue; + + // agents from 1st quarter go down permantently + if (i <= xmpp_agents_.size()/4) { + n_down_from_agents_.push_back(xmpp_agents_[i-1]); + continue; + } + + // agents from 2nd quarter flip with gr + test::NetworkAgentMock *agent = xmpp_agents_[i-1]; + vector instance_ids = vector(); + vector nroutes = vector(); + for (int j = 1; j <= n_instances_; j++) { + instance_ids.push_back(j); + nroutes.push_back(n_routes_/2); + } + n_flipped_agents_.push_back(AgentTestParams(agent, instance_ids, + nroutes)); + } + for (int i = n_instances_/4 + 1; i <= n_instances_/2; i++) + instances_to_delete_during_gr_.push_back(i); + GracefulRestartTestRun(); +} + +#define COMBINE_PARAMS \ + Combine(ValuesIn(GetInstanceParameters()), \ + ValuesIn(GetRouteParameters()), \ + ValuesIn(GetAgentParameters()), \ + ValuesIn(GetTargetParameters()), \ + ValuesIn(xmpp_close_from_control_node)) + +INSTANTIATE_TEST_CASE_P(GracefulRestartTestWithParams, GracefulRestartTest, + COMBINE_PARAMS); + +class TestEnvironment : public ::testing::Environment { + virtual ~TestEnvironment() { } +}; + +static void SetUp() { + ControlNode::SetDefaultSchedulingPolicy(); + BgpServerTest::GlobalSetUp(); + BgpObjectFactory::Register( + boost::factory()); + BgpObjectFactory::Register( + boost::factory()); + XmppObjectFactory::Register( + boost::factory()); +} + +static void TearDown() { + TaskScheduler *scheduler = TaskScheduler::GetInstance(); + scheduler->Terminate(); +} + +int main(int argc, char **argv) { + gargc = argc; + gargv = argv; + + bgp_log_test::init(); + ::testing::InitGoogleTest(&gargc, gargv); + ::testing::AddGlobalTestEnvironment(new TestEnvironment()); + SetUp(); + int result = RUN_ALL_TESTS(); + TearDown(); + return result; +} diff --git a/src/control-node/test/network_agent_mock.cc b/src/control-node/test/network_agent_mock.cc index 0da64602d0a..aaae1c8a065 100644 --- a/src/control-node/test/network_agent_mock.cc +++ b/src/control-node/test/network_agent_mock.cc @@ -348,6 +348,19 @@ pugi::xml_document *XmppDocumentMock::SubUnsubXmlDoc( return xdoc_.get(); } +/* + * Empty publish and collection nodes constitute eor marker. + */ +pugi::xml_document *XmppDocumentMock::AddEorMarker() { + xdoc_->reset(); + xml_node pubsub = PubSubHeader(kNetworkServiceJID); + pubsub.append_child("publish"); + + pubsub = PubSubHeader(kNetworkServiceJID); + pubsub.append_child("collection"); + return xdoc_.get(); +} + pugi::xml_document *XmppDocumentMock::RouteAddDeleteXmlDoc( const std::string &network, const std::string &prefix, bool add, const NextHops &nexthops, const RouteAttributes &attributes) { @@ -766,7 +779,7 @@ NetworkAgentMock::NetworkAgentMock(EventManager *evm, const string &hostname, boost::bind(&NetworkAgentMock::ProcessRequest, this, _1)), server_address_(server_address), local_address_(local_address), server_port_(server_port), skip_updates_processing_(false), down_(false), - xmpp_auth_enabled_(xmpp_auth_enabled) { + xmpp_auth_enabled_(xmpp_auth_enabled), id_(0) { // Static initialization of NetworkAgentMock class. Initialize(); @@ -943,11 +956,12 @@ bool NetworkAgentMock::ProcessRequest(Request *request) { case IS_ESTABLISHED: request->result = IsSessionEstablished(); break; + case IS_CHANNEL_READY: + request->result = IsReady(); + break; } - // // Notify waiting caller with the result - // tbb::mutex::scoped_lock lock(work_mutex_); cond_var_.notify_all(); return true; @@ -966,14 +980,36 @@ bool NetworkAgentMock::IsEstablished() { request.type = IS_ESTABLISHED; work_queue_.Enqueue(&request); - // // Wait for the request to get processed. - // cond_var_.wait(lock); return request.result; } +bool NetworkAgentMock::IsChannelReady() { + AgentPeer *peer = GetAgent(); + return (peer != NULL && peer->channel() != NULL && + peer->channel()->GetPeerState() == xmps::READY); +} + +bool NetworkAgentMock::IsReady() { + tbb::interface5::unique_lock lock(work_mutex_); + + Request request; + request.type = IS_CHANNEL_READY; + work_queue_.Enqueue(&request); + + // Wait for the request to get processed. + cond_var_.wait(lock); + + return request.result; +} + +void NetworkAgentMock::SendEorMarker() { + AgentPeer *peer = GetAgent(); + peer->SendDocument(impl_->AddEorMarker()); +} + void NetworkAgentMock::AddRoute(const string &network_name, const string &prefix, const string nexthop, int local_pref, int med) { @@ -1408,6 +1444,10 @@ int NetworkAgentMock::RouteCount() const { return route_mgr_->Count(); } +bool NetworkAgentMock::HasSubscribed(const std::string &network) const { + return route_mgr_->HasSubscribed(network); +} + // Return number of nexthops associated with a given route int NetworkAgentMock::RouteNextHopCount(const std::string &network, const std::string &prefix) { diff --git a/src/control-node/test/network_agent_mock.h b/src/control-node/test/network_agent_mock.h index 80be3129470..712c5bac0cf 100644 --- a/src/control-node/test/network_agent_mock.h +++ b/src/control-node/test/network_agent_mock.h @@ -198,6 +198,7 @@ class XmppDocumentMock { static const char *kPubSubNS; XmppDocumentMock(const std::string &hostname); + pugi::xml_document *AddEorMarker(); pugi::xml_document *RouteAddXmlDoc(const std::string &network, const std::string &prefix, const NextHops &nexthops = NextHops(), @@ -382,6 +383,7 @@ class NetworkAgentMock { int RouteCount(const std::string &network) const; int RouteCount() const; + bool HasSubscribed(const std::string &network) const; int RouteNextHopCount(const std::string &network, const std::string &prefix); const RouteEntry *RouteLookup(const std::string &network, @@ -404,6 +406,7 @@ class NetworkAgentMock { return inet6_route_mgr_->Lookup(network, prefix); } + void SendEorMarker(); void AddRoute(const std::string &network, const std::string &prefix, const std::string nexthop = "", int local_pref = 0, int med = 0); @@ -484,6 +487,8 @@ class NetworkAgentMock { bool IsEstablished(); bool IsSessionEstablished(); + bool IsReady(); + bool IsChannelReady(); void ClearInstances(); const std::string &hostname() const { return impl_->hostname(); } @@ -491,6 +496,8 @@ class NetworkAgentMock { const std::string ToString() const; void set_localaddr(const std::string &addr) { impl_->set_localaddr(addr); } XmppDocumentMock *GetXmlHandler() { return impl_.get(); } + void set_id (int id) { id_ = id; } + const int id() const { return id_; } XmppClient *client() { return client_; } void Delete(); @@ -502,6 +509,7 @@ class NetworkAgentMock { enum RequestType { IS_ESTABLISHED, + IS_CHANNEL_READY, }; struct Request { RequestType type; @@ -545,6 +553,7 @@ class NetworkAgentMock { tbb::interface5::condition_variable cond_var_; bool xmpp_auth_enabled_; + int id_; }; typedef boost::shared_ptr NetworkAgentMockPtr; diff --git a/src/xmpp/xmpp_connection.cc b/src/xmpp/xmpp_connection.cc index eecde94c3da..e46a2761375 100644 --- a/src/xmpp/xmpp_connection.cc +++ b/src/xmpp/xmpp_connection.cc @@ -627,7 +627,7 @@ class XmppServerConnection::DeleteActor : public LifetimeActor { if (parent_->session() || server_->IsPeerCloseGraceful()) { server_->NotifyConnectionEvent(parent_->ChannelMux(), - xmps::NOT_READY); + xmps::NOT_READY); } if (parent_->logUVE()) { @@ -675,23 +675,6 @@ XmppServerConnection::~XmppServerConnection() { server()->RemoveDeletedConnection(this); } -bool XmppServerConnection::EndpointNameIsUnique() { - // Bail if we've been deleted. - if (IsDeleted()) - return false; - - // Nothing to check if we already have a XmppConnectionEndpoint. - if (conn_endpoint_) - return true; - - // Associate with a XmppConnectionEndpoint and handle the case where we - // already have another XmppConnection from the same Endpoint. Note that - // the XmppConnection is not marked duplicate since it's already on the - // ConnectionMap. - conn_endpoint_ = server()->LocateConnectionEndpoint(this); - return (conn_endpoint_ ? true : false); -} - void XmppServerConnection::ManagedDelete() { XMPP_UTDEBUG(XmppConnectionDelete, "Managed server connection delete", FromString(), ToString()); @@ -742,9 +725,12 @@ uint32_t XmppServerConnection::flap_count() const { } void XmppServerConnection::increment_flap_count() { - if (!conn_endpoint_) + XmppConnectionEndpoint *conn_endpoint = conn_endpoint_; + if (!conn_endpoint) + conn_endpoint = server()->FindConnectionEndpoint(this); + if (!conn_endpoint) return; - conn_endpoint_->increment_flap_count(); + conn_endpoint->increment_flap_count(); if (!logUVE()) return; @@ -752,8 +738,8 @@ void XmppServerConnection::increment_flap_count() { XmppPeerInfoData peer_info; peer_info.set_name(ToUVEKey()); PeerFlapInfo flap_info; - flap_info.set_flap_count(conn_endpoint_->flap_count()); - flap_info.set_flap_time(conn_endpoint_->last_flap()); + flap_info.set_flap_count(conn_endpoint->flap_count()); + flap_info.set_flap_time(conn_endpoint->last_flap()); peer_info.set_flap_info(flap_info); XMPPPeerInfo::Send(peer_info); } @@ -925,6 +911,10 @@ XmppConnection *XmppConnectionEndpoint::connection() { return connection_; } +const XmppConnection *XmppConnectionEndpoint::connection() const { + return connection_; +} + void XmppConnectionEndpoint::set_connection(XmppConnection *connection) { assert(!connection_); connection_ = connection; diff --git a/src/xmpp/xmpp_connection.h b/src/xmpp/xmpp_connection.h index 135de092dc4..22d1730bf15 100644 --- a/src/xmpp/xmpp_connection.h +++ b/src/xmpp/xmpp_connection.h @@ -56,7 +56,6 @@ class XmppConnection { // Invoked from XmppServer when a session is accepted. virtual bool AcceptSession(XmppSession *session); virtual void ReceiveMsg(XmppSession *session, const std::string &); - virtual bool EndpointNameIsUnique() { return true; } virtual boost::asio::ip::tcp::endpoint endpoint() const; virtual boost::asio::ip::tcp::endpoint local_endpoint() const; @@ -204,6 +203,15 @@ class XmppConnection { bool disable_read() const { return disable_read_; } void set_disable_read(bool disable_read) { disable_read_ = disable_read; } XmppStateMachine *state_machine(); + const XmppStateMachine *state_machine() const; + + void set_state_machine(XmppStateMachine *state_machine) { + state_machine_.reset(state_machine); + } + + void SwapXmppStateMachine(XmppConnection *other) { + state_machine_.swap(other->state_machine_); + } void inc_connect_error(); void inc_session_close(); @@ -225,7 +233,6 @@ class XmppConnection { protected: TcpServer *server_; XmppSession *session_; - const XmppStateMachine *state_machine() const; const XmppChannelMux *channel_mux() const; private: @@ -267,7 +274,6 @@ class XmppServerConnection : public XmppConnection { virtual ~XmppServerConnection(); virtual bool IsClient() const; - virtual bool EndpointNameIsUnique(); virtual void ManagedDelete(); virtual void RetryDelete(); virtual LifetimeActor *deleter(); @@ -288,6 +294,9 @@ class XmppServerConnection : public XmppConnection { void clear_on_work_queue() { on_work_queue_ = false; } XmppConnectionEndpoint *conn_endpoint() { return conn_endpoint_; } + void set_conn_endpoint(XmppConnectionEndpoint *conn_endpoint) { + conn_endpoint = conn_endpoint; + } void FillShowInfo(ShowXmppConnection *show_connection) const; private: @@ -337,6 +346,7 @@ class XmppConnectionEndpoint { uint64_t last_flap() const; const std::string last_flap_at() const; XmppConnection *connection(); + const XmppConnection *connection() const; void set_connection(XmppConnection *connection); void reset_connection(); diff --git a/src/xmpp/xmpp_server.cc b/src/xmpp/xmpp_server.cc index a7e5698a60b..ef2b2eb5f87 100644 --- a/src/xmpp/xmpp_server.cc +++ b/src/xmpp/xmpp_server.cc @@ -485,42 +485,50 @@ void XmppServer::RemoveDeletedConnection(XmppServerConnection *connection) { ConnectionSet::iterator it = deleted_connection_set_.find(connection); assert(it != deleted_connection_set_.end()); deleted_connection_set_.erase(it); + ReleaseConnectionEndpoint(connection); } -const XmppConnectionEndpoint *XmppServer::FindConnectionEndpoint( - const string &endpoint_name) const { +XmppConnectionEndpoint *XmppServer::FindConnectionEndpoint( + const string &endpoint_name) { + tbb::mutex::scoped_lock lock(endpoint_map_mutex_); ConnectionEndpointMap::const_iterator loc = connection_endpoint_map_.find(endpoint_name); return (loc != connection_endpoint_map_.end() ? loc->second : NULL); } -// -// Find or create an XmppConnectionEndpoint for the Endpoint of the given -// XmppConnnection. If XmppConnectionEndpoint is already associated with a -// XmppConnection, return NULL to indicate that the XmppConnection should -// be terminated. Otherwise, associate it with the given XmppConnection. -// -XmppConnectionEndpoint *XmppServer::LocateConnectionEndpoint( +XmppConnectionEndpoint *XmppServer::FindConnectionEndpoint( XmppServerConnection *connection) { + if (!connection) + return NULL; tbb::mutex::scoped_lock lock(endpoint_map_mutex_); - const string &endpoint_name = connection->ToString(); - XmppConnectionEndpoint *conn_endpoint; - ConnectionEndpointMap::iterator loc = - connection_endpoint_map_.find(endpoint_name); - if (loc == connection_endpoint_map_.end()) { - conn_endpoint = new XmppConnectionEndpoint(endpoint_name); - bool result; - tie(loc, result) = connection_endpoint_map_.insert( - make_pair(endpoint_name, conn_endpoint)); - assert(result); - } else { - conn_endpoint = loc->second; - } + ConnectionEndpointMap::const_iterator loc = + connection_endpoint_map_.find(connection->ToString()); + return (loc != connection_endpoint_map_.end() ? loc->second : NULL); +} - if (conn_endpoint->connection()) +XmppConnectionEndpoint *XmppServer::LocateConnectionEndpoint( + XmppServerConnection *connection, bool &created) { + created = false; + if (!connection) return NULL; + + tbb::mutex::scoped_lock lock(endpoint_map_mutex_); + + ConnectionEndpointMap::const_iterator loc = + connection_endpoint_map_.find(connection->ToString()); + if (loc != connection_endpoint_map_.end()) + return loc->second; + + created = true; + XmppConnectionEndpoint *conn_endpoint = + new XmppConnectionEndpoint(connection->ToString()); + bool result; + tie(loc, result) = connection_endpoint_map_.insert( + make_pair(connection->ToString(), conn_endpoint)); + assert(result); conn_endpoint->set_connection(connection); + connection->set_conn_endpoint(conn_endpoint); return conn_endpoint; } @@ -530,12 +538,15 @@ XmppConnectionEndpoint *XmppServer::LocateConnectionEndpoint( // simply have called XmppConnectionEndpoint::reset_connection directly. // void XmppServer::ReleaseConnectionEndpoint(XmppServerConnection *connection) { - tbb::mutex::scoped_lock lock(endpoint_map_mutex_); - - if (!connection->conn_endpoint()) + XmppConnectionEndpoint *conn_endpoint = connection->conn_endpoint(); + if (!conn_endpoint) + conn_endpoint = FindConnectionEndpoint(connection); + if (!conn_endpoint) return; - assert(connection->conn_endpoint()->connection() == connection); - connection->conn_endpoint()->reset_connection(); + + tbb::mutex::scoped_lock lock(endpoint_map_mutex_); + if (conn_endpoint->connection() == connection) + conn_endpoint->reset_connection(); } void XmppServer::FillShowConnections( diff --git a/src/xmpp/xmpp_server.h b/src/xmpp/xmpp_server.h index b2143c4dee7..78562806d67 100644 --- a/src/xmpp/xmpp_server.h +++ b/src/xmpp/xmpp_server.h @@ -69,10 +69,11 @@ class XmppServer : public XmppConnectionManager { const std::string &ServerAddr() const { return server_addr_; } size_t ConnectionCount() const; - const XmppConnectionEndpoint *FindConnectionEndpoint( - const std::string &endpoint_name) const; + XmppConnectionEndpoint *FindConnectionEndpoint( + const std::string &endpoint_name); + XmppConnectionEndpoint *FindConnectionEndpoint(XmppServerConnection *conn); XmppConnectionEndpoint *LocateConnectionEndpoint( - XmppServerConnection *connection); + XmppServerConnection *connection, bool &created); void ReleaseConnectionEndpoint(XmppServerConnection *connection); void FillShowConnections( @@ -83,13 +84,15 @@ class XmppServer : public XmppConnectionManager { virtual SslSession *AllocSession(SslSocket *socket); virtual bool AcceptSession(TcpSession *session); + typedef std::map ConnectionMap; + ConnectionMap connection_map_; + private: class DeleteActor; friend class BgpXmppBasicTest; friend class DeleteActor; friend class XmppStateMachineTest; - typedef std::map ConnectionMap; typedef std::set ConnectionSet; typedef std::map ConnectionEndpointMap; typedef std::map ConnectionEventCbMap; @@ -99,7 +102,6 @@ class XmppServer : public XmppConnectionManager { void SetConnectionQueueDisable(bool disabled); void WorkQueueExitCallback(bool done); - ConnectionMap connection_map_; ConnectionSet deleted_connection_set_; size_t max_connections_; diff --git a/src/xmpp/xmpp_state_machine.cc b/src/xmpp/xmpp_state_machine.cc index 0665b0bbbec..918e0a9d5d3 100644 --- a/src/xmpp/xmpp_state_machine.cc +++ b/src/xmpp/xmpp_state_machine.cc @@ -361,12 +361,15 @@ struct Active : public sc::state { } SM_LOG(state_machine, "EvXmppOpen in (Active) State"); state_machine->AssignSession(); + XmppConnection *connection = state_machine->connection(); - if (!connection->EndpointNameIsUnique()) { + if (connection->IsDeleted()) { state_machine->ResetSession(); return discard_event(); } + XmppSession *session = state_machine->session(); + state_machine->ResurrectOldConnection(connection, session); state_machine->CancelOpenTimer(); if (!connection->SendOpenConfirm(session)) { connection->SendClose(session); @@ -1086,19 +1089,19 @@ void XmppStateMachine::ResetSession() { set_session(NULL); CancelHoldTimer(); - if (!connection) return; + if (!connection) + return; // Stop keepalives, transition to IDLE and notify registerd entities. connection->StopKeepAliveTimer(); connection->ChannelMux()->HandleStateEvent(xmsm::IDLE); - if (IsActiveChannel()) return; + if (IsActiveChannel()) + return; // Retain the connection if graceful restart is supported. XmppServer *server = dynamic_cast(connection->server()); - if (server->IsPeerCloseGraceful()) return; - - // Delete the connection. - connection->ManagedDelete(); + if (!server->IsPeerCloseGraceful()) + connection->ManagedDelete(); } XmppStateMachine::XmppStateMachine(XmppConnection *connection, bool active, @@ -1343,6 +1346,56 @@ bool XmppStateMachine::PassiveOpen(XmppSession *session) { return Enqueue(xmsm::EvTcpPassiveOpen(session)); } +// Process XmppStream header message received over a session. Close the stream +// if an old session is still present and undergoing graceful restart. +// +// Return true if msg is enqueued for further processing, false otherwise. +bool XmppStateMachine::ProcessStreamHeaderMessage(XmppSession *session, + const XmppStanza::XmppMessage *msg) { + + // Update "To" information which can be used to map an older session + session->Connection()->SetTo(msg->from); + + XmppServer *xmpp_server = dynamic_cast(server_); + XmppConnectionEndpoint *endp = NULL; + + // Look for an endpoint which may already exist + if (xmpp_server) + endp = xmpp_server->FindConnectionEndpoint( + dynamic_cast(connection_)); + + // If older endpoint is present and is still associated with XmppConnection, + // check if older connection is under graceful-restart. + if (endp && endp->connection()) { + XmppStateMachine *state_machine = endp->connection()->state_machine(); + + // Different state_machines imply that connections are different + if (state_machine && state_machine != this) { + xmsm::XmState state = state_machine->get_state(); + + // If GR is not supported, then close all new connections until old + // one is completely deleted. Even if GR is supported, new + // connection cannot be accepted until old one is fully cleaned up. + if (!xmpp_server->IsPeerCloseGraceful() || state != xmsm::ACTIVE) { + + // Bring down old session if it is still in ESTABLISHED state. + // This is the scenario in which old session's TCP did not learn + // the session down event, possibly due to compute cold reboot. + // In that case, trigger closure (and possibly GR) process for + // the old session. + if (state == xmsm::ESTABLISHED) + state_machine->Enqueue(xmsm::EvTcpClose( + state_machine->session())); + Enqueue(xmsm::EvTcpClose(session)); + return false; + } + } + } + + // In all other cases, process the OpenMessage like it is normally done. + return Enqueue(xmsm::EvXmppOpen(session, msg)); +} + void XmppStateMachine::OnMessage(XmppSession *session, const XmppStanza::XmppMessage *msg) { bool enqueued = false; @@ -1351,7 +1404,6 @@ void XmppStateMachine::OnMessage(XmppSession *session, switch (msg->type) { case XmppStanza::STREAM_HEADER: - if (stream_msg->strmtype == XmppStanza::XmppStreamMessage::FEATURE_TLS) { @@ -1370,13 +1422,11 @@ void XmppStateMachine::OnMessage(XmppSession *session, break; } - } else if ((stream_msg->strmtype == - XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER) || - (stream_msg->strmtype == - XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER_RESP)) { - session->Connection()->SetTo(msg->from); - enqueued = Enqueue(xmsm::EvXmppOpen(session, msg)); - } + } else if (stream_msg->strmtype == + XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER || + stream_msg->strmtype == + XmppStanza::XmppStreamMessage::INIT_STREAM_HEADER_RESP) + enqueued = ProcessStreamHeaderMessage(session, msg); break; case XmppStanza::WHITESPACE_MESSAGE_STANZA: enqueued = Enqueue(xmsm::EvXmppKeepalive(session, msg)); @@ -1586,3 +1636,61 @@ void XmppStateMachine::SendConnectionInfo(XmppConnectionInfo *info, XMPP_CONNECTION_LOG_MSG(*info); return; } + +// Resurrect an old xmpp connection if present (when under GracefulRestart) +// +// During Graceful Restart (or otherwise), new connections are not rejected in +// ProcessStreamHeaderMessage() itself until old one's cleanup process is +// complete and the system is ready to start a new session. +// +// Hence in here, when called upon receipt of OpenMessage, we can try to reuse +// old XmppConnection if present and there by complete any pending GR process +// +// We do so by reusing XmppConnection, XmppChannel, etc. from the old connection +// and only use the XmppSession and XmppStateMachine from the new session +// +// New connection is instead associated with the old state machine and session, +// and their deletion is triggered +void XmppStateMachine::ResurrectOldConnection(XmppConnection *new_connection, + XmppSession *new_session) { + + // Look for an endpoint (which is a persistent data structure) across + // xmpp session flips + bool created; + XmppConnectionEndpoint *connection_endpoint = + static_cast( + new_connection->server())->LocateConnectionEndpoint( + static_cast(new_connection), created); + + // If this is a new endpoint, then there is no older connecction to manage. + if (created) + return; + + // There is no connection associated with older end point. Treat it as + // a new endpoint being created (XXX Should we assert here instead ?) + if (!connection_endpoint->connection()) { + connection_endpoint->set_connection(new_connection); + return; + } + + // Retrieve old XmppConnection and XmppStateMachine (to reuse) + XmppConnection *old_xmpp_connection = connection_endpoint->connection(); + XmppStateMachine *old_state_machine = old_xmpp_connection->state_machine(); + + // Swap Old and New connections and state machines linkages + new_connection->SwapXmppStateMachine(old_xmpp_connection); + this->SwapXmppConnection(old_state_machine); + + // Update XmppConnections from the sessions. + XmppSession *old_xmpp_session = old_state_machine->session(); + if (old_xmpp_session) + old_xmpp_session->SetConnection(new_connection); + new_session->SetConnection(old_xmpp_connection); + + // Swap old xmpp session with the new one. + old_xmpp_connection->set_session(new_session); + + // Trigger deletion of the new connection (which now is linked wth + // the old_state_machine and old_xmpp_session + new_connection->Shutdown(); +} diff --git a/src/xmpp/xmpp_state_machine.h b/src/xmpp/xmpp_state_machine.h index c0f3c31d772..6e21c42af16 100644 --- a/src/xmpp/xmpp_state_machine.h +++ b/src/xmpp/xmpp_state_machine.h @@ -91,7 +91,7 @@ class XmppStateMachine : void TimerErrorHandler(std::string name, std::string error); // Feed session events into the state machine. - void OnSessionEvent(TcpSession *session, TcpSession::Event event); + virtual void OnSessionEvent(TcpSession *session, TcpSession::Event event); // Receive Passive Open. bool PassiveOpen(XmppSession *session); @@ -125,6 +125,14 @@ class XmppStateMachine : // getters and setters XmppConnection *connection() { return connection_; } + void set_connection(const XmppConnection *connection) { + connection_ = const_cast(connection); + } + void SwapXmppConnection(XmppStateMachine *other) { + XmppConnection *tmp = connection_; + connection_ = other->connection_; + other->connection_ = tmp; + } bool IsActiveChannel(); bool logUVE(); const char *ChannelType(); @@ -158,6 +166,8 @@ class XmppStateMachine : void SendConnectionInfo(XmppConnectionInfo *info, const std::string &event, const std::string &nextstate = ""); + void ResurrectOldConnection(XmppConnection *connection, + XmppSession *session); void set_last_event(const std::string &event); const std::string &last_event() const { return last_event_; } @@ -166,15 +176,17 @@ class XmppStateMachine : bool OpenTimerCancelled() { return open_timer_->cancelled(); } bool HoldTimerCancelled() { return hold_timer_->cancelled(); } void AssertOnHoldTimeout(); + bool HoldTimerExpired(); private: friend class XmppStateMachineTest; bool ConnectTimerExpired(); bool OpenTimerExpired(); - bool HoldTimerExpired(); bool Enqueue(const sc::event_base &ev); bool DequeueEvent(boost::intrusive_ptr &event); + bool ProcessStreamHeaderMessage(XmppSession *session, + const XmppStanza::XmppMessage *msg); WorkQueue > work_queue_; XmppConnection *connection_;