Skip to content

Commit

Permalink
Merge "Fix concurrency issue in XmppSession::WriteReady"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 9, 2015
2 parents b23d33b + dffb019 commit 7bb315c
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 94 deletions.
4 changes: 2 additions & 2 deletions src/bgp/test/bgp_xmpp_basic_test.cc
Expand Up @@ -141,11 +141,11 @@ class BgpXmppBasicTest : public ::testing::Test {
}

size_t GetConnectionQueueSize(XmppServer *xs) {
return xs->GetQueueSize();
return xs->GetConnectionQueueSize();
}

void SetConnectionQueueDisable(XmppServer *xs, bool flag) {
xs->SetQueueDisable(flag);
xs->SetConnectionQueueDisable(flag);
}

void SetLifetimeManagerDestroyDisable(bool disabled) {
Expand Down
1 change: 1 addition & 0 deletions src/xmpp/SConscript
Expand Up @@ -33,6 +33,7 @@ libxmpp = env.Library('xmpp',
'xmpp_channel.cc',
'xmpp_config.cc',
'xmpp_connection.cc',
'xmpp_connection_manager.cc',
'xmpp_factory.cc',
'xmpp_lifetime.cc',
'xmpp_session',
Expand Down
4 changes: 2 additions & 2 deletions src/xmpp/test/xmpp_regex_test.cc
Expand Up @@ -18,8 +18,8 @@ using namespace std;

class XmppRegexMock : public XmppSession {
public:
XmppRegexMock(SslServer *server, SslSocket *sock) :
XmppSession(server, sock), p1("<(iq|message)"), bufx_("") { }
XmppRegexMock(XmppConnectionManager *manager, SslSocket *sock)
: XmppSession(manager, sock), p1("<(iq|message)"), bufx_("") { }
~XmppRegexMock() { }

//boost::regex Regex() { return p1; }
Expand Down
10 changes: 6 additions & 4 deletions src/xmpp/xmpp_client.cc
Expand Up @@ -27,7 +27,8 @@ class XmppClient::DeleteActor : public LifetimeActor {
DeleteActor(XmppClient *client)
: LifetimeActor(client->lifetime_manager()), client_(client) { }
virtual bool MayDelete() const {
return true;
CHECK_CONCURRENCY("bgp::Config");
return (client_->GetSessionQueueSize() == 0);
}
virtual void Shutdown() {
CHECK_CONCURRENCY("bgp::Config");
Expand All @@ -41,7 +42,7 @@ class XmppClient::DeleteActor : public LifetimeActor {
};

XmppClient::XmppClient(EventManager *evm)
: SslServer(evm, ssl::context::tlsv1_client, false, false),
: XmppConnectionManager(evm, ssl::context::tlsv1_client, false, false),
config_mgr_(new XmppConfigManager),
lifetime_manager_(new LifetimeManager(
TaskScheduler::GetInstance()->GetTaskId("bgp::Config"))),
Expand All @@ -50,7 +51,8 @@ XmppClient::XmppClient(EventManager *evm)
}

XmppClient::XmppClient(EventManager *evm, const XmppChannelConfig *config)
: SslServer(evm, ssl::context::tlsv1_client, config->auth_enabled, true),
: XmppConnectionManager(
evm, ssl::context::tlsv1_client, config->auth_enabled, true),
config_mgr_(new XmppConfigManager),
lifetime_manager_(new LifetimeManager(
TaskScheduler::GetInstance()->GetTaskId("bgp::Config"))),
Expand Down Expand Up @@ -131,7 +133,7 @@ TcpSession *XmppClient::CreateSession() {
}

void XmppClient::Shutdown() {
TcpServer::Shutdown();
XmppConnectionManager::Shutdown();
deleter_->Delete();
}

Expand Down
7 changes: 3 additions & 4 deletions src/xmpp/xmpp_client.h
Expand Up @@ -11,15 +11,14 @@
#include "io/ssl_session.h"
#include "xmpp/xmpp_config.h"
#include "xmpp/xmpp_connection.h"
#include "xmpp/xmpp_connection_manager.h"

class LifetimeActor;
class LifetimeManager;
class XmppSession;

// Class to represent Xmpp Client
// We derive from the common TCP server base class
// which abstracts both server & client side methods.
class XmppClient : public SslServer {
class XmppClient : public XmppConnectionManager {
public:
typedef boost::asio::ip::tcp::endpoint Endpoint;

Expand Down Expand Up @@ -50,7 +49,7 @@ class XmppClient : public SslServer {
XmppConfigManager *xmpp_config_mgr() { return config_mgr_.get(); }

LifetimeManager *lifetime_manager();
LifetimeActor *deleter();
virtual LifetimeActor *deleter();

protected:
virtual SslSession *AllocSession(SslSocket *socket);
Expand Down
16 changes: 14 additions & 2 deletions src/xmpp/xmpp_connection.cc
Expand Up @@ -80,9 +80,18 @@ void XmppConnection::SetConfig(const XmppChannelConfig *config) {

void XmppConnection::set_session(XmppSession *session) {
tbb::spin_mutex::scoped_lock lock(spin_mutex_);
assert(session);
session_ = session;
}

void XmppConnection::clear_session() {
tbb::spin_mutex::scoped_lock lock(spin_mutex_);
if (!session_)
return;
session_->ClearConnection();
session_ = NULL;
}

const XmppSession *XmppConnection::session() const {
return session_;
}
Expand All @@ -91,8 +100,9 @@ XmppSession *XmppConnection::session() {
return session_;
}

void XmppConnection::WriteReady(const boost::system::error_code &ec) {
mux_->WriteReady(ec);
void XmppConnection::WriteReady() {
boost::system::error_code ec;
mux_->WriteReady(ec);
}

void XmppConnection::Shutdown() {
Expand Down Expand Up @@ -315,6 +325,8 @@ void XmppConnection::SendClose(XmppSession *session) {

void XmppConnection::ProcessSslHandShakeResponse(SslSessionPtr session,
const boost::system::error_code& error) {
if (!state_machine())
return;

if (error) {
inc_handshake_failure();
Expand Down
3 changes: 2 additions & 1 deletion src/xmpp/xmpp_connection.h
Expand Up @@ -95,6 +95,7 @@ class XmppConnection {
void StopKeepAliveTimer();

void set_session(XmppSession *session);
void clear_session();
void SetFrom(const std::string &);
void SetTo(const std::string &);

Expand Down Expand Up @@ -145,7 +146,7 @@ class XmppConnection {
virtual uint32_t flap_count() const = 0;
virtual const std::string last_flap_at() const = 0;

virtual void WriteReady(const boost::system::error_code &ec);
virtual void WriteReady();

friend class XmppStateMachineTest;

Expand Down
80 changes: 80 additions & 0 deletions src/xmpp/xmpp_connection_manager.cc
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/

#include "xmpp/xmpp_connection_manager.h"

#include <boost/bind.hpp>

#include "base/lifetime.h"
#include "base/task_annotations.h"
#include "xmpp/xmpp_session.h"

//
// Constructor for XmppConnectionManager.
//
XmppConnectionManager::XmppConnectionManager(EventManager *evm,
boost::asio::ssl::context::method m,
bool ssl_enabled, bool ssl_handshake_delayed)
: SslServer(evm, m, ssl_enabled, ssl_handshake_delayed),
session_queue_(TaskScheduler::GetInstance()->GetTaskId("bgp::Config"),
0, boost::bind(&XmppConnectionManager::DequeueSession, this, _1)) {
}

//
// Shutdown the XmppConnectionManager.
//
// Register an exit callback to the session WorkQueue so that we can retry
// deletion when the WorkQueue becomes empty.
//
void XmppConnectionManager::Shutdown() {
TcpServer::Shutdown();
session_queue_.SetExitCallback(
boost::bind(&XmppConnectionManager::WorkQueueExitCallback, this, _1));
session_queue_.Shutdown();
}

//
// Concurrency: called in the context of io thread.
//
// Add a XmppSession to the queue of write ready sessions.
// Take a reference to make sure that XmppSession doesn't get deleted before
// it's processed.
//
void XmppConnectionManager::EnqueueSession(XmppSession *session) {
if (deleter()->IsDeleted())
return;
session_queue_.Enqueue(TcpSessionPtr(session));
}

//
// Concurrency: called in the context of bgp::Config task.
//
// Handler for XmppSessions that are dequeued from the session WorkQueue.
//
// The Xmpp[Client|Server] doesn't get destroyed if the WorkQueue is non-empty.
//
bool XmppConnectionManager::DequeueSession(TcpSessionPtr tcp_session) {
CHECK_CONCURRENCY("bgp::Config");
XmppSession *session = static_cast<XmppSession *>(tcp_session.get());
session->ProcessWriteReady();
return true;
}

//
// Exit callback for the session WorkQueue.
//
void XmppConnectionManager::WorkQueueExitCallback(bool done) {
CHECK_CONCURRENCY("bgp::Config");
if (!deleter()->IsDeleted())
return;
deleter()->RetryDelete();
}

//
// Return size of WorkQueue of write ready XmppSessions.
//
size_t XmppConnectionManager::GetSessionQueueSize() const {
CHECK_CONCURRENCY("bgp::Config");
return session_queue_.Length();
}
37 changes: 37 additions & 0 deletions src/xmpp/xmpp_connection_manager.h
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
*/

#ifndef XMPP_XMPP_CONNECTION_MANAGER_H
#define XMPP_XMPP_CONNECTION_MANAGER_H

#include "base/queue_task.h"
#include "io/ssl_server.h"

class LifetimeActor;
class XmppSession;

//
// Common class to represent XmppClient and XmppServer
//
class XmppConnectionManager : public SslServer {
public:
XmppConnectionManager(EventManager *evm,
boost::asio::ssl::context::method m,
bool ssl_enabled, bool ssl_handshake_delayed);

void Shutdown();
void EnqueueSession(XmppSession *session);
size_t GetSessionQueueSize() const;
virtual LifetimeActor *deleter() = 0;

private:
bool DequeueSession(TcpSessionPtr tcp_session);
void WorkQueueExitCallback(bool done);

WorkQueue<TcpSessionPtr> session_queue_;

DISALLOW_COPY_AND_ASSIGN(XmppConnectionManager);
};

#endif // XMPP_XMPP_CONNECTION_MANAGER_H

0 comments on commit 7bb315c

Please sign in to comment.