Skip to content

Commit

Permalink
Merge "Support for SSL server/session infrastructure"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 5, 2015
2 parents fa61828 + df4fc0a commit 2ba4956
Show file tree
Hide file tree
Showing 13 changed files with 727 additions and 127 deletions.
2 changes: 2 additions & 0 deletions src/io/SConscript
Expand Up @@ -23,6 +23,8 @@ libio = env.Library('io',
EventManagerSrc +
[
'io_utils.cc',
'ssl_server.cc',
'ssl_session.cc',
'tcp_message_write.cc',
'tcp_server.cc',
'tcp_session.cc',
Expand Down
54 changes: 54 additions & 0 deletions src/io/ssl_server.cc
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#include "ssl_server.h"
#include "ssl_session.h"

#include "io/event_manager.h"

SslServer::SslServer(EventManager *evm, boost::asio::ssl::context::method m)
: TcpServer(evm), context_(*evm->io_service(), m) {
boost::system::error_code ec;
// By default set verify mode to none, to be set by derived class later.
context_.set_verify_mode(boost::asio::ssl::context::verify_none, ec);
assert(ec.value() == 0);
context_.set_options(boost::asio::ssl::context::default_workarounds, ec);
assert(ec.value() == 0);
}

SslServer::~SslServer() {
}

boost::asio::ssl::context *SslServer::context() {
return &context_;
}

TcpSession *SslServer::AllocSession(bool server_session) {
SslSession *session;
if (server_session) {
session = AllocSession(so_ssl_accept_.get());

// if session allocate succeeds release ownership to so_accept.
if (session != NULL) {
so_ssl_accept_.release();
}
} else {
SslSocket *socket = new SslSocket(*event_manager()->io_service(),
context_);
session = AllocSession(socket);
}

return session;
}

TcpServer::Socket *SslServer::accept_socket() const {
// return tcp socket
return &(so_ssl_accept_->next_layer());
}

void SslServer::set_accept_socket() {
so_ssl_accept_.reset(new SslSocket(*event_manager()->io_service(),
context_));
}

43 changes: 43 additions & 0 deletions src/io/ssl_server.h
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#ifndef __src_io_ssl_server_h__
#define __src_io_ssl_server_h__

#include <boost/asio/ssl.hpp>

#include "io/tcp_server.h"

class SslSession;

class SslServer : public TcpServer {
public:
typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SslSocket;

explicit SslServer(EventManager *evm, boost::asio::ssl::context::method m);
virtual ~SslServer();

protected:
// given SSL socket, Create a session object.
virtual SslSession *AllocSession(SslSocket *socket) = 0;

// boost ssl context accessor to setup ssl context variables.
boost::asio::ssl::context *context();

private:
// suppress AllocSession method using tcp socket, not valid for
// ssl server.
TcpSession *AllocSession(Socket *socket) { return NULL; }

TcpSession *AllocSession(bool server_session);

Socket *accept_socket() const;
void set_accept_socket();

boost::asio::ssl::context context_;
std::auto_ptr<SslSocket> so_ssl_accept_; // SSL socket used in async_accept
DISALLOW_COPY_AND_ASSIGN(SslServer);
};

#endif //__src_io_ssl_server_h__
95 changes: 95 additions & 0 deletions src/io/ssl_session.cc
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include "ssl_session.h"

using namespace boost::asio;

SslSession::SslSession(SslServer *server, SslSocket *socket,
bool async_read_ready) :
TcpSession(server, NULL, async_read_ready),
ssl_socket_(socket) {
}

SslSession::~SslSession() {
}

TcpSession::Socket *SslSession::socket() const {
// return tcp socket
return &ssl_socket_->next_layer();
}

bool SslSession::Connected(Endpoint remote) {
if (IsClosed()) {
return false;
}

// trigger ssl client handshake
std::srand(std::time(0));
ssl_socket_->async_handshake
(boost::asio::ssl::stream_base::client,
boost::bind(&SslSession::ConnectHandShakeHandler, TcpSessionPtr(this),
remote, boost::asio::placeholders::error));
return true;
}

void SslSession::Accepted() {
// trigger ssl server handshake
std::srand(std::time(0));
ssl_socket_->async_handshake
(boost::asio::ssl::stream_base::server,
boost::bind(&SslSession::AcceptHandShakeHandler, TcpSessionPtr(this),
boost::asio::placeholders::error));
}

void SslSession::AcceptHandShakeHandler(TcpSessionPtr session,
const boost::system::error_code& error) {
SslSession *ssl_session = static_cast<SslSession *>(session.get());
if (!error) {
// on successful handshake continue with tcp session state machine.
ssl_session->TcpSession::Accepted();
} else {
// close session on failure
ssl_session->CloseInternal(false);
}
}

void SslSession::ConnectHandShakeHandler(TcpSessionPtr session, Endpoint remote,
const boost::system::error_code& error) {
SslSession *ssl_session = static_cast<SslSession *>(session.get());
bool ret = false;
if (!error) {
// on successful handshake continue with tcp session state machine.
ret = ssl_session->TcpSession::Connected(remote);
}
if (ret == false) {
// report connect failure and close the session
ssl_session->ConnectFailed();
ssl_session->CloseInternal(false);
}
}


void SslSession::AsyncReadSome(boost::asio::mutable_buffer buffer) {
ssl_socket_->async_read_some(mutable_buffers_1(buffer),
boost::bind(&TcpSession::AsyncReadHandler, TcpSessionPtr(this), buffer,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}

std::size_t SslSession::WriteSome(const uint8_t *data, std::size_t len,
boost::system::error_code &error) {
return ssl_socket_->write_some(boost::asio::buffer(data, len), error);
}

void SslSession::AsyncWrite(const u_int8_t *data, std::size_t size) {
boost::asio::async_write(
*ssl_socket_.get(), buffer(data, size),
boost::bind(&TcpSession::AsyncWriteHandler, TcpSessionPtr(this),
boost::asio::placeholders::error));
}

46 changes: 46 additions & 0 deletions src/io/ssl_session.h
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
*/

#ifndef __src_io_ssl_session_h__
#define __src_io_ssl_session_h__

#include "io/tcp_session.h"
#include "io/ssl_server.h"

class SslSession : public TcpSession {
public:
typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> SslSocket;

// SslSession constructor takes ownership of socket.
SslSession(SslServer *server, SslSocket *socket,
bool async_read_ready = true);

virtual Socket *socket() const;

// Override to trigger handshake
virtual bool Connected(Endpoint remote);

// Override to trigger handshake
virtual void Accepted();


protected:
virtual ~SslSession();

private:
static void AcceptHandShakeHandler(TcpSessionPtr session,
const boost::system::error_code& error);
static void ConnectHandShakeHandler(TcpSessionPtr session, Endpoint remote,
const boost::system::error_code& error);

void AsyncReadSome(boost::asio::mutable_buffer buffer);
std::size_t WriteSome(const uint8_t *data, std::size_t len,
boost::system::error_code &error);
void AsyncWrite(const u_int8_t *data, std::size_t size);

boost::scoped_ptr<SslSocket> ssl_socket_;
DISALLOW_COPY_AND_ASSIGN(SslSession);
};

#endif // __src_io_ssl_session_h__
66 changes: 9 additions & 57 deletions src/io/tcp_message_write.cc
Expand Up @@ -13,8 +13,8 @@ using namespace boost::asio;
using namespace boost::system;
using tbb::mutex;

TcpMessageWriter::TcpMessageWriter(Socket *socket, TcpSession *session) :
socket_(socket), offset_(0), session_(session) {
TcpMessageWriter::TcpMessageWriter(TcpSession *session) :
offset_(0), session_(session) {
}

TcpMessageWriter::~TcpMessageWriter() {
Expand All @@ -36,7 +36,7 @@ int TcpMessageWriter::Send(const uint8_t *data, size_t len, error_code &ec) {
session_->server_->stats_.write_bytes += len;

if (buffer_queue_.empty()) {
wrote = socket_->write_some(boost::asio::buffer(data, len), ec);
wrote = session_->WriteSome(data, len, ec);
if (TcpSession::IsSocketErrorHard(ec)) return -1;
assert(wrote >= 0);

Expand All @@ -45,7 +45,7 @@ int TcpMessageWriter::Send(const uint8_t *data, size_t len, error_code &ec) {
"Encountered partial send of " << wrote << " bytes when "
"sending " << len << " bytes, Error: " << ec);
BufferAppend(data + wrote, len - wrote);
DeferWrite();
session_->DeferWriter();
}
} else {
TCP_SESSION_LOG_UT_DEBUG(session_, TCP_DIR_OUT,
Expand All @@ -55,55 +55,20 @@ int TcpMessageWriter::Send(const uint8_t *data, size_t len, error_code &ec) {
return wrote;
}

void TcpMessageWriter::DeferWrite() {

// Update socket write block count.
session_->stats_.write_blocked++;
session_->server_->stats_.write_blocked++;
socket_->async_write_some(
boost::asio::null_buffers(),
boost::bind(&TcpMessageWriter::HandleWriteReady, this,
TcpSessionPtr(session_),
placeholders::error, UTCTimestampUsec()));
return;
}

// Socket is ready for write. Flush any pending data and notify
// clients aboout it.
void TcpMessageWriter::HandleWriteReady(TcpSessionPtr session_ptr,
const error_code &error,
uint64_t block_start_time) {
mutex::scoped_lock lock(session_->mutex());

// Update socket write block time.
uint64_t blocked_usecs = UTCTimestampUsec() - block_start_time;
session_->stats_.write_blocked_duration_usecs += blocked_usecs;
session_->server_->stats_.write_blocked_duration_usecs += blocked_usecs;

if (TcpSession::IsSocketErrorHard(error)) {
goto done;
}

//
// Ignore if connection is already closed.
//
if (session_->IsClosedLocked()) return;

// Socket is ready for write. Flush any pending data
void TcpMessageWriter::HandleWriteReady(error_code &error) {
while (!buffer_queue_.empty()) {
boost::asio::mutable_buffer head = buffer_queue_.front();
const uint8_t *data = buffer_cast<const uint8_t *>(head) + offset_;
int remaining = buffer_size(head) - offset_;
error_code ec;
int wrote = socket_->write_some(buffer(data, remaining), ec);
if (TcpSession::IsSocketErrorHard(ec)) {
lock.release();
if (!cb_.empty()) cb_(ec);
int wrote = session_->WriteSome(data, remaining, error);
if (TcpSession::IsSocketErrorHard(error)) {
return;
}
assert(wrote >= 0);
if (wrote != remaining) {
offset_ += wrote;
DeferWrite();
session_->DeferWriter();
return;
} else {
offset_ = 0;
Expand All @@ -112,16 +77,6 @@ void TcpMessageWriter::HandleWriteReady(TcpSessionPtr session_ptr,
}
}
buffer_queue_.clear();

done:
lock.release();
// The session object is implicitly accessed in by cb_. This is
// safe because this function currently holds a refcount on the session
// via TcpSessionPtr.
if (!cb_.empty()) {
cb_(error);
}
return;
}

void TcpMessageWriter::BufferAppend(const uint8_t *src, int bytes) {
Expand All @@ -137,6 +92,3 @@ void TcpMessageWriter::DeleteBuffer(mutable_buffer buffer) {
return;
}

void TcpMessageWriter::RegisterNotification(SendReadyCb cb) {
cb_ = cb;
}
13 changes: 3 additions & 10 deletions src/io/tcp_message_write.h
Expand Up @@ -20,29 +20,22 @@ class TcpSession;

class TcpMessageWriter {
public:
typedef boost::asio::ip::tcp::socket Socket;
static const int kDefaultBufferSize = 4 * 1024;
explicit TcpMessageWriter(Socket *, TcpSession *session);
explicit TcpMessageWriter(TcpSession *session);
~TcpMessageWriter();

// return false for send
int Send(const uint8_t *msg, size_t len, error_code &ec);

typedef boost::function<void(const error_code &ec)> SendReadyCb;
void RegisterNotification(SendReadyCb);

private:
friend class TcpSession;
typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
typedef std::list<boost::asio::mutable_buffer> BufferQueue;
void BufferAppend(const uint8_t *data, int len);
void DeleteBuffer(boost::asio::mutable_buffer buffer);
void DeferWrite();
void HandleWriteReady(TcpSessionPtr session_ref, const error_code &ec,
uint64_t block_start_time);
void HandleWriteReady(boost::system::error_code &ec);

BufferQueue buffer_queue_;
SendReadyCb cb_;
Socket *socket_;
int offset_;
TcpSession *session_;
};
Expand Down

0 comments on commit 2ba4956

Please sign in to comment.