Skip to content

Commit

Permalink
Merge "Handle Instance Task's creation errors" into R2.22.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and opencontrail-ci-admin committed Mar 7, 2016
2 parents beafd24 + e4b3216 commit 69e49eb
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 63 deletions.
117 changes: 75 additions & 42 deletions src/vnsw/agent/oper/instance_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ void InstanceManager::Initialize(DB *database, const std::string &netns_cmd,
netns_timeout_ = netns_timeout;
}

netns_reattempts_ = kReattemptsDefault;

int workers = kWorkersDefault;
if (netns_workers > 0) {
workers = netns_workers;
Expand Down Expand Up @@ -435,82 +437,102 @@ InstanceTaskQueue *InstanceManager::GetTaskQueue(const std::string &str) {
return task_queues_[index];
}

//After Run(), if child process is running, we keep the task status as
//"Starting" or "Stopping". We start a timer to track TaskTimeout time.
//if process is not runnning, we verify how many times we already attempted
//to run. If netns_reattempts_ are already crossed, we return false,
// so that caller deletes the task without running any further.
//If required reattempts are not done, we start a timer and return true
// so that same task is run again after the timeout. The task status is set to
//"reattempt" to track reattempt case
bool InstanceManager::StartTask(InstanceTaskQueue *task_queue,
InstanceTask *task) {


InstanceState *state = GetState(task);
if (state) {
state->reset_errors();
}

pid_t pid;
if (task->Run())
bool status = task->Run();
if (status || task->is_running()) {
pid = task->pid();
else
return false;

if (state != NULL) {
state->set_pid(pid);
state->set_cmd(task->cmd());
if (task->cmd_type() == Start) {
state->set_status_type(InstanceState::Starting);
} else {
state->set_status_type(InstanceState::Stopping);
if (state != NULL) {
state->set_pid(pid);
state->set_cmd(task->cmd());
if (task->cmd_type() == Start) {
state->set_status_type(InstanceState::Starting);
} else {
state->set_status_type(InstanceState::Stopping);
}
}
} else {
LOG(ERROR, "Instance task " << task << " attempt " << task->reattempts());
if (state) {
state->set_status_type(InstanceState::Reattempt);
state->set_cmd(task->cmd());
}
if (task->incr_reattempts() > netns_reattempts_) {
LOG(ERROR, "Instance task " << task << " reattempts exceeded");
return false;
}
}

if (pid > 0) {
task_queue->StartTimer(netns_timeout_ * 1000);
return true;
}

task_queue->Pop();
UnregisterSvcInstance(task);
delete task;
task_queue->StartTimer(netns_timeout_ * 1000);

return false;
return true;
}

//If Starting the task succeds we wait for another event on that task.
//If not the task is removed from the front of the queue and is delted.
void InstanceManager::ScheduleNextTask(InstanceTaskQueue *task_queue) {
while (!task_queue->Empty()) {

InstanceTask *task = task_queue->Front();
InstanceState *state = GetState(task);

if (!task->is_running()) {
bool starting = StartTask(task_queue, task);
if (starting) {
bool status = StartTask(task_queue, task);
if (status) {
return;
}
} else {
int delay = time(NULL) - task->start_time();
if (delay < netns_timeout_) {
return;
}
InstanceState *state = GetState(task);
if (state) {
state->set_status_type(InstanceState::Timeout);
}

LOG(ERROR, "NetNS error timeout " << delay << " > " <<
netns_timeout_ << ", " << task->cmd());

if (delay > (netns_timeout_ * 2)) {
task->Terminate();
task_queue->StopTimer();
task_queue->Pop();

ServiceInstance* svc_instance = GetSvcInstance(task);
if (state && svc_instance)
state->decr_tasks_running();

task_svc_instances_.erase(task);
LOG(ERROR, " Delay " << delay << "Timeout " <<
(netns_timeout_ * 2));

DeleteState(svc_instance);

delete task;
if (delay >= (netns_timeout_ * 2)) {
task->Terminate();
} else {
task->Stop();
return;
}
}

task_queue->StopTimer();
task_queue->Pop();

ServiceInstance* svc_instance = GetSvcInstance(task);
if (state && svc_instance)
state->decr_tasks_running();

task_svc_instances_.erase(task);

LOG(ERROR, "Delete task " << task);
DeleteState(svc_instance);

delete task;
}
}

Expand Down Expand Up @@ -669,13 +691,24 @@ void InstanceManager::StopStaleNetNS(ServiceInstance::Properties &props) {
cmd_str << " --pool-id " << props.pool_id;
}

InstanceTask *task = new InstanceTaskExecvp(cmd_str.str(), Stop,
agent_->event_manager());
std::stringstream info;
info << "NetNS stale run command queued: " << task->cmd();
Enqueue(task, props.instance_id);
std::string cmd = cmd_str.str();
std::vector<std::string> argv;
boost::split(argv, cmd, boost::is_any_of(" "), boost::token_compress_on);
std::vector<const char *> c_argv(argv.size() + 1);
for (std::size_t i = 0; i != argv.size(); ++i) {
c_argv[i] = argv[i].c_str();
}

LOG(ERROR, "Stale NetNS " << cmd);

LOG(DEBUG, info.str().c_str());
pid_t pid = vfork();
if (pid == 0) {
CloseTaskFds();
execvp(c_argv[0], (char **) c_argv.data());
perror("execvp");

_exit(127);
}
}

void InstanceManager::SetLastCmdType(ServiceInstance *svc_instance,
Expand Down
5 changes: 4 additions & 1 deletion src/vnsw/agent/oper/instance_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class InstanceManager {

static const int kTimeoutDefault = 30;
static const int kWorkersDefault = 1;
static const int kReattemptsDefault = 2;

InstanceManager(Agent *);
~InstanceManager();
Expand Down Expand Up @@ -136,6 +137,7 @@ class InstanceManager {
DBTableBase::ListenerId lb_listener_;
std::string netns_cmd_;
int netns_timeout_;
int netns_reattempts_;
WorkQueue<InstanceManagerChildEvent> work_queue_;

std::vector<InstanceTaskQueue *> task_queues_;
Expand Down Expand Up @@ -164,7 +166,8 @@ class InstanceState : public DBState {
Stopping,
Stopped,
Error,
Timeout
Timeout,
Reattempt
};

InstanceState();
Expand Down
40 changes: 21 additions & 19 deletions src/vnsw/agent/oper/instance_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "io/event_manager.h"

InstanceTask::InstanceTask()
: is_running_(false), start_time_(0)
: is_running_(false), start_time_(0), reattempts_(0)
{}

InstanceTaskExecvp::InstanceTaskExecvp(const std::string &cmd,
Expand All @@ -27,8 +27,6 @@ void InstanceTaskExecvp::ReadErrors(const boost::system::error_code &ec,
boost::system::error_code close_ec;
errors_.close(close_ec);

LOG(ERROR, "error value " << ec.value() << " for pid " << pid_ << "\n");

if (!on_exit_cb_.empty()) {
on_exit_cb_(this, ec);
}
Expand All @@ -54,6 +52,15 @@ void InstanceTaskExecvp::Terminate() {
kill(pid_, SIGKILL);
}


// If there is an error before the fork, task is set to "not running"
// and "false" is returned to caller so that caller can take appropriate
// action on task. If an error is encounted after fork, it is very
// likely that child process is running so we keep the task status as
// "running" and return "false" to caller, so that caller does not
// attempt to run the same task again. In this case, the child process
// exit notification can not be received by instance manager, hence
// instance manager has to rely on TaskTimeout delete the task.
bool InstanceTaskExecvp::Run() {
std::vector<std::string> argv;
LOG(DEBUG, "NetNS run command: " << cmd_);
Expand All @@ -68,17 +75,7 @@ bool InstanceTaskExecvp::Run() {

int err[2];
if (pipe(err) < 0) {
return false;
}
/*
* temporarily block SIGCHLD signals
*/
sigset_t mask;
sigset_t orig_mask;
sigemptyset (&mask);
sigaddset (&mask, SIGCHLD);
if (sigprocmask(SIG_BLOCK, &mask, &orig_mask) < 0) {
LOG(ERROR, "NetNS error: sigprocmask, " << strerror(errno));
return is_running_ = false;
}

pid_ = vfork();
Expand All @@ -97,23 +94,28 @@ bool InstanceTaskExecvp::Run() {

_exit(127);
}
if (sigprocmask(SIG_SETMASK, &orig_mask, NULL) < 0) {
LOG(ERROR, "NetNS error: sigprocmask, " << strerror(errno));
}

close(err[1]);

start_time_ = time(NULL);

int fd = ::dup(err[0]);
close(err[0]);
if (fd == -1) {
return is_running_ = false;
//is_running_ is still true indicating the child process is
//running. Caller needs to verify the status before acting on
//the task again
return false;
}
boost::system::error_code ec;
errors_.assign(fd, ec);
if (ec) {
close(fd);
return is_running_ = false;

//is_running_ is still true indicating the child process is
//running. Caller needs to verify the status before acting on
//the task again
return false;
}

bzero(rx_buff_, sizeof(rx_buff_));
Expand Down
12 changes: 11 additions & 1 deletion src/vnsw/agent/oper/instance_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,19 @@ class InstanceTask {
on_exit_cb_ = cb;
}

int incr_reattempts() {
return ++reattempts_;
}

int reattempts() {
return reattempts_;
}

protected:
bool is_running_;
time_t start_time_;
OnErrorCallback on_error_cb_;
int reattempts_;
OnExitCallback on_exit_cb_;
};

Expand All @@ -81,13 +90,14 @@ class InstanceTaskExecvp : public InstanceTask {
return cmd_type_;
}


private:
void ReadErrors(const boost::system::error_code &ec, size_t read_bytes);
const std::string cmd_;
boost::asio::posix::stream_descriptor errors_;
std::stringstream errors_data_;
char rx_buff_[kBufLen];
AgentSignal::SignalChildHandler sig_handler_;

pid_t pid_;
int cmd_type_;
};
Expand Down

0 comments on commit 69e49eb

Please sign in to comment.