Skip to content

Commit

Permalink
Merge "Change the stale cleanup functionality and add endOfRib detect…
Browse files Browse the repository at this point in the history
…ion logic."
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Aug 18, 2015
2 parents 8c4bb58 + e48172f commit 7af3199
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 24 deletions.
53 changes: 51 additions & 2 deletions src/ifmap/client/ifmap_channel.cc
Expand Up @@ -128,8 +128,12 @@ IFMapChannel::IFMapChannel(IFMapManager *manager, const std::string& user,
username_(user), password_(passwd), state_machine_(NULL),
response_state_(NONE), sequence_number_(0), recv_msg_cnt_(0),
sent_msg_cnt_(0), reconnect_attempts_(0), connection_status_(NOCONN),
connection_status_change_at_(UTCTimestampUsec()) {
connection_status_change_at_(UTCTimestampUsec()),
end_of_rib_timer_(TimerManager::CreateTimer(*(manager->io_service()),
"End of rib timer")) {

set_start_stale_entries_cleanup(false);
set_end_of_rib_computed(false);
boost::system::error_code ec;
if (certstore.empty()) {
ctx_.set_verify_mode(boost::asio::ssl::context::verify_none, ec);
Expand Down Expand Up @@ -234,6 +238,11 @@ void IFMapChannel::ReconnectPreparationInMainThr() {

void IFMapChannel::ReconnectPreparation() {
CHECK_CONCURRENCY("ifmap::StateMachine");
// The stale entries cleanup timer could be running if connection was reset
// in the near past. Stop the timer since we dont want to clean our database
// in this case.
set_start_stale_entries_cleanup(false);
manager_->ifmap_server()->StopStaleEntriesCleanup();
io_strand_.post(
boost::bind(&IFMapChannel::ReconnectPreparationInMainThr, this));
}
Expand Down Expand Up @@ -278,7 +287,7 @@ void IFMapChannel::DoConnectInMainThr(bool is_ssrc) {
if (!is_ssrc) {
if (ConnectionStatusIsDown()) {
sequence_number_++;
manager_->ifmap_server()->StaleNodesCleanup();
set_start_stale_entries_cleanup(true);
}
set_connection_status(UP);
IFMAP_PEER_DEBUG(IFMapServerConnection,
Expand Down Expand Up @@ -597,6 +606,19 @@ int IFMapChannel::ReadPollResponse() {
"Incorrectly formatted Poll response. Quitting.", "");
return -1;
}
if (reply_str.find(string("searchResult")) != string::npos) {
if (start_stale_entries_cleanup()) {
// If this is a reconnection, keep re-arming the stale entries
// cleanup timer as long as we keep receiving SearchResults.
manager_->ifmap_server()->StartStaleEntriesCleanup();
} else {
// When the daemon is coming up, as long as we are receiving
// SearchResults, we have not received the entire db.
if (!end_of_rib_computed()) {
StartEndOfRibTimer();
}
}
}
string poll_string = reply_str.substr(pos);
increment_recv_msg_cnt();
bool success = true;
Expand All @@ -618,6 +640,26 @@ int IFMapChannel::ReadPollResponse() {
return 0;
}

bool IFMapChannel::EndOfRibProcTimeout() {
int timeout = kEndOfRibTimeout;
IFMAP_PEER_DEBUG(IFMapServerConnection, integerToString(timeout),
"millisecond end of rib timer fired");
set_end_of_rib_computed(true);
return false;
}

void IFMapChannel::StartEndOfRibTimer() {
if (end_of_rib_timer_->running()) {
end_of_rib_timer_->Cancel();
}
end_of_rib_timer_->Start(kEndOfRibTimeout,
boost::bind(&IFMapChannel::EndOfRibProcTimeout, this), NULL);
}

bool IFMapChannel::EndOfRibTimerRunning() {
return end_of_rib_timer_->running();
}

// Will run in the context of the main task
void IFMapChannel::ProcResponseInMainThr(size_t bytes_to_read) {
CHECK_CONCURRENCY_MAIN_THR();
Expand Down Expand Up @@ -868,6 +910,13 @@ static bool IFMapServerInfoHandleRequest(const Sandesh *sr,
channel->get_connection_status_and_time());
server_conn_info.set_host(channel->get_host());
server_conn_info.set_port(channel->get_port());
server_conn_info.set_end_of_rib_computed(channel->end_of_rib_computed());
server_conn_info.set_end_of_rib_timer_running(
channel->EndOfRibTimerRunning());
server_conn_info.set_start_stale_entries_cleanup(
channel->start_stale_entries_cleanup());
server_conn_info.set_stale_entries_cleanup_timer_running(
sctx->ifmap_server()->StaleEntriesCleanupTimerRunning());

server_stats.set_rx_msgs(channel->get_recv_msg_cnt());
server_stats.set_tx_msgs(channel->get_sent_msg_cnt());
Expand Down
17 changes: 17 additions & 0 deletions src/ifmap/client/ifmap_channel.h
Expand Up @@ -8,6 +8,7 @@
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <tbb/atomic.h>

#ifdef __clang__
#pragma clang diagnostic push
Expand All @@ -27,6 +28,7 @@ class IFMapStateMachine;
class IFMapManager;
class TimerImpl;
class IFMapPeerTimedoutEntries;
class Timer;

class IFMapChannel {
public:
Expand Down Expand Up @@ -167,13 +169,23 @@ class IFMapChannel {
}
PeerTimedoutInfo GetTimedoutInfo(const std::string &host,
const std::string &port);
void set_start_stale_entries_cleanup(bool value) {
start_stale_entries_cleanup_ = value;
}
bool start_stale_entries_cleanup() { return start_stale_entries_cleanup_; }
void set_end_of_rib_computed(bool value) {
end_of_rib_computed_ = value;
}
bool end_of_rib_computed() { return end_of_rib_computed_; }
bool EndOfRibTimerRunning();

private:
// 45 seconds i.e. 30 + (3*5)s
static const int kSessionKeepaliveIdleTime = 30; // in seconds
static const int kSessionKeepaliveInterval = 3; // in seconds
static const int kSessionKeepaliveProbes = 5; // count
static const int kSessionTcpUserTimeout = 45000; // in milliseconds
static const int kEndOfRibTimeout = 10000; // milliseconds

enum ResponseState {
NONE = 0,
Expand Down Expand Up @@ -210,6 +222,8 @@ class IFMapChannel {
void SendPollRequestInMainThr(std::string poll_msg);
void PollResponseWaitInMainThr();
void ProcResponseInMainThr(size_t bytes_to_read);
bool EndOfRibProcTimeout();
void StartEndOfRibTimer();

IFMapManager *manager_;
boost::asio::ip::tcp::resolver resolver_;
Expand All @@ -236,6 +250,9 @@ class IFMapChannel {
uint64_t connection_status_change_at_;
boost::asio::ip::tcp::endpoint endpoint_;
TimedoutMap timedout_map_;
tbb::atomic<bool> start_stale_entries_cleanup_;
Timer *end_of_rib_timer_;
tbb::atomic<bool> end_of_rib_computed_;

std::string GetSizeAsString(size_t stream_sz, std::string log) {
std::ostringstream ss;
Expand Down
4 changes: 4 additions & 0 deletions src/ifmap/client/ifmap_manager.cc
Expand Up @@ -64,6 +64,10 @@ uint64_t IFMapManager::GetChannelSequenceNumber() {
return channel_->get_sequence_number();
}

void IFMapManager::SetStartStaleEntriesCleanup(bool value) {
channel_->set_start_stale_entries_cleanup(value);
}

void IFMapManager::GetPeerServerInfo(IFMapPeerServerInfoUI &server_info) {
server_info.set_url(get_host_port());
server_info.set_connection_status(channel_->get_connection_status());
Expand Down
1 change: 1 addition & 0 deletions src/ifmap/client/ifmap_manager.h
Expand Up @@ -57,6 +57,7 @@ class IFMapManager {
virtual void ResetConnection(const std::string &host,
const std::string &port);
bool PeerDown();
void SetStartStaleEntriesCleanup(bool value);

private:

Expand Down
14 changes: 12 additions & 2 deletions src/ifmap/ifmap_log.sandesh
Expand Up @@ -71,7 +71,7 @@ systemlog sandesh IFMapXmppUnknownMessage {
4: string client_name (key="ObjectVRouter")
}

systemlog sandesh IFMapStaleCleanerInfo {
systemlog sandesh IFMapStaleEntriesCleanerInfo {
1: "SeqNum:"
2: u64 sequence_number
3: "NodesDeleted:"
Expand All @@ -84,6 +84,11 @@ systemlog sandesh IFMapStaleCleanerInfo {
10: u32 objects_deleted
}

systemlog sandesh IFMapStaleEntriesCleanupTimerFired {
1: string str1
2: string str2
}

systemlog sandesh IFMapChannelUnregisterMessage {
1: string message
2: string client_name (key="ObjectVRouter")
Expand Down Expand Up @@ -264,7 +269,7 @@ trace sandesh IFMapXmppUnknownMessageTrace {
4: string client_name (key="ObjectVRouter")
}

trace sandesh IFMapStaleCleanerInfoTrace {
trace sandesh IFMapStaleEntriesCleanerInfoTrace {
1: "SeqNum:"
2: u64 sequence_number
3: "NodesDeleted:"
Expand All @@ -277,6 +282,11 @@ trace sandesh IFMapStaleCleanerInfoTrace {
10: u32 objects_deleted
}

trace sandesh IFMapStaleEntriesCleanupTimerFiredTrace {
1: string str1
2: string str2
}

trace sandesh IFMapChannelUnregisterMessageTrace {
1: string message
2: string client_name (key="ObjectVRouter")
Expand Down
46 changes: 32 additions & 14 deletions src/ifmap/ifmap_server.cc
Expand Up @@ -11,6 +11,7 @@
#include <boost/algorithm/string.hpp>

#include "base/logging.h"
#include "base/task_annotations.h"
#include "db/db.h"
#include "db/db_graph.h"
#include "db/db_graph_edge.h"
Expand All @@ -32,9 +33,9 @@

using std::make_pair;

class IFMapServer::IFMapStaleCleaner : public Task {
class IFMapServer::IFMapStaleEntriesCleaner : public Task {
public:
IFMapStaleCleaner(DB *db, DBGraph *graph, IFMapServer *server):
IFMapStaleEntriesCleaner(DB *db, DBGraph *graph, IFMapServer *server):
Task(TaskScheduler::GetInstance()->GetTaskId("db::DBTable"), 0),
db_(db), graph_(graph), ifmap_server_(server) {
}
Expand Down Expand Up @@ -114,7 +115,7 @@ class IFMapServer::IFMapStaleCleaner : public Task {
}
}
}
IFMAP_DEBUG(IFMapStaleCleanerInfo, curr_seq_num, nodes_deleted,
IFMAP_DEBUG(IFMapStaleEntriesCleanerInfo, curr_seq_num, nodes_deleted,
nodes_changed, links_deleted, objects_deleted);

return true;
Expand Down Expand Up @@ -190,8 +191,8 @@ IFMapServer::IFMapServer(DB *db, DBGraph *graph,
work_queue_(TaskScheduler::GetInstance()->GetTaskId("db::DBTable"), 0,
boost::bind(&IFMapServer::ClientWorker, this, _1)),
io_service_(io_service),
stale_cleanup_timer_(TimerManager::CreateTimer(*(io_service_),
"Stale cleanup timer")),
stale_entries_cleanup_timer_(TimerManager::CreateTimer(*(io_service_),
"Stale entries cleanup timer")),
ifmap_manager_(NULL), ifmap_channel_manager_(NULL) {
}

Expand All @@ -204,7 +205,7 @@ void IFMapServer::Initialize() {
}

void IFMapServer::Shutdown() {
TimerManager::DeleteTimer(stale_cleanup_timer_);
TimerManager::DeleteTimer(stale_entries_cleanup_timer_);
vm_uuid_mapper_->Shutdown();
exporter_->Shutdown();
}
Expand Down Expand Up @@ -372,21 +373,38 @@ bool IFMapServer::ClientNameToIndex(const std::string &id, int *index) {
return false;
}

bool IFMapServer::StaleNodesProcTimeout() {
IFMapStaleCleaner *cleaner = new IFMapStaleCleaner(db_, graph_, this);
bool IFMapServer::StaleEntriesProcTimeout() {
int timeout = kStaleEntriesCleanupTimeout;
IFMAP_DEBUG(IFMapStaleEntriesCleanupTimerFired, integerToString(timeout),
"millisecond stale entries cleanup timer fired");
SetStartStaleEntriesCleanup(false);
IFMapStaleEntriesCleaner *cleaner =
new IFMapStaleEntriesCleaner(db_, graph_, this);

TaskScheduler *scheduler = TaskScheduler::GetInstance();
scheduler->Enqueue(cleaner);
return false;
}

void IFMapServer::StaleNodesCleanup() {
if (stale_cleanup_timer_->running()) {
stale_cleanup_timer_->Cancel();
void IFMapServer::StartStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_entries_cleanup_timer_->Start(kStaleEntriesCleanupTimeout,
boost::bind(&IFMapServer::StaleEntriesProcTimeout, this), NULL);
}

bool IFMapServer::StaleEntriesCleanupTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return stale_entries_cleanup_timer_->running();
}

void IFMapServer::StopStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_cleanup_timer_->Start(kStaleCleanupTimeout,
boost::bind(&IFMapServer::StaleNodesProcTimeout, this),
NULL);
}

void IFMapServer::ProcessVmSubscribe(std::string vr_name, std::string vm_uuid,
Expand Down
18 changes: 13 additions & 5 deletions src/ifmap/ifmap_server.h
Expand Up @@ -57,7 +57,6 @@ class IFMapServer {

void AddClient(IFMapClient *client);
void DeleteClient(IFMapClient *client);
void StaleNodesCleanup();

DB *database() { return db_; }
DBGraph *graph() { return graph_; }
Expand All @@ -75,6 +74,11 @@ class IFMapServer {
virtual uint64_t get_ifmap_channel_sequence_number() {
return ifmap_manager_->GetChannelSequenceNumber();
}
void SetStartStaleEntriesCleanup(bool value) {
if (ifmap_manager_) {
ifmap_manager_->SetStartStaleEntriesCleanup(value);
}
}
void set_ifmap_channel_manager(IFMapChannelManager *manager) {
ifmap_channel_manager_ = manager;
}
Expand All @@ -87,7 +91,7 @@ class IFMapServer {
void ProcessVmSubscribe(std::string vr_name, std::string vm_uuid,
bool subscribe);

class IFMapStaleCleaner;
class IFMapStaleEntriesCleaner;
class IFMapVmSubscribe;

void ProcessVmRegAsPending(std::string vm_uuid, std::string vr_name,
Expand All @@ -100,9 +104,13 @@ class IFMapServer {
const CmSz_t GetIndexMapSize() const { return index_map_.size(); }
void GetUIInfo(IFMapServerInfoUI *server_info);
bool ClientNameToIndex(const std::string &id, int *index);
void StartStaleEntriesCleanup();
void StopStaleEntriesCleanup();
bool StaleEntriesCleanupTimerRunning();

private:
static const int kStaleCleanupTimeout = 60000; // milliseconds
static const int kStaleEntriesCleanupTimeout = 10000; // milliseconds

friend class IFMapServerTest;
friend class IFMapRestartTest;
friend class ShowIFMapXmppClientInfo;
Expand All @@ -123,7 +131,7 @@ class IFMapServer {
void CleanupUuidMapper(IFMapClient *client);
void ClientExporterSetup(IFMapClient *client);
void ClientExporterCleanup(IFMapClient *client);
bool StaleNodesProcTimeout();
bool StaleEntriesProcTimeout();
const ClientMap &GetClientMap() const { return client_map_; }
void SimulateDeleteClient(IFMapClient *client);

Expand All @@ -138,7 +146,7 @@ class IFMapServer {
IndexMap index_map_;
WorkQueue<QueueEntry> work_queue_;
boost::asio::io_service *io_service_;
Timer *stale_cleanup_timer_;
Timer *stale_entries_cleanup_timer_;
IFMapManager *ifmap_manager_;
IFMapChannelManager *ifmap_channel_manager_;
};
Expand Down
4 changes: 4 additions & 0 deletions src/ifmap/ifmap_server_show.sandesh
Expand Up @@ -251,6 +251,10 @@ struct IFMapPeerServerConnInfo {
4: string connection_status;
5: string host;
6: string port;
7: bool end_of_rib_computed;
8: bool end_of_rib_timer_running;
9: bool start_stale_entries_cleanup;
10: bool stale_entries_cleanup_timer_running;
}

struct IFMapPeerTimedoutEntry {
Expand Down
2 changes: 1 addition & 1 deletion src/ifmap/test/ifmap_restart_test.cc
Expand Up @@ -135,7 +135,7 @@ class IFMapRestartTest : public ::testing::Test {
}

void StaleNodesProcTimeout() {
server_.StaleNodesProcTimeout();
server_.StaleEntriesProcTimeout();
}

DB db_;
Expand Down

0 comments on commit 7af3199

Please sign in to comment.