Skip to content

Commit

Permalink
Merge "Pull TX xmpp message for agent to local buffer."
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Jun 2, 2016
2 parents df48b6e + a14ec0a commit 7f17427
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 16 deletions.
3 changes: 3 additions & 0 deletions src/bgp/test/bgp_xmpp_channel_test.cc
Expand Up @@ -93,6 +93,9 @@ class XmppChannelMock : public XmppChannel {
virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb) {
return;
}
virtual void RegisterTxMessageTraceCallback(TxMessageTraceCb cb) {
return;
}
};

class BgpXmppChannelMock : public BgpXmppChannel {
Expand Down
3 changes: 3 additions & 0 deletions src/bgp/test/bgp_xmpp_parse_test.cc
Expand Up @@ -82,6 +82,9 @@ class XmppChannelMock : public XmppChannel {
virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb) {
return;
}
virtual void RegisterTxMessageTraceCallback(TxMessageTraceCb cb) {
return;
}
};

class BgpXmppChannelMock : public BgpXmppChannel {
Expand Down
30 changes: 24 additions & 6 deletions src/vnsw/agent/controller/controller_init.cc
Expand Up @@ -46,6 +46,10 @@ SandeshTraceBufferPtr ControllerRxRouteMessageTraceBuf2(SandeshTraceBufferCreate
"ControllerRxRouteXmppMessage2", 5000));
SandeshTraceBufferPtr ControllerRxConfigMessageTraceBuf2(SandeshTraceBufferCreate(
"ControllerRxConfigXmppMessage2", 5000));
SandeshTraceBufferPtr ControllerTxMessageTraceBuf1(SandeshTraceBufferCreate(
"ControllerTxXmppMessage_1", 5000));
SandeshTraceBufferPtr ControllerTxMessageTraceBuf2(SandeshTraceBufferCreate(
"ControllerTxXmppMessage_2", 5000));

ControllerDiscoveryData::ControllerDiscoveryData(xmps::PeerId peer_id,
std::vector<DSResponse> resp) :
Expand Down Expand Up @@ -176,7 +180,11 @@ void VNController::XmppServerConnect() {
FindChannel(XmppInit::kControlNodeJID);
assert(channel);
channel->RegisterRxMessageTraceCallback(
boost::bind(&VNController::XmppMessageTrace,
boost::bind(&VNController::RxXmppMessageTrace,
this, bgp_peer->GetXmppServerIdx(),
_1, _2, _3, _4, _5));
channel->RegisterTxMessageTraceCallback(
boost::bind(&VNController::TxXmppMessageTrace,
this, bgp_peer->GetXmppServerIdx(),
_1, _2, _3, _4, _5));
bgp_peer->RegisterXmppChannel(channel);
Expand Down Expand Up @@ -849,11 +857,11 @@ void VNController::Enqueue(ControllerWorkQueueDataType data) {
work_queue_.Enqueue(data);
}

bool VNController::XmppMessageTrace(uint8_t peer_index,
const std::string &to_address,
int port, int size,
const std::string &msg,
const XmppStanza::XmppMessage *xmppmsg) {
bool VNController::RxXmppMessageTrace(uint8_t peer_index,
const std::string &to_address,
int port, int size,
const std::string &msg,
const XmppStanza::XmppMessage *xmppmsg) {
const std::string &to = xmppmsg->to;
if (to.find(XmppInit::kBgpPeer) != string::npos) {
CONTROLLER_RX_ROUTE_MESSAGE_TRACE(Message, peer_index, to_address,
Expand All @@ -866,3 +874,13 @@ bool VNController::XmppMessageTrace(uint8_t peer_index,
}
return false;
}

bool VNController::TxXmppMessageTrace(uint8_t peer_index,
const std::string &to_address,
int port, int size,
const std::string &msg,
const XmppStanza::XmppMessage *xmppmsg) {
CONTROLLER_TX_MESSAGE_TRACE(Message, peer_index, to_address,
port, size, msg);
return true;
}
28 changes: 23 additions & 5 deletions src/vnsw/agent/controller/controller_init.h
Expand Up @@ -167,11 +167,16 @@ class VNController {
void RegisterControllerChangeCallback(XmppChannelDownCb xmpp_channel_down_cb) {
xmpp_channel_down_cb_ = xmpp_channel_down_cb;
}
bool XmppMessageTrace(uint8_t peer_index,
const std::string &to_address,
int port, int size,
const std::string &msg,
const XmppStanza::XmppMessage *xmpp_msg);
bool RxXmppMessageTrace(uint8_t peer_index,
const std::string &to_address,
int port, int size,
const std::string &msg,
const XmppStanza::XmppMessage *xmpp_msg);
bool TxXmppMessageTrace(uint8_t peer_index,
const std::string &to_address,
int port, int size,
const std::string &msg,
const XmppStanza::XmppMessage *xmpp_msg);

private:
AgentXmppChannel *FindAgentXmppChannel(const std::string &server_ip);
Expand Down Expand Up @@ -205,6 +210,8 @@ extern SandeshTraceBufferPtr ControllerRxRouteMessageTraceBuf1;
extern SandeshTraceBufferPtr ControllerRxConfigMessageTraceBuf1;
extern SandeshTraceBufferPtr ControllerRxRouteMessageTraceBuf2;
extern SandeshTraceBufferPtr ControllerRxConfigMessageTraceBuf2;
extern SandeshTraceBufferPtr ControllerTxMessageTraceBuf1;
extern SandeshTraceBufferPtr ControllerTxMessageTraceBuf2;

#define CONTROLLER_RX_ROUTE_MESSAGE_TRACE(obj, index, ...)\
do {\
Expand Down Expand Up @@ -257,4 +264,15 @@ do {\
AgentXmpp##obj::TraceMsg(ControllerTraceBuf, __FILE__, __LINE__, __VA_ARGS__);\
} while(0);\

#define CONTROLLER_TX_MESSAGE_TRACE(obj, index, ...)\
do {\
if (index == 0) { \
AgentXmpp##obj::TraceMsg(ControllerTxMessageTraceBuf1, __FILE__, \
__LINE__, __VA_ARGS__);\
} else { \
AgentXmpp##obj::TraceMsg(ControllerTxMessageTraceBuf2, __FILE__, \
__LINE__, __VA_ARGS__);\
} \
} while(0);\

#endif
2 changes: 2 additions & 0 deletions src/vnsw/agent/test/test_cmn_util.h
Expand Up @@ -427,6 +427,8 @@ class XmppChannelMock : public XmppChannel {

virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb) {
}
virtual void RegisterTxMessageTraceCallback(TxMessageTraceCb cb) {
}
virtual std::string LastStateName() const {
return "";
}
Expand Down
7 changes: 7 additions & 0 deletions src/xmpp/xmpp_channel.h
Expand Up @@ -42,6 +42,12 @@ class XmppChannel {
const std::string &,
const XmppStanza::XmppMessage * msg)
> RxMessageTraceCb;
typedef boost::function<bool(const std::string &,
int,
int,
const std::string &,
const XmppStanza::XmppMessage * msg)
> TxMessageTraceCb;

virtual ~XmppChannel() { }

Expand All @@ -50,6 +56,7 @@ class XmppChannel {
virtual void RegisterReceive(xmps::PeerId, ReceiveCb) = 0;
virtual void UnRegisterReceive(xmps::PeerId) = 0;
virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb) = 0;
virtual void RegisterTxMessageTraceCallback(TxMessageTraceCb cb) = 0;
virtual void UnRegisterWriteReady(xmps::PeerId id) = 0;
virtual void Close() = 0;
virtual void CloseComplete() = 0;
Expand Down
17 changes: 16 additions & 1 deletion src/xmpp/xmpp_channel_mux.cc
Expand Up @@ -13,7 +13,8 @@ using namespace std;
using namespace xmsm;

XmppChannelMux::XmppChannelMux(XmppConnection *connection)
: connection_(connection), rx_message_trace_cb_(NULL), closing_count_(0) {
: connection_(connection), rx_message_trace_cb_(NULL),
tx_message_trace_cb_(NULL), closing_count_(0) {
}

XmppChannelMux::~XmppChannelMux() {
Expand Down Expand Up @@ -250,6 +251,9 @@ std::string XmppChannelMux::LastFlap() const {
void XmppChannelMux::RegisterRxMessageTraceCallback(RxMessageTraceCb cb) {
rx_message_trace_cb_ = cb;
}
void XmppChannelMux::RegisterTxMessageTraceCallback(TxMessageTraceCb cb) {
tx_message_trace_cb_ = cb;
}

bool XmppChannelMux::RxMessageTrace(const std::string &to_address,
int port,
Expand All @@ -261,3 +265,14 @@ bool XmppChannelMux::RxMessageTrace(const std::string &to_address,
}
return false;
}

bool XmppChannelMux::TxMessageTrace(const std::string &to_address,
int port,
int msg_size,
const std::string &msg,
const XmppStanza::XmppMessage *xmpp_msg) {
if (tx_message_trace_cb_) {
return tx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg);
}
return false;
}
6 changes: 6 additions & 0 deletions src/xmpp/xmpp_channel_mux.h
Expand Up @@ -26,6 +26,7 @@ class XmppChannelMux : public XmppChannel {
virtual void RegisterReceive(xmps::PeerId, ReceiveCb);
virtual void UnRegisterReceive(xmps::PeerId);
virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb);
virtual void RegisterTxMessageTraceCallback(TxMessageTraceCb cb);
size_t ReceiverCount() const;
std::vector<std::string> GetReceiverList() const;

Expand Down Expand Up @@ -62,6 +63,10 @@ class XmppChannelMux : public XmppChannel {
const std::string &msg,
const XmppStanza::XmppMessage *xmpp_msg);

bool TxMessageTrace(const std::string &to_address, int port, int msg_size,
const std::string &msg,
const XmppStanza::XmppMessage *xmpp_msg);

protected:
friend class XmppChannelMuxMock;

Expand All @@ -78,6 +83,7 @@ class XmppChannelMux : public XmppChannel {
XmppConnection *connection_;
tbb::mutex mutex_;
RxMessageTraceCb rx_message_trace_cb_;
TxMessageTraceCb tx_message_trace_cb_;
int closing_count_;
};

Expand Down
15 changes: 11 additions & 4 deletions src/xmpp/xmpp_connection.cc
Expand Up @@ -227,10 +227,17 @@ bool XmppConnection::Send(const uint8_t *data, size_t size) {
if (session_ == NULL) {
return false;
}
XMPP_MESSAGE_TRACE(XmppTxStream,
session_->remote_endpoint().address().to_string(),
session_->remote_endpoint().port(), size,
string(reinterpret_cast<const char *>(data), size));
if (!(mux_ &&
(mux_->TxMessageTrace(session_->remote_endpoint().address().to_string(),
session_->remote_endpoint().port(),
size,
string(reinterpret_cast<const char *>(data),
size), NULL)))) {
XMPP_MESSAGE_TRACE(XmppTxStream,
session_->remote_endpoint().address().to_string(),
session_->remote_endpoint().port(), size,
string(reinterpret_cast<const char *>(data), size));
}

stats_[1].update++;
return session_->Send(data, size, &sent);
Expand Down

0 comments on commit 7f17427

Please sign in to comment.