Skip to content

Commit

Permalink
Handle Instance Task's creation errors
Browse files Browse the repository at this point in the history
When instance manager starts and runs an Instance Task, it sets up a
pipe between parent and child and also adds the parents pipe FD's to
boost asio. Creation of pipe and assigning the pipe fd to boost can
result in errors. These errorrs are not handled in instance manager.
This is resulting in error scenarios where complete instance task is not
setup but instance manager acts on it, which results in crashes.

As a fix, following behaviour is added.
1) If creation of pipe fails, task's "is_running" is set to false and is
not moved out of task_queue. Using a timer, it is attempted two more
times to start it. If task still fails to start after fixed number of
attempts, then task is deleted.

2) If child process successfully starts(and pid is available) but
assigning the fd to boost fails, "is_running" is set to true and
instance manager waits for netns_timeout_ time to delete this task
rather relying on pipe event. In this case, child process very likely
succeeds the execution but as pipe event is no more tracked, max time of
netns_timeout_ time is waited to delete the task.

3) The stale netns in host machines (if agent restarts)  are deleted directly rather
relying on task_queue infrastructure to make sure stale tasks are not at
head of task_queue.

partial-bug: #1527429
(cherry picked from commit 6bc6a42)

Conflicts:
	src/vnsw/agent/oper/instance_manager.cc
	src/vnsw/agent/oper/instance_task.h

Change-Id: Ifd4cacce42e8fc6136cf15a9c82e678ad3b1ea9e
(cherry picked from commit e4b3216)
  • Loading branch information
divakardhar authored and haripk committed Mar 6, 2016
1 parent c5866f5 commit f9edeba
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 63 deletions.
117 changes: 75 additions & 42 deletions src/vnsw/agent/oper/instance_manager.cc
Expand Up @@ -182,6 +182,8 @@ void InstanceManager::Initialize(DB *database, const std::string &netns_cmd,
netns_timeout_ = netns_timeout;
}

netns_reattempts_ = kReattemptsDefault;

int workers = kWorkersDefault;
if (netns_workers > 0) {
workers = netns_workers;
Expand Down Expand Up @@ -409,82 +411,102 @@ InstanceTaskQueue *InstanceManager::GetTaskQueue(const std::string &str) {
return task_queues_[index];
}

//After Run(), if child process is running, we keep the task status as
//"Starting" or "Stopping". We start a timer to track TaskTimeout time.
//if process is not runnning, we verify how many times we already attempted
//to run. If netns_reattempts_ are already crossed, we return false,
// so that caller deletes the task without running any further.
//If required reattempts are not done, we start a timer and return true
// so that same task is run again after the timeout. The task status is set to
//"reattempt" to track reattempt case
bool InstanceManager::StartTask(InstanceTaskQueue *task_queue,
InstanceTask *task) {


InstanceState *state = GetState(task);
if (state) {
state->reset_errors();
}

pid_t pid;
if (task->Run())
bool status = task->Run();
if (status || task->is_running()) {
pid = task->pid();
else
return false;

if (state != NULL) {
state->set_pid(pid);
state->set_cmd(task->cmd());
if (task->cmd_type() == Start) {
state->set_status_type(InstanceState::Starting);
} else {
state->set_status_type(InstanceState::Stopping);
if (state != NULL) {
state->set_pid(pid);
state->set_cmd(task->cmd());
if (task->cmd_type() == Start) {
state->set_status_type(InstanceState::Starting);
} else {
state->set_status_type(InstanceState::Stopping);
}
}
} else {
LOG(ERROR, "Instance task " << task << " attempt " << task->reattempts());
if (state) {
state->set_status_type(InstanceState::Reattempt);
state->set_cmd(task->cmd());
}
if (task->incr_reattempts() > netns_reattempts_) {
LOG(ERROR, "Instance task " << task << " reattempts exceeded");
return false;
}
}

if (pid > 0) {
task_queue->StartTimer(netns_timeout_ * 1000);
return true;
}

task_queue->Pop();
UnregisterSvcInstance(task);
delete task;
task_queue->StartTimer(netns_timeout_ * 1000);

return false;
return true;
}

//If Starting the task succeds we wait for another event on that task.
//If not the task is removed from the front of the queue and is delted.
void InstanceManager::ScheduleNextTask(InstanceTaskQueue *task_queue) {
while (!task_queue->Empty()) {

InstanceTask *task = task_queue->Front();
InstanceState *state = GetState(task);

if (!task->is_running()) {
bool starting = StartTask(task_queue, task);
if (starting) {
bool status = StartTask(task_queue, task);
if (status) {
return;
}
} else {
int delay = time(NULL) - task->start_time();
if (delay < netns_timeout_) {
return;
}
InstanceState *state = GetState(task);
if (state) {
state->set_status_type(InstanceState::Timeout);
}

LOG(ERROR, "NetNS error timeout " << delay << " > " <<
netns_timeout_ << ", " << task->cmd());

if (delay > (netns_timeout_ * 2)) {
task->Terminate();
task_queue->StopTimer();
task_queue->Pop();

ServiceInstance* svc_instance = GetSvcInstance(task);
if (state && svc_instance)
state->decr_tasks_running();

task_svc_instances_.erase(task);
LOG(ERROR, " Delay " << delay << "Timeout " <<
(netns_timeout_ * 2));

DeleteState(svc_instance);

delete task;
if (delay >= (netns_timeout_ * 2)) {
task->Terminate();
} else {
task->Stop();
return;
}
}

task_queue->StopTimer();
task_queue->Pop();

ServiceInstance* svc_instance = GetSvcInstance(task);
if (state && svc_instance)
state->decr_tasks_running();

task_svc_instances_.erase(task);

LOG(ERROR, "Delete task " << task);
DeleteState(svc_instance);

delete task;
}
}

Expand Down Expand Up @@ -643,13 +665,24 @@ void InstanceManager::StopStaleNetNS(ServiceInstance::Properties &props) {
cmd_str << " --pool-id " << props.pool_id;
}

InstanceTask *task = new InstanceTaskExecvp(cmd_str.str(), Stop,
agent_->event_manager());
std::stringstream info;
info << "NetNS stale run command queued: " << task->cmd();
Enqueue(task, props.instance_id);
std::string cmd = cmd_str.str();
std::vector<std::string> argv;
boost::split(argv, cmd, boost::is_any_of(" "), boost::token_compress_on);
std::vector<const char *> c_argv(argv.size() + 1);
for (std::size_t i = 0; i != argv.size(); ++i) {
c_argv[i] = argv[i].c_str();
}

LOG(ERROR, "Stale NetNS " << cmd);

LOG(DEBUG, info.str().c_str());
pid_t pid = vfork();
if (pid == 0) {
CloseTaskFds();
execvp(c_argv[0], (char **) c_argv.data());
perror("execvp");

_exit(127);
}
}

void InstanceManager::SetLastCmdType(ServiceInstance *svc_instance,
Expand Down
5 changes: 4 additions & 1 deletion src/vnsw/agent/oper/instance_manager.h
Expand Up @@ -56,6 +56,7 @@ class InstanceManager {

static const int kTimeoutDefault = 30;
static const int kWorkersDefault = 1;
static const int kReattemptsDefault = 2;

InstanceManager(Agent *);
~InstanceManager();
Expand Down Expand Up @@ -136,6 +137,7 @@ class InstanceManager {
DBTableBase::ListenerId lb_listener_;
std::string netns_cmd_;
int netns_timeout_;
int netns_reattempts_;
WorkQueue<InstanceManagerChildEvent> work_queue_;

std::vector<InstanceTaskQueue *> task_queues_;
Expand Down Expand Up @@ -164,7 +166,8 @@ class InstanceState : public DBState {
Stopping,
Stopped,
Error,
Timeout
Timeout,
Reattempt
};

InstanceState();
Expand Down
40 changes: 21 additions & 19 deletions src/vnsw/agent/oper/instance_task.cc
Expand Up @@ -8,7 +8,7 @@
#include "io/event_manager.h"

InstanceTask::InstanceTask()
: is_running_(false), start_time_(0)
: is_running_(false), start_time_(0), reattempts_(0)
{}

InstanceTaskExecvp::InstanceTaskExecvp(const std::string &cmd,
Expand All @@ -27,8 +27,6 @@ void InstanceTaskExecvp::ReadErrors(const boost::system::error_code &ec,
boost::system::error_code close_ec;
errors_.close(close_ec);

LOG(ERROR, "error value " << ec.value() << " for pid " << pid_ << "\n");

if (!on_exit_cb_.empty()) {
on_exit_cb_(this, ec);
}
Expand All @@ -54,6 +52,15 @@ void InstanceTaskExecvp::Terminate() {
kill(pid_, SIGKILL);
}


// If there is an error before the fork, task is set to "not running"
// and "false" is returned to caller so that caller can take appropriate
// action on task. If an error is encounted after fork, it is very
// likely that child process is running so we keep the task status as
// "running" and return "false" to caller, so that caller does not
// attempt to run the same task again. In this case, the child process
// exit notification can not be received by instance manager, hence
// instance manager has to rely on TaskTimeout delete the task.
bool InstanceTaskExecvp::Run() {
std::vector<std::string> argv;
LOG(DEBUG, "NetNS run command: " << cmd_);
Expand All @@ -68,17 +75,7 @@ bool InstanceTaskExecvp::Run() {

int err[2];
if (pipe(err) < 0) {
return false;
}
/*
* temporarily block SIGCHLD signals
*/
sigset_t mask;
sigset_t orig_mask;
sigemptyset (&mask);
sigaddset (&mask, SIGCHLD);
if (sigprocmask(SIG_BLOCK, &mask, &orig_mask) < 0) {
LOG(ERROR, "NetNS error: sigprocmask, " << strerror(errno));
return is_running_ = false;
}

pid_ = vfork();
Expand All @@ -97,23 +94,28 @@ bool InstanceTaskExecvp::Run() {

_exit(127);
}
if (sigprocmask(SIG_SETMASK, &orig_mask, NULL) < 0) {
LOG(ERROR, "NetNS error: sigprocmask, " << strerror(errno));
}

close(err[1]);

start_time_ = time(NULL);

int fd = ::dup(err[0]);
close(err[0]);
if (fd == -1) {
return is_running_ = false;
//is_running_ is still true indicating the child process is
//running. Caller needs to verify the status before acting on
//the task again
return false;
}
boost::system::error_code ec;
errors_.assign(fd, ec);
if (ec) {
close(fd);
return is_running_ = false;

//is_running_ is still true indicating the child process is
//running. Caller needs to verify the status before acting on
//the task again
return false;
}

bzero(rx_buff_, sizeof(rx_buff_));
Expand Down
12 changes: 11 additions & 1 deletion src/vnsw/agent/oper/instance_task.h
Expand Up @@ -51,10 +51,19 @@ class InstanceTask {
on_exit_cb_ = cb;
}

int incr_reattempts() {
return ++reattempts_;
}

int reattempts() {
return reattempts_;
}

protected:
bool is_running_;
time_t start_time_;
OnErrorCallback on_error_cb_;
int reattempts_;
OnExitCallback on_exit_cb_;
};

Expand All @@ -81,13 +90,14 @@ class InstanceTaskExecvp : public InstanceTask {
return cmd_type_;
}


private:
void ReadErrors(const boost::system::error_code &ec, size_t read_bytes);
const std::string cmd_;
boost::asio::posix::stream_descriptor errors_;
std::stringstream errors_data_;
char rx_buff_[kBufLen];
AgentSignal::SignalChildHandler sig_handler_;

pid_t pid_;
int cmd_type_;
};
Expand Down

0 comments on commit f9edeba

Please sign in to comment.