From 4f83e182922b27ac52ffc189aab20e4e61aa9712 Mon Sep 17 00:00:00 2001 From: Nischal Sheth Date: Thu, 9 Apr 2015 10:34:07 -0700 Subject: [PATCH] Fix concurrency issue in TcpSession::AsyncReadStart Arrange for async_read_some on the underlying socket to be called from the io thread instead of calling it directly from io::Reader Task. This commit addresses the read side concurrency issue with socket. The write side issue will be addressed via another commit. Note that we haven't seen any problems that can be attributed to write side concurrency yet. Make AsyncReadStart a noop for BgpSessionMock as the strand post operation interferes with EvmManager::RunOnce. Changes in state_machine_test.cc are needed to fix an existing bug that got exposed as a result of making AsyncReadStart virtual. The issue was that StateMachine::PassiveOpen enqueues EvTcpPassiveOpen and then calls AsyncReadStart on the BgpSession. There's a chance that the BgpSession gets deleted when processing EvTcpPassiveOpen i.e. before the call to AsyncReadStart. This is not a problem in production code because StateMachine::PassiveOpen is called from bgp::Config Task which is exclusive with bgp::StateMachine Task. However in the test, StateMachine::PassiveOpen is called from the main thread. Change-Id: If78b36c6fc7d45afd320b7e1ca245757c573deec Partial-Bug: 1441339 --- src/bgp/test/state_machine_test.cc | 15 +++++++++++++++ src/io/tcp_session.cc | 17 ++++++++++++++--- src/io/tcp_session.h | 6 +++++- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/bgp/test/state_machine_test.cc b/src/bgp/test/state_machine_test.cc index 8ab914ad93b..febb6e031bf 100644 --- a/src/bgp/test/state_machine_test.cc +++ b/src/bgp/test/state_machine_test.cc @@ -65,6 +65,9 @@ class BgpSessionMock : public BgpSession { return true; } + virtual void AsyncReadStart() { + } + void Close() { state_ = BgpSessionMock::CLOSE; BgpSession::Close(); @@ -175,6 +178,14 @@ class StateMachineUnitTest : public ::testing::Test { evm_.Shutdown(); } + void StopStateMachine() { + sm_->work_queue_.set_disable(true); + } + + void StartStateMachine() { + sm_->work_queue_.set_disable(false); + } + virtual void SetUp() { } @@ -402,15 +413,19 @@ class StateMachineUnitTest : public ::testing::Test { } void EvTcpPassiveOpen() { ConcurrencyScope scope("bgp::Config"); + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreatePassiveSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvTcpDuplicatePassiveOpen() { ConcurrencyScope scope("bgp::Config"); + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreateDuplicateSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvOpenTimerExpiredPassive() { EvTcpPassiveOpen(); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 44db6b87268..c1b49e4b1e3 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -54,7 +54,8 @@ TcpSession::TcpSession( established_(false), closed_(false), direction_(ACTIVE), - writer_(new TcpMessageWriter(socket, this)) { + writer_(new TcpMessageWriter(socket, this)), + name_("-") { refcount_ = 0; writer_->RegisterNotification( boost::bind(&TcpSession::WriteReadyInternal, this, _1)); @@ -62,7 +63,10 @@ TcpSession::TcpSession( TaskScheduler *scheduler = TaskScheduler::GetInstance(); reader_task_id_ = scheduler->GetTaskId("io::ReaderTask"); } - name_ = "-"; + if (server_) { + io_strand_.reset(new Strand(*server->event_manager()->io_service())); + } + } TcpSession::~TcpSession() { @@ -118,7 +122,7 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -void TcpSession::AsyncReadStart() { +void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { mutable_buffer buffer = AllocateBuffer(); tbb::mutex::scoped_lock lock(mutex_); if (!established_) { @@ -130,6 +134,13 @@ void TcpSession::AsyncReadStart() { boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } +void TcpSession::AsyncReadStart() { + if (io_strand_) { + io_strand_->post( + boost::bind(&TcpSession::AsyncReadStartInternal, this, TcpSessionPtr(this))); + } +} + TcpSession::Endpoint TcpSession::local_endpoint() const { tbb::mutex::scoped_lock lock(mutex_); if (!established_) { diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index 04f65ac71b6..7740ce77344 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -125,7 +126,7 @@ class TcpSession { void set_read_on_connect(bool read) { read_on_connect_ = read; } void SessionEstablished(Endpoint remote, Direction direction); - void AsyncReadStart(); + virtual void AsyncReadStart(); const io::SocketStats &GetSocketStats() const { return stats_; } void GetRxSocketStats(SocketIOStats &socket_stats) const; @@ -155,6 +156,7 @@ class TcpSession { friend void intrusive_ptr_release(TcpSession *session); typedef boost::intrusive_ptr TcpSessionPtr; typedef std::list BufferQueue; + typedef boost::asio::strand Strand; static void AsyncReadHandler(TcpSessionPtr session, boost::asio::mutable_buffer buffer, @@ -163,6 +165,7 @@ class TcpSession { static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); + void AsyncReadStartInternal(TcpSessionPtr session); void ReleaseBufferLocked(Buffer buffer); void CloseInternal(bool call_observer, bool notify_server = true); void SetEstablished(Endpoint remote, Direction dir); @@ -182,6 +185,7 @@ class TcpSession { TcpServerPtr server_; boost::scoped_ptr socket_; + boost::scoped_ptr io_strand_; bool read_on_connect_; int buffer_size_;