Skip to content

Commit

Permalink
Move the stale cleanup timer to IFMapChannel
Browse files Browse the repository at this point in the history
Also, stop the end of rib timer if its running and the connection goes down.

Change-Id: I877b91815f7d89e09c50b8fd2fd0ff011b84dc70
Closes-Bug: #1446869
  • Loading branch information
tkarwa committed Sep 1, 2015
1 parent 891afa6 commit 3ce95d2
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 78 deletions.
85 changes: 69 additions & 16 deletions src/ifmap/client/ifmap_channel.cc
Expand Up @@ -129,6 +129,8 @@ IFMapChannel::IFMapChannel(IFMapManager *manager, const std::string& user,
response_state_(NONE), sequence_number_(0), recv_msg_cnt_(0),
sent_msg_cnt_(0), reconnect_attempts_(0), connection_status_(NOCONN),
connection_status_change_at_(UTCTimestampUsec()),
stale_entries_cleanup_timer_(TimerManager::CreateTimer(
*(manager->io_service()), "Stale entries cleanup timer")),
end_of_rib_timer_(TimerManager::CreateTimer(*(manager->io_service()),
"End of rib timer")) {

Expand All @@ -144,6 +146,11 @@ IFMapChannel::IFMapChannel(IFMapManager *manager, const std::string& user,
b64_auth_str_ = base64_encode(auth_str);
}

IFMapChannel::~IFMapChannel() {
TimerManager::DeleteTimer(stale_entries_cleanup_timer_);
TimerManager::DeleteTimer(end_of_rib_timer_);
}

void IFMapChannel::set_connection_status(ConnectionStatus status) {
if (connection_status_ != status) {
connection_status_ = status;
Expand Down Expand Up @@ -238,11 +245,18 @@ void IFMapChannel::ReconnectPreparationInMainThr() {

void IFMapChannel::ReconnectPreparation() {
CHECK_CONCURRENCY("ifmap::StateMachine");

// The stale entries cleanup timer could be running if connection was reset
// in the near past. Stop the timer since we dont want to clean our database
// in this case.
set_start_stale_entries_cleanup(false);
manager_->ifmap_server()->StopStaleEntriesCleanup();
StopStaleEntriesCleanupTimer();

// The end of rib timer could be running if the initial connection was
// reset in the near past. Stop the timer since we dont want to prematurely
// declare that end of rib has been computed.
StopEndOfRibTimer();

io_strand_.post(
boost::bind(&IFMapChannel::ReconnectPreparationInMainThr, this));
}
Expand Down Expand Up @@ -606,13 +620,12 @@ int IFMapChannel::ReadPollResponse() {
if (start_stale_entries_cleanup()) {
// If this is a reconnection, keep re-arming the stale entries
// cleanup timer as long as we keep receiving SearchResults.
manager_->ifmap_server()->StartStaleEntriesCleanup();
} else {
// When the daemon is coming up, as long as we are receiving
// SearchResults, we have not received the entire db.
if (!end_of_rib_computed()) {
StartEndOfRibTimer();
}
StartStaleEntriesCleanupTimer();
}
// When the daemon is coming up, as long as we are receiving
// SearchResults, we have not received the entire db.
if (!end_of_rib_computed()) {
StartEndOfRibTimer();
}
}
string poll_string = reply_str.substr(pos);
Expand All @@ -636,24 +649,64 @@ int IFMapChannel::ReadPollResponse() {
return 0;
}

bool IFMapChannel::EndOfRibProcTimeout() {
int timeout = kEndOfRibTimeout;
void IFMapChannel::StartStaleEntriesCleanupTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_entries_cleanup_timer_->Start(kStaleEntriesCleanupTimeout,
boost::bind(&IFMapChannel::ProcessStaleEntriesTimeout, this), NULL);
}

void IFMapChannel::StopStaleEntriesCleanupTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
}

// Called in the context of the main thread.
bool IFMapChannel::ProcessStaleEntriesTimeout() {
int timeout = kStaleEntriesCleanupTimeout;
IFMAP_PEER_DEBUG(IFMapServerConnection, integerToString(timeout),
"millisecond end of rib timer fired");
set_end_of_rib_computed(true);
process::ConnectionState::GetInstance()->Update();
return false;
"millisecond stale cleanup timer fired");
set_start_stale_entries_cleanup(false);
return manager_->ifmap_server()->ProcessStaleEntriesTimeout();
}

bool IFMapChannel::StaleEntriesCleanupTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return stale_entries_cleanup_timer_->running();
}

void IFMapChannel::StartEndOfRibTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (end_of_rib_timer_->running()) {
end_of_rib_timer_->Cancel();
}
end_of_rib_timer_->Start(kEndOfRibTimeout,
boost::bind(&IFMapChannel::EndOfRibProcTimeout, this), NULL);
boost::bind(&IFMapChannel::ProcessEndOfRibTimeout, this), NULL);
}

void IFMapChannel::StopEndOfRibTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (end_of_rib_timer_->running()) {
end_of_rib_timer_->Cancel();
}
}

// Called in the context of the main thread.
bool IFMapChannel::ProcessEndOfRibTimeout() {
int timeout = kEndOfRibTimeout;
IFMAP_PEER_DEBUG(IFMapServerConnection, integerToString(timeout),
"millisecond end of rib timer fired");
set_end_of_rib_computed(true);
process::ConnectionState::GetInstance()->Update();
return false;
}

bool IFMapChannel::EndOfRibTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return end_of_rib_timer_->running();
}

Expand Down Expand Up @@ -913,7 +966,7 @@ static bool IFMapServerInfoHandleRequest(const Sandesh *sr,
server_conn_info.set_start_stale_entries_cleanup(
channel->start_stale_entries_cleanup());
server_conn_info.set_stale_entries_cleanup_timer_running(
sctx->ifmap_server()->StaleEntriesCleanupTimerRunning());
channel->StaleEntriesCleanupTimerRunning());

server_stats.set_rx_msgs(channel->get_recv_msg_cnt());
server_stats.set_tx_msgs(channel->get_sent_msg_cnt());
Expand Down
12 changes: 10 additions & 2 deletions src/ifmap/client/ifmap_channel.h
Expand Up @@ -48,7 +48,7 @@ class IFMapChannel {
IFMapChannel(IFMapManager *manager, const std::string& user,
const std::string& passwd, const std::string& certstore);

virtual ~IFMapChannel() { }
virtual ~IFMapChannel();

void set_sm(IFMapStateMachine *state_machine) {
state_machine_ = state_machine;
Expand Down Expand Up @@ -169,6 +169,7 @@ class IFMapChannel {
}
PeerTimedoutInfo GetTimedoutInfo(const std::string &host,
const std::string &port);
bool StaleEntriesCleanupTimerRunning();
void set_start_stale_entries_cleanup(bool value) {
start_stale_entries_cleanup_ = value;
}
Expand All @@ -187,6 +188,8 @@ class IFMapChannel {
static const int kSessionKeepaliveInterval = 3; // in seconds
static const int kSessionKeepaliveProbes = 5; // count
static const int kSessionTcpUserTimeout = 45000; // in milliseconds

static const int kStaleEntriesCleanupTimeout = 10000; // milliseconds
static const int kEndOfRibTimeout = 10000; // milliseconds

enum ResponseState {
Expand Down Expand Up @@ -224,8 +227,12 @@ class IFMapChannel {
void SendPollRequestInMainThr(std::string poll_msg);
void PollResponseWaitInMainThr();
void ProcResponseInMainThr(size_t bytes_to_read);
bool EndOfRibProcTimeout();
void StartStaleEntriesCleanupTimer();
void StopStaleEntriesCleanupTimer();
bool ProcessStaleEntriesTimeout();
void StartEndOfRibTimer();
void StopEndOfRibTimer();
bool ProcessEndOfRibTimeout();

IFMapManager *manager_;
boost::asio::ip::tcp::resolver resolver_;
Expand Down Expand Up @@ -253,6 +260,7 @@ class IFMapChannel {
boost::asio::ip::tcp::endpoint endpoint_;
TimedoutMap timedout_map_;
tbb::atomic<bool> start_stale_entries_cleanup_;
Timer *stale_entries_cleanup_timer_;
Timer *end_of_rib_timer_;
tbb::atomic<bool> end_of_rib_computed_;

Expand Down
4 changes: 0 additions & 4 deletions src/ifmap/client/ifmap_manager.cc
Expand Up @@ -58,10 +58,6 @@ bool IFMapManager::GetEndOfRibComputed() const {
return channel_->end_of_rib_computed();
}

void IFMapManager::SetStartStaleEntriesCleanup(bool value) {
channel_->set_start_stale_entries_cleanup(value);
}

void IFMapManager::GetPeerServerInfo(IFMapPeerServerInfoUI &server_info) {
server_info.set_url(get_host_port());
server_info.set_connection_status(channel_->get_connection_status());
Expand Down
1 change: 0 additions & 1 deletion src/ifmap/client/ifmap_manager.h
Expand Up @@ -54,7 +54,6 @@ class IFMapManager {
virtual void ResetConnection(const std::string &host,
const std::string &port);
bool PeerDown();
void SetStartStaleEntriesCleanup(bool value);

private:

Expand Down
31 changes: 1 addition & 30 deletions src/ifmap/ifmap_server.cc
Expand Up @@ -191,8 +191,6 @@ IFMapServer::IFMapServer(DB *db, DBGraph *graph,
work_queue_(TaskScheduler::GetInstance()->GetTaskId("db::DBTable"), 0,
boost::bind(&IFMapServer::ClientWorker, this, _1)),
io_service_(io_service),
stale_entries_cleanup_timer_(TimerManager::CreateTimer(*(io_service_),
"Stale entries cleanup timer")),
ifmap_manager_(NULL), ifmap_channel_manager_(NULL) {
}

Expand All @@ -205,7 +203,6 @@ void IFMapServer::Initialize() {
}

void IFMapServer::Shutdown() {
TimerManager::DeleteTimer(stale_entries_cleanup_timer_);
vm_uuid_mapper_->Shutdown();
exporter_->Shutdown();
}
Expand Down Expand Up @@ -373,40 +370,14 @@ bool IFMapServer::ClientNameToIndex(const std::string &id, int *index) {
return false;
}

bool IFMapServer::StaleEntriesProcTimeout() {
int timeout = kStaleEntriesCleanupTimeout;
IFMAP_DEBUG(IFMapStaleEntriesCleanupTimerFired, integerToString(timeout),
"millisecond stale entries cleanup timer fired");
SetStartStaleEntriesCleanup(false);
bool IFMapServer::ProcessStaleEntriesTimeout() {
IFMapStaleEntriesCleaner *cleaner =
new IFMapStaleEntriesCleaner(db_, graph_, this);

TaskScheduler *scheduler = TaskScheduler::GetInstance();
scheduler->Enqueue(cleaner);
return false;
}

void IFMapServer::StartStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_entries_cleanup_timer_->Start(kStaleEntriesCleanupTimeout,
boost::bind(&IFMapServer::StaleEntriesProcTimeout, this), NULL);
}

bool IFMapServer::StaleEntriesCleanupTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return stale_entries_cleanup_timer_->running();
}

void IFMapServer::StopStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
}

void IFMapServer::ProcessVmSubscribe(std::string vr_name, std::string vm_uuid,
bool subscribe, bool has_vms) {
IFMapVmSubscribe *vm_sub = new IFMapVmSubscribe(db_, graph_, this,
Expand Down
13 changes: 1 addition & 12 deletions src/ifmap/ifmap_server.h
Expand Up @@ -74,11 +74,6 @@ class IFMapServer {
virtual uint64_t get_ifmap_channel_sequence_number() {
return ifmap_manager_->GetChannelSequenceNumber();
}
void SetStartStaleEntriesCleanup(bool value) {
if (ifmap_manager_) {
ifmap_manager_->SetStartStaleEntriesCleanup(value);
}
}
void set_ifmap_channel_manager(IFMapChannelManager *manager) {
ifmap_channel_manager_ = manager;
}
Expand All @@ -104,13 +99,9 @@ class IFMapServer {
const CmSz_t GetIndexMapSize() const { return index_map_.size(); }
void GetUIInfo(IFMapServerInfoUI *server_info);
bool ClientNameToIndex(const std::string &id, int *index);
void StartStaleEntriesCleanup();
void StopStaleEntriesCleanup();
bool StaleEntriesCleanupTimerRunning();
bool ProcessStaleEntriesTimeout();

private:
static const int kStaleEntriesCleanupTimeout = 10000; // milliseconds

friend class IFMapServerTest;
friend class IFMapRestartTest;
friend class ShowIFMapXmppClientInfo;
Expand All @@ -131,7 +122,6 @@ class IFMapServer {
void CleanupUuidMapper(IFMapClient *client);
void ClientExporterSetup(IFMapClient *client);
void ClientExporterCleanup(IFMapClient *client);
bool StaleEntriesProcTimeout();
const ClientMap &GetClientMap() const { return client_map_; }
void SimulateDeleteClient(IFMapClient *client);

Expand All @@ -146,7 +136,6 @@ class IFMapServer {
IndexMap index_map_;
WorkQueue<QueueEntry> work_queue_;
boost::asio::io_service *io_service_;
Timer *stale_entries_cleanup_timer_;
IFMapManager *ifmap_manager_;
IFMapChannelManager *ifmap_channel_manager_;
};
Expand Down

0 comments on commit 3ce95d2

Please sign in to comment.