diff --git a/src/bgp/test/state_machine_test.cc b/src/bgp/test/state_machine_test.cc index 8ab914ad93b..febb6e031bf 100644 --- a/src/bgp/test/state_machine_test.cc +++ b/src/bgp/test/state_machine_test.cc @@ -65,6 +65,9 @@ class BgpSessionMock : public BgpSession { return true; } + virtual void AsyncReadStart() { + } + void Close() { state_ = BgpSessionMock::CLOSE; BgpSession::Close(); @@ -175,6 +178,14 @@ class StateMachineUnitTest : public ::testing::Test { evm_.Shutdown(); } + void StopStateMachine() { + sm_->work_queue_.set_disable(true); + } + + void StartStateMachine() { + sm_->work_queue_.set_disable(false); + } + virtual void SetUp() { } @@ -402,15 +413,19 @@ class StateMachineUnitTest : public ::testing::Test { } void EvTcpPassiveOpen() { ConcurrencyScope scope("bgp::Config"); + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreatePassiveSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvTcpDuplicatePassiveOpen() { ConcurrencyScope scope("bgp::Config"); + StopStateMachine(); BgpSessionMock *session = session_mgr_->CreateDuplicateSession(); TcpSession::Endpoint endpoint; peer_->AcceptSession(session); + StartStateMachine(); } void EvOpenTimerExpiredPassive() { EvTcpPassiveOpen(); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 44db6b87268..c1b49e4b1e3 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -54,7 +54,8 @@ TcpSession::TcpSession( established_(false), closed_(false), direction_(ACTIVE), - writer_(new TcpMessageWriter(socket, this)) { + writer_(new TcpMessageWriter(socket, this)), + name_("-") { refcount_ = 0; writer_->RegisterNotification( boost::bind(&TcpSession::WriteReadyInternal, this, _1)); @@ -62,7 +63,10 @@ TcpSession::TcpSession( TaskScheduler *scheduler = TaskScheduler::GetInstance(); reader_task_id_ = scheduler->GetTaskId("io::ReaderTask"); } - name_ = "-"; + if (server_) { + io_strand_.reset(new Strand(*server->event_manager()->io_service())); + } + } TcpSession::~TcpSession() { @@ -118,7 +122,7 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -void TcpSession::AsyncReadStart() { +void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { mutable_buffer buffer = AllocateBuffer(); tbb::mutex::scoped_lock lock(mutex_); if (!established_) { @@ -130,6 +134,13 @@ void TcpSession::AsyncReadStart() { boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } +void TcpSession::AsyncReadStart() { + if (io_strand_) { + io_strand_->post( + boost::bind(&TcpSession::AsyncReadStartInternal, this, TcpSessionPtr(this))); + } +} + TcpSession::Endpoint TcpSession::local_endpoint() const { tbb::mutex::scoped_lock lock(mutex_); if (!established_) { diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index 04f65ac71b6..7740ce77344 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -125,7 +126,7 @@ class TcpSession { void set_read_on_connect(bool read) { read_on_connect_ = read; } void SessionEstablished(Endpoint remote, Direction direction); - void AsyncReadStart(); + virtual void AsyncReadStart(); const io::SocketStats &GetSocketStats() const { return stats_; } void GetRxSocketStats(SocketIOStats &socket_stats) const; @@ -155,6 +156,7 @@ class TcpSession { friend void intrusive_ptr_release(TcpSession *session); typedef boost::intrusive_ptr TcpSessionPtr; typedef std::list BufferQueue; + typedef boost::asio::strand Strand; static void AsyncReadHandler(TcpSessionPtr session, boost::asio::mutable_buffer buffer, @@ -163,6 +165,7 @@ class TcpSession { static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); + void AsyncReadStartInternal(TcpSessionPtr session); void ReleaseBufferLocked(Buffer buffer); void CloseInternal(bool call_observer, bool notify_server = true); void SetEstablished(Endpoint remote, Direction dir); @@ -182,6 +185,7 @@ class TcpSession { TcpServerPtr server_; boost::scoped_ptr socket_; + boost::scoped_ptr io_strand_; bool read_on_connect_; int buffer_size_;