Skip to content

Commit

Permalink
Change the stale cleanup functionality and add endOfRib detection logic.
Browse files Browse the repository at this point in the history
The existing implementation uses a fixed timeout to clean up stale ifmap
node/link entries when the control-node connection to irond flaps. The ifmap
protocol sends only SearchResults when a connection comes up. Subsequent
adds/deletes are send via UpdateResults and DeleteResults. We use this fact to
decide we have reached end of rib i.e. all existing config has been received.

We start/restart a timer for each SearchResult received. Since, SearchResults
will keep coming in until all data has been downloaded, the timer will make us
wait before we cleanup entries that became stale while the connection was down.
Using a static timer value will not work well for large configs but this scheme
will.

Also, adding logic to detect endOfRib when the daemon first comes up. We use
the same logic as above to detect that we have received all available config.
This is done only when the daemon comes up for the first time. In contrast, the
stale timer functionality is used only when the connection goes down. Detecting
that all config has been received will be used to advertise the control-node to
Discovery only after complete config download from irond.

Also, some name changes for consistency.

Change-Id: Ib14f45f489591679fa406d4af41a846777f4bb28
Closes-Bug: #1446869
  • Loading branch information
tkarwa committed Aug 17, 2015
1 parent 770d9e0 commit e48172f
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 24 deletions.
53 changes: 51 additions & 2 deletions src/ifmap/client/ifmap_channel.cc
Expand Up @@ -128,8 +128,12 @@ IFMapChannel::IFMapChannel(IFMapManager *manager, const std::string& user,
username_(user), password_(passwd), state_machine_(NULL),
response_state_(NONE), sequence_number_(0), recv_msg_cnt_(0),
sent_msg_cnt_(0), reconnect_attempts_(0), connection_status_(NOCONN),
connection_status_change_at_(UTCTimestampUsec()) {
connection_status_change_at_(UTCTimestampUsec()),
end_of_rib_timer_(TimerManager::CreateTimer(*(manager->io_service()),
"End of rib timer")) {

set_start_stale_entries_cleanup(false);
set_end_of_rib_computed(false);
boost::system::error_code ec;
if (certstore.empty()) {
ctx_.set_verify_mode(boost::asio::ssl::context::verify_none, ec);
Expand Down Expand Up @@ -234,6 +238,11 @@ void IFMapChannel::ReconnectPreparationInMainThr() {

void IFMapChannel::ReconnectPreparation() {
CHECK_CONCURRENCY("ifmap::StateMachine");
// The stale entries cleanup timer could be running if connection was reset
// in the near past. Stop the timer since we dont want to clean our database
// in this case.
set_start_stale_entries_cleanup(false);
manager_->ifmap_server()->StopStaleEntriesCleanup();
io_strand_.post(
boost::bind(&IFMapChannel::ReconnectPreparationInMainThr, this));
}
Expand Down Expand Up @@ -278,7 +287,7 @@ void IFMapChannel::DoConnectInMainThr(bool is_ssrc) {
if (!is_ssrc) {
if (ConnectionStatusIsDown()) {
sequence_number_++;
manager_->ifmap_server()->StaleNodesCleanup();
set_start_stale_entries_cleanup(true);
}
set_connection_status(UP);
IFMAP_PEER_DEBUG(IFMapServerConnection,
Expand Down Expand Up @@ -597,6 +606,19 @@ int IFMapChannel::ReadPollResponse() {
"Incorrectly formatted Poll response. Quitting.", "");
return -1;
}
if (reply_str.find(string("searchResult")) != string::npos) {
if (start_stale_entries_cleanup()) {
// If this is a reconnection, keep re-arming the stale entries
// cleanup timer as long as we keep receiving SearchResults.
manager_->ifmap_server()->StartStaleEntriesCleanup();
} else {
// When the daemon is coming up, as long as we are receiving
// SearchResults, we have not received the entire db.
if (!end_of_rib_computed()) {
StartEndOfRibTimer();
}
}
}
string poll_string = reply_str.substr(pos);
increment_recv_msg_cnt();
bool success = true;
Expand All @@ -618,6 +640,26 @@ int IFMapChannel::ReadPollResponse() {
return 0;
}

bool IFMapChannel::EndOfRibProcTimeout() {
int timeout = kEndOfRibTimeout;
IFMAP_PEER_DEBUG(IFMapServerConnection, integerToString(timeout),
"millisecond end of rib timer fired");
set_end_of_rib_computed(true);
return false;
}

void IFMapChannel::StartEndOfRibTimer() {
if (end_of_rib_timer_->running()) {
end_of_rib_timer_->Cancel();
}
end_of_rib_timer_->Start(kEndOfRibTimeout,
boost::bind(&IFMapChannel::EndOfRibProcTimeout, this), NULL);
}

bool IFMapChannel::EndOfRibTimerRunning() {
return end_of_rib_timer_->running();
}

// Will run in the context of the main task
void IFMapChannel::ProcResponseInMainThr(size_t bytes_to_read) {
CHECK_CONCURRENCY_MAIN_THR();
Expand Down Expand Up @@ -868,6 +910,13 @@ static bool IFMapServerInfoHandleRequest(const Sandesh *sr,
channel->get_connection_status_and_time());
server_conn_info.set_host(channel->get_host());
server_conn_info.set_port(channel->get_port());
server_conn_info.set_end_of_rib_computed(channel->end_of_rib_computed());
server_conn_info.set_end_of_rib_timer_running(
channel->EndOfRibTimerRunning());
server_conn_info.set_start_stale_entries_cleanup(
channel->start_stale_entries_cleanup());
server_conn_info.set_stale_entries_cleanup_timer_running(
sctx->ifmap_server()->StaleEntriesCleanupTimerRunning());

server_stats.set_rx_msgs(channel->get_recv_msg_cnt());
server_stats.set_tx_msgs(channel->get_sent_msg_cnt());
Expand Down
17 changes: 17 additions & 0 deletions src/ifmap/client/ifmap_channel.h
Expand Up @@ -8,6 +8,7 @@
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <tbb/atomic.h>

#ifdef __clang__
#pragma clang diagnostic push
Expand All @@ -27,6 +28,7 @@ class IFMapStateMachine;
class IFMapManager;
class TimerImpl;
class IFMapPeerTimedoutEntries;
class Timer;

class IFMapChannel {
public:
Expand Down Expand Up @@ -167,13 +169,23 @@ class IFMapChannel {
}
PeerTimedoutInfo GetTimedoutInfo(const std::string &host,
const std::string &port);
void set_start_stale_entries_cleanup(bool value) {
start_stale_entries_cleanup_ = value;
}
bool start_stale_entries_cleanup() { return start_stale_entries_cleanup_; }
void set_end_of_rib_computed(bool value) {
end_of_rib_computed_ = value;
}
bool end_of_rib_computed() { return end_of_rib_computed_; }
bool EndOfRibTimerRunning();

private:
// 45 seconds i.e. 30 + (3*5)s
static const int kSessionKeepaliveIdleTime = 30; // in seconds
static const int kSessionKeepaliveInterval = 3; // in seconds
static const int kSessionKeepaliveProbes = 5; // count
static const int kSessionTcpUserTimeout = 45000; // in milliseconds
static const int kEndOfRibTimeout = 10000; // milliseconds

enum ResponseState {
NONE = 0,
Expand Down Expand Up @@ -210,6 +222,8 @@ class IFMapChannel {
void SendPollRequestInMainThr(std::string poll_msg);
void PollResponseWaitInMainThr();
void ProcResponseInMainThr(size_t bytes_to_read);
bool EndOfRibProcTimeout();
void StartEndOfRibTimer();

IFMapManager *manager_;
boost::asio::ip::tcp::resolver resolver_;
Expand All @@ -236,6 +250,9 @@ class IFMapChannel {
uint64_t connection_status_change_at_;
boost::asio::ip::tcp::endpoint endpoint_;
TimedoutMap timedout_map_;
tbb::atomic<bool> start_stale_entries_cleanup_;
Timer *end_of_rib_timer_;
tbb::atomic<bool> end_of_rib_computed_;

std::string GetSizeAsString(size_t stream_sz, std::string log) {
std::ostringstream ss;
Expand Down
4 changes: 4 additions & 0 deletions src/ifmap/client/ifmap_manager.cc
Expand Up @@ -64,6 +64,10 @@ uint64_t IFMapManager::GetChannelSequenceNumber() {
return channel_->get_sequence_number();
}

void IFMapManager::SetStartStaleEntriesCleanup(bool value) {
channel_->set_start_stale_entries_cleanup(value);
}

void IFMapManager::GetPeerServerInfo(IFMapPeerServerInfoUI &server_info) {
server_info.set_url(get_host_port());
server_info.set_connection_status(channel_->get_connection_status());
Expand Down
1 change: 1 addition & 0 deletions src/ifmap/client/ifmap_manager.h
Expand Up @@ -57,6 +57,7 @@ class IFMapManager {
virtual void ResetConnection(const std::string &host,
const std::string &port);
bool PeerDown();
void SetStartStaleEntriesCleanup(bool value);

private:

Expand Down
14 changes: 12 additions & 2 deletions src/ifmap/ifmap_log.sandesh
Expand Up @@ -71,7 +71,7 @@ systemlog sandesh IFMapXmppUnknownMessage {
4: string client_name (key="ObjectVRouter")
}

systemlog sandesh IFMapStaleCleanerInfo {
systemlog sandesh IFMapStaleEntriesCleanerInfo {
1: "SeqNum:"
2: u64 sequence_number
3: "NodesDeleted:"
Expand All @@ -84,6 +84,11 @@ systemlog sandesh IFMapStaleCleanerInfo {
10: u32 objects_deleted
}

systemlog sandesh IFMapStaleEntriesCleanupTimerFired {
1: string str1
2: string str2
}

systemlog sandesh IFMapChannelUnregisterMessage {
1: string message
2: string client_name (key="ObjectVRouter")
Expand Down Expand Up @@ -264,7 +269,7 @@ trace sandesh IFMapXmppUnknownMessageTrace {
4: string client_name (key="ObjectVRouter")
}

trace sandesh IFMapStaleCleanerInfoTrace {
trace sandesh IFMapStaleEntriesCleanerInfoTrace {
1: "SeqNum:"
2: u64 sequence_number
3: "NodesDeleted:"
Expand All @@ -277,6 +282,11 @@ trace sandesh IFMapStaleCleanerInfoTrace {
10: u32 objects_deleted
}

trace sandesh IFMapStaleEntriesCleanupTimerFiredTrace {
1: string str1
2: string str2
}

trace sandesh IFMapChannelUnregisterMessageTrace {
1: string message
2: string client_name (key="ObjectVRouter")
Expand Down
46 changes: 32 additions & 14 deletions src/ifmap/ifmap_server.cc
Expand Up @@ -11,6 +11,7 @@
#include <boost/algorithm/string.hpp>

#include "base/logging.h"
#include "base/task_annotations.h"
#include "db/db.h"
#include "db/db_graph.h"
#include "db/db_graph_edge.h"
Expand All @@ -32,9 +33,9 @@

using std::make_pair;

class IFMapServer::IFMapStaleCleaner : public Task {
class IFMapServer::IFMapStaleEntriesCleaner : public Task {
public:
IFMapStaleCleaner(DB *db, DBGraph *graph, IFMapServer *server):
IFMapStaleEntriesCleaner(DB *db, DBGraph *graph, IFMapServer *server):
Task(TaskScheduler::GetInstance()->GetTaskId("db::DBTable"), 0),
db_(db), graph_(graph), ifmap_server_(server) {
}
Expand Down Expand Up @@ -114,7 +115,7 @@ class IFMapServer::IFMapStaleCleaner : public Task {
}
}
}
IFMAP_DEBUG(IFMapStaleCleanerInfo, curr_seq_num, nodes_deleted,
IFMAP_DEBUG(IFMapStaleEntriesCleanerInfo, curr_seq_num, nodes_deleted,
nodes_changed, links_deleted, objects_deleted);

return true;
Expand Down Expand Up @@ -190,8 +191,8 @@ IFMapServer::IFMapServer(DB *db, DBGraph *graph,
work_queue_(TaskScheduler::GetInstance()->GetTaskId("db::DBTable"), 0,
boost::bind(&IFMapServer::ClientWorker, this, _1)),
io_service_(io_service),
stale_cleanup_timer_(TimerManager::CreateTimer(*(io_service_),
"Stale cleanup timer")),
stale_entries_cleanup_timer_(TimerManager::CreateTimer(*(io_service_),
"Stale entries cleanup timer")),
ifmap_manager_(NULL), ifmap_channel_manager_(NULL) {
}

Expand All @@ -204,7 +205,7 @@ void IFMapServer::Initialize() {
}

void IFMapServer::Shutdown() {
TimerManager::DeleteTimer(stale_cleanup_timer_);
TimerManager::DeleteTimer(stale_entries_cleanup_timer_);
vm_uuid_mapper_->Shutdown();
exporter_->Shutdown();
}
Expand Down Expand Up @@ -372,21 +373,38 @@ bool IFMapServer::ClientNameToIndex(const std::string &id, int *index) {
return false;
}

bool IFMapServer::StaleNodesProcTimeout() {
IFMapStaleCleaner *cleaner = new IFMapStaleCleaner(db_, graph_, this);
bool IFMapServer::StaleEntriesProcTimeout() {
int timeout = kStaleEntriesCleanupTimeout;
IFMAP_DEBUG(IFMapStaleEntriesCleanupTimerFired, integerToString(timeout),
"millisecond stale entries cleanup timer fired");
SetStartStaleEntriesCleanup(false);
IFMapStaleEntriesCleaner *cleaner =
new IFMapStaleEntriesCleaner(db_, graph_, this);

TaskScheduler *scheduler = TaskScheduler::GetInstance();
scheduler->Enqueue(cleaner);
return false;
}

void IFMapServer::StaleNodesCleanup() {
if (stale_cleanup_timer_->running()) {
stale_cleanup_timer_->Cancel();
void IFMapServer::StartStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_entries_cleanup_timer_->Start(kStaleEntriesCleanupTimeout,
boost::bind(&IFMapServer::StaleEntriesProcTimeout, this), NULL);
}

bool IFMapServer::StaleEntriesCleanupTimerRunning() {
CHECK_CONCURRENCY("ifmap::StateMachine");
return stale_entries_cleanup_timer_->running();
}

void IFMapServer::StopStaleEntriesCleanup() {
CHECK_CONCURRENCY("ifmap::StateMachine");
if (stale_entries_cleanup_timer_->running()) {
stale_entries_cleanup_timer_->Cancel();
}
stale_cleanup_timer_->Start(kStaleCleanupTimeout,
boost::bind(&IFMapServer::StaleNodesProcTimeout, this),
NULL);
}

void IFMapServer::ProcessVmSubscribe(std::string vr_name, std::string vm_uuid,
Expand Down
18 changes: 13 additions & 5 deletions src/ifmap/ifmap_server.h
Expand Up @@ -57,7 +57,6 @@ class IFMapServer {

void AddClient(IFMapClient *client);
void DeleteClient(IFMapClient *client);
void StaleNodesCleanup();

DB *database() { return db_; }
DBGraph *graph() { return graph_; }
Expand All @@ -75,6 +74,11 @@ class IFMapServer {
virtual uint64_t get_ifmap_channel_sequence_number() {
return ifmap_manager_->GetChannelSequenceNumber();
}
void SetStartStaleEntriesCleanup(bool value) {
if (ifmap_manager_) {
ifmap_manager_->SetStartStaleEntriesCleanup(value);
}
}
void set_ifmap_channel_manager(IFMapChannelManager *manager) {
ifmap_channel_manager_ = manager;
}
Expand All @@ -87,7 +91,7 @@ class IFMapServer {
void ProcessVmSubscribe(std::string vr_name, std::string vm_uuid,
bool subscribe);

class IFMapStaleCleaner;
class IFMapStaleEntriesCleaner;
class IFMapVmSubscribe;

void ProcessVmRegAsPending(std::string vm_uuid, std::string vr_name,
Expand All @@ -100,9 +104,13 @@ class IFMapServer {
const CmSz_t GetIndexMapSize() const { return index_map_.size(); }
void GetUIInfo(IFMapServerInfoUI *server_info);
bool ClientNameToIndex(const std::string &id, int *index);
void StartStaleEntriesCleanup();
void StopStaleEntriesCleanup();
bool StaleEntriesCleanupTimerRunning();

private:
static const int kStaleCleanupTimeout = 60000; // milliseconds
static const int kStaleEntriesCleanupTimeout = 10000; // milliseconds

friend class IFMapServerTest;
friend class IFMapRestartTest;
friend class ShowIFMapXmppClientInfo;
Expand All @@ -123,7 +131,7 @@ class IFMapServer {
void CleanupUuidMapper(IFMapClient *client);
void ClientExporterSetup(IFMapClient *client);
void ClientExporterCleanup(IFMapClient *client);
bool StaleNodesProcTimeout();
bool StaleEntriesProcTimeout();
const ClientMap &GetClientMap() const { return client_map_; }
void SimulateDeleteClient(IFMapClient *client);

Expand All @@ -138,7 +146,7 @@ class IFMapServer {
IndexMap index_map_;
WorkQueue<QueueEntry> work_queue_;
boost::asio::io_service *io_service_;
Timer *stale_cleanup_timer_;
Timer *stale_entries_cleanup_timer_;
IFMapManager *ifmap_manager_;
IFMapChannelManager *ifmap_channel_manager_;
};
Expand Down
4 changes: 4 additions & 0 deletions src/ifmap/ifmap_server_show.sandesh
Expand Up @@ -251,6 +251,10 @@ struct IFMapPeerServerConnInfo {
4: string connection_status;
5: string host;
6: string port;
7: bool end_of_rib_computed;
8: bool end_of_rib_timer_running;
9: bool start_stale_entries_cleanup;
10: bool stale_entries_cleanup_timer_running;
}

struct IFMapPeerTimedoutEntry {
Expand Down
2 changes: 1 addition & 1 deletion src/ifmap/test/ifmap_restart_test.cc
Expand Up @@ -135,7 +135,7 @@ class IFMapRestartTest : public ::testing::Test {
}

void StaleNodesProcTimeout() {
server_.StaleNodesProcTimeout();
server_.StaleEntriesProcTimeout();
}

DB db_;
Expand Down

0 comments on commit e48172f

Please sign in to comment.