diff --git a/src/bgp/test/state_machine_test.cc b/src/bgp/test/state_machine_test.cc index 2bbd2e47cc1..44986bdee55 100644 --- a/src/bgp/test/state_machine_test.cc +++ b/src/bgp/test/state_machine_test.cc @@ -66,6 +66,9 @@ class BgpSessionMock : public BgpSession { return true; } + virtual void AsyncReadStart() { + } + void Close() { state_ = BgpSessionMock::CLOSE; BgpSession::Close(); @@ -176,6 +179,14 @@ class StateMachineUnitTest : public ::testing::Test { evm_.Shutdown(); } + void StopStateMachine() { + sm_->work_queue_.set_disable(true); + } + + void StartStateMachine() { + sm_->work_queue_.set_disable(false); + } + virtual void SetUp() { } @@ -403,15 +414,19 @@ class StateMachineUnitTest : public ::testing::Test { } void EvTcpPassiveOpen() { ConcurrencyScope scope("bgp::Config"); + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreatePassiveSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvTcpDuplicatePassiveOpen() { ConcurrencyScope scope("bgp::Config"); + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreateDuplicateSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvOpenTimerExpiredPassive() { EvTcpPassiveOpen(); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 22d5c7f34ba..a53a3b541dc 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -52,13 +52,16 @@ TcpSession::TcpSession( established_(false), closed_(false), direction_(ACTIVE), - writer_(new TcpMessageWriter(this)) { + writer_(new TcpMessageWriter(this)), + name_("-") { refcount_ = 0; if (reader_task_id_ == -1) { TaskScheduler *scheduler = TaskScheduler::GetInstance(); reader_task_id_ = scheduler->GetTaskId("io::ReaderTask"); } - name_ = "-"; + if (server_) { + io_strand_.reset(new Strand(*server->event_manager()->io_service())); + } } TcpSession::~TcpSession() { @@ -114,7 +117,7 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -void TcpSession::AsyncReadStart() { +void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { mutable_buffer buffer = AllocateBuffer(); tbb::mutex::scoped_lock lock(mutex_); if (!established_) { @@ -124,6 +127,13 @@ void TcpSession::AsyncReadStart() { AsyncReadSome(buffer); } +void TcpSession::AsyncReadStart() { + if (io_strand_) { + io_strand_->post(boost::bind( + &TcpSession::AsyncReadStartInternal, this, TcpSessionPtr(this))); + } +} + void TcpSession::DeferWriter() { // Update socket write block count. stats_.write_blocked++; diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index b426c859561..2b1797c7c57 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -135,7 +136,7 @@ class TcpSession { void set_read_on_connect(bool read) { read_on_connect_ = read; } void SessionEstablished(Endpoint remote, Direction direction); - void AsyncReadStart(); + virtual void AsyncReadStart(); const io::SocketStats &GetSocketStats() const { return stats_; } void GetRxSocketStats(SocketIOStats &socket_stats) const; @@ -152,6 +153,7 @@ class TcpSession { static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); + void AsyncReadStartInternal(TcpSessionPtr session); virtual Task* CreateReaderTask(boost::asio::mutable_buffer, size_t); virtual ~TcpSession(); @@ -186,6 +188,7 @@ class TcpSession { friend void intrusive_ptr_add_ref(TcpSession *session); friend void intrusive_ptr_release(TcpSession *session); typedef std::list BufferQueue; + typedef boost::asio::strand Strand; static void WriteReadyInternal(TcpSessionPtr session, const boost::system::error_code &error, @@ -208,6 +211,7 @@ class TcpSession { TcpServerPtr server_; boost::scoped_ptr socket_; + boost::scoped_ptr io_strand_; bool read_on_connect_; int buffer_size_;