Skip to content

Commit

Permalink
Support for SSL server/session infrastructure
Browse files Browse the repository at this point in the history
Following changes are done to extend existing tcp infra to
support SSL server.
- code movement to make TCP message writer to be independent
  of socket, and all operations to be trigered on session ptr
- introduced virtual methods to override socket operations to
  use relavant socket structure.
- hooks installed in SSL server to trigger SSL handshake before
  triggering tcp session connected and accepted state machine

Added basic connect, send and recv test code for ssl server infra.

Change-Id: I8e50a400e6b80cef42e852f5da2038f44ce4b082
  • Loading branch information
Prabhjot Singh Sethi committed Jan 27, 2015
1 parent 34c2af6 commit df4fc0a
Show file tree
Hide file tree
Showing 13 changed files with 727 additions and 127 deletions.
2 changes: 2 additions & 0 deletions src/io/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ libio = env.Library('io',
EventManagerSrc +
[
'io_utils.cc',
'ssl_server.cc',
'ssl_session.cc',
'tcp_message_write.cc',
'tcp_server.cc',
'tcp_session.cc',
Expand Down
54 changes: 54 additions & 0 deletions src/io/ssl_server.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#include "ssl_server.h"
#include "ssl_session.h"

#include "io/event_manager.h"

SslServer::SslServer(EventManager *evm, boost::asio::ssl::context::method m)
: TcpServer(evm), context_(*evm->io_service(), m) {
boost::system::error_code ec;
// By default set verify mode to none, to be set by derived class later.
context_.set_verify_mode(boost::asio::ssl::context::verify_none, ec);
assert(ec.value() == 0);
context_.set_options(boost::asio::ssl::context::default_workarounds, ec);
assert(ec.value() == 0);
}

SslServer::~SslServer() {
}

boost::asio::ssl::context *SslServer::context() {
return &context_;
}

TcpSession *SslServer::AllocSession(bool server_session) {
SslSession *session;
if (server_session) {
session = AllocSession(so_ssl_accept_.get());

// if session allocate succeeds release ownership to so_accept.
if (session != NULL) {
so_ssl_accept_.release();
}
} else {
SslSocket *socket = new SslSocket(*event_manager()->io_service(),
context_);
session = AllocSession(socket);
}

return session;
}

TcpServer::Socket *SslServer::accept_socket() const {
// return tcp socket
return &(so_ssl_accept_->next_layer());
}

void SslServer::set_accept_socket() {
so_ssl_accept_.reset(new SslSocket(*event_manager()->io_service(),
context_));
}

43 changes: 43 additions & 0 deletions src/io/ssl_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#ifndef __src_io_ssl_server_h__
#define __src_io_ssl_server_h__

#include <boost/asio/ssl.hpp>

#include "io/tcp_server.h"

class SslSession;

class SslServer : public TcpServer {
public:
typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SslSocket;

explicit SslServer(EventManager *evm, boost::asio::ssl::context::method m);
virtual ~SslServer();

protected:
// given SSL socket, Create a session object.
virtual SslSession *AllocSession(SslSocket *socket) = 0;

// boost ssl context accessor to setup ssl context variables.
boost::asio::ssl::context *context();

private:
// suppress AllocSession method using tcp socket, not valid for
// ssl server.
TcpSession *AllocSession(Socket *socket) { return NULL; }

TcpSession *AllocSession(bool server_session);

Socket *accept_socket() const;
void set_accept_socket();

boost::asio::ssl::context context_;
std::auto_ptr<SslSocket> so_ssl_accept_; // SSL socket used in async_accept
DISALLOW_COPY_AND_ASSIGN(SslServer);
};

#endif //__src_io_ssl_server_h__
95 changes: 95 additions & 0 deletions src/io/ssl_session.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include "ssl_session.h"

using namespace boost::asio;

SslSession::SslSession(SslServer *server, SslSocket *socket,
bool async_read_ready) :
TcpSession(server, NULL, async_read_ready),
ssl_socket_(socket) {
}

SslSession::~SslSession() {
}

TcpSession::Socket *SslSession::socket() const {
// return tcp socket
return &ssl_socket_->next_layer();
}

bool SslSession::Connected(Endpoint remote) {
if (IsClosed()) {
return false;
}

// trigger ssl client handshake
std::srand(std::time(0));
ssl_socket_->async_handshake
(boost::asio::ssl::stream_base::client,
boost::bind(&SslSession::ConnectHandShakeHandler, TcpSessionPtr(this),
remote, boost::asio::placeholders::error));
return true;
}

void SslSession::Accepted() {
// trigger ssl server handshake
std::srand(std::time(0));
ssl_socket_->async_handshake
(boost::asio::ssl::stream_base::server,
boost::bind(&SslSession::AcceptHandShakeHandler, TcpSessionPtr(this),
boost::asio::placeholders::error));
}

void SslSession::AcceptHandShakeHandler(TcpSessionPtr session,
const boost::system::error_code& error) {
SslSession *ssl_session = static_cast<SslSession *>(session.get());
if (!error) {
// on successful handshake continue with tcp session state machine.
ssl_session->TcpSession::Accepted();
} else {
// close session on failure
ssl_session->CloseInternal(false);
}
}

void SslSession::ConnectHandShakeHandler(TcpSessionPtr session, Endpoint remote,
const boost::system::error_code& error) {
SslSession *ssl_session = static_cast<SslSession *>(session.get());
bool ret = false;
if (!error) {
// on successful handshake continue with tcp session state machine.
ret = ssl_session->TcpSession::Connected(remote);
}
if (ret == false) {
// report connect failure and close the session
ssl_session->ConnectFailed();
ssl_session->CloseInternal(false);
}
}


void SslSession::AsyncReadSome(boost::asio::mutable_buffer buffer) {
ssl_socket_->async_read_some(mutable_buffers_1(buffer),
boost::bind(&TcpSession::AsyncReadHandler, TcpSessionPtr(this), buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

std::size_t SslSession::WriteSome(const uint8_t *data, std::size_t len,
boost::system::error_code &error) {
return ssl_socket_->write_some(boost::asio::buffer(data, len), error);
}

void SslSession::AsyncWrite(const u_int8_t *data, std::size_t size) {
boost::asio::async_write(
*ssl_socket_.get(), buffer(data, size),
boost::bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this),
boost::asio::placeholders::error));
}

46 changes: 46 additions & 0 deletions src/io/ssl_session.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#ifndef __src_io_ssl_session_h__
#define __src_io_ssl_session_h__

#include "io/tcp_session.h"
#include "io/ssl_server.h"

class SslSession : public TcpSession {
public:
typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SslSocket;

// SslSession constructor takes ownership of socket.
SslSession(SslServer *server, SslSocket *socket,
bool async_read_ready = true);

virtual Socket *socket() const;

// Override to trigger handshake
virtual bool Connected(Endpoint remote);

// Override to trigger handshake
virtual void Accepted();


protected:
virtual ~SslSession();

private:
static void AcceptHandShakeHandler(TcpSessionPtr session,
const boost::system::error_code& error);
static void ConnectHandShakeHandler(TcpSessionPtr session, Endpoint remote,
const boost::system::error_code& error);

void AsyncReadSome(boost::asio::mutable_buffer buffer);
std::size_t WriteSome(const uint8_t *data, std::size_t len,
boost::system::error_code &error);
void AsyncWrite(const u_int8_t *data, std::size_t size);

boost::scoped_ptr<SslSocket> ssl_socket_;
DISALLOW_COPY_AND_ASSIGN(SslSession);
};

#endif // __src_io_ssl_session_h__
66 changes: 9 additions & 57 deletions src/io/tcp_message_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ using namespace boost::asio;
using namespace boost::system;
using tbb::mutex;

TcpMessageWriter::TcpMessageWriter(Socket *socket, TcpSession *session) :
socket_(socket), offset_(0), session_(session) {
TcpMessageWriter::TcpMessageWriter(TcpSession *session) :
offset_(0), session_(session) {
}

TcpMessageWriter::~TcpMessageWriter() {
Expand All @@ -36,7 +36,7 @@ int TcpMessageWriter::Send(const uint8_t *data, size_t len, error_code &ec) {
session_->server_->stats_.write_bytes += len;

if (buffer_queue_.empty()) {
wrote = socket_->write_some(boost::asio::buffer(data, len), ec);
wrote = session_->WriteSome(data, len, ec);
if (TcpSession::IsSocketErrorHard(ec)) return -1;
assert(wrote >= 0);

Expand All @@ -45,7 +45,7 @@ int TcpMessageWriter::Send(const uint8_t *data, size_t len, error_code &ec) {
"Encountered partial send of " << wrote << " bytes when "
"sending " << len << " bytes, Error: " << ec);
BufferAppend(data + wrote, len - wrote);
DeferWrite();
session_->DeferWriter();
}
} else {
TCP_SESSION_LOG_UT_DEBUG(session_, TCP_DIR_OUT,
Expand All @@ -55,55 +55,20 @@ int TcpMessageWriter::Send(const uint8_t *data, size_t len, error_code &ec) {
return wrote;
}

void TcpMessageWriter::DeferWrite() {

// Update socket write block count.
session_->stats_.write_blocked++;
session_->server_->stats_.write_blocked++;
socket_->async_write_some(
boost::asio::null_buffers(),
boost::bind(&TcpMessageWriter::HandleWriteReady, this,
TcpSessionPtr(session_),
placeholders::error, UTCTimestampUsec()));
return;
}

// Socket is ready for write. Flush any pending data and notify
// clients aboout it.
void TcpMessageWriter::HandleWriteReady(TcpSessionPtr session_ptr,
const error_code &error,
uint64_t block_start_time) {
mutex::scoped_lock lock(session_->mutex());

// Update socket write block time.
uint64_t blocked_usecs = UTCTimestampUsec() - block_start_time;
session_->stats_.write_blocked_duration_usecs += blocked_usecs;
session_->server_->stats_.write_blocked_duration_usecs += blocked_usecs;

if (TcpSession::IsSocketErrorHard(error)) {
goto done;
}

//
// Ignore if connection is already closed.
//
if (session_->IsClosedLocked()) return;

// Socket is ready for write. Flush any pending data
void TcpMessageWriter::HandleWriteReady(error_code &error) {
while (!buffer_queue_.empty()) {
boost::asio::mutable_buffer head = buffer_queue_.front();
const uint8_t *data = buffer_cast<const uint8_t *>(head) + offset_;
int remaining = buffer_size(head) - offset_;
error_code ec;
int wrote = socket_->write_some(buffer(data, remaining), ec);
if (TcpSession::IsSocketErrorHard(ec)) {
lock.release();
if (!cb_.empty()) cb_(ec);
int wrote = session_->WriteSome(data, remaining, error);
if (TcpSession::IsSocketErrorHard(error)) {
return;
}
assert(wrote >= 0);
if (wrote != remaining) {
offset_ += wrote;
DeferWrite();
session_->DeferWriter();
return;
} else {
offset_ = 0;
Expand All @@ -112,16 +77,6 @@ void TcpMessageWriter::HandleWriteReady(TcpSessionPtr session_ptr,
}
}
buffer_queue_.clear();

done:
lock.release();
// The session object is implicitly accessed in by cb_. This is
// safe because this function currently holds a refcount on the session
// via TcpSessionPtr.
if (!cb_.empty()) {
cb_(error);
}
return;
}

void TcpMessageWriter::BufferAppend(const uint8_t *src, int bytes) {
Expand All @@ -137,6 +92,3 @@ void TcpMessageWriter::DeleteBuffer(mutable_buffer buffer) {
return;
}

void TcpMessageWriter::RegisterNotification(SendReadyCb cb) {
cb_ = cb;
}
13 changes: 3 additions & 10 deletions src/io/tcp_message_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,22 @@ class TcpSession;

class TcpMessageWriter {
public:
typedef boost::asio::ip::tcp::socket Socket;
static const int kDefaultBufferSize = 4 * 1024;
explicit TcpMessageWriter(Socket *, TcpSession *session);
explicit TcpMessageWriter(TcpSession *session);
~TcpMessageWriter();

// return false for send
int Send(const uint8_t *msg, size_t len, error_code &ec);

typedef boost::function<void(const error_code &ec)> SendReadyCb;
void RegisterNotification(SendReadyCb);

private:
friend class TcpSession;
typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
typedef std::list<boost::asio::mutable_buffer> BufferQueue;
void BufferAppend(const uint8_t *data, int len);
void DeleteBuffer(boost::asio::mutable_buffer buffer);
void DeferWrite();
void HandleWriteReady(TcpSessionPtr session_ref, const error_code &ec,
uint64_t block_start_time);
void HandleWriteReady(boost::system::error_code &ec);

BufferQueue buffer_queue_;
SendReadyCb cb_;
Socket *socket_;
int offset_;
TcpSession *session_;
};
Expand Down

0 comments on commit df4fc0a

Please sign in to comment.