From c1802eeaf3035fc4bd4fd21a7eaae2e4a827c92e Mon Sep 17 00:00:00 2001 From: Megh Bhatt Date: Mon, 4 May 2015 17:38:44 -0700 Subject: [PATCH] Use return value of concurrent queue try_pop to determine whether to enqueue HTTP session request queue processing task. The current method of using empty with try_pop/push without mutex can lead to situation where concurrent queue is not empty and no task is enqueued. Closes-Bug: #1447837 Change-Id: Ia475d3fa6b1d0fb540a159d5aed47a3dde96f2fc --- src/http/http_session.cc | 80 ++++++++++++++++++---------------------- src/http/http_session.h | 1 + 2 files changed, 37 insertions(+), 44 deletions(-) diff --git a/src/http/http_session.cc b/src/http/http_session.cc index 7e1ff29cef5..3300eabdd9c 100644 --- a/src/http/http_session.cc +++ b/src/http/http_session.cc @@ -171,10 +171,9 @@ class HttpSession::RequestHandler : public Task { } // Retrieve a request item from the queue. Return true if the queue - // is empty _after_ the pop, false otherwise. + // is empty. bool FromQ(HttpRequest *& r) { - session_->request_queue_.try_pop(r); - return session_->request_queue_.empty(); + return !session_->request_queue_.try_pop(r); } virtual bool Run() { HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO, @@ -184,7 +183,7 @@ class HttpSession::RequestHandler : public Task { HttpServer *server = static_cast(session_->server()); while (true) { request = NULL; - bool empty = FromQ(request); + session_->req_queue_empty_ = FromQ(request); if (!request) break; HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO, "URL is " + request->ToString()); @@ -205,16 +204,12 @@ class HttpSession::RequestHandler : public Task { } handler(session_.get(), request); } - - // If the queue was empty, do not proceed further. If new request - // items have been added to the queue, they would be processed by - // a new task. - if (empty) break; } if (del_session) { + HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO, "DeleteSession " + + session_->ToString()); session_->set_observer(NULL); server->DeleteSession(session_.get()); - HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_INFO, "DeleteSession"); } HttpSession::task_count_--; return true; @@ -229,6 +224,7 @@ HttpSession::HttpSession(HttpServer *server, Socket *socket) TaskScheduler *scheduler = TaskScheduler::GetInstance(); req_handler_task_id_ = scheduler->GetTaskId("http::RequestHandlerTask"); } + req_queue_empty_ = true; set_observer(boost::bind(&HttpSession::OnSessionEvent, this, _1, _2)); } @@ -256,7 +252,6 @@ void HttpSession::OnSessionEvent(TcpSession *session, HttpSession *h_session = dynamic_cast(session); assert(h_session); - bool was_empty = false; switch (event) { case TcpSession::CLOSE: { @@ -274,52 +269,49 @@ void HttpSession::OnSessionEvent(TcpSession *session, string nourl = ""; request->SetUrl(&nourl); request->SetEvent(event); - was_empty = request_queue_.empty(); request_queue_.push(request); + // Enqueue new RequestHandler task if needed + if (req_queue_empty_) { + TaskScheduler *scheduler = TaskScheduler::GetInstance(); + RequestHandler *task = new RequestHandler(this); + HttpSession::task_count_++; + scheduler->Enqueue(task); + } } break; default: break; } - - if (was_empty) { - TaskScheduler *scheduler = TaskScheduler::GetInstance(); - RequestHandler *task = new RequestHandler(this); - HttpSession::task_count_++; - scheduler->Enqueue(task); - } } void HttpSession::OnRead(Buffer buffer) { const u_int8_t *data = BufferData(buffer); size_t size = BufferSize(buffer); std::stringstream msg; - bool was_empty = false; - { - msg << "HttpSession::Read " << size << " bytes"; - HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, msg.str()); - - if (context_str_.size() == 0) { - ReleaseBuffer(buffer); - return; - } - if (request_builder_.get() == NULL) { - request_builder_.reset(new RequestBuilder()); - } - request_builder_->Parse(data, size); - if (request_builder_->complete()) { - was_empty = request_queue_.empty(); - HttpRequest *request = request_builder_->GetRequest(); - HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, request->ToString()); - request_queue_.push(request); - request_builder_->Clear(); - } + + msg << "HttpSession::Read " << size << " bytes"; + HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, msg.str()); + + if (context_str_.size() == 0) { + ReleaseBuffer(buffer); + return; } - if (was_empty) { - TaskScheduler *scheduler = TaskScheduler::GetInstance(); - RequestHandler *task = new RequestHandler(this); - HttpSession::task_count_++; - scheduler->Enqueue(task); + if (request_builder_.get() == NULL) { + request_builder_.reset(new RequestBuilder()); + } + request_builder_->Parse(data, size); + if (request_builder_->complete()) { + HttpRequest *request = request_builder_->GetRequest(); + HTTP_SYS_LOG("HttpSession", SandeshLevel::UT_DEBUG, request->ToString()); + request_queue_.push(request); + // Enqueue new RequestHandler task if needed + if (req_queue_empty_) { + TaskScheduler *scheduler = TaskScheduler::GetInstance(); + RequestHandler *task = new RequestHandler(this); + HttpSession::task_count_++; + scheduler->Enqueue(task); + } + request_builder_->Clear(); } // TODO: error handling ReleaseBuffer(buffer); diff --git a/src/http/http_session.h b/src/http/http_session.h index af590c6fd5a..1df53653c75 100644 --- a/src/http/http_session.h +++ b/src/http/http_session.h @@ -80,6 +80,7 @@ class HttpSession: public TcpSession { { client_context_str_ = client_ctx; } boost::scoped_ptr request_builder_; tbb::concurrent_queue request_queue_; + tbb::atomic req_queue_empty_; std::string context_str_; std::string client_context_str_; SessionEventCb event_cb_;