Skip to content

Commit

Permalink
Unconditionally send publish with updated oper-state or oper-state-re…
Browse files Browse the repository at this point in the history
…ason

Background: Discovery client periodically sends heartbeat and
publish if there is change is publish information. Write to
cassandra db if not sequenced results in older stale oper-state
and oper-state-reason updated due to last db read during
heartbeat update instead of new published information.
(TODO: Fix Cassandra DB writes)

To overcome above issue, we are sending publish unconditionally
everytime.

Change-Id: Ia0ae8be877210df755613a9332804d97dad48f66
ParTial-Bug:1488936
  • Loading branch information
nipak committed Aug 27, 2015
1 parent 1c0d9ae commit 8d0db48
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 10 deletions.
33 changes: 24 additions & 9 deletions src/discovery/client/discovery_client.cc
Expand Up @@ -448,14 +448,16 @@ void DiscoveryServiceClient::ReEvaluatePublish(std::string serviceName,

std::string reeval_reason;
resp->oper_state = cb(reeval_reason);
if (resp->oper_state != oper_state) {
if ((resp->oper_state != oper_state) ||
(resp->oper_state_reason.compare(reeval_reason))) {

auto_ptr<XmlBase> impl(XmppXmlImplFactory::Instance()->GetXmlImpl());
if (impl->LoadDoc(resp->publish_msg_) == -1) {
resp->pub_fail_++;
return;
}

resp->oper_state_reason = reeval_reason;
XmlPugi *pugi = reinterpret_cast<XmlPugi *>(impl.get());
if (resp->oper_state) {
pugi->ModifyNode("oper-state", "up");
Expand All @@ -478,13 +480,13 @@ void DiscoveryServiceClient::ReEvaluatePublish(std::string serviceName,
stringstream ss;
impl->PrintDoc(ss);
resp->publish_msg_ = ss.str();

DISCOVERY_CLIENT_TRACE(DiscoveryClientMsg, resp->publish_hdr_,
serviceName, resp->publish_msg_);
SendHttpPostMessage(resp->publish_hdr_, serviceName,
resp->publish_msg_);

}

/* Send publish unconditionally */
DISCOVERY_CLIENT_TRACE(DiscoveryClientMsg, resp->publish_hdr_,
serviceName, resp->publish_msg_);
SendHttpPostMessage(resp->publish_hdr_, serviceName,
resp->publish_msg_);
}
}

Expand All @@ -508,6 +510,7 @@ void DiscoveryServiceClient::Publish(std::string serviceName, std::string &msg,
pub_msg->dss_ep_.address(ds_endpoint_.address());
pub_msg->dss_ep_.port(ds_endpoint_.port());
pub_msg->oper_state = false;
pub_msg->oper_state_reason = "Initial Registration";

pub_msg->client_msg_ = msg;
pub_msg->publish_msg_ += "<publish>" + msg;
Expand All @@ -523,8 +526,9 @@ void DiscoveryServiceClient::Publish(std::string serviceName, std::string &msg,
}
}
pub_msg->publish_msg_ += "<oper-state>down</oper-state>";
pub_msg->publish_msg_ +=
"<oper-state-reason>Initial Registration</oper-state-reason>";
pub_msg->publish_msg_ += "<oper-state-reason>";
pub_msg->publish_msg_ += pub_msg->oper_state_reason;
pub_msg->publish_msg_ += "</oper-state-reason>";
pub_msg->publish_msg_ += "</publish>";
boost::system::error_code ec;
pub_msg->publish_hdr_ = "publish/" + boost::asio::ip::host_name(ec);
Expand Down Expand Up @@ -1008,6 +1012,17 @@ bool DiscoveryServiceClient::IsPublishServiceRegisteredUp(
return false;
}

void DiscoveryServiceClient::PublishServiceReEvalString(
std::string serviceName, std::string &reeval_reason) {

PublishResponseMap::iterator loc = publish_response_map_.find(serviceName);
if (loc != publish_response_map_.end()) {
DSPublishResponse *pub_resp = loc->second;
reeval_reason = pub_resp->oper_state_reason;
}
}


void DiscoveryServiceClient::FillDiscoveryServiceSubscriberStats(
std::vector<DiscoveryClientSubscriberStats> &ds_stats) {

Expand Down
4 changes: 4 additions & 0 deletions src/discovery/client/discovery_client.h
Expand Up @@ -88,6 +88,7 @@ struct DSPublishResponse {

boost::asio::ip::udp::endpoint dss_ep_;
bool oper_state;
std::string oper_state_reason;
DiscoveryServiceClient *ds_client_;
std::string publish_msg_;
std::string publish_hdr_;
Expand Down Expand Up @@ -151,7 +152,10 @@ class DiscoveryServiceClient {
}
int GetHeartBeatInterval() { return heartbeat_interval_; }

// Test Functions
bool IsPublishServiceRegisteredUp(std::string serviceName);
void PublishServiceReEvalString(std::string serviceName,
std::string &reeval_reason);

// sandesh introspect fill stats
void FillDiscoveryServicePublisherStats(
Expand Down
85 changes: 84 additions & 1 deletion src/discovery/client/test/discovery_client_test.cc
Expand Up @@ -75,6 +75,21 @@ class DiscoveryServiceClientMock: public DiscoveryServiceClient {
}
}

bool XmppServicePublishReEvalStringHandler(std::string &msg) {
static bool publish_reeval = true;
xmpp_reeval_pub_++;
if (publish_reeval) {
publish_reeval = false;
msg = "Service waiting for condition 1111";
} else {
publish_reeval = true;
msg = "Service waiting for condition 2222";
}
return false;
}



void BuildServiceResponseMessage(std::string serviceNameTag, uint num_instances,
std::string &msg) {
auto_ptr<XmlBase> impl(XmppXmlImplFactory::Instance()->GetXmlImpl());
Expand Down Expand Up @@ -971,8 +986,9 @@ TEST_F(DiscoveryServiceClientTest, ReEvaluatePublishTest) {
TASK_UTIL_EXPECT_TRUE(
dsc_publish->IsPublishServiceRegisteredUp(service));

EvmShutdown();
dsc_publish->SetHeartBeatInterval(10);

EvmShutdown();
//withdraw publish service
dsc_publish->WithdrawPublish(service);
task_util::WaitForIdle();
Expand All @@ -984,6 +1000,73 @@ TEST_F(DiscoveryServiceClientTest, ReEvaluatePublishTest) {
task_util::WaitForIdle();
}

TEST_F(DiscoveryServiceClientTest, ReEvaluatePublishReasonStringTest) {

ip::tcp::endpoint dss_ep;
dss_ep.address(ip::address::from_string("127.0.0.1"));
dss_ep.port(5997);
DiscoveryServiceClientMock *dsc_publish =
(new DiscoveryServiceClientMock(evm_.get(), dss_ep, "DS-Test", false));
dsc_publish->SetHeartBeatInterval(0);
dsc_publish->Init();

//publish a service
std::string service = "xmpp-server-publish-reeval-string-test";
std::string msg;
dsc_publish->BuildPublishMessage(service, msg);
dsc_publish->Publish(service, msg,
boost::bind(&DiscoveryServiceClientMock::XmppServicePublishReEvalStringHandler,
dsc_publish, _1));
//Confirm published service is DOWN
TASK_UTIL_EXPECT_FALSE(
dsc_publish->IsPublishServiceRegisteredUp(service));
std::string reeval_string;
dsc_publish->PublishServiceReEvalString(service, reeval_string);
EXPECT_STREQ(reeval_string.c_str(),"Initial Registration");

// send publisher cookie response
std::string msg2;
dsc_publish->BuildPublishResponseMessage(service, msg2);
boost::system::error_code ec;
dsc_publish->PublishResponseHandler(msg2, ec, service, NULL);

TASK_UTIL_EXPECT_EQ(1, dsc_publish->XmppServiceReEvalPublishCount());
//Confirm published service is DOWN
TASK_UTIL_EXPECT_FALSE(
dsc_publish->IsPublishServiceRegisteredUp(service));
reeval_string.clear();
dsc_publish->PublishServiceReEvalString(service, reeval_string);
EXPECT_STREQ(reeval_string.c_str(),"Service waiting for condition 1111");

TASK_UTIL_EXPECT_EQ(2, dsc_publish->XmppServiceReEvalPublishCount());
//Confirm published service is DOWN
TASK_UTIL_EXPECT_FALSE(
dsc_publish->IsPublishServiceRegisteredUp(service));
reeval_string.clear();
dsc_publish->PublishServiceReEvalString(service, reeval_string);
EXPECT_STREQ(reeval_string.c_str(),"Service waiting for condition 2222");

TASK_UTIL_EXPECT_EQ(3, dsc_publish->XmppServiceReEvalPublishCount());
//Confirm published service is DOWN
TASK_UTIL_EXPECT_FALSE(
dsc_publish->IsPublishServiceRegisteredUp(service));
reeval_string.clear();
dsc_publish->PublishServiceReEvalString(service, reeval_string);
EXPECT_STREQ(reeval_string.c_str(),"Service waiting for condition 1111");

dsc_publish->SetHeartBeatInterval(10);

EvmShutdown();
//withdraw publish service
dsc_publish->WithdrawPublish(service);
task_util::WaitForIdle();

dsc_publish->Shutdown(); // No more listening socket, clear sessions
task_util::WaitForIdle();
delete dsc_publish;
task_util::WaitForIdle();
task_util::WaitForIdle();
}

}

Expand Down

0 comments on commit 8d0db48

Please sign in to comment.