Skip to content

Commit

Permalink
Fix concurrency issue in BgpSession::WriteReady
Browse files Browse the repository at this point in the history
It's possible that BgpSession::WriteReady gets called after peer
for the session has already been deleted. Prevent WriteReady from
from accessing freed memory by clearing the back pointer to the
peer in the session when the session in the peer is cleared.

The above fix is not sufficient since there's a concurrency issue
wherein BgpSession::WriteReady could try to access the peer while
the peer is trying to clear the back pointer in the session.

Fix is to enqueue sessions to a work queue in the sesison manager
when BgpSession::WriteReady is called. The work queue is processed
in the context of the bgp::Config task.

Change-Id: I6e4c72b1367914decddd0baf25589446171443bd
Closes-Bug: 1462550
  • Loading branch information
Nischal Sheth committed Jun 8, 2015
1 parent 793b232 commit 35eb530
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 44 deletions.
5 changes: 3 additions & 2 deletions src/bgp/bgp_peer.cc
Expand Up @@ -750,7 +750,7 @@ BgpSession *BgpPeer::CreateSession() {

BgpSession *bgp_session = static_cast<BgpSession *>(session);
BindLocalEndpoint(bgp_session);
bgp_session->SetPeer(this);
bgp_session->set_peer(this);
return bgp_session;
}

Expand All @@ -762,7 +762,7 @@ void BgpPeer::SetAdminState(bool down) {
}

bool BgpPeer::AcceptSession(BgpSession *session) {
session->SetPeer(this);
session->set_peer(this);

// Set valid keys, if any, in the socket.
InstallAuthKeys(session);
Expand Down Expand Up @@ -1510,6 +1510,7 @@ void BgpPeer::set_session(BgpSession *session) {
void BgpPeer::clear_session() {
tbb::spin_mutex::scoped_lock lock(spin_mutex_);
if (session_) {
session_->clear_peer();
session_->set_observer(NULL);
session_->Close();
}
Expand Down
2 changes: 1 addition & 1 deletion src/bgp/bgp_server.cc
Expand Up @@ -175,7 +175,7 @@ class BgpServer::DeleteActor : public LifetimeActor {
}
virtual bool MayDelete() const {
CHECK_CONCURRENCY("bgp::Config");
return server_->session_manager()->IsQueueEmpty();
return server_->session_manager()->MayDelete();
}
virtual void Shutdown() {
CHECK_CONCURRENCY("bgp::Config");
Expand Down
34 changes: 25 additions & 9 deletions src/bgp/bgp_session.cc
Expand Up @@ -40,8 +40,9 @@ BgpMessageReader::BgpMessageReader(TcpSession *session,
BgpMessageReader::~BgpMessageReader() {
}

BgpSession::BgpSession(BgpSessionManager *session, Socket *socket)
: TcpSession(session, socket),
BgpSession::BgpSession(BgpSessionManager *session_mgr, Socket *socket)
: TcpSession(session_mgr, socket),
session_mgr_(session_mgr),
peer_(NULL),
reader_(new BgpMessageReader(this,
boost::bind(&BgpSession::ReceiveMsg, this, _1, _2))) {
Expand All @@ -51,24 +52,39 @@ BgpSession::~BgpSession() {
}

//
// Handle write ready callback.
// Concurrency: called in the context of bgp::Config task.
//
// Process write ready callback.
//
// 1. Tell SchedulingGroupManager that the IPeer is send ready.
// 2. Tell BgpPeer that it's send ready so that it can resume Keepalives.
//
// We can ignore any errors since the StateMachine will get informed of the
// TcpSession close independently and react to it.
//
void BgpSession::WriteReady(const boost::system::error_code &error) {
if (error || !peer_)
void BgpSession::ProcessWriteReady() {
if (!peer_)
return;

BgpServer *server = peer_->server();
SchedulingGroupManager *sg_mgr = server->scheduling_group_manager();
sg_mgr->SendReady(peer_);
peer_->SetSendReady();
}

//
// Concurrency: called in the context of io thread.
//
// Handle write ready callback. Enqueue the session to a WorkQueue in the
// BgpSessionManager. The WorkQueue gets processed in the context of the
// bgp::Config task. This ensures that we don't access the BgpPeer while
// the BgpPeer is trying to clear our back pointer to it.
//
// We can ignore any errors since the StateMachine will get informed of the
// TcpSession close independently and react to it.
//
void BgpSession::WriteReady(const boost::system::error_code &error) {
if (error)
return;
session_mgr_->EnqueueWriteReady(this);
}

int BgpSession::GetSessionInstance() const {
return peer_->GetIndex();
}
Expand Down
10 changes: 7 additions & 3 deletions src/bgp/bgp_session.h
Expand Up @@ -40,14 +40,17 @@ class BgpMessageReader : public TcpMessageReader {

class BgpSession : public TcpSession {
public:
BgpSession(BgpSessionManager *session, Socket *socket);
BgpSession(BgpSessionManager *session_mgr, Socket *socket);
virtual ~BgpSession();

void SetPeer(BgpPeer *peer) { peer_ = peer; }
BgpPeer *Peer() { return peer_; }
void SendNotification(int code, int subcode,
const std::string &data = std::string());
virtual int GetSessionInstance() const;
void ProcessWriteReady();

void set_peer(BgpPeer *peer) { peer_ = peer; }
void clear_peer() { peer_ = NULL; }
BgpPeer *peer() { return peer_; }

protected:
virtual void OnRead(Buffer buffer) {
Expand All @@ -60,6 +63,7 @@ class BgpSession : public TcpSession {
}
virtual void WriteReady(const boost::system::error_code &error);

BgpSessionManager *session_mgr_;
BgpPeer *peer_;
boost::scoped_ptr<BgpMessageReader> reader_;

Expand Down
66 changes: 53 additions & 13 deletions src/bgp/bgp_session_manager.cc
Expand Up @@ -13,8 +13,12 @@
BgpSessionManager::BgpSessionManager(EventManager *evm, BgpServer *server)
: TcpServer(evm),
server_(server),
session_queue_(TaskScheduler::GetInstance()->GetTaskId("bgp::Config"), 0,
boost::bind(&BgpSessionManager::ProcessSession, this, _1)) {
session_queue_(
TaskScheduler::GetInstance()->GetTaskId("bgp::Config"), 0,
boost::bind(&BgpSessionManager::ProcessSession, this, _1)),
write_ready_queue_(
TaskScheduler::GetInstance()->GetTaskId("bgp::Config"), 0,
boost::bind(&BgpSessionManager::ProcessWriteReady, this, _1)) {
}

BgpSessionManager::~BgpSessionManager() {
Expand All @@ -30,28 +34,64 @@ bool BgpSessionManager::Initialize(short port) {
//
// Called from BgpServer::DeleteActor's Shutdown method.
// Shutdown the TcpServer.
// Register an exit callback to the WorkQueue so that we can ask BgpServer
// to retry deletion when the WorkQueue becomes empty.
// Register an exit callback to the WorkQueues so that we can ask BgpServer
// to retry deletion when a WorkQueue becomes empty.
//
void BgpSessionManager::Shutdown() {
CHECK_CONCURRENCY("bgp::Config");
TcpServer::Shutdown();
session_queue_.SetExitCallback(
boost::bind(&BgpSessionManager::ProcessSessionDone, this, _1));
boost::bind(&BgpSessionManager::WorkQueueExitCallback, this, _1));
write_ready_queue_.SetExitCallback(
boost::bind(&BgpSessionManager::WorkQueueExitCallback, this, _1));
}

//
// Called when the BgpServer is being destroyed.
//
// The WorkQueue needs to be shutdown as the last step to ensure that all
// The WorkQueues need to be shutdown as the last step to ensure that all
// entries get deleted. Note that there's no need to call DeleteSession on
// the sessions in the WorkQueue since ClearSessions does the same thing.
// the sessions in the WorkQueues since ClearSessions does the same thing.
//
void BgpSessionManager::Terminate() {
CHECK_CONCURRENCY("bgp::Config");
server_ = NULL;
ClearSessions();
session_queue_.Shutdown();
write_ready_queue_.Shutdown();
}

//
// Return true if all WorkQueues are empty.
//
bool BgpSessionManager::MayDelete() const {
if (!session_queue_.IsQueueEmpty())
return false;
if (!write_ready_queue_.IsQueueEmpty())
return false;
return true;
}

//
// Add a BgpSession to the write ready WorkQueue.
// Take a reference to make sure that BgpSession doesn't get deleted before
// it's processed.
//
void BgpSessionManager::EnqueueWriteReady(BgpSession *session) {
if (!server_ || server_->IsDeleted())
return;
write_ready_queue_.Enqueue(TcpSessionPtr(session));
}

//
// Handler for BgpSessions that are dequeued from the write ready WorkQueue.
//
// The BgpServer does not get destroyed if the WorkQueue is non-empty.
//
bool BgpSessionManager::ProcessWriteReady(TcpSessionPtr tcp_session) {
BgpSession *session = static_cast<BgpSession *>(tcp_session.get());
session->ProcessWriteReady();
return true;
}

//
Expand Down Expand Up @@ -96,7 +136,7 @@ TcpSession *BgpSessionManager::AllocSession(Socket *socket) {
}

//
// Accept incoming BgpSession and add it to the WorkQueue for processing.
// Accept incoming BgpSession and add to session WorkQueue for processing.
// This ensures that we don't try to access the BgpServer data structures
// from the IO thread while they are being modified from bgp::Config task.
//
Expand All @@ -114,7 +154,7 @@ bool BgpSessionManager::AcceptSession(TcpSession *tcp_session) {
}

//
// Handler for BgpSessions that are dequeued from the WorkQueue.
// Handler for BgpSessions that are dequeued from the session WorkQueue.
//
// The BgpServer does not get destroyed if the WorkQueue is non-empty.
//
Expand Down Expand Up @@ -162,16 +202,16 @@ bool BgpSessionManager::ProcessSession(BgpSession *session) {
}

//
// Exit callback for the WorkQueue.
// Exit callback for the session and write ready WorkQueues.
//
void BgpSessionManager::ProcessSessionDone(bool done) {
void BgpSessionManager::WorkQueueExitCallback(bool done) {
server_->RetryDelete();
}

size_t BgpSessionManager::GetQueueSize() const {
size_t BgpSessionManager::GetSessionQueueSize() const {
return session_queue_.Length();
}

void BgpSessionManager::SetQueueDisable(bool disabled) {
void BgpSessionManager::SetSessionQueueDisable(bool disabled) {
session_queue_.set_disable(disabled);
}
12 changes: 8 additions & 4 deletions src/bgp/bgp_session_manager.h
Expand Up @@ -27,7 +27,9 @@ class BgpSessionManager : public TcpServer {
virtual bool Initialize(short port);
void Shutdown();
void Terminate();
bool IsQueueEmpty() const { return session_queue_.IsQueueEmpty(); }
bool MayDelete() const;

void EnqueueWriteReady(BgpSession *session);

BgpServer *server() { return server_; }

Expand All @@ -40,12 +42,14 @@ class BgpSessionManager : public TcpServer {

BgpPeer *FindPeer(Endpoint remote);
bool ProcessSession(BgpSession *session);
void ProcessSessionDone(bool done);
size_t GetQueueSize() const;
void SetQueueDisable(bool disabled);
bool ProcessWriteReady(TcpSessionPtr tcp_session);
void WorkQueueExitCallback(bool done);
size_t GetSessionQueueSize() const;
void SetSessionQueueDisable(bool disabled);

BgpServer *server_;
WorkQueue<BgpSession *> session_queue_;
WorkQueue<TcpSessionPtr> write_ready_queue_;

DISALLOW_COPY_AND_ASSIGN(BgpSessionManager);
};
Expand Down
16 changes: 8 additions & 8 deletions src/bgp/state_machine.cc
Expand Up @@ -229,7 +229,7 @@ struct EvBgpHeaderError : sc::event<EvBgpHeaderError> {
struct EvBgpOpen : sc::event<EvBgpOpen> {
EvBgpOpen(BgpSession *session, const BgpProto::OpenMessage *msg)
: session(session), msg(msg) {
BGP_LOG_PEER(Message, session->Peer(), SandeshLevel::SYS_INFO,
BGP_LOG_PEER(Message, session->peer(), SandeshLevel::SYS_INFO,
BGP_LOG_FLAG_SYSLOG, BGP_PEER_DIR_IN,
"Open " << msg->ToString());
}
Expand Down Expand Up @@ -270,14 +270,14 @@ struct EvBgpOpenError : sc::event<EvBgpOpenError> {

struct EvBgpKeepalive : sc::event<EvBgpKeepalive> {
explicit EvBgpKeepalive(BgpSession *session) : session(session) {
const StateMachine *state_machine = session->Peer()->state_machine();
const StateMachine *state_machine = session->peer()->state_machine();
SandeshLevel::type log_level;
if (state_machine->get_state() == StateMachine::ESTABLISHED) {
log_level = Sandesh::LoggingUtLevel();
} else {
log_level = SandeshLevel::SYS_INFO;
}
BGP_LOG_PEER(Message, session->Peer(), log_level,
BGP_LOG_PEER(Message, session->peer(), log_level,
BGP_LOG_FLAG_SYSLOG, BGP_PEER_DIR_IN, "Keepalive");
}
static const char *Name() {
Expand All @@ -302,7 +302,7 @@ struct EvBgpNotification : sc::event<EvBgpNotification> {
log_level = SandeshLevel::SYS_NOTICE;
}
string peer_key =
session->Peer() ? session->Peer()->ToUVEKey() : session->ToString();
session->peer() ? session->peer()->ToUVEKey() : session->ToString();
BGP_LOG(BgpPeerNotification, log_level, BGP_LOG_FLAG_ALL, peer_key,
BGP_PEER_DIR_IN, msg->error, msg->subcode, msg->ToString());
}
Expand Down Expand Up @@ -1448,7 +1448,7 @@ void StateMachine::OnMessage(BgpSession *session, BgpProto::BgpMessage *msg,
case BgpProto::OPEN: {
BgpProto::OpenMessage *open_msg =
static_cast<BgpProto::OpenMessage *>(msg);
BgpPeer *peer = session->Peer();
BgpPeer *peer = session->peer();
peer->inc_rx_open();
if (int subcode = open_msg->Validate(peer)) {
Enqueue(fsm::EvBgpOpenError(session, subcode));
Expand All @@ -1460,13 +1460,13 @@ void StateMachine::OnMessage(BgpSession *session, BgpProto::BgpMessage *msg,
break;
}
case BgpProto::KEEPALIVE: {
BgpPeer *peer = session->Peer();
BgpPeer *peer = session->peer();
Enqueue(fsm::EvBgpKeepalive(session));
if (peer) peer->inc_rx_keepalive();
break;
}
case BgpProto::NOTIFICATION: {
BgpPeer *peer = session->Peer();
BgpPeer *peer = session->peer();
if (peer)
peer->inc_rx_notification();
Enqueue(fsm::EvBgpNotification(session,
Expand All @@ -1478,7 +1478,7 @@ void StateMachine::OnMessage(BgpSession *session, BgpProto::BgpMessage *msg,
BgpProto::Update *update = static_cast<BgpProto::Update *>(msg);
BgpPeer *peer = NULL;
if (session)
peer = session->Peer();
peer = session->peer();
if (peer)
peer->inc_rx_update();

Expand Down
4 changes: 2 additions & 2 deletions src/bgp/test/bgp_server_test.cc
Expand Up @@ -150,11 +150,11 @@ class BgpServerUnitTest : public ::testing::Test {
}

size_t GetSessionQueueSize(BgpSessionManager *session_manager) {
return session_manager->GetQueueSize();
return session_manager->GetSessionQueueSize();
}

void SetSessionQueueDisable(BgpSessionManager *session_manager, bool flag) {
session_manager->SetQueueDisable(flag);
session_manager->SetSessionQueueDisable(flag);
}

void SetupPeers(int peer_count, unsigned short port_a,
Expand Down
2 changes: 1 addition & 1 deletion src/bgp/test/bgp_session_test.cc
Expand Up @@ -83,7 +83,7 @@ class BgpSessionUnitTest : public ::testing::Test {
peer_ = static_cast<BgpPeerMock *>(
rti->peer_manager()->PeerLocate(&server_, config_.get()));
session_.reset(new BgpSessionTest(server_.session_manager()));
session_->SetPeer(peer_);
session_->set_peer(peer_);
}

virtual void TearDown() {
Expand Down
3 changes: 2 additions & 1 deletion src/io/tcp_server.h
Expand Up @@ -85,6 +85,8 @@ class TcpServer {
const std::string &md5_password);

protected:
typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;

// Create a session object.
virtual TcpSession *AllocSession(Socket *socket) = 0;

Expand Down Expand Up @@ -118,7 +120,6 @@ class TcpServer {
friend void intrusive_ptr_release(TcpServer *server);

typedef boost::intrusive_ptr<TcpServer> TcpServerPtr;
typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
struct TcpSessionPtrCmp {
bool operator()(const TcpSessionPtr &lhs,
const TcpSessionPtr &rhs) const {
Expand Down

0 comments on commit 35eb530

Please sign in to comment.