Skip to content

Commit

Permalink
Use return value of concurrent queue try_pop to determine
Browse files Browse the repository at this point in the history
whether to enqueue HTTP session request queue processing
task. The current method of using empty with try_pop/push
without mutex can lead to situation where concurrent queue
is not empty and no task is enqueued.
Closes-Bug: #1447837

Change-Id: Ia475d3fa6b1d0fb540a159d5aed47a3dde96f2fc
  • Loading branch information
Megh Bhatt committed May 6, 2015
1 parent f3ae003 commit c1802ee
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 44 deletions.
80 changes: 36 additions & 44 deletions src/http/http_session.cc
Expand Up @@ -171,10 +171,9 @@ class HttpSession::RequestHandler : public Task {
}

// Retrieve a request item from the queue. Return true if the queue
// is empty _after_ the pop, false otherwise.
// is empty.
bool FromQ(HttpRequest *& r) {
session_->request_queue_.try_pop(r);
return session_->request_queue_.empty();
return !session_->request_queue_.try_pop(r);
}
virtual bool Run() {
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO,
Expand All @@ -184,7 +183,7 @@ class HttpSession::RequestHandler : public Task {
HttpServer *server = static_cast<HttpServer *>(session_->server());
while (true) {
request = NULL;
bool empty = FromQ(request);
session_->req_queue_empty_ = FromQ(request);
if (!request) break;
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO,
"URL is " + request->ToString());
Expand All @@ -205,16 +204,12 @@ class HttpSession::RequestHandler : public Task {
}
handler(session_.get(), request);
}

// If the queue was empty, do not proceed further. If new request
// items have been added to the queue, they would be processed by
// a new task.
if (empty) break;
}
if (del_session) {
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO, "DeleteSession "
+ session_->ToString());
session_->set_observer(NULL);
server->DeleteSession(session_.get());
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO, "DeleteSession");
}
HttpSession::task_count_--;
return true;
Expand All @@ -229,6 +224,7 @@ HttpSession::HttpSession(HttpServer *server, Socket *socket)
TaskScheduler *scheduler = TaskScheduler::GetInstance();
req_handler_task_id_ = scheduler->GetTaskId("http::RequestHandlerTask");
}
req_queue_empty_ = true;
set_observer(boost::bind(&HttpSession::OnSessionEvent, this, _1, _2));
}

Expand Down Expand Up @@ -256,7 +252,6 @@ void HttpSession::OnSessionEvent(TcpSession *session,
HttpSession *h_session = dynamic_cast<HttpSession *>(session);
assert(h_session);

bool was_empty = false;
switch (event) {
case TcpSession::CLOSE:
{
Expand All @@ -274,52 +269,49 @@ void HttpSession::OnSessionEvent(TcpSession *session,
string nourl = "";
request->SetUrl(&nourl);
request->SetEvent(event);
was_empty = request_queue_.empty();
request_queue_.push(request);
// Enqueue new RequestHandler task if needed
if (req_queue_empty_) {
TaskScheduler *scheduler = TaskScheduler::GetInstance();
RequestHandler *task = new RequestHandler(this);
HttpSession::task_count_++;
scheduler->Enqueue(task);
}
}
break;
default:
break;
}

if (was_empty) {
TaskScheduler *scheduler = TaskScheduler::GetInstance();
RequestHandler *task = new RequestHandler(this);
HttpSession::task_count_++;
scheduler->Enqueue(task);
}
}

void HttpSession::OnRead(Buffer buffer) {
const u_int8_t *data = BufferData(buffer);
size_t size = BufferSize(buffer);
std::stringstream msg;
bool was_empty = false;
{
msg << "HttpSession::Read " << size << " bytes";
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, msg.str());

if (context_str_.size() == 0) {
ReleaseBuffer(buffer);
return;
}
if (request_builder_.get() == NULL) {
request_builder_.reset(new RequestBuilder());
}
request_builder_->Parse(data, size);
if (request_builder_->complete()) {
was_empty = request_queue_.empty();
HttpRequest *request = request_builder_->GetRequest();
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, request->ToString());
request_queue_.push(request);
request_builder_->Clear();
}

msg << "HttpSession::Read " << size << " bytes";
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, msg.str());

if (context_str_.size() == 0) {
ReleaseBuffer(buffer);
return;
}
if (was_empty) {
TaskScheduler *scheduler = TaskScheduler::GetInstance();
RequestHandler *task = new RequestHandler(this);
HttpSession::task_count_++;
scheduler->Enqueue(task);
if (request_builder_.get() == NULL) {
request_builder_.reset(new RequestBuilder());
}
request_builder_->Parse(data, size);
if (request_builder_->complete()) {
HttpRequest *request = request_builder_->GetRequest();
HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, request->ToString());
request_queue_.push(request);
// Enqueue new RequestHandler task if needed
if (req_queue_empty_) {
TaskScheduler *scheduler = TaskScheduler::GetInstance();
RequestHandler *task = new RequestHandler(this);
HttpSession::task_count_++;
scheduler->Enqueue(task);
}
request_builder_->Clear();
}
// TODO: error handling
ReleaseBuffer(buffer);
Expand Down
1 change: 1 addition & 0 deletions src/http/http_session.h
Expand Up @@ -80,6 +80,7 @@ class HttpSession: public TcpSession {
{ client_context_str_ = client_ctx; }
boost::scoped_ptr<RequestBuilder> request_builder_;
tbb::concurrent_queue<HttpRequest *> request_queue_;
tbb::atomic<bool> req_queue_empty_;
std::string context_str_;
std::string client_context_str_;
SessionEventCb event_cb_;
Expand Down

0 comments on commit c1802ee

Please sign in to comment.