Skip to content

Commit

Permalink
Fix concurrency issue in TcpSession::AsyncReadStart
Browse files Browse the repository at this point in the history
Arrange for async_read_some on the underlying socket to be called
from the io thread instead of calling it directly from io::Reader
Task.

This commit addresses the read side concurrency issue with socket.
The write side issue will be addressed via another commit.  Note
that we haven't seen any problems that can be attributed to write
side concurrency yet.

Make AsyncReadStart a noop for BgpSessionMock as the strand post
operation interferes with EvmManager::RunOnce.

Changes in state_machine_test.cc are needed to fix an existing bug
that got exposed as a result of making AsyncReadStart virtual. The
issue was that StateMachine::PassiveOpen enqueues EvTcpPassiveOpen
and then calls AsyncReadStart on the BgpSession. There's a chance
that the BgpSession gets deleted when processing EvTcpPassiveOpen
i.e. before the call to AsyncReadStart. This is not a problem in
production code because StateMachine::PassiveOpen is called from
bgp::Config Task which is exclusive with bgp::StateMachine Task.
However in the test, StateMachine::PassiveOpen is called from the
main thread.

Change-Id: If78b36c6fc7d45afd320b7e1ca245757c573deec
Partial-Bug: 1441339
  • Loading branch information
Nischal Sheth committed Apr 9, 2015
1 parent e857b31 commit 4f83e18
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
15 changes: 15 additions & 0 deletions src/bgp/test/state_machine_test.cc
Expand Up @@ -65,6 +65,9 @@ class BgpSessionMock : public BgpSession {
return true;
}

virtual void AsyncReadStart() {
}

void Close() {
state_ = BgpSessionMock::CLOSE;
BgpSession::Close();
Expand Down Expand Up @@ -175,6 +178,14 @@ class StateMachineUnitTest : public ::testing::Test {
evm_.Shutdown();
}

void StopStateMachine() {
sm_->work_queue_.set_disable(true);
}

void StartStateMachine() {
sm_->work_queue_.set_disable(false);
}

virtual void SetUp() {
}

Expand Down Expand Up @@ -402,15 +413,19 @@ class StateMachineUnitTest : public ::testing::Test {
}
void EvTcpPassiveOpen() {
ConcurrencyScope scope("bgp::Config");
StopStateMachine();
BgpSessionMock *session = session_mgr_->CreatePassiveSession();
TcpSession::Endpoint endpoint;
peer_->AcceptSession(session);
StartStateMachine();
}
void EvTcpDuplicatePassiveOpen() {
ConcurrencyScope scope("bgp::Config");
StopStateMachine();
BgpSessionMock *session = session_mgr_->CreateDuplicateSession();
TcpSession::Endpoint endpoint;
peer_->AcceptSession(session);
StartStateMachine();
}
void EvOpenTimerExpiredPassive() {
EvTcpPassiveOpen();
Expand Down
17 changes: 14 additions & 3 deletions src/io/tcp_session.cc
Expand Up @@ -54,15 +54,19 @@ TcpSession::TcpSession(
established_(false),
closed_(false),
direction_(ACTIVE),
writer_(new TcpMessageWriter(socket, this)) {
writer_(new TcpMessageWriter(socket, this)),
name_("-") {
refcount_ = 0;
writer_->RegisterNotification(
boost::bind(&TcpSession::WriteReadyInternal, this, _1));
if (reader_task_id_ == -1) {
TaskScheduler *scheduler = TaskScheduler::GetInstance();
reader_task_id_ = scheduler->GetTaskId("io::ReaderTask");
}
name_ = "-";
if (server_) {
io_strand_.reset(new Strand(*server->event_manager()->io_service()));
}

}

TcpSession::~TcpSession() {
Expand Down Expand Up @@ -118,7 +122,7 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) {
assert(false);
}

void TcpSession::AsyncReadStart() {
void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) {
mutable_buffer buffer = AllocateBuffer();
tbb::mutex::scoped_lock lock(mutex_);
if (!established_) {
Expand All @@ -130,6 +134,13 @@ void TcpSession::AsyncReadStart() {
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void TcpSession::AsyncReadStart() {
if (io_strand_) {
io_strand_->post(
boost::bind(&TcpSession::AsyncReadStartInternal, this, TcpSessionPtr(this)));
}
}

TcpSession::Endpoint TcpSession::local_endpoint() const {
tbb::mutex::scoped_lock lock(mutex_);
if (!established_) {
Expand Down
6 changes: 5 additions & 1 deletion src/io/tcp_session.h
Expand Up @@ -11,6 +11,7 @@
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/function.hpp>
#include <boost/scoped_ptr.hpp>
Expand Down Expand Up @@ -125,7 +126,7 @@ class TcpSession {
void set_read_on_connect(bool read) { read_on_connect_ = read; }
void SessionEstablished(Endpoint remote, Direction direction);

void AsyncReadStart();
virtual void AsyncReadStart();

const io::SocketStats &GetSocketStats() const { return stats_; }
void GetRxSocketStats(SocketIOStats &socket_stats) const;
Expand Down Expand Up @@ -155,6 +156,7 @@ class TcpSession {
friend void intrusive_ptr_release(TcpSession *session);
typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
typedef std::list<boost::asio::mutable_buffer> BufferQueue;
typedef boost::asio::strand Strand;

static void AsyncReadHandler(TcpSessionPtr session,
boost::asio::mutable_buffer buffer,
Expand All @@ -163,6 +165,7 @@ class TcpSession {
static void AsyncWriteHandler(TcpSessionPtr session,
const boost::system::error_code &error);

void AsyncReadStartInternal(TcpSessionPtr session);
void ReleaseBufferLocked(Buffer buffer);
void CloseInternal(bool call_observer, bool notify_server = true);
void SetEstablished(Endpoint remote, Direction dir);
Expand All @@ -182,6 +185,7 @@ class TcpSession {

TcpServerPtr server_;
boost::scoped_ptr<Socket> socket_;
boost::scoped_ptr<Strand> io_strand_;
bool read_on_connect_;
int buffer_size_;

Expand Down

0 comments on commit 4f83e18

Please sign in to comment.