Skip to content

Commit

Permalink
Merge "Move the stale cleanup timer to IFMapChannel"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Sep 3, 2015
2 parents 92f2f64 + 3ce95d2 commit 1ad7793
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 78 deletions.
85 changes: 69 additions & 16 deletions src/ifmap/client/ifmap_channel.cc
Expand Up @@ -129,6 +129,8 @@ IFMapChannel::IFMapChannel(IFMapManager *manager, const std::string& user,
response_state_(NONE), sequence_number_(0), recv_msg_cnt_(0),
sent_msg_cnt_(0), reconnect_attempts_(0), connection_status_(NOCONN),
connection_status_change_at_(UTCTimestampUsec()),
stale_entries_cleanup_timer_(TimerManager::CreateTimer(
*(manager->io_service()), "Stale entries cleanup timer")),
end_of_rib_timer_(TimerManager::CreateTimer(*(manager->io_service()),
"End of rib timer")) {

Expand All @@ -144,6 +146,11 @@ IFMapChannel::IFMapChannel(IFMapManager *manager, const std::string& user,
b64_auth_str_ = base64_encode(auth_str);
}

IFMapChannel::~IFMapChannel() {
TimerManager::DeleteTimer(stale_entries_cleanup_timer_);
TimerManager::DeleteTimer(end_of_rib_timer_);
}

void IFMapChannel::set_connection_status(ConnectionStatus status) {
if (connection_status_ != status) {
connection_status_ = status;
Expand Down Expand Up @@ -238,11 +245,18 @@ void IFMapChannel::ReconnectPreparationInMainThr() {

void IFMapChannel::ReconnectPreparation() {
CHECK_CONCURRENCY("ifmap::StateMachine");

// The stale entries cleanup timer could be running if connection was reset
// in the near past. Stop the timer since we dont want to clean our database
// in this case.
set_start_stale_entries_cleanup(false);
manager_->ifmap_server()->StopStaleEntriesCleanup();
StopStaleEntriesCleanupTimer();

// The end of rib timer could be running if the initial connection was
// reset in the near past. Stop the timer since we dont want to prematurely
// declare that end of rib has been computed.
StopEndOfRibTimer();

io_strand_.post(
boost::bind(&IFMapChannel::ReconnectPreparationInMainThr, this));
}
Expand Down Expand Up @@ -606,13 +620,12 @@ int IFMapChannel::ReadPollResponse() {
if (start_stale_entries_cleanup()) {
// If this is a reconnection, keep re-arming the stale entries
// cleanup timer as long as we keep receiving SearchResults.
manager_->ifmap_server()->StartStaleEntriesCleanup();
} else {
// When the daemon is coming up, as long as we are receiving
// SearchResults, we have not received the entire db.
if (!end_of_rib_computed()) {
StartEndOfRibTimer();
}
StartStaleEntriesCleanupTimer();
}
// When the daemon is coming up, as long as we are receiving
// SearchResults, we have not received the entire db.
if (!end_of_rib_computed()) {
StartEndOfRibTimer();
}
}
string poll_string = reply_str.substr(pos);
Expand All @@ -636,24 +649,64 @@ int IFMapChannel::ReadPollResponse() {
return 0;
}

bool IFMapChannel::EndOfRibProcTimeout() {
int timeout = kEndOfRibTimeout;
void IFMapChannel::StartStaleEntriesCleanupTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_entries_cleanup_timer_->Start(kStaleEntriesCleanupTimeout,
boost::bind(&IFMapChannel::ProcessStaleEntriesTimeout, this), NULL);
}

void IFMapChannel::StopStaleEntriesCleanupTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
}

// Called in the context of the main thread.
bool IFMapChannel::ProcessStaleEntriesTimeout() {
int timeout = kStaleEntriesCleanupTimeout;
IFMAP_PEER_DEBUG(IFMapServerConnection, integerToString(timeout),
"millisecond end of rib timer fired");
set_end_of_rib_computed(true);
process::ConnectionState::GetInstance()->Update();
return false;
"millisecond stale cleanup timer fired");
set_start_stale_entries_cleanup(false);
return manager_->ifmap_server()->ProcessStaleEntriesTimeout();
}

bool IFMapChannel::StaleEntriesCleanupTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return stale_entries_cleanup_timer_->running();
}

void IFMapChannel::StartEndOfRibTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (end_of_rib_timer_->running()) {
end_of_rib_timer_->Cancel();
}
end_of_rib_timer_->Start(kEndOfRibTimeout,
boost::bind(&IFMapChannel::EndOfRibProcTimeout, this), NULL);
boost::bind(&IFMapChannel::ProcessEndOfRibTimeout, this), NULL);
}

void IFMapChannel::StopEndOfRibTimer() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (end_of_rib_timer_->running()) {
end_of_rib_timer_->Cancel();
}
}

// Called in the context of the main thread.
bool IFMapChannel::ProcessEndOfRibTimeout() {
int timeout = kEndOfRibTimeout;
IFMAP_PEER_DEBUG(IFMapServerConnection, integerToString(timeout),
"millisecond end of rib timer fired");
set_end_of_rib_computed(true);
process::ConnectionState::GetInstance()->Update();
return false;
}

bool IFMapChannel::EndOfRibTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return end_of_rib_timer_->running();
}

Expand Down Expand Up @@ -913,7 +966,7 @@ static bool IFMapServerInfoHandleRequest(const Sandesh *sr,
server_conn_info.set_start_stale_entries_cleanup(
channel->start_stale_entries_cleanup());
server_conn_info.set_stale_entries_cleanup_timer_running(
sctx->ifmap_server()->StaleEntriesCleanupTimerRunning());
channel->StaleEntriesCleanupTimerRunning());

server_stats.set_rx_msgs(channel->get_recv_msg_cnt());
server_stats.set_tx_msgs(channel->get_sent_msg_cnt());
Expand Down
12 changes: 10 additions & 2 deletions src/ifmap/client/ifmap_channel.h
Expand Up @@ -48,7 +48,7 @@ class IFMapChannel {
IFMapChannel(IFMapManager *manager, const std::string& user,
const std::string& passwd, const std::string& certstore);

virtual ~IFMapChannel() { }
virtual ~IFMapChannel();

void set_sm(IFMapStateMachine *state_machine) {
state_machine_ = state_machine;
Expand Down Expand Up @@ -169,6 +169,7 @@ class IFMapChannel {
}
PeerTimedoutInfo GetTimedoutInfo(const std::string &host,
const std::string &port);
bool StaleEntriesCleanupTimerRunning();
void set_start_stale_entries_cleanup(bool value) {
start_stale_entries_cleanup_ = value;
}
Expand All @@ -187,6 +188,8 @@ class IFMapChannel {
static const int kSessionKeepaliveInterval = 3; // in seconds
static const int kSessionKeepaliveProbes = 5; // count
static const int kSessionTcpUserTimeout = 45000; // in milliseconds

static const int kStaleEntriesCleanupTimeout = 10000; // milliseconds
static const int kEndOfRibTimeout = 10000; // milliseconds

enum ResponseState {
Expand Down Expand Up @@ -224,8 +227,12 @@ class IFMapChannel {
void SendPollRequestInMainThr(std::string poll_msg);
void PollResponseWaitInMainThr();
void ProcResponseInMainThr(size_t bytes_to_read);
bool EndOfRibProcTimeout();
void StartStaleEntriesCleanupTimer();
void StopStaleEntriesCleanupTimer();
bool ProcessStaleEntriesTimeout();
void StartEndOfRibTimer();
void StopEndOfRibTimer();
bool ProcessEndOfRibTimeout();

IFMapManager *manager_;
boost::asio::ip::tcp::resolver resolver_;
Expand Down Expand Up @@ -253,6 +260,7 @@ class IFMapChannel {
boost::asio::ip::tcp::endpoint endpoint_;
TimedoutMap timedout_map_;
tbb::atomic<bool> start_stale_entries_cleanup_;
Timer *stale_entries_cleanup_timer_;
Timer *end_of_rib_timer_;
tbb::atomic<bool> end_of_rib_computed_;

Expand Down
4 changes: 0 additions & 4 deletions src/ifmap/client/ifmap_manager.cc
Expand Up @@ -58,10 +58,6 @@ bool IFMapManager::GetEndOfRibComputed() const {
return channel_->end_of_rib_computed();
}

void IFMapManager::SetStartStaleEntriesCleanup(bool value) {
channel_->set_start_stale_entries_cleanup(value);
}

void IFMapManager::GetPeerServerInfo(IFMapPeerServerInfoUI &server_info) {
server_info.set_url(get_host_port());
server_info.set_connection_status(channel_->get_connection_status());
Expand Down
1 change: 0 additions & 1 deletion src/ifmap/client/ifmap_manager.h
Expand Up @@ -54,7 +54,6 @@ class IFMapManager {
virtual void ResetConnection(const std::string &host,
const std::string &port);
bool PeerDown();
void SetStartStaleEntriesCleanup(bool value);

private:

Expand Down
31 changes: 1 addition & 30 deletions src/ifmap/ifmap_server.cc
Expand Up @@ -191,8 +191,6 @@ IFMapServer::IFMapServer(DB *db, DBGraph *graph,
work_queue_(TaskScheduler::GetInstance()->GetTaskId("db::DBTable"), 0,
boost::bind(&IFMapServer::ClientWorker, this, _1)),
io_service_(io_service),
stale_entries_cleanup_timer_(TimerManager::CreateTimer(*(io_service_),
"Stale entries cleanup timer")),
ifmap_manager_(NULL), ifmap_channel_manager_(NULL) {
}

Expand All @@ -205,7 +203,6 @@ void IFMapServer::Initialize() {
}

void IFMapServer::Shutdown() {
TimerManager::DeleteTimer(stale_entries_cleanup_timer_);
vm_uuid_mapper_->Shutdown();
exporter_->Shutdown();
}
Expand Down Expand Up @@ -373,40 +370,14 @@ bool IFMapServer::ClientNameToIndex(const std::string &id, int *index) {
return false;
}

bool IFMapServer::StaleEntriesProcTimeout() {
int timeout = kStaleEntriesCleanupTimeout;
IFMAP_DEBUG(IFMapStaleEntriesCleanupTimerFired, integerToString(timeout),
"millisecond stale entries cleanup timer fired");
SetStartStaleEntriesCleanup(false);
bool IFMapServer::ProcessStaleEntriesTimeout() {
IFMapStaleEntriesCleaner *cleaner =
new IFMapStaleEntriesCleaner(db_, graph_, this);

TaskScheduler *scheduler = TaskScheduler::GetInstance();
scheduler->Enqueue(cleaner);
return false;
}

void IFMapServer::StartStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_entries_cleanup_timer_->Start(kStaleEntriesCleanupTimeout,
boost::bind(&IFMapServer::StaleEntriesProcTimeout, this), NULL);
}

bool IFMapServer::StaleEntriesCleanupTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return stale_entries_cleanup_timer_->running();
}

void IFMapServer::StopStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
}

void IFMapServer::ProcessVmSubscribe(std::string vr_name, std::string vm_uuid,
bool subscribe, bool has_vms) {
IFMapVmSubscribe *vm_sub = new IFMapVmSubscribe(db_, graph_, this,
Expand Down
13 changes: 1 addition & 12 deletions src/ifmap/ifmap_server.h
Expand Up @@ -74,11 +74,6 @@ class IFMapServer {
virtual uint64_t get_ifmap_channel_sequence_number() {
return ifmap_manager_->GetChannelSequenceNumber();
}
void SetStartStaleEntriesCleanup(bool value) {
if (ifmap_manager_) {
ifmap_manager_->SetStartStaleEntriesCleanup(value);
}
}
void set_ifmap_channel_manager(IFMapChannelManager *manager) {
ifmap_channel_manager_ = manager;
}
Expand All @@ -104,13 +99,9 @@ class IFMapServer {
const CmSz_t GetIndexMapSize() const { return index_map_.size(); }
void GetUIInfo(IFMapServerInfoUI *server_info);
bool ClientNameToIndex(const std::string &id, int *index);
void StartStaleEntriesCleanup();
void StopStaleEntriesCleanup();
bool StaleEntriesCleanupTimerRunning();
bool ProcessStaleEntriesTimeout();

private:
static const int kStaleEntriesCleanupTimeout = 10000; // milliseconds

friend class IFMapServerTest;
friend class IFMapRestartTest;
friend class ShowIFMapXmppClientInfo;
Expand All @@ -131,7 +122,6 @@ class IFMapServer {
void CleanupUuidMapper(IFMapClient *client);
void ClientExporterSetup(IFMapClient *client);
void ClientExporterCleanup(IFMapClient *client);
bool StaleEntriesProcTimeout();
const ClientMap &GetClientMap() const { return client_map_; }
void SimulateDeleteClient(IFMapClient *client);

Expand All @@ -146,7 +136,6 @@ class IFMapServer {
IndexMap index_map_;
WorkQueue<QueueEntry> work_queue_;
boost::asio::io_service *io_service_;
Timer *stale_entries_cleanup_timer_;
IFMapManager *ifmap_manager_;
IFMapChannelManager *ifmap_channel_manager_;
};
Expand Down

0 comments on commit 1ad7793

Please sign in to comment.