Skip to content

Commit

Permalink
Fix Healtcheck instance parallel access & cleanup
Browse files Browse the repository at this point in the history
Issue:
------
Health check instance is getting access from asio and
DBtable task context causing race condition to access
object and delete it at the same time.

Fix:
----
- move operation for READ and EXIT to a new HealthCheck
  task context which runs in exclusion with DBTable task
- move cleanup of instance from DBTable to HealthCheck
  task context to put events in correct sequence
- instance holds reference to service object to assure
  sanity of access till cleanup is complete

Closes-Bug: 1533627
Related-Bug: 1530539
Change-Id: I2880a2c21a8a642bd6612067be5b67ba02c88fe8
  • Loading branch information
Prabhjot Singh Sethi committed Jan 28, 2016
1 parent 25954b4 commit 4a4bb3d
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/vnsw/agent/cmn/agent.cc
Expand Up @@ -135,6 +135,7 @@ void Agent::SetAgentTaskPolicy() {
"Agent::Uve",
"Agent::KSync",
"Agent::PktFlowResponder",
kTaskHealthCheck,
AGENT_SHUTDOWN_TASKNAME,
AGENT_INIT_TASKNAME
};
Expand Down
2 changes: 2 additions & 0 deletions src/vnsw/agent/cmn/agent.h
Expand Up @@ -217,6 +217,8 @@ extern void RouterIdDepInit(Agent *agent);
#define kTaskFlowEvent "Agent::FlowEvent"
#define kTaskFlowAudit "KSync::FlowAudit"

#define kTaskHealthCheck "Agent::HealthCheck"

#define kInterfaceDbTablePrefix "db.interface"
#define kVnDbTablePrefix "db.vn"
#define kVmDbTablePrefix "db.vm"
Expand Down
184 changes: 137 additions & 47 deletions src/vnsw/agent/oper/health_check.cc
Expand Up @@ -46,34 +46,35 @@ HealthCheckService::~HealthCheckService() {
HealthCheckInstance::HealthCheckInstance(HealthCheckService *service,
MetaDataIpAllocator *allocator,
VmInterface *intf) :
service_(service), intf_(intf), ip_(new MetaDataIp(allocator, intf)),
task_(NULL), last_update_time_("-") {
service_(NULL), intf_(intf), ip_(new MetaDataIp(allocator, intf)),
task_(NULL), last_update_time_("-"), deleted_(false) {
active_ = false;
ip_->set_active(true);
intf->InsertHealthCheckInstance(this);
ResyncInterface();
ResyncInterface(service);
}

HealthCheckInstance::~HealthCheckInstance() {
VmInterface *intf = static_cast<VmInterface *>(intf_.get());
intf->DeleteHealthCheckInstance(this);
ResyncInterface();
DestroyInstanceTask();
ResyncInterface(service_.get());
}

void HealthCheckInstance::ResyncInterface() {
void HealthCheckInstance::ResyncInterface(HealthCheckService *service) {
DBRequest req;
req.oper = DBRequest::DB_ENTRY_ADD_CHANGE;
req.key.reset(new VmInterfaceKey(AgentKey::RESYNC, intf_->GetUuid(), ""));
req.data.reset(new VmInterfaceHealthCheckData());
service_->table_->agent()->interface_table()->Enqueue(&req);
service->table_->agent()->interface_table()->Enqueue(&req);
}

bool HealthCheckInstance::CreateInstanceTask() {
if (task_ != NULL) {
if (!deleted_ && task_.get() != NULL) {
return false;
}

deleted_ = false;

HEALTH_CHECK_TRACE(Trace, "Starting " + this->to_string());
std::stringstream cmd_str;
cmd_str << kHealthCheckCmd << " -m " << service_->monitor_type_;
Expand All @@ -88,9 +89,10 @@ bool HealthCheckInstance::CreateInstanceTask() {
cmd_str << " -u " << service_->url_path_;
}

task_ = new HeathCheckProcessInstance("HealthCheckInstance", cmd_str.str(),
0, service_->table_->agent()->event_manager());
if (task_ != NULL) {
task_.reset(new HeathCheckProcessInstance("HealthCheckInstance",
cmd_str.str(), 0,
service_->table_->agent()->event_manager()));
if (task_.get() != NULL) {
task_->set_pipe_stdout(true);
task_->set_on_data_cb(
boost::bind(&HealthCheckInstance::OnRead, this, _1, _2));
Expand All @@ -102,15 +104,26 @@ bool HealthCheckInstance::CreateInstanceTask() {
return false;
}

void HealthCheckInstance::DestroyInstanceTask() {
if (task_ == NULL) {
return;
bool HealthCheckInstance::DestroyInstanceTask() {
if (deleted_) {
return true;
}

HeathCheckProcessInstance *task = task_;
task_ = NULL;
task->Stop();
delete task;
if (task_.get() == NULL) {
return false;
}

deleted_ = true;
task_->Stop();
return true;
}

void HealthCheckInstance::set_service(HealthCheckService *service) {
if (service_ == service) {
return;
}
service_ = service;
CreateInstanceTask();
}

std::string HealthCheckInstance::to_string() {
Expand All @@ -120,33 +133,29 @@ std::string HealthCheckInstance::to_string() {
return str;
}

void HealthCheckInstance::OnRead(InstanceTask *task, const std::string data) {
last_update_time_ = UTCUsecToString(UTCTimestampUsec());
std::string msg = data;
boost::algorithm::to_lower(msg);
if (msg.find("success") != std::string::npos) {
if (!active_) {
active_ = true;
ResyncInterface();
}
}
if (msg.find("failure") != std::string::npos) {
if (active_) {
active_ = false;
ResyncInterface();
}
}
HEALTH_CHECK_TRACE(Trace, this->to_string() + " Received msg = " + data);
void HealthCheckInstance::OnRead(InstanceTask *task, const std::string &data) {
HealthCheckInstanceEvent *event =
new HealthCheckInstanceEvent(this,
HealthCheckInstanceEvent::MESSAGE_READ,
data);
service_->table_->InstanceEventEnqueue(event);
}

void HealthCheckInstance::OnExit(InstanceTask *task,
const boost::system::error_code &ec) {
if (task_ != NULL) {
HEALTH_CHECK_TRACE(Trace, "Restarting " + this->to_string());
task_->Run();
} else {
HEALTH_CHECK_TRACE(Trace, "Stopped " + this->to_string());
}
HealthCheckInstanceEvent *event =
new HealthCheckInstanceEvent(this,
HealthCheckInstanceEvent::TASK_EXIT, "");
service_->table_->InstanceEventEnqueue(event);
}

HealthCheckInstanceEvent::HealthCheckInstanceEvent(HealthCheckInstance *inst,
EventType type,
const std::string &message) :
instance_(inst), type_(type), message_(message) {
}

HealthCheckInstanceEvent::~HealthCheckInstanceEvent() {
}

bool HealthCheckService::IsLess(const DBEntry &rhs) const {
Expand Down Expand Up @@ -196,7 +205,7 @@ bool HealthCheckService::DBEntrySandesh(Sandesh *sresp,
inst_data.set_health_check_ip
(it->second->ip_->destination_ip().to_string());
inst_data.set_active(it->second->active_);
inst_data.set_running(it->second->task_ != NULL ?
inst_data.set_running(it->second->task_.get() != NULL ?
it->second->task_->is_running(): false);
inst_data.set_last_update_time(it->second->last_update_time_);
inst_list.push_back(inst_data);
Expand All @@ -210,6 +219,10 @@ bool HealthCheckService::DBEntrySandesh(Sandesh *sresp,
return true;
}

void HealthCheckService::PostAdd() {
UpdateInstanceServiceReference();
}

bool HealthCheckService::Copy(HealthCheckTable *table,
const HealthCheckServiceData *data) {
bool ret = false;
Expand Down Expand Up @@ -278,7 +291,9 @@ bool HealthCheckService::Copy(HealthCheckTable *table,
((it != intf_list_.end()) && ((*it_cfg) > it->first))) {
InstanceList::iterator it_prev = it;
it++;
delete it_prev->second;
if (!it_prev->second->DestroyInstanceTask()) {
delete it_prev->second;
}
intf_list_.erase(it_prev);
ret = true;
} else {
Expand All @@ -294,7 +309,6 @@ bool HealthCheckService::Copy(HealthCheckTable *table,
intf_list_.insert(std::pair<boost::uuids::uuid,
HealthCheckInstance *>(*(it_cfg), inst));
inst->ip_->set_destination_ip(dest_ip_);
inst->CreateInstanceTask();
ret = true;
} else {
it++;
Expand All @@ -306,15 +320,42 @@ bool HealthCheckService::Copy(HealthCheckTable *table,
return ret;
}

HealthCheckTable::HealthCheckTable(DB *db, const std::string &name) :
void HealthCheckService::UpdateInstanceServiceReference() {
InstanceList::iterator it = intf_list_.begin();
while (it != intf_list_.end()) {
it->second->set_service(this);
it++;
}
}

void HealthCheckService::DeleteInstances() {
InstanceList::iterator it = intf_list_.begin();
while (it != intf_list_.end()) {
it->second->DestroyInstanceTask();
intf_list_.erase(it);
it = intf_list_.begin();
}
}

HealthCheckTable::HealthCheckTable(Agent *agent, DB *db,
const std::string &name) :
AgentOperDBTable(db, name) {
set_agent(agent);
inst_event_queue_ = new WorkQueue<HealthCheckInstanceEvent *>(
agent->task_scheduler()->GetTaskId(kTaskHealthCheck), 0,
boost::bind(&HealthCheckTable::InstanceEventProcess, this, _1));
inst_event_queue_->set_name("HealthCheck instance event queue");
}

HealthCheckTable::~HealthCheckTable() {
inst_event_queue_->Shutdown();
delete inst_event_queue_;
}

DBTableBase *HealthCheckTable::CreateTable(DB *db, const std::string &name) {
HealthCheckTable *health_check_table = new HealthCheckTable(db, name);
DBTableBase *HealthCheckTable::CreateTable(Agent *agent, DB *db,
const std::string &name) {
HealthCheckTable *health_check_table =
new HealthCheckTable(agent, db, name);
(static_cast<DBTable *>(health_check_table))->Init();
return health_check_table;
};
Expand Down Expand Up @@ -343,6 +384,7 @@ bool HealthCheckTable::OperDBOnChange(DBEntry *entry, const DBRequest *req) {
dynamic_cast<HealthCheckServiceData *>(req->data.get());
assert(data);
bool ret = service->Copy(this, data);
service->UpdateInstanceServiceReference();
return ret;
}

Expand All @@ -351,6 +393,8 @@ bool HealthCheckTable::OperDBResync(DBEntry *entry, const DBRequest *req) {
}

bool HealthCheckTable::OperDBDelete(DBEntry *entry, const DBRequest *req) {
HealthCheckService *service = static_cast<HealthCheckService *>(entry);
service->DeleteInstances();
return true;
}

Expand Down Expand Up @@ -463,6 +507,52 @@ HealthCheckService *HealthCheckTable::Find(const boost::uuids::uuid &u) {
return static_cast<HealthCheckService *>(FindActiveEntry(&key));
}

void
HealthCheckTable::InstanceEventEnqueue(HealthCheckInstanceEvent *event) const {
inst_event_queue_->Enqueue(event);
}

bool HealthCheckTable::InstanceEventProcess(HealthCheckInstanceEvent *event) {
HealthCheckInstance *inst = event->instance_;
switch (event->type_) {
case HealthCheckInstanceEvent::MESSAGE_READ:
{
inst->last_update_time_ = UTCUsecToString(UTCTimestampUsec());
std::string msg = event->message_;
boost::algorithm::to_lower(msg);
if (msg.find("success") != std::string::npos) {
if (!inst->active_) {
inst->active_ = true;
inst->ResyncInterface(inst->service_.get());
}
}
if (msg.find("failure") != std::string::npos) {
if (inst->active_) {
inst->active_ = false;
inst->ResyncInterface(inst->service_.get());
}
}
HEALTH_CHECK_TRACE(Trace, inst->to_string() +
" Received msg = " + event->message_);
}
break;
case HealthCheckInstanceEvent::TASK_EXIT:
if (!inst->deleted_) {
HEALTH_CHECK_TRACE(Trace, "Restarting " + inst->to_string());
inst->task_->Run();
} else {
HEALTH_CHECK_TRACE(Trace, "Stopped " + inst->to_string());
delete inst;
}
break;
default:
// unhandled event
assert(0);
}
delete event;
return true;
}

AgentSandeshPtr
HealthCheckTable::GetAgentSandesh(const AgentSandeshArguments *args,
const std::string &context) {
Expand Down

0 comments on commit 4a4bb3d

Please sign in to comment.