Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore refactor #530

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
66 changes: 66 additions & 0 deletions taskflow/algorithm/semaphore_guard.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#pragma once

#include "../core/graph.hpp"
#include "../core/semaphore.hpp"

/**
@file semaphore.hpp
@brief semaphore guard include file
*/

namespace tf {

// ----------------------------------------------------------------------------
// SemaphoreGuard
// ----------------------------------------------------------------------------

/**
@class SemaphoreGuard

@brief class to implement a RAII semaphore guard to help developer control
tf::Semaphore ownership within a scope, releasing ownership in the
destructor. It is recommended to use it to make sure lock is release in a
properly order to avoid deadlocking.

The usage of SemaphoreGuard is pretty like std::lock_guard's. One difference is
that user should pass tf::Runtime reference as a parameter to tf::SemaphoreGuard
due to the implementation need.

@code{.cpp}
tf::Semaphore se(1);
tf::Executor executor(2);
tf::Taskflow taskflow;
int32_t count = 0;
auto t1 = taskflow.emplace([&](tf::Runtime& rt) {
tf::SemaphoreGuard gd(rt, se);
--count;
})
auto t2 = taskflow.emplace([&](tf::Runtime& rt) {
tf::SemaphoreGuard gd(rt, se);
++count;
});
executor.run(taskflow);
executor.wait_for_all();
@endcode

*/
class SemaphoreGuard {
public:
explicit SemaphoreGuard(tf::Runtime& rt, tf::Semaphore& se) :
_rt(rt), _se(se) {
_rt.acquire(_se);
}

~SemaphoreGuard() {
_rt.release(_se);
}

SemaphoreGuard(const SemaphoreGuard&) = delete;
SemaphoreGuard& operator=(const SemaphoreGuard&) = delete;

private:
tf::Runtime& _rt;
tf::Semaphore& _se;
};

} // end of namespace tf. ---------------------------------------------------
174 changes: 173 additions & 1 deletion taskflow/core/executor.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <chrono>

#include "observer.hpp"
#include "taskflow.hpp"
#include "async_task.hpp"
Expand Down Expand Up @@ -1047,6 +1049,36 @@ class Executor {
std::vector<Worker> _workers;
std::list<Taskflow> _taskflows;

struct TaskNotifier {
TaskNotifier() = default;
virtual ~TaskNotifier() = default;

TaskNotifier(tf::Semaphore& se) :
_se(se) {}

inline bool stop_predicate() {
// if (TF_UNLIKELY(!_enque)) {
// _enque = true;
// return _se._try_acquire_or_wait<true>(node);
// }

// we do not need to enqueue the Notifier::waiter in intask semaphore use case
return _se._try_acquire_or_wait<false>(nullptr);
}

tf::Semaphore& _se;

// Waitiner worker's index
std::mutex _waiters_idx_mtx;
std::list<size_t> _waiters_idx;
};

std::mutex _task_notifier_mtx;
std::list<TaskNotifier> _task_notifier;

std::unique_ptr<std::thread> _semaphore_looper { nullptr };
std::atomic<bool> _semaphore_notify_finished { false };

Notifier _notifier;

TaskQueue<Node*> _wsq;
Expand Down Expand Up @@ -1094,6 +1126,10 @@ class Executor {

template <typename P>
void _corun_until(Worker&, P&&);

void _corun_until_opt(Worker&, tf::Semaphore&);

void _semaphore_notify();
};

#ifdef TF_DISABLE_EXCEPTION_HANDLING
Expand Down Expand Up @@ -1144,6 +1180,12 @@ inline Executor::~Executor() {
for(auto& t : _threads){
t.join();
}
_semaphore_notify_finished.store(true, std::memory_order_release);

if (_semaphore_looper && _semaphore_looper->joinable()) {
_semaphore_looper->join();
_semaphore_looper.reset();
}
}

// Function: num_workers
Expand Down Expand Up @@ -1224,7 +1266,6 @@ inline void Executor::_spawn(size_t N) {
break;
}
}

});

// POSIX-like system can use the following to affine threads to cores
Expand All @@ -1250,6 +1291,7 @@ inline void Executor::_spawn(size_t N) {
std::unique_lock<std::mutex> lock(mutex);
cond.wait(lock, [&](){ return n==N; });
#endif
_semaphore_notify_finished.store(true, std::memory_order_release);
}

// Function: _corun_until
Expand Down Expand Up @@ -2297,6 +2339,116 @@ void Runtime::corun_until(P&& predicate) {
_executor._corun_until(_worker, std::forward<P>(predicate));
}

void Executor::_semaphore_notify() {
std::uniform_int_distribution<size_t> rdvtm(0, _workers.size() - 1);

while (!_semaphore_notify_finished.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> lk(_task_notifier_mtx);
for (auto& it : _task_notifier) {
if (it._waiters_idx.empty()) {
continue;
}
// fetched the semaphore by a thread succeessfully. notify the top and erase it.
if (it.stop_predicate()) {
auto& selected_worker = _workers[*it._waiters_idx.begin()];
std::lock_guard<std::mutex> predicate_lk(selected_worker._predicate_mtx);
// get stop predicate, first come first serve
selected_worker._predicate_status.store(
Worker::SideProcessStatus::SEMAPHORE_SUCCESS,
std::memory_order_release);
selected_worker._predicate_cv.notify_all();
{
std::lock_guard<std::mutex> _waiters_lk(it._waiters_idx_mtx);
it._waiters_idx.pop_front();
}
break;
} else {
for (size_t idx : it._waiters_idx) {
auto& w = _workers[idx];
if (auto t = w._wsq.pop(); t) {
w._stealed_task = t;
std::lock_guard<std::mutex> predicate_lk(
w._predicate_mtx);
w._predicate_status.store(
Worker::SideProcessStatus::STEAL_SUCCESS,
std::memory_order_release);
continue;
}
for (size_t num_steals = 0; num_steals < _MAX_STEALS; ++num_steals) {
auto t = (w._id == w._vtm) ? w._wsq.steal() : _workers[w._vtm]._wsq.steal();
if (t) {
w._stealed_task = t;
std::lock_guard<std::mutex> predicate_lk(
w._predicate_mtx);
w._predicate_status.store(
Worker::SideProcessStatus::STEAL_SUCCESS,
std::memory_order_release);
break;
} else {
num_steals++;
}
w._vtm = rdvtm(w._rdgen);
}
}
}
}
}
}

void Executor::_corun_until_opt(Worker& w, tf::Semaphore& se) {
{
TaskNotifier* task_notifier = nullptr;
// register for acquiring semaphore
std::lock_guard<std::mutex> lk(_task_notifier_mtx);
if (_semaphore_looper == nullptr) {
_semaphore_looper.reset(
new std::thread(&Executor::_semaphore_notify, this));
_semaphore_notify_finished.store(false, std::memory_order_release);
}
bool inserted = false;
for (auto& it : _task_notifier) {
std::lock_guard<std::mutex> _waiters_lk(it._waiters_idx_mtx);
// there is already have this semaphore waiting list.
if (&it._se == &se) {
it._waiters_idx.emplace_back(w._id);
task_notifier = &it;
inserted = true;
break;
}
}
if (!inserted) {
_task_notifier.emplace_back(se);
task_notifier = &*_task_notifier.rbegin();
{
std::lock_guard<std::mutex> _waiters_lk(task_notifier->_waiters_idx_mtx);
task_notifier->_waiters_idx.emplace_back(w._id);
}
}
}

explore:
{
// waiting for predicate. could be handled by task notifier. blocked here.
std::unique_lock<std::mutex> lk(w._predicate_mtx);
w._predicate_cv.wait(lk, [&]() {
return w._predicate_status.load(std::memory_order_acquire)
!= tf::Worker::SideProcessStatus::UNASSIGNED;
});
if (w._predicate_status.load(std::memory_order_acquire)
== tf::Worker::SideProcessStatus::SEMAPHORE_SUCCESS) {
return;
} else if (w._predicate_status.load(std::memory_order_acquire)
== tf::Worker::SideProcessStatus::STEAL_SUCCESS) {
// 2. if predicate not ready, process(steal) other tasks.
_invoke(w, w._stealed_task);
std::cout <<" stealed " << std::endl;
w._predicate_status.store(tf::Worker::SideProcessStatus::UNASSIGNED,
std::memory_order_release);
}
goto explore;
}
}

// Function: _silent_async
template <typename F>
void Runtime::_silent_async(Worker& w, const std::string& name, F&& f) {
Expand Down Expand Up @@ -2376,6 +2528,26 @@ inline Runtime::~Runtime() {
}
}

inline void Runtime::acquire(Semaphore& s) {
_executor._corun_until_opt(_worker, s);
}

inline void Runtime::release(Semaphore& s) {
s._release(this->_parent);
}

// template<ptrdiff_t least_max_value>
// inline void Runtime::acquire(tf::counting_semaphore<least_max_value>& s) {
// _executor._corun_until(_worker, [&]() {
// return s.try_acquire();
// });
// }

// template<ptrdiff_t least_max_value>
// inline void Runtime::release(tf::counting_semaphore<least_max_value>& s) {
// s.release();
// }

} // end of namespace tf -----------------------------------------------------


Expand Down
14 changes: 13 additions & 1 deletion taskflow/core/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,18 @@ class Runtime {
*/
inline Worker& worker();

/**
@brief try to acquire a semaphore
*
*/
void acquire(Semaphore& s);

/**
@brief try to acquire a semaphore
*
*/
void release(Semaphore& s);

protected:

/**
Expand Down Expand Up @@ -894,7 +906,7 @@ inline bool Node::_acquire_all(SmallVector<Node*>& nodes) {
auto& to_acquire = _semaphores->to_acquire;

for(size_t i = 0; i < to_acquire.size(); ++i) {
if(!to_acquire[i]->_try_acquire_or_wait(this)) {
if(!to_acquire[i]->_try_acquire_or_wait<true>(this)) {
for(size_t j = 1; j <= i; ++j) {
auto r = to_acquire[i-j]->_release();
nodes.insert(std::end(nodes), std::begin(r), std::end(r));
Expand Down
24 changes: 23 additions & 1 deletion taskflow/core/semaphore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <mutex>

#include "declarations.hpp"
#include "graph.hpp"

/**
@file semaphore.hpp
Expand Down Expand Up @@ -68,6 +69,8 @@ This arrangement limits the number of concurrently running tasks to only one.
class Semaphore {

friend class Node;
friend class Runtime;
friend class Executor;

public:

Expand All @@ -83,6 +86,8 @@ class Semaphore {
*/
explicit Semaphore(size_t max_workers);

virtual ~Semaphore() = default;

/**
@brief queries the counter value (not thread-safe during the run)
*/
Expand All @@ -96,23 +101,29 @@ class Semaphore {

std::vector<Node*> _waiters;

template <bool>
bool _try_acquire_or_wait(Node*);

std::vector<Node*> _release();

void _release(Node*);
};

inline Semaphore::Semaphore(size_t max_workers) :
_counter(max_workers) {
}

template <bool emplace_waiter>
inline bool Semaphore::_try_acquire_or_wait(Node* me) {
std::lock_guard<std::mutex> lock(_mtx);
if(_counter > 0) {
--_counter;
return true;
}
else {
_waiters.push_back(me);
if constexpr (emplace_waiter) {
_waiters.push_back(me);
}
return false;
}
}
Expand All @@ -124,6 +135,17 @@ inline std::vector<Node*> Semaphore::_release() {
return r;
}

inline void Semaphore::_release(Node* me) {
std::lock_guard<std::mutex> lock(_mtx);
for (uint32_t i = 0; i < _waiters.size(); ++i) {
if (_waiters[i] == me) {
_waiters.erase(_waiters.begin() + i);
break;
}
}
++_counter;
}

inline size_t Semaphore::count() const {
return _counter;
}
Expand Down