Skip to content

Commit

Permalink
Merge "Cleanup TCP Code name space usages"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 5, 2016
2 parents 64bc62a + c8b3333 commit 3c69097
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 134 deletions.
84 changes: 48 additions & 36 deletions src/io/ssl_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,26 @@
#include <boost/bind.hpp>

#include "ssl_session.h"
#include "io/io_utils.h"
#include "io/io_log.h"
#include "io/io_utils.h"

using boost::asio::async_write;
using boost::asio::buffer;
using boost::asio::mutable_buffer;
using boost::asio::null_buffers;
using boost::bind;
using boost::function;
using boost::system::error_code;
using std::size_t;
using std::srand;
using std::string;
using std::time;

using namespace boost::asio;

class SslSession::SslReader : public Task {
public:
typedef boost::function<void(Buffer)> ReadHandler;
typedef function<void(Buffer)> ReadHandler;

SslReader(int task_id, SslSessionPtr session, ReadHandler read_fn,
Buffer buffer)
Expand All @@ -31,7 +43,7 @@ class SslSession::SslReader : public Task {
}
return true;
}
std::string Description() const { return "SslSession::SslReader"; }
string Description() const { return "SslSession::SslReader"; }
private:
SslSessionPtr session_;
ReadHandler read_fn_;
Expand All @@ -56,12 +68,12 @@ SslSession::SslSession(SslServer *server, SslSocket *ssl_socket,
SslSession::~SslSession() {
}

Task* SslSession::CreateReaderTask(boost::asio::mutable_buffer buffer,
Task* SslSession::CreateReaderTask(mutable_buffer buffer,
size_t bytes_transferred) {

Buffer rdbuf(buffer_cast<const uint8_t *>(buffer), bytes_transferred);
SslReader *task = new SslReader(this->reader_task_id(),
SslSessionPtr(this), boost::bind(&SslSession::OnRead, this, _1), rdbuf);
SslSessionPtr(this), bind(&SslSession::OnRead, this, _1), rdbuf);
return (task);
}

Expand All @@ -71,64 +83,63 @@ TcpSession::Socket *SslSession::socket() const {
return &ssl_socket_->next_layer();
}

bool SslSession::AsyncReadHandlerProcess(boost::asio::mutable_buffer buffer,
size_t &bytes_transferred,
boost::system::error_code &error) {
bool SslSession::AsyncReadHandlerProcess(mutable_buffer buffer,
size_t *bytes_transferred,
error_code &error) {
// no processing needed if ssl handshake is not complete.
if (!IsSslHandShakeSuccessLocked()) {
return false;
}

// do ssl read here in IO context, ignore errors
bytes_transferred = ssl_socket_->read_some(mutable_buffers_1(buffer), error);
*bytes_transferred = ssl_socket_->read_some(mutable_buffers_1(buffer),
error);

return true;
}

void SslSession::AsyncReadSome(boost::asio::mutable_buffer buffer) {
void SslSession::AsyncReadSome(mutable_buffer buffer) {
if (IsSslHandShakeSuccessLocked()) {
// trigger read with null buffer to get indication for data available
// on the socket and then do the actuall socket read in
// AsyncReadHandlerProcess
socket()->async_read_some(boost::asio::null_buffers(),
boost::bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
socket()->async_read_some(null_buffers(),
bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), buffer,
placeholders::error, placeholders::bytes_transferred));
} else {
// No tcp socket read/write while ssl handshake is ongoing
if (!ssl_handshake_in_progress_) {
socket()->async_read_some(mutable_buffers_1(buffer),
boost::bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this), buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
bind(&TcpSession::AsyncReadHandler, SslSessionPtr(this),
buffer, placeholders::error,
placeholders::bytes_transferred));
}
}
}

std::size_t SslSession::WriteSome(const uint8_t *data, std::size_t len,
boost::system::error_code &error) {
size_t SslSession::WriteSome(const uint8_t *data, size_t len,
error_code &error) {

if (IsSslHandShakeSuccessLocked()) {
return ssl_socket_->write_some(boost::asio::buffer(data, len), error);
return ssl_socket_->write_some(buffer(data, len), error);
} else {
return (TcpSession::WriteSome(data, len, error));
}
}

void SslSession::AsyncWrite(const u_int8_t *data, std::size_t size) {
void SslSession::AsyncWrite(const u_int8_t *data, size_t size) {
if (IsSslHandShakeSuccessLocked()) {
boost::asio::async_write(
*ssl_socket_.get(), buffer(data, size),
boost::bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this),
boost::asio::placeholders::error));
async_write(*ssl_socket_.get(), buffer(data, size),
bind(&TcpSession::AsyncWriteHandler,
TcpSessionPtr(this), placeholders::error));
} else {
return (TcpSession::AsyncWrite(data, size));
}
}

void SslSession::SslHandShakeCallback(SslHandShakeCallbackHandler cb,
SslSessionPtr session,
const boost::system::error_code &error) {
const error_code &error) {

session->ssl_handshake_in_progress_ = false;
if (!error) {
Expand All @@ -142,22 +153,23 @@ void SslSession::SslHandShakeCallback(SslHandShakeCallbackHandler cb,
}
}

void SslSession::TriggerSslHandShakeInternal(SslSessionPtr session, SslHandShakeCallbackHandler cb) {
std::srand(std::time(0));
boost::system::error_code ec;
void SslSession::TriggerSslHandShakeInternal(
SslSessionPtr session, SslHandShakeCallbackHandler cb) {
srand(time(0));
error_code ec;
session->ssl_handshake_in_progress_ = true;
if (session->IsServerSession()) {
session->ssl_socket_->async_handshake(boost::asio::ssl::stream_base::server,
boost::bind(&SslSession::SslHandShakeCallback, cb, session,
boost::asio::placeholders::error));
session->ssl_socket_->async_handshake(ssl::stream_base::server,
bind(&SslSession::SslHandShakeCallback, cb, session,
placeholders::error));
} else {
session->ssl_socket_->async_handshake(boost::asio::ssl::stream_base::client,
boost::bind(&SslSession::SslHandShakeCallback, cb, session,
boost::asio::placeholders::error));
session->ssl_socket_->async_handshake(ssl::stream_base::client,
bind(&SslSession::SslHandShakeCallback, cb, session,
placeholders::error));
}
}

void SslSession::TriggerSslHandShake(SslHandShakeCallbackHandler cb) {
server()->event_manager()->io_service()->post(
boost::bind(&TriggerSslHandShakeInternal, SslSessionPtr(this), cb));
bind(&TriggerSslHandShakeInternal, SslSessionPtr(this), cb));
}
2 changes: 1 addition & 1 deletion src/io/ssl_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class SslSession : public TcpSession {
// session mutex held, to avoid concurrent read and write operations
// on same socket.
bool AsyncReadHandlerProcess(boost::asio::mutable_buffer buffer,
size_t &bytes_transferred,
size_t *bytes_transferred,
boost::system::error_code &error);

void AsyncReadSome(boost::asio::mutable_buffer buffer);
Expand Down
37 changes: 21 additions & 16 deletions src/io/tcp_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
#include "io/io_log.h"
#include "io/io_utils.h"

using boost::asio::placeholders::error;
using boost::asio::socket_base;
using boost::bind;
using boost::system::error_code;

using namespace boost::asio::ip;
using namespace std;

Expand Down Expand Up @@ -54,15 +59,15 @@ bool TcpServer::Initialize(unsigned short port) {
}

tcp::endpoint localaddr(tcp::v4(), port);
boost::system::error_code ec;
error_code ec;
acceptor_->open(localaddr.protocol(), ec);
if (ec) {
TCP_SERVER_LOG_ERROR(this, TCP_DIR_NA, "TCP open: " << ec.message());
ResetAcceptor();
return false;
}

acceptor_->set_option(boost::asio::socket_base::reuse_address(true), ec);
acceptor_->set_option(socket_base::reuse_address(true), ec);
if (ec) {
TCP_SERVER_LOG_ERROR(this, TCP_DIR_NA, "TCP reuse_address: "
<< ec.message());
Expand Down Expand Up @@ -91,7 +96,7 @@ bool TcpServer::Initialize(unsigned short port) {
//
SetName(local_endpoint);

acceptor_->listen(boost::asio::socket_base::max_connections, ec);
acceptor_->listen(socket_base::max_connections, ec);
if (ec) {
TCP_SERVER_LOG_ERROR(this, TCP_DIR_NA, "TCP listen(" << port << "): "
<< ec.message());
Expand All @@ -107,7 +112,7 @@ bool TcpServer::Initialize(unsigned short port) {

void TcpServer::Shutdown() {
tbb::mutex::scoped_lock lock(mutex_);
boost::system::error_code ec;
error_code ec;

if (acceptor_) {
acceptor_->close(ec);
Expand Down Expand Up @@ -215,16 +220,16 @@ void TcpServer::AsyncAccept() {
}
set_accept_socket();
acceptor_->async_accept(*accept_socket(),
boost::bind(&TcpServer::AcceptHandlerInternal, this,
TcpServerPtr(this), boost::asio::placeholders::error));
bind(&TcpServer::AcceptHandlerInternal, this,
TcpServerPtr(this), error));
}

int TcpServer::GetPort() const {
tbb::mutex::scoped_lock lock(mutex_);
if (acceptor_.get() == NULL) {
return -1;
}
boost::system::error_code ec;
error_code ec;
tcp::endpoint ep = acceptor_->local_endpoint(ec);
if (ec) {
return -1;
Expand All @@ -239,7 +244,7 @@ bool TcpServer::HasSessions() const {

bool TcpServer::HasSessionReadAvailable() const {
tbb::mutex::scoped_lock lock(mutex_);
boost::system::error_code error;
error_code error;
if (accept_socket()->available(error) > 0) {
return true;
}
Expand All @@ -258,7 +263,7 @@ TcpServer::Endpoint TcpServer::LocalEndpoint() const {
if (acceptor_.get() == NULL) {
return Endpoint();
}
boost::system::error_code ec;
error_code ec;
Endpoint local = acceptor_->local_endpoint(ec);
if (ec) {
return Endpoint();
Expand Down Expand Up @@ -302,9 +307,9 @@ bool TcpServer::AcceptSession(TcpSession *session) {
// via AsyncAccept() in order to process future accept calls
//
void TcpServer::AcceptHandlerInternal(TcpServerPtr server,
const boost::system::error_code& error) {
const error_code& error) {
tcp::endpoint remote;
boost::system::error_code ec;
error_code ec;
TcpSessionPtr session;
bool need_close = false;

Expand Down Expand Up @@ -368,7 +373,7 @@ void TcpServer::AcceptHandlerComplete(TcpSessionPtr &session) {
"Rejected session from "
<< remote.address().to_string()
<< ":" << remote.port());
boost::system::error_code ec;
error_code ec;
session->CloseInternal(ec, false, false);
return;
}
Expand All @@ -387,7 +392,7 @@ TcpSession *TcpServer::GetSession(Endpoint remote) {
}

void TcpServer::ConnectHandler(TcpServerPtr server, TcpSessionPtr session,
const boost::system::error_code &error) {
const error_code &error) {
if (error) {
TCP_SERVER_LOG_UT_DEBUG(server, TCP_DIR_OUT,
"Connect failure: " << error.message());
Expand All @@ -399,7 +404,7 @@ void TcpServer::ConnectHandler(TcpServerPtr server, TcpSessionPtr session,
}

void TcpServer::ConnectHandlerComplete(TcpSessionPtr &session) {
boost::system::error_code ec;
error_code ec;
Endpoint remote = session->socket()->remote_endpoint(ec);
if (ec) {
TCP_SERVER_LOG_INFO(this, TCP_DIR_OUT,
Expand All @@ -425,8 +430,8 @@ void TcpServer::Connect(TcpSession *session, Endpoint remote) {
assert(session->refcount_);
Socket *socket = session->socket();
socket->async_connect(remote,
boost::bind(&TcpServer::ConnectHandler, this, TcpServerPtr(this),
TcpSessionPtr(session), boost::asio::placeholders::error));
bind(&TcpServer::ConnectHandler, this, TcpServerPtr(this),
TcpSessionPtr(session), error));
}

int TcpServer::SetMd5SocketOption(int fd, uint32_t peer_ip,
Expand Down

0 comments on commit 3c69097

Please sign in to comment.