Skip to content

Commit

Permalink
Merge "GR: Maintain 1:1 mapping between associated xmpp connection an…
Browse files Browse the repository at this point in the history
…d endpoints"
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Mar 4, 2016
2 parents d2fcb8c + e67c99c commit 4ea0e95
Show file tree
Hide file tree
Showing 16 changed files with 159 additions and 92 deletions.
46 changes: 31 additions & 15 deletions src/bgp/bgp_peer_close.cc
Expand Up @@ -25,15 +25,20 @@

// Create an instance of PeerCloseManager with back reference to parent IPeer
PeerCloseManager::PeerCloseManager(IPeer *peer) :
peer_(peer), stale_timer_(NULL), state_(NONE), close_again_(false) {
peer_(peer), stale_timer_(NULL), sweep_timer_(NULL), state_(NONE),
close_again_(false) {
stats_.init++;
if (peer->server())
if (peer->server()) {
stale_timer_ = TimerManager::CreateTimer(*peer->server()->ioservice(),
"Graceful Restart StaleTimer");
sweep_timer_ = TimerManager::CreateTimer(*peer->server()->ioservice(),
"Graceful Restart SweepTimer");
}
}

PeerCloseManager::~PeerCloseManager() {
TimerManager::DeleteTimer(stale_timer_);
TimerManager::DeleteTimer(sweep_timer_);
}

const std::string PeerCloseManager::GetStateName(State state) const {
Expand Down Expand Up @@ -149,7 +154,6 @@ void PeerCloseManager::ProcessClosure() {
if (peer_->IsReady()) {
MOVE_TO_STATE(SWEEP);
stats_.sweep++;
peer_->peer_close()->GracefulRestartSweep();
} else {
MOVE_TO_STATE(DELETE);
stats_.deletes++;
Expand Down Expand Up @@ -178,6 +182,7 @@ bool PeerCloseManager::IsCloseInProgress() {
void PeerCloseManager::CloseComplete() {
MOVE_TO_STATE(NONE);
stale_timer_->Cancel();
sweep_timer_->Cancel();
stats_.init++;

// Nested closures trigger fresh GR
Expand All @@ -187,6 +192,22 @@ void PeerCloseManager::CloseComplete() {
}
}

bool PeerCloseManager::ProcessSweepStateActions() {
assert(state_ == SWEEP);

// Notify clients to trigger sweep as appropriate.
peer_->peer_close()->GracefulRestartSweep();
CloseComplete();
return false;
}

void PeerCloseManager::TriggerSweepStateActions() {
PEER_CLOSE_MANAGER_LOG("Sweep Timer started to fire right away");
sweep_timer_->Cancel();
sweep_timer_->Start(0,
boost::bind(&PeerCloseManager::ProcessSweepStateActions, this));
}

// Concurrency: Runs in the context of the BGP peer rib membership task.
//
// Close process for this peer in terms of walking RibIns and RibOuts are
Expand Down Expand Up @@ -216,8 +237,7 @@ void PeerCloseManager::UnregisterPeerComplete(IPeer *ipeer, BgpTable *table) {
return;
}

// Handle SWEEP state and restart GR for nested closures.
CloseComplete();
TriggerSweepStateActions();
}

// Get the type of RibIn close action at start (Not during graceful restart
Expand Down Expand Up @@ -278,7 +298,7 @@ void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt,
if (action == MembershipRequest::INVALID)
return;

bool delete_rt = false;
bool notify_rt = false;

// Process all paths sourced from this peer_. Multiple paths could exist
// in ecmp cases.
Expand Down Expand Up @@ -334,17 +354,13 @@ void PeerCloseManager::ProcessRibIn(DBTablePartBase *root, BgpRoute *rt,
}

// Feed the route modify/delete request to the table input process.
delete_rt = table->InputCommon(root, rt, path, peer_, NULL, oper,
attrs, path->GetPathId(),
path->GetFlags() | stale,
path->GetLabel());
notify_rt |= table->InputCommon(root, rt, path, peer_, NULL, oper,
attrs, path->GetPathId(),
path->GetFlags() | stale,
path->GetLabel());
}

// rt can be now deleted safely.
if (delete_rt)
root->Delete(rt);

return;
table->InputCommonPostProcess(root, rt, notify_rt);
}

void PeerCloseManager::FillCloseInfo(BgpNeighborResp *resp) {
Expand Down
3 changes: 3 additions & 0 deletions src/bgp/bgp_peer_close.h
Expand Up @@ -77,10 +77,13 @@ class PeerCloseManager {

void ProcessClosure();
void CloseComplete();
bool ProcessSweepStateActions();
void TriggerSweepStateActions();
const std::string GetStateName(State state) const;

IPeer *peer_;
Timer *stale_timer_;
Timer *sweep_timer_;
State state_;
bool close_again_;
Stats stats_;
Expand Down
41 changes: 22 additions & 19 deletions src/bgp/bgp_table.cc
Expand Up @@ -271,7 +271,7 @@ bool BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path,
const IPeer *peer, DBRequest *req,
DBRequest::DBOperation oper, BgpAttrPtr attrs,
uint32_t path_id, uint32_t flags, uint32_t label) {
bool delete_rt = false;
bool notify_rt = false;

switch (oper) {
case DBRequest::DB_ENTRY_ADD_CHANGE: {
Expand Down Expand Up @@ -307,7 +307,7 @@ bool BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path,
table);
}
rt->InsertPath(new_path);
root->Notify(rt);
notify_rt = true;
break;
}

Expand All @@ -320,13 +320,7 @@ bool BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path,
if (path->NeedsResolution())
path_resolver_->StopPathResolution(root->index(), path);
rt->RemovePath(BgpPath::BGP_XMPP, peer, path_id);

// Delete the route only if all paths are gone.
if (rt->front() == NULL) {
delete_rt = true;
} else {
root->Notify(rt);
}
notify_rt = true;
}
break;
}
Expand All @@ -336,7 +330,7 @@ bool BgpTable::InputCommon(DBTablePartBase *root, BgpRoute *rt, BgpPath *path,
break;
}
}
return delete_rt;
return notify_rt;
}

void BgpTable::Input(DBTablePartition *root, DBClient *client,
Expand Down Expand Up @@ -393,7 +387,7 @@ void BgpTable::Input(DBTablePartition *root, DBClient *client,
int count = 0;
ExtCommunityDB *extcomm_db = rtinstance_->server()->extcomm_db();
BgpAttrPtr attr = data ? data->attrs() : NULL;
bool delete_rt = false;
bool notify_rt = false;

// Process each of the paths sourced and create/update paths accordingly.
if (data) {
Expand Down Expand Up @@ -426,24 +420,33 @@ void BgpTable::Input(DBTablePartition *root, DBClient *client,
attr = data->attrs()->attr_db()->Locate(clone);
}

delete_rt = InputCommon(root, rt, path, peer, req, req->oper,
attr, path_id, nexthop.flags_,
nexthop.label_);
notify_rt |= InputCommon(root, rt, path, peer, req, req->oper,
attr, path_id, nexthop.flags_,
nexthop.label_);
}
}

// Flush remaining paths that remain marked for deletion.
for (map<BgpPath *, bool>::iterator it = deleted_paths.begin();
it != deleted_paths.end(); it++) {
BgpPath *path = it->first;
delete_rt = InputCommon(root, rt, path, peer, req,
DBRequest::DB_ENTRY_DELETE, NULL,
path->GetPathId(), 0, 0);
notify_rt |= InputCommon(root, rt, path, peer, req,
DBRequest::DB_ENTRY_DELETE, NULL,
path->GetPathId(), 0, 0);
}

// rt can be now deleted safely.
if (delete_rt)
InputCommonPostProcess(root, rt, notify_rt);
}

void BgpTable::InputCommonPostProcess(DBTablePartBase *root,
BgpRoute *rt, bool notify_rt) {
if (!notify_rt)
return;

if (rt->front() == NULL)
root->Delete(rt);
else
root->Notify(rt);
}

bool BgpTable::MayDelete() const {
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/bgp_table.h
Expand Up @@ -118,6 +118,8 @@ class BgpTable : public RouteTable {
const IPeer *peer, DBRequest *req,
DBRequest::DBOperation oper, BgpAttrPtr attrs,
uint32_t path_id, uint32_t flags, uint32_t label);
void InputCommonPostProcess(DBTablePartBase *root, BgpRoute *rt,
bool notify_rt);

LifetimeActor *deleter();
const LifetimeActor *deleter() const;
Expand Down
8 changes: 3 additions & 5 deletions src/bgp/bgp_xmpp_channel.cc
Expand Up @@ -196,12 +196,9 @@ class BgpXmppChannel::PeerClose : public IPeerClose {
return;

parent_->set_peer_closed(false);
XmppConnection *connection =
const_cast<XmppConnection *>(parent_->channel_->connection());

// Restart state machine.
if (connection && connection->state_machine())
connection->state_machine()->Initialize();
// Indicate to Channel that GR Closure is now complete
parent_->channel_->CloseComplete();
}

virtual void Delete() {
Expand All @@ -217,6 +214,7 @@ class BgpXmppChannel::PeerClose : public IPeerClose {
void Close() {
if (parent_) {
assert(parent_->peer_deleted());
assert(parent_->channel_->IsCloseInProgress());
manager_->Close();
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/test/bgp_xmpp_channel_test.cc
Expand Up @@ -29,6 +29,8 @@ class XmppChannelMock : public XmppChannel {
XmppChannelMock() { }
virtual ~XmppChannelMock() { }
void Close() { }
void CloseComplete() { }
bool IsCloseInProgress() const { return false; }
bool Send(const uint8_t *, size_t, xmps::PeerId, SendReadyCb) {
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/bgp/test/bgp_xmpp_parse_test.cc
Expand Up @@ -22,6 +22,8 @@ class XmppChannelMock : public XmppChannel {
XmppChannelMock() { }
virtual ~XmppChannelMock() { }
void Close() { }
void CloseComplete() { }
bool IsCloseInProgress() const { return false; }
bool Send(const uint8_t *, size_t, xmps::PeerId, SendReadyCb) {
return true;
}
Expand Down
2 changes: 2 additions & 0 deletions src/vnsw/agent/test/test_cmn_util.h
Expand Up @@ -396,6 +396,8 @@ class XmppChannelMock : public XmppChannel {
return true;
}
void Close() { }
void CloseComplete() { }
bool IsCloseInProgress() const { return false; }
MOCK_METHOD2(RegisterReceive, void(xmps::PeerId, ReceiveCb));
MOCK_METHOD1(UnRegisterReceive, void(xmps::PeerId));
MOCK_METHOD1(UnRegisterWriteReady, void(xmps::PeerId));
Expand Down
2 changes: 2 additions & 0 deletions src/xmpp/xmpp_channel.h
Expand Up @@ -50,6 +50,8 @@ class XmppChannel {
virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb) = 0;
virtual void UnRegisterWriteReady(xmps::PeerId id) = 0;
virtual void Close() = 0;
virtual void CloseComplete() = 0;
virtual bool IsCloseInProgress() const = 0;
virtual std::string ToString() const = 0;
virtual std::string StateName() const = 0;
virtual std::string LastStateName() const = 0;
Expand Down
43 changes: 42 additions & 1 deletion src/xmpp/xmpp_channel_mux.cc
Expand Up @@ -13,16 +13,55 @@ using namespace std;
using namespace xmsm;

XmppChannelMux::XmppChannelMux(XmppConnection *connection)
: connection_(connection), rx_message_trace_cb_(NULL) {
: connection_(connection), rx_message_trace_cb_(NULL), closing_count_(0) {
}

XmppChannelMux::~XmppChannelMux() {
}

void XmppChannelMux::Close() {
if (closing_count_)
return;
InitializeClosingCount();
connection_->Clear();
}

// Track clients who close gracefully. At the moment, only BGP cares about this.
void XmppChannelMux::InitializeClosingCount() {

BOOST_FOREACH(const ReceiveCbMap::value_type &value, rxmap_) {
switch (value.first) {

// Currently, Only BgpXmppChannel client cares about GR.
case xmps::BGP:
closing_count_++;
break;

case xmps::CONFIG:
case xmps::DNS:
case xmps::OTHER:
break;
}
}
}

// Check if the channel is being closed (Graceful Restart)
bool XmppChannelMux::IsCloseInProgress() const {
return closing_count_ != 0;
}

// API for the clients to indicate GR Closure is complete
void XmppChannelMux::CloseComplete() {
assert(closing_count_);
closing_count_--;
if (closing_count_)
return;

// Restart state machine.
if (connection() && connection()->state_machine())
connection()->state_machine()->Initialize();
}

xmps::PeerState XmppChannelMux::GetPeerState() const {
xmsm::XmState st = connection_->GetStateMcState();
return (st == xmsm::ESTABLISHED) ? xmps::READY :
Expand Down Expand Up @@ -158,6 +197,8 @@ void XmppChannelMux::HandleStateEvent(xmsm::XmState state) {
} else {
// Event to create the peer on server
XmppServer *server = static_cast<XmppServer *>(connection_->server());
if (st == xmps::NOT_READY)
InitializeClosingCount();
server->NotifyConnectionEvent(this, st);
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/xmpp/xmpp_channel_mux.h
Expand Up @@ -19,6 +19,8 @@ class XmppChannelMux : public XmppChannel {
virtual ~XmppChannelMux();

virtual void Close();
virtual bool IsCloseInProgress() const;
virtual void CloseComplete();
virtual bool Send(const uint8_t *, size_t, xmps::PeerId, SendReadyCb);
virtual void RegisterReceive(xmps::PeerId, ReceiveCb);
virtual void UnRegisterReceive(xmps::PeerId);
Expand Down Expand Up @@ -64,6 +66,7 @@ class XmppChannelMux : public XmppChannel {

private:
void RegisterWriteReady(xmps::PeerId, SendReadyCb);
void InitializeClosingCount();

typedef std::map<xmps::PeerId, SendReadyCb> WriteReadyCbMap;
typedef std::map<xmps::PeerId, ReceiveCb> ReceiveCbMap;
Expand All @@ -74,6 +77,7 @@ class XmppChannelMux : public XmppChannel {
XmppConnection *connection_;
tbb::mutex mutex_;
RxMessageTraceCb rx_message_trace_cb_;
int closing_count_;
};

#endif // __XMPP_CHANNEL_MUX_H__
5 changes: 2 additions & 3 deletions src/xmpp/xmpp_connection.cc
Expand Up @@ -626,8 +626,7 @@ class XmppServerConnection::DeleteActor : public LifetimeActor {
}

if (parent_->session() || server_->IsPeerCloseGraceful()) {
server_->NotifyConnectionEvent(parent_->ChannelMux(),
xmps::NOT_READY);
parent_->ChannelMux()->HandleStateEvent(xmsm::IDLE);
}

if (parent_->logUVE()) {
Expand Down Expand Up @@ -727,7 +726,7 @@ uint32_t XmppServerConnection::flap_count() const {
void XmppServerConnection::increment_flap_count() {
XmppConnectionEndpoint *conn_endpoint = conn_endpoint_;
if (!conn_endpoint)
conn_endpoint = server()->FindConnectionEndpoint(this);
conn_endpoint = server()->FindConnectionEndpoint(ToString());
if (!conn_endpoint)
return;
conn_endpoint->increment_flap_count();
Expand Down

0 comments on commit 4ea0e95

Please sign in to comment.