Skip to content

Commit

Permalink
Merge "Save new and past publisher-ids."
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Feb 8, 2016
2 parents 8e2589e + 20ef43c commit f0e9050
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
18 changes: 7 additions & 11 deletions src/discovery/client/discovery_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ DSSubscribeResponse::DSSubscribeResponse(std::string serviceName,
}

DSSubscribeResponse::~DSSubscribeResponse() {
service_list_.clear();
inuse_service_list_.clear();
publisher_id_map_.clear();
TimerManager::DeleteTimer(subscribe_timer_);
}

Expand Down Expand Up @@ -98,12 +98,9 @@ void DSSubscribeResponse::DeleteInUseServiceList(

std::string DSSubscribeResponse::GetPublisherId(string ip_address) {

std::vector<DSResponse>::iterator it = service_list_.begin();
for (; it != service_list_.end(); it++) {
DSResponse resp = *it;
if (resp.ep.address().to_string().compare(ip_address) == 0) {
return (resp.publisher_id);
}
PublisherIdMap::iterator loc = publisher_id_map_.find(ip_address);
if (loc != publisher_id_map_.end()) {
return(loc->second);
}
return "";
}
Expand Down Expand Up @@ -907,6 +904,9 @@ void DiscoveryServiceClient::SubscribeResponseHandler(std::string &xmls,
boost::trim(value);
if (strcmp(subnode.name(), "ip-address") == 0) {
resp.ep.address(ip::address::from_string(value));
// Update Map to get publisher-id during next subscribe
hdr->publisher_id_map_.insert(make_pair(
resp.ep.address().to_string(), resp.publisher_id));
} else if (strcmp(subnode.name(), "port") == 0) {
uint32_t port;
stringstream sport(value);
Expand All @@ -917,7 +917,6 @@ void DiscoveryServiceClient::SubscribeResponseHandler(std::string &xmls,
ds_response.push_back(resp);
}


// generate hash of the message
boost::hash<std::string> string_hash;
uint32_t gen_chksum = string_hash(docs.str());
Expand Down Expand Up @@ -952,9 +951,6 @@ void DiscoveryServiceClient::SubscribeResponseHandler(std::string &xmls,
serviceName, ConnectionStatus::UP, ds_endpoint_,
"SubscribeResponse");

hdr->service_list_.clear();
hdr->service_list_ = ds_response;

//Start Subscribe Timer
hdr->StartSubscribeTimer(ttl);

Expand Down
8 changes: 5 additions & 3 deletions src/discovery/client/discovery_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ struct DSSubscribeResponse {
/* Subscribe Response cached */
uint32_t chksum_;
Timer *subscribe_timer_;
std::vector<DSResponse> service_list_;
DiscoveryServiceClient *ds_client_;
std::string subscribe_msg_;
int attempts_;
Expand All @@ -62,12 +61,15 @@ struct DSSubscribeResponse {

bool subscribe_cb_called_;

// Map of <ep, PublisherId> PublisherIdMap
typedef std::map<std::string, std::string> PublisherIdMap;
PublisherIdMap publisher_id_map_;
std::string GetPublisherId(std::string ip_address);

// Save in-use server list
void AddInUseServiceList(boost::asio::ip::tcp::endpoint ep);
void DeleteInUseServiceList(boost::asio::ip::tcp::endpoint ep);
std::vector<boost::asio::ip::tcp::endpoint> inuse_service_list_;

std::string GetPublisherId(std::string ip_address);
};

struct DSPublishResponse {
Expand Down
48 changes: 48 additions & 0 deletions src/discovery/client/test/discovery_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ class DiscoveryServiceClientMock: public DiscoveryServiceClient {
}
}

void AsyncSubscribeXmppHandlerWithFakePubId(std::vector<DSResponse> dr) {
//Connect handler for the service
xmpp_cb_count_++;
xmpp_instances_ = dr.size();
DSResponse ds_resp;
ds_resp.ep.address(ip::address::from_string("10.84.7.1"));
ds_resp.ep.port(9699);
AddSubscribeInUseServiceList("xmpp-server-test", ds_resp.ep);
}

void AsyncSubscribeIfMapHandler(std::vector<DSResponse> dr) {
//Connect handler for the service
ifmap_cb_count_++;
Expand Down Expand Up @@ -741,6 +751,44 @@ TEST_F(DiscoveryServiceClientTest, SubscribeWithPubId) {
task_util::WaitForIdle();
}

TEST_F(DiscoveryServiceClientTest, SubscribeWithNonExistantPubId) {

ip::tcp::endpoint dss_ep;
dss_ep.address(ip::address::from_string("127.0.0.1"));
dss_ep.port(5998);
DiscoveryServiceClientMock *dsc_subscribe =
(new DiscoveryServiceClientMock(evm_.get(), dss_ep,
"DS-Test2"));
dsc_subscribe->Init();

//subscribe a service
dsc_subscribe->Subscribe("xmpp-server-test", 2,
boost::bind(&DiscoveryServiceClientMock::AsyncSubscribeXmppHandlerWithFakePubId,
dsc_subscribe, _1));
task_util::WaitForIdle();

//subscribe response
std::string msg;
dsc_subscribe->BuildSubscribeResponseMessageWithPubliserId(
"xmpp-server-test", 2, msg);
boost::system::error_code ec;
dsc_subscribe->SubscribeResponseHandler(msg, ec, "xmpp-server-test", NULL);
EXPECT_TRUE(dsc_subscribe->XmppInstances() == 2);

DSSubscribeResponse *resp = dsc_subscribe->GetSubscribeResponse("xmpp-server-test");
TASK_UTIL_EXPECT_EQ(2, resp->sub_sent_);
EXPECT_TRUE(1 == resp->inuse_service_list_.size());

EvmShutdown();

//unsubscribe to service
dsc_subscribe->Unsubscribe("xmpp-server-test");

dsc_subscribe->Shutdown(); // No more listening socket, clear sessions
task_util::WaitForIdle();
delete dsc_subscribe;
task_util::WaitForIdle();
}

TEST_F(DiscoveryServiceClientTest, Publish_Subscribe_1_Service) {

Expand Down

0 comments on commit f0e9050

Please sign in to comment.