From 7853e1c2950b187cf9acec437a5bbff395ed4137 Mon Sep 17 00:00:00 2001 From: Ananth Suryanarayana Date: Mon, 1 Aug 2016 17:17:01 -0700 Subject: [PATCH] Execute socket->read() inside TcpSession client code protected by mutex Instead of letting read happen in boost asio library in the thread which runs the even, defer actual socket read call to TcpSsession (Or SslSession) code which is (already) protected by mutex. This ensures that socket read and write never happens in parallel. This is a requirement as boost::asio library is not thread safe per-se for parallel reads/writes over the same asio socket Change-Id: I25cc6b149d26579c1eb1f75965227135c26834e7 Closes-Bug: 1608731 --- src/io/ssl_session.cc | 46 +++++++------------------- src/io/ssl_session.h | 7 ++-- src/io/tcp_session.cc | 76 +++++++++++++++---------------------------- src/io/tcp_session.h | 15 +++------ 4 files changed, 44 insertions(+), 100 deletions(-) diff --git a/src/io/ssl_session.cc b/src/io/ssl_session.cc index 0ac7f822619..2833764b920 100644 --- a/src/io/ssl_session.cc +++ b/src/io/ssl_session.cc @@ -83,42 +83,18 @@ TcpSession::Socket *SslSession::socket() const { return &ssl_socket_->next_layer(); } -bool SslSession::AsyncReadHandlerProcess(mutable_buffer buffer, - size_t *bytes_transferred, - error_code &error) { - // no processing needed if ssl handshake is not complete. - if (!IsSslHandShakeSuccessLocked()) { - return false; - } +size_t SslSession::ReadSome(mutable_buffer buffer, error_code &error) { + // Read data from the tcp socket or from the ssl socket, as appropriate. + assert(!ssl_handshake_in_progress_); + if (!IsSslHandShakeSuccessLocked()) + return TcpSession::ReadSome(buffer, error); // do ssl read here in IO context, ignore errors - *bytes_transferred = ssl_socket_->read_some(mutable_buffers_1(buffer), - error); - - return true; -} - -void SslSession::AsyncReadSome(mutable_buffer buffer) { - if (IsSslHandShakeSuccessLocked()) { - // trigger read with null buffer to get indication for data available - // on the socket and then do the actuall socket read in - // AsyncReadHandlerProcess - socket()->async_read_some(null_buffers(), - bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), buffer, - placeholders::error, placeholders::bytes_transferred)); - } else { - // No tcp socket read/write while ssl handshake is ongoing - if (!ssl_handshake_in_progress_) { - socket()->async_read_some(mutable_buffers_1(buffer), - bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), - buffer, placeholders::error, - placeholders::bytes_transferred)); - } - } + return ssl_socket_->read_some(mutable_buffers_1(buffer), error); } size_t SslSession::WriteSome(const uint8_t *data, size_t len, - error_code &error) { + error_code &error) { if (IsSslHandShakeSuccessLocked()) { return ssl_socket_->write_some(buffer(data, len), error); @@ -130,16 +106,16 @@ size_t SslSession::WriteSome(const uint8_t *data, size_t len, void SslSession::AsyncWrite(const u_int8_t *data, size_t size) { if (IsSslHandShakeSuccessLocked()) { async_write(*ssl_socket_.get(), buffer(data, size), - bind(&TcpSession::AsyncWriteHandler, - TcpSessionPtr(this), placeholders::error)); + bind(&TcpSession::AsyncWriteHandler, + TcpSessionPtr(this), placeholders::error)); } else { return (TcpSession::AsyncWrite(data, size)); } } void SslSession::SslHandShakeCallback(SslHandShakeCallbackHandler cb, - SslSessionPtr session, - const error_code &error) { + SslSessionPtr session, + const error_code &error) { session->ssl_handshake_in_progress_ = false; if (!error) { diff --git a/src/io/ssl_session.h b/src/io/ssl_session.h index d704a02c27c..99dcebb1a50 100644 --- a/src/io/ssl_session.h +++ b/src/io/ssl_session.h @@ -64,11 +64,8 @@ class SslSession : public TcpSession { // SslSession do actual ssl socket read for data in this context with // session mutex held, to avoid concurrent read and write operations // on same socket. - bool AsyncReadHandlerProcess(boost::asio::mutable_buffer buffer, - size_t *bytes_transferred, - boost::system::error_code &error); - - void AsyncReadSome(boost::asio::mutable_buffer buffer); + size_t ReadSome(boost::asio::mutable_buffer buffer, + boost::system::error_code &error); std::size_t WriteSome(const uint8_t *data, std::size_t len, boost::system::error_code &error); void AsyncWrite(const u_int8_t *data, std::size_t size); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 98e5627dc96..3410d69d4a8 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -142,12 +142,6 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -bool TcpSession::AsyncReadHandlerProcess(mutable_buffer buffer, - size_t *bytes_transferred, - error_code &error) { - return false; -} - void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { // Update socket read block time. if (stats_.read_block_start_time) { @@ -157,19 +151,13 @@ void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { stats_.read_blocked_duration_usecs += blocked_usecs; server_->stats_.read_blocked_duration_usecs += blocked_usecs; } - mutable_buffer buffer = AllocateBuffer(); - tbb::mutex::scoped_lock lock(mutex_); - if (!established_) { - ReleaseBufferLocked(buffer); - return; - } - AsyncReadSome(buffer); + AsyncReadSome(); } void TcpSession::AsyncReadStart() { if (io_strand_) { io_strand_->post(bind(&TcpSession::AsyncReadStartInternal, this, - TcpSessionPtr(this))); + TcpSessionPtr(this))); } } @@ -193,12 +181,12 @@ void TcpSession::DeferWriter() { placeholders::error, UTCTimestampUsec())); } -void TcpSession::AsyncReadSome(mutable_buffer buffer) { - socket()->async_read_some(mutable_buffers_1(buffer), - bind(&TcpSession::AsyncReadHandler, - TcpSessionPtr(this), buffer, - placeholders::error, - placeholders::bytes_transferred)); +void TcpSession::AsyncReadSome() { + tbb::mutex::scoped_lock lock(mutex_); + if (established_) { + socket()->async_read_some(null_buffers(), + bind(&TcpSession::AsyncReadHandler, TcpSessionPtr(this))); + } } size_t TcpSession::WriteSome(const uint8_t *data, size_t len, @@ -208,8 +196,8 @@ size_t TcpSession::WriteSome(const uint8_t *data, size_t len, void TcpSession::AsyncWrite(const u_int8_t *data, size_t size) { async_write(*socket(), buffer(data, size), - bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this), - placeholders::error)); + bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this), + placeholders::error)); } TcpSession::Endpoint TcpSession::local_endpoint() const { @@ -416,7 +404,8 @@ bool TcpSession::Send(const u_int8_t *data, size_t size, size_t *sent) { CloseInternal(error, true); return false; } - if (len < 0 || (size_t)len != size) ret = false; + if ((size_t) len != size) + ret = false; if (sent) *sent = (len > 0) ? len : 0; } else { AsyncWrite(data, size); @@ -434,9 +423,12 @@ Task* TcpSession::CreateReaderTask(mutable_buffer buffer, return (task); } -void TcpSession::AsyncReadHandler( - TcpSessionPtr session, mutable_buffer buffer, - const error_code &error, size_t bytes_transferred) { +size_t TcpSession::ReadSome(mutable_buffer buffer, error_code &error) { + return socket()->read_some(mutable_buffers_1(buffer), error); +} + +void TcpSession::AsyncReadHandler(TcpSessionPtr session) { + mutable_buffer buffer = session->AllocateBuffer(); tbb::mutex::scoped_lock lock(session->mutex_); if (session->closed_) { @@ -444,36 +436,22 @@ void TcpSession::AsyncReadHandler( return; } + error_code error; + size_t bytes_transferred = session->ReadSome(buffer, error); if (IsSocketErrorHard(error)) { session->ReleaseBufferLocked(buffer); - // eof is returned when the peer closed the socket, no need to log err - if (error != error::eof) { - TCP_SESSION_LOG_ERROR(session, TCP_DIR_IN, - "Read failed due to error " << error.value() - << " : " << error.message()); - } + // eof is returned when the peer closed the socket, no need to log err + if (error != error::eof) { + TCP_SESSION_LOG_ERROR(session, TCP_DIR_IN, + "Read failed due to error " << error.value() + << " : " << error.message()); + } + lock.release(); session->CloseInternal(error, true); return; } - error_code err; - if (session->AsyncReadHandlerProcess(buffer, &bytes_transferred, err)) { - // check error code if session needs to be closed - if (IsSocketErrorHard(err)) { - session->ReleaseBufferLocked(buffer); - // eof is returned when the peer has closed the socket - if (err != error::eof) { - TCP_SESSION_LOG_ERROR(session, TCP_DIR_IN, - "Read failed due to error " << err.value() - << " : " << err.message()); - } - lock.release(); - session->CloseInternal(err, true); - return; - } - } - // Update read statistics. session->stats_.read_calls++; session->stats_.read_bytes += bytes_transferred; diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index 8571345a78a..e3132db5cc7 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -156,19 +156,10 @@ class TcpSession { protected: typedef boost::intrusive_ptr TcpSessionPtr; - static void AsyncReadHandler(TcpSessionPtr session, - boost::asio::mutable_buffer buffer, - const boost::system::error_code &error, - size_t size); + static void AsyncReadHandler(TcpSessionPtr session); static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); - // returns true if Processing done, used by SslSession to do actual - // synchronous read for data. - virtual bool AsyncReadHandlerProcess(boost::asio::mutable_buffer buffer, - size_t *bytes_transferred, - boost::system::error_code &error); - void AsyncReadStartInternal(TcpSessionPtr session); virtual Task* CreateReaderTask(boost::asio::mutable_buffer, size_t); @@ -179,7 +170,9 @@ class TcpSession { // Callback after socket is ready for write. virtual void WriteReady(const boost::system::error_code &error); - virtual void AsyncReadSome(boost::asio::mutable_buffer buffer); + void AsyncReadSome(); + virtual size_t ReadSome(boost::asio::mutable_buffer buffer, + boost::system::error_code &error); virtual std::size_t WriteSome(const uint8_t *data, std::size_t len, boost::system::error_code &error); virtual void AsyncWrite(const u_int8_t *data, std::size_t size);