diff --git a/src/vnsw/agent/cmn/agent.cc b/src/vnsw/agent/cmn/agent.cc index b7183ffa4cb..78dc395f4f2 100644 --- a/src/vnsw/agent/cmn/agent.cc +++ b/src/vnsw/agent/cmn/agent.cc @@ -135,6 +135,7 @@ void Agent::SetAgentTaskPolicy() { "Agent::Uve", "Agent::KSync", "Agent::PktFlowResponder", + kTaskHealthCheck, AGENT_SHUTDOWN_TASKNAME, AGENT_INIT_TASKNAME }; diff --git a/src/vnsw/agent/cmn/agent.h b/src/vnsw/agent/cmn/agent.h index 70ffe7602f3..cb4514214e3 100644 --- a/src/vnsw/agent/cmn/agent.h +++ b/src/vnsw/agent/cmn/agent.h @@ -217,6 +217,8 @@ extern void RouterIdDepInit(Agent *agent); #define kTaskFlowEvent "Agent::FlowEvent" #define kTaskFlowAudit "KSync::FlowAudit" +#define kTaskHealthCheck "Agent::HealthCheck" + #define kInterfaceDbTablePrefix "db.interface" #define kVnDbTablePrefix "db.vn" #define kVmDbTablePrefix "db.vm" diff --git a/src/vnsw/agent/oper/health_check.cc b/src/vnsw/agent/oper/health_check.cc index 4b369176fcd..2d189f3fa83 100644 --- a/src/vnsw/agent/oper/health_check.cc +++ b/src/vnsw/agent/oper/health_check.cc @@ -46,34 +46,35 @@ HealthCheckService::~HealthCheckService() { HealthCheckInstance::HealthCheckInstance(HealthCheckService *service, MetaDataIpAllocator *allocator, VmInterface *intf) : - service_(service), intf_(intf), ip_(new MetaDataIp(allocator, intf)), - task_(NULL), last_update_time_("-") { + service_(NULL), intf_(intf), ip_(new MetaDataIp(allocator, intf)), + task_(NULL), last_update_time_("-"), deleted_(false) { active_ = false; ip_->set_active(true); intf->InsertHealthCheckInstance(this); - ResyncInterface(); + ResyncInterface(service); } HealthCheckInstance::~HealthCheckInstance() { VmInterface *intf = static_cast(intf_.get()); intf->DeleteHealthCheckInstance(this); - ResyncInterface(); - DestroyInstanceTask(); + ResyncInterface(service_.get()); } -void HealthCheckInstance::ResyncInterface() { +void HealthCheckInstance::ResyncInterface(HealthCheckService *service) { DBRequest req; req.oper = DBRequest::DB_ENTRY_ADD_CHANGE; req.key.reset(new VmInterfaceKey(AgentKey::RESYNC, intf_->GetUuid(), "")); req.data.reset(new VmInterfaceHealthCheckData()); - service_->table_->agent()->interface_table()->Enqueue(&req); + service->table_->agent()->interface_table()->Enqueue(&req); } bool HealthCheckInstance::CreateInstanceTask() { - if (task_ != NULL) { + if (!deleted_ && task_.get() != NULL) { return false; } + deleted_ = false; + HEALTH_CHECK_TRACE(Trace, "Starting " + this->to_string()); std::stringstream cmd_str; cmd_str << kHealthCheckCmd << " -m " << service_->monitor_type_; @@ -88,9 +89,10 @@ bool HealthCheckInstance::CreateInstanceTask() { cmd_str << " -u " << service_->url_path_; } - task_ = new HeathCheckProcessInstance("HealthCheckInstance", cmd_str.str(), - 0, service_->table_->agent()->event_manager()); - if (task_ != NULL) { + task_.reset(new HeathCheckProcessInstance("HealthCheckInstance", + cmd_str.str(), 0, + service_->table_->agent()->event_manager())); + if (task_.get() != NULL) { task_->set_pipe_stdout(true); task_->set_on_data_cb( boost::bind(&HealthCheckInstance::OnRead, this, _1, _2)); @@ -102,15 +104,26 @@ bool HealthCheckInstance::CreateInstanceTask() { return false; } -void HealthCheckInstance::DestroyInstanceTask() { - if (task_ == NULL) { - return; +bool HealthCheckInstance::DestroyInstanceTask() { + if (deleted_) { + return true; } - HeathCheckProcessInstance *task = task_; - task_ = NULL; - task->Stop(); - delete task; + if (task_.get() == NULL) { + return false; + } + + deleted_ = true; + task_->Stop(); + return true; +} + +void HealthCheckInstance::set_service(HealthCheckService *service) { + if (service_ == service) { + return; + } + service_ = service; + CreateInstanceTask(); } std::string HealthCheckInstance::to_string() { @@ -120,33 +133,29 @@ std::string HealthCheckInstance::to_string() { return str; } -void HealthCheckInstance::OnRead(InstanceTask *task, const std::string data) { - last_update_time_ = UTCUsecToString(UTCTimestampUsec()); - std::string msg = data; - boost::algorithm::to_lower(msg); - if (msg.find("success") != std::string::npos) { - if (!active_) { - active_ = true; - ResyncInterface(); - } - } - if (msg.find("failure") != std::string::npos) { - if (active_) { - active_ = false; - ResyncInterface(); - } - } - HEALTH_CHECK_TRACE(Trace, this->to_string() + " Received msg = " + data); +void HealthCheckInstance::OnRead(InstanceTask *task, const std::string &data) { + HealthCheckInstanceEvent *event = + new HealthCheckInstanceEvent(this, + HealthCheckInstanceEvent::MESSAGE_READ, + data); + service_->table_->InstanceEventEnqueue(event); } void HealthCheckInstance::OnExit(InstanceTask *task, const boost::system::error_code &ec) { - if (task_ != NULL) { - HEALTH_CHECK_TRACE(Trace, "Restarting " + this->to_string()); - task_->Run(); - } else { - HEALTH_CHECK_TRACE(Trace, "Stopped " + this->to_string()); - } + HealthCheckInstanceEvent *event = + new HealthCheckInstanceEvent(this, + HealthCheckInstanceEvent::TASK_EXIT, ""); + service_->table_->InstanceEventEnqueue(event); +} + +HealthCheckInstanceEvent::HealthCheckInstanceEvent(HealthCheckInstance *inst, + EventType type, + const std::string &message) : + instance_(inst), type_(type), message_(message) { +} + +HealthCheckInstanceEvent::~HealthCheckInstanceEvent() { } bool HealthCheckService::IsLess(const DBEntry &rhs) const { @@ -196,7 +205,7 @@ bool HealthCheckService::DBEntrySandesh(Sandesh *sresp, inst_data.set_health_check_ip (it->second->ip_->destination_ip().to_string()); inst_data.set_active(it->second->active_); - inst_data.set_running(it->second->task_ != NULL ? + inst_data.set_running(it->second->task_.get() != NULL ? it->second->task_->is_running(): false); inst_data.set_last_update_time(it->second->last_update_time_); inst_list.push_back(inst_data); @@ -210,6 +219,10 @@ bool HealthCheckService::DBEntrySandesh(Sandesh *sresp, return true; } +void HealthCheckService::PostAdd() { + UpdateInstanceServiceReference(); +} + bool HealthCheckService::Copy(HealthCheckTable *table, const HealthCheckServiceData *data) { bool ret = false; @@ -278,7 +291,9 @@ bool HealthCheckService::Copy(HealthCheckTable *table, ((it != intf_list_.end()) && ((*it_cfg) > it->first))) { InstanceList::iterator it_prev = it; it++; - delete it_prev->second; + if (!it_prev->second->DestroyInstanceTask()) { + delete it_prev->second; + } intf_list_.erase(it_prev); ret = true; } else { @@ -294,7 +309,6 @@ bool HealthCheckService::Copy(HealthCheckTable *table, intf_list_.insert(std::pair(*(it_cfg), inst)); inst->ip_->set_destination_ip(dest_ip_); - inst->CreateInstanceTask(); ret = true; } else { it++; @@ -306,15 +320,42 @@ bool HealthCheckService::Copy(HealthCheckTable *table, return ret; } -HealthCheckTable::HealthCheckTable(DB *db, const std::string &name) : +void HealthCheckService::UpdateInstanceServiceReference() { + InstanceList::iterator it = intf_list_.begin(); + while (it != intf_list_.end()) { + it->second->set_service(this); + it++; + } +} + +void HealthCheckService::DeleteInstances() { + InstanceList::iterator it = intf_list_.begin(); + while (it != intf_list_.end()) { + it->second->DestroyInstanceTask(); + intf_list_.erase(it); + it = intf_list_.begin(); + } +} + +HealthCheckTable::HealthCheckTable(Agent *agent, DB *db, + const std::string &name) : AgentOperDBTable(db, name) { + set_agent(agent); + inst_event_queue_ = new WorkQueue( + agent->task_scheduler()->GetTaskId(kTaskHealthCheck), 0, + boost::bind(&HealthCheckTable::InstanceEventProcess, this, _1)); + inst_event_queue_->set_name("HealthCheck instance event queue"); } HealthCheckTable::~HealthCheckTable() { + inst_event_queue_->Shutdown(); + delete inst_event_queue_; } -DBTableBase *HealthCheckTable::CreateTable(DB *db, const std::string &name) { - HealthCheckTable *health_check_table = new HealthCheckTable(db, name); +DBTableBase *HealthCheckTable::CreateTable(Agent *agent, DB *db, + const std::string &name) { + HealthCheckTable *health_check_table = + new HealthCheckTable(agent, db, name); (static_cast(health_check_table))->Init(); return health_check_table; }; @@ -343,6 +384,7 @@ bool HealthCheckTable::OperDBOnChange(DBEntry *entry, const DBRequest *req) { dynamic_cast(req->data.get()); assert(data); bool ret = service->Copy(this, data); + service->UpdateInstanceServiceReference(); return ret; } @@ -351,6 +393,8 @@ bool HealthCheckTable::OperDBResync(DBEntry *entry, const DBRequest *req) { } bool HealthCheckTable::OperDBDelete(DBEntry *entry, const DBRequest *req) { + HealthCheckService *service = static_cast(entry); + service->DeleteInstances(); return true; } @@ -463,6 +507,52 @@ HealthCheckService *HealthCheckTable::Find(const boost::uuids::uuid &u) { return static_cast(FindActiveEntry(&key)); } +void +HealthCheckTable::InstanceEventEnqueue(HealthCheckInstanceEvent *event) const { + inst_event_queue_->Enqueue(event); +} + +bool HealthCheckTable::InstanceEventProcess(HealthCheckInstanceEvent *event) { + HealthCheckInstance *inst = event->instance_; + switch (event->type_) { + case HealthCheckInstanceEvent::MESSAGE_READ: + { + inst->last_update_time_ = UTCUsecToString(UTCTimestampUsec()); + std::string msg = event->message_; + boost::algorithm::to_lower(msg); + if (msg.find("success") != std::string::npos) { + if (!inst->active_) { + inst->active_ = true; + inst->ResyncInterface(inst->service_.get()); + } + } + if (msg.find("failure") != std::string::npos) { + if (inst->active_) { + inst->active_ = false; + inst->ResyncInterface(inst->service_.get()); + } + } + HEALTH_CHECK_TRACE(Trace, inst->to_string() + + " Received msg = " + event->message_); + } + break; + case HealthCheckInstanceEvent::TASK_EXIT: + if (!inst->deleted_) { + HEALTH_CHECK_TRACE(Trace, "Restarting " + inst->to_string()); + inst->task_->Run(); + } else { + HEALTH_CHECK_TRACE(Trace, "Stopped " + inst->to_string()); + delete inst; + } + break; + default: + // unhandled event + assert(0); + } + delete event; + return true; +} + AgentSandeshPtr HealthCheckTable::GetAgentSandesh(const AgentSandeshArguments *args, const std::string &context) { diff --git a/src/vnsw/agent/oper/health_check.h b/src/vnsw/agent/oper/health_check.h index 4a06d7c6483..dd13380b343 100644 --- a/src/vnsw/agent/oper/health_check.h +++ b/src/vnsw/agent/oper/health_check.h @@ -68,6 +68,24 @@ struct HealthCheckServiceData : public AgentOperDBData { std::set intf_uuid_list_; }; +struct HealthCheckInstanceEvent { +public: + enum EventType { + MESSAGE_READ = 0, + TASK_EXIT, + EVENT_MAX + }; + + HealthCheckInstanceEvent(HealthCheckInstance *inst, EventType type, + const std::string &message); + virtual ~HealthCheckInstanceEvent(); + + HealthCheckInstance *instance_; + EventType type_; + std::string message_; + DISALLOW_COPY_AND_ASSIGN(HealthCheckInstanceEvent); +}; + struct HealthCheckInstance { typedef InstanceTaskExecvp HeathCheckProcessInstance; static const std::string kHealthCheckCmd; @@ -76,21 +94,27 @@ struct HealthCheckInstance { MetaDataIpAllocator *allocator, VmInterface *intf); ~HealthCheckInstance(); - void ResyncInterface(); + void ResyncInterface(HealthCheckService *service); bool CreateInstanceTask(); - void DestroyInstanceTask(); + + // return true it instance is scheduled to destroy + // when API returns false caller need to assure delete of + // Health Check Instance + bool DestroyInstanceTask(); + void set_service(HealthCheckService *service); std::string to_string(); // OnRead Callback for Task - void OnRead(InstanceTask *task, const std::string data); + void OnRead(InstanceTask *task, const std::string &data); // OnExit Callback for Task void OnExit(InstanceTask *task, const boost::system::error_code &ec); bool active() {return active_;} - // service under which this instance is running - HealthCheckService *service_; + // reference to health check service under + // which this instance is running + HealthCheckServiceRef service_; // Interface associated to this HealthCheck Instance InterfaceRef intf_; // MetaData IP Created for this HealthCheck Instance @@ -98,9 +122,12 @@ struct HealthCheckInstance { // current status of HealthCheckInstance bool active_; // task managing external running script for status - HeathCheckProcessInstance *task_; + boost::scoped_ptr task_; // last update time std::string last_update_time_; + // instance is delete marked + bool deleted_; + private: DISALLOW_COPY_AND_ASSIGN(HealthCheckInstance); }; @@ -124,12 +151,17 @@ class HealthCheckService : AgentRefCount, bool DBEntrySandesh(Sandesh *resp, std::string &name) const; + void PostAdd(); bool Copy(HealthCheckTable *table, const HealthCheckServiceData *data); + void UpdateInstanceServiceReference(); + void DeleteInstances(); + const boost::uuids::uuid &uuid() const { return uuid_; } private: friend class HealthCheckInstance; + friend class HealthCheckInstanceEvent; const HealthCheckTable *table_; boost::uuids::uuid uuid_; @@ -150,10 +182,11 @@ class HealthCheckService : AgentRefCount, class HealthCheckTable : public AgentOperDBTable { public: - HealthCheckTable(DB *db, const std::string &name); + HealthCheckTable(Agent *agent, DB *db, const std::string &name); virtual ~HealthCheckTable(); - static DBTableBase *CreateTable(DB *db, const std::string &name); + static DBTableBase *CreateTable(Agent *agent, DB *db, + const std::string &name); virtual std::auto_ptr AllocEntry(const DBRequestKey *k) const; virtual size_t Hash(const DBEntry *entry) const {return 0;} @@ -174,7 +207,12 @@ class HealthCheckTable : public AgentOperDBTable { HealthCheckService *Find(const boost::uuids::uuid &u); + void InstanceEventEnqueue(HealthCheckInstanceEvent *event) const; + bool InstanceEventProcess(HealthCheckInstanceEvent *event); + private: + WorkQueue *inst_event_queue_; + DISALLOW_COPY_AND_ASSIGN(HealthCheckTable); }; diff --git a/src/vnsw/agent/oper/instance_task.h b/src/vnsw/agent/oper/instance_task.h index 8463bfc9861..916735f0631 100644 --- a/src/vnsw/agent/oper/instance_task.h +++ b/src/vnsw/agent/oper/instance_task.h @@ -19,7 +19,7 @@ class InstanceTask { public: typedef boost::function OnErrorCallback; - typedef boost::function + typedef boost::function OnDataCallback; typedef boost::functionOnExitCallback; diff --git a/src/vnsw/agent/oper/operdb_init.cc b/src/vnsw/agent/oper/operdb_init.cc index 10b162884f0..0049fffa120 100644 --- a/src/vnsw/agent/oper/operdb_init.cc +++ b/src/vnsw/agent/oper/operdb_init.cc @@ -89,7 +89,8 @@ void OperDB::CreateDBTables(DB *db) { DB::RegisterFactory("db.physical_devices.0", &PhysicalDeviceTable::CreateTable); DB::RegisterFactory("db.healthcheck.0", - &HealthCheckTable::CreateTable); + boost::bind(&HealthCheckTable::CreateTable, + agent_, _1, _2)); InterfaceTable *intf_table; intf_table = static_cast(db->CreateTable("db.interface.0")); @@ -106,7 +107,6 @@ void OperDB::CreateDBTables(DB *db) { static_cast(db->CreateTable("db.healthcheck.0")); assert(hc_table); agent_->set_health_check_table(hc_table); - hc_table->set_agent(agent_); NextHopTable *nh_table; nh_table = static_cast(db->CreateTable("db.nexthop.0"));