diff --git a/src/io/ssl_session.cc b/src/io/ssl_session.cc index 0ac7f822619..2833764b920 100644 --- a/src/io/ssl_session.cc +++ b/src/io/ssl_session.cc @@ -83,42 +83,18 @@ TcpSession::Socket *SslSession::socket() const { return &ssl_socket_->next_layer(); } -bool SslSession::AsyncReadHandlerProcess(mutable_buffer buffer, - size_t *bytes_transferred, - error_code &error) { - // no processing needed if ssl handshake is not complete. - if (!IsSslHandShakeSuccessLocked()) { - return false; - } +size_t SslSession::ReadSome(mutable_buffer buffer, error_code &error) { + // Read data from the tcp socket or from the ssl socket, as appropriate. + assert(!ssl_handshake_in_progress_); + if (!IsSslHandShakeSuccessLocked()) + return TcpSession::ReadSome(buffer, error); // do ssl read here in IO context, ignore errors - *bytes_transferred = ssl_socket_->read_some(mutable_buffers_1(buffer), - error); - - return true; -} - -void SslSession::AsyncReadSome(mutable_buffer buffer) { - if (IsSslHandShakeSuccessLocked()) { - // trigger read with null buffer to get indication for data available - // on the socket and then do the actuall socket read in - // AsyncReadHandlerProcess - socket()->async_read_some(null_buffers(), - bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), buffer, - placeholders::error, placeholders::bytes_transferred)); - } else { - // No tcp socket read/write while ssl handshake is ongoing - if (!ssl_handshake_in_progress_) { - socket()->async_read_some(mutable_buffers_1(buffer), - bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), - buffer, placeholders::error, - placeholders::bytes_transferred)); - } - } + return ssl_socket_->read_some(mutable_buffers_1(buffer), error); } size_t SslSession::WriteSome(const uint8_t *data, size_t len, - error_code &error) { + error_code &error) { if (IsSslHandShakeSuccessLocked()) { return ssl_socket_->write_some(buffer(data, len), error); @@ -130,16 +106,16 @@ size_t SslSession::WriteSome(const uint8_t *data, size_t len, void SslSession::AsyncWrite(const u_int8_t *data, size_t size) { if (IsSslHandShakeSuccessLocked()) { async_write(*ssl_socket_.get(), buffer(data, size), - bind(&TcpSession::AsyncWriteHandler, - TcpSessionPtr(this), placeholders::error)); + bind(&TcpSession::AsyncWriteHandler, + TcpSessionPtr(this), placeholders::error)); } else { return (TcpSession::AsyncWrite(data, size)); } } void SslSession::SslHandShakeCallback(SslHandShakeCallbackHandler cb, - SslSessionPtr session, - const error_code &error) { + SslSessionPtr session, + const error_code &error) { session->ssl_handshake_in_progress_ = false; if (!error) { diff --git a/src/io/ssl_session.h b/src/io/ssl_session.h index d704a02c27c..99dcebb1a50 100644 --- a/src/io/ssl_session.h +++ b/src/io/ssl_session.h @@ -64,11 +64,8 @@ class SslSession : public TcpSession { // SslSession do actual ssl socket read for data in this context with // session mutex held, to avoid concurrent read and write operations // on same socket. - bool AsyncReadHandlerProcess(boost::asio::mutable_buffer buffer, - size_t *bytes_transferred, - boost::system::error_code &error); - - void AsyncReadSome(boost::asio::mutable_buffer buffer); + size_t ReadSome(boost::asio::mutable_buffer buffer, + boost::system::error_code &error); std::size_t WriteSome(const uint8_t *data, std::size_t len, boost::system::error_code &error); void AsyncWrite(const u_int8_t *data, std::size_t size); diff --git a/src/io/tcp_session.cc b/src/io/tcp_session.cc index 98e5627dc96..3410d69d4a8 100644 --- a/src/io/tcp_session.cc +++ b/src/io/tcp_session.cc @@ -142,12 +142,6 @@ void TcpSession::ReleaseBufferLocked(Buffer buffer) { assert(false); } -bool TcpSession::AsyncReadHandlerProcess(mutable_buffer buffer, - size_t *bytes_transferred, - error_code &error) { - return false; -} - void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { // Update socket read block time. if (stats_.read_block_start_time) { @@ -157,19 +151,13 @@ void TcpSession::AsyncReadStartInternal(TcpSessionPtr session) { stats_.read_blocked_duration_usecs += blocked_usecs; server_->stats_.read_blocked_duration_usecs += blocked_usecs; } - mutable_buffer buffer = AllocateBuffer(); - tbb::mutex::scoped_lock lock(mutex_); - if (!established_) { - ReleaseBufferLocked(buffer); - return; - } - AsyncReadSome(buffer); + AsyncReadSome(); } void TcpSession::AsyncReadStart() { if (io_strand_) { io_strand_->post(bind(&TcpSession::AsyncReadStartInternal, this, - TcpSessionPtr(this))); + TcpSessionPtr(this))); } } @@ -193,12 +181,12 @@ void TcpSession::DeferWriter() { placeholders::error, UTCTimestampUsec())); } -void TcpSession::AsyncReadSome(mutable_buffer buffer) { - socket()->async_read_some(mutable_buffers_1(buffer), - bind(&TcpSession::AsyncReadHandler, - TcpSessionPtr(this), buffer, - placeholders::error, - placeholders::bytes_transferred)); +void TcpSession::AsyncReadSome() { + tbb::mutex::scoped_lock lock(mutex_); + if (established_) { + socket()->async_read_some(null_buffers(), + bind(&TcpSession::AsyncReadHandler, TcpSessionPtr(this))); + } } size_t TcpSession::WriteSome(const uint8_t *data, size_t len, @@ -208,8 +196,8 @@ size_t TcpSession::WriteSome(const uint8_t *data, size_t len, void TcpSession::AsyncWrite(const u_int8_t *data, size_t size) { async_write(*socket(), buffer(data, size), - bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this), - placeholders::error)); + bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this), + placeholders::error)); } TcpSession::Endpoint TcpSession::local_endpoint() const { @@ -416,7 +404,8 @@ bool TcpSession::Send(const u_int8_t *data, size_t size, size_t *sent) { CloseInternal(error, true); return false; } - if (len < 0 || (size_t)len != size) ret = false; + if ((size_t) len != size) + ret = false; if (sent) *sent = (len > 0) ? len : 0; } else { AsyncWrite(data, size); @@ -434,9 +423,12 @@ Task* TcpSession::CreateReaderTask(mutable_buffer buffer, return (task); } -void TcpSession::AsyncReadHandler( - TcpSessionPtr session, mutable_buffer buffer, - const error_code &error, size_t bytes_transferred) { +size_t TcpSession::ReadSome(mutable_buffer buffer, error_code &error) { + return socket()->read_some(mutable_buffers_1(buffer), error); +} + +void TcpSession::AsyncReadHandler(TcpSessionPtr session) { + mutable_buffer buffer = session->AllocateBuffer(); tbb::mutex::scoped_lock lock(session->mutex_); if (session->closed_) { @@ -444,36 +436,22 @@ void TcpSession::AsyncReadHandler( return; } + error_code error; + size_t bytes_transferred = session->ReadSome(buffer, error); if (IsSocketErrorHard(error)) { session->ReleaseBufferLocked(buffer); - // eof is returned when the peer closed the socket, no need to log err - if (error != error::eof) { - TCP_SESSION_LOG_ERROR(session, TCP_DIR_IN, - "Read failed due to error " << error.value() - << " : " << error.message()); - } + // eof is returned when the peer closed the socket, no need to log err + if (error != error::eof) { + TCP_SESSION_LOG_ERROR(session, TCP_DIR_IN, + "Read failed due to error " << error.value() + << " : " << error.message()); + } + lock.release(); session->CloseInternal(error, true); return; } - error_code err; - if (session->AsyncReadHandlerProcess(buffer, &bytes_transferred, err)) { - // check error code if session needs to be closed - if (IsSocketErrorHard(err)) { - session->ReleaseBufferLocked(buffer); - // eof is returned when the peer has closed the socket - if (err != error::eof) { - TCP_SESSION_LOG_ERROR(session, TCP_DIR_IN, - "Read failed due to error " << err.value() - << " : " << err.message()); - } - lock.release(); - session->CloseInternal(err, true); - return; - } - } - // Update read statistics. session->stats_.read_calls++; session->stats_.read_bytes += bytes_transferred; diff --git a/src/io/tcp_session.h b/src/io/tcp_session.h index 8571345a78a..e3132db5cc7 100644 --- a/src/io/tcp_session.h +++ b/src/io/tcp_session.h @@ -156,19 +156,10 @@ class TcpSession { protected: typedef boost::intrusive_ptr TcpSessionPtr; - static void AsyncReadHandler(TcpSessionPtr session, - boost::asio::mutable_buffer buffer, - const boost::system::error_code &error, - size_t size); + static void AsyncReadHandler(TcpSessionPtr session); static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error); - // returns true if Processing done, used by SslSession to do actual - // synchronous read for data. - virtual bool AsyncReadHandlerProcess(boost::asio::mutable_buffer buffer, - size_t *bytes_transferred, - boost::system::error_code &error); - void AsyncReadStartInternal(TcpSessionPtr session); virtual Task* CreateReaderTask(boost::asio::mutable_buffer, size_t); @@ -179,7 +170,9 @@ class TcpSession { // Callback after socket is ready for write. virtual void WriteReady(const boost::system::error_code &error); - virtual void AsyncReadSome(boost::asio::mutable_buffer buffer); + void AsyncReadSome(); + virtual size_t ReadSome(boost::asio::mutable_buffer buffer, + boost::system::error_code &error); virtual std::size_t WriteSome(const uint8_t *data, std::size_t len, boost::system::error_code &error); virtual void AsyncWrite(const u_int8_t *data, std::size_t size);