diff --git a/src/bgp/test/state_machine_test.cc b/src/bgp/test/state_machine_test.cc index 2cbe590d0b6..3a7dfe51b21 100644 --- a/src/bgp/test/state_machine_test.cc +++ b/src/bgp/test/state_machine_test.cc @@ -64,6 +64,9 @@ class BgpSessionMock : public BgpSession { return true; } + virtual void AsyncReadStart() { + } + void Close() { state_ = BgpSessionMock::CLOSE; BgpSession::Close(); @@ -174,6 +177,14 @@ class StateMachineTest : public ::testing::Test { evm_.Shutdown(); } + void StopStateMachine() { + sm_->work_queue_.set_disable(true); + } + + void StartStateMachine() { + sm_->work_queue_.set_disable(false); + } + virtual void SetUp() { } @@ -400,14 +411,18 @@ class StateMachineTest : public ::testing::Test { sm_->FireOpenTimer(); } void EvTcpPassiveOpen() { + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreatePassiveSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvTcpDuplicatePassiveOpen() { + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreateDuplicateSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvOpenTimerExpiredPassive() { EvTcpPassiveOpen(); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 25f6de485fa..3b96f22b0db 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -53,7 +53,8 @@ TcpSession::TcpSession( established_(false), closed_(false), direction_(ACTIVE), - writer_(new TcpMessageWriter(socket, this)) { + writer_(new TcpMessageWriter(socket, this)), + name_("-") { refcount_ = 0; writer_->RegisterNotification( boost::bind(&TcpSession::WriteReadyInternal, this, _1)); @@ -61,7 +62,10 @@ TcpSession::TcpSession( TaskScheduler *scheduler = TaskScheduler::GetInstance(); reader_task_id_ = scheduler->GetTaskId("io::ReaderTask"); } - name_ = "-"; + if (server_) { + io_strand_.reset(new Strand(*server->event_manager()->io_service())); + } + } TcpSession::~TcpSession() { @@ -117,7 +121,7 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -void TcpSession::AsyncReadStart() { +void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { mutable_buffer buffer = AllocateBuffer(); tbb::mutex::scoped_lock lock(mutex_); if (!established_) { @@ -129,6 +133,13 @@ void TcpSession::AsyncReadStart() { boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } +void TcpSession::AsyncReadStart() { + if (io_strand_) { + io_strand_->post( + boost::bind(&TcpSession::AsyncReadStartInternal, this, TcpSessionPtr(this))); + } +} + TcpSession::Endpoint TcpSession::local_endpoint() const { tbb::mutex::scoped_lock lock(mutex_); if (!established_) { diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index c5a29b5ee37..48e8b0ea83f 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -125,7 +126,7 @@ class TcpSession { void set_read_on_connect(bool read) { read_on_connect_ = read; } void SessionEstablished(Endpoint remote, Direction direction); - void AsyncReadStart(); + virtual void AsyncReadStart(); const TcpServer::SocketStats &GetSocketStats() const { return stats_; } void GetRxSocketStats(TcpServerSocketStats &socket_stats) const; @@ -155,6 +156,7 @@ class TcpSession { friend void intrusive_ptr_release(TcpSession *session); typedef boost::intrusive_ptr TcpSessionPtr; typedef std::list BufferQueue; + typedef boost::asio::strand Strand; static void AsyncReadHandler(TcpSessionPtr session, boost::asio::mutable_buffer buffer, @@ -163,6 +165,7 @@ class TcpSession { static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); + void AsyncReadStartInternal(TcpSessionPtr session); void ReleaseBufferLocked(Buffer buffer); void CloseInternal(bool callObserver); void SetEstablished(Endpoint remote, Direction dir); @@ -182,6 +185,7 @@ class TcpSession { TcpServerPtr server_; boost::scoped_ptr socket_; + boost::scoped_ptr io_strand_; bool read_on_connect_; int buffer_size_;