From b7f6c4f5fdf52de74d6cba749b8f99726826e38b Mon Sep 17 00:00:00 2001 From: Nischal Sheth Date: Thu, 9 Apr 2015 10:34:07 -0700 Subject: [PATCH] Fix concurrency issue in TcpSession::AsyncReadStart Arrange for async_read_some on the underlying socket to be called from the io thread instead of calling it directly from io::Reader Task. This commit addresses the read side concurrency issue with socket. The write side issue will be addressed via another commit. Note that we haven't seen any problems that can be attributed to write side concurrency yet. Make AsyncReadStart a noop for BgpSessionMock as the strand post operation interferes with EvmManager::RunOnce. Changes in state_machine_test.cc are needed to fix an existing bug that got exposed as a result of making AsyncReadStart virtual. The issue was that StateMachine::PassiveOpen enqueues EvTcpPassiveOpen and then calls AsyncReadStart on the BgpSession. There's a chance that the BgpSession gets deleted when processing EvTcpPassiveOpen i.e. before the call to AsyncReadStart. This is not a problem in production code because StateMachine::PassiveOpen is called from bgp::Config Task which is exclusive with bgp::StateMachine Task. However in the test, StateMachine::PassiveOpen is called from the main thread. Change-Id: If78b36c6fc7d45afd320b7e1ca245757c573deec Partial-Bug: 1441339 --- src/bgp/test/state_machine_test.cc | 15 +++++++++++++++ src/io/tcp_session.cc | 17 ++++++++++++++--- src/io/tcp_session.h | 6 +++++- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/bgp/test/state_machine_test.cc b/src/bgp/test/state_machine_test.cc index 2cbe590d0b6..3a7dfe51b21 100644 --- a/src/bgp/test/state_machine_test.cc +++ b/src/bgp/test/state_machine_test.cc @@ -64,6 +64,9 @@ class BgpSessionMock : public BgpSession { return true; } + virtual void AsyncReadStart() { + } + void Close() { state_ = BgpSessionMock::CLOSE; BgpSession::Close(); @@ -174,6 +177,14 @@ class StateMachineTest : public ::testing::Test { evm_.Shutdown(); } + void StopStateMachine() { + sm_->work_queue_.set_disable(true); + } + + void StartStateMachine() { + sm_->work_queue_.set_disable(false); + } + virtual void SetUp() { } @@ -400,14 +411,18 @@ class StateMachineTest : public ::testing::Test { sm_->FireOpenTimer(); } void EvTcpPassiveOpen() { + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreatePassiveSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvTcpDuplicatePassiveOpen() { + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreateDuplicateSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvOpenTimerExpiredPassive() { EvTcpPassiveOpen(); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 25f6de485fa..3b96f22b0db 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -53,7 +53,8 @@ TcpSession::TcpSession( established_(false), closed_(false), direction_(ACTIVE), - writer_(new TcpMessageWriter(socket, this)) { + writer_(new TcpMessageWriter(socket, this)), + name_("-") { refcount_ = 0; writer_->RegisterNotification( boost::bind(&TcpSession::WriteReadyInternal, this, _1)); @@ -61,7 +62,10 @@ TcpSession::TcpSession( TaskScheduler *scheduler = TaskScheduler::GetInstance(); reader_task_id_ = scheduler->GetTaskId("io::ReaderTask"); } - name_ = "-"; + if (server_) { + io_strand_.reset(new Strand(*server->event_manager()->io_service())); + } + } TcpSession::~TcpSession() { @@ -117,7 +121,7 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -void TcpSession::AsyncReadStart() { +void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { mutable_buffer buffer = AllocateBuffer(); tbb::mutex::scoped_lock lock(mutex_); if (!established_) { @@ -129,6 +133,13 @@ void TcpSession::AsyncReadStart() { boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } +void TcpSession::AsyncReadStart() { + if (io_strand_) { + io_strand_->post( + boost::bind(&TcpSession::AsyncReadStartInternal, this, TcpSessionPtr(this))); + } +} + TcpSession::Endpoint TcpSession::local_endpoint() const { tbb::mutex::scoped_lock lock(mutex_); if (!established_) { diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index c5a29b5ee37..48e8b0ea83f 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -125,7 +126,7 @@ class TcpSession { void set_read_on_connect(bool read) { read_on_connect_ = read; } void SessionEstablished(Endpoint remote, Direction direction); - void AsyncReadStart(); + virtual void AsyncReadStart(); const TcpServer::SocketStats &GetSocketStats() const { return stats_; } void GetRxSocketStats(TcpServerSocketStats &socket_stats) const; @@ -155,6 +156,7 @@ class TcpSession { friend void intrusive_ptr_release(TcpSession *session); typedef boost::intrusive_ptr TcpSessionPtr; typedef std::list BufferQueue; + typedef boost::asio::strand Strand; static void AsyncReadHandler(TcpSessionPtr session, boost::asio::mutable_buffer buffer, @@ -163,6 +165,7 @@ class TcpSession { static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); + void AsyncReadStartInternal(TcpSessionPtr session); void ReleaseBufferLocked(Buffer buffer); void CloseInternal(bool callObserver); void SetEstablished(Endpoint remote, Direction dir); @@ -182,6 +185,7 @@ class TcpSession { TcpServerPtr server_; boost::scoped_ptr socket_; + boost::scoped_ptr io_strand_; bool read_on_connect_; int buffer_size_;