Skip to content

Commit

Permalink
Add test to show semaphore deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
Brad Phelan committed Sep 13, 2023
1 parent 9316d98 commit 9395abf
Showing 1 changed file with 122 additions and 78 deletions.
200 changes: 122 additions & 78 deletions unittests/test_semaphores.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,55 +9,55 @@

void critical_section(size_t W) {

tf::Taskflow taskflow;
tf::Executor executor(W);
tf::CriticalSection section(1);
tf::Taskflow taskflow;
tf::Executor executor(W);
tf::CriticalSection section(1);

int N = 1000;
int counter = 0;
int N = 1000;
int counter = 0;

for(int i=0; i<N; ++i) {
tf::Task task = taskflow.emplace([&](){ counter++; })
.name(std::to_string(i));
section.add(task);
}
section.add(task);
}

executor.run(taskflow).wait();
executor.run(taskflow).wait();

REQUIRE(counter == N);
REQUIRE(counter == N);

executor.run(taskflow);
executor.run(taskflow);
executor.run(taskflow);
executor.run(taskflow);
executor.run(taskflow);
executor.run(taskflow);

executor.wait_for_all();
executor.wait_for_all();

REQUIRE(counter == 4*N);
REQUIRE(section.count() == 1);
REQUIRE(counter == 4 * N);
REQUIRE(section.count() == 1);
}

TEST_CASE("CriticalSection.1thread") {
critical_section(1);
critical_section(1);
}

TEST_CASE("CriticalSection.2threads") {
critical_section(2);
critical_section(2);
}

TEST_CASE("CriticalSection.3threads") {
critical_section(3);
critical_section(3);
}

TEST_CASE("CriticalSection.7threads") {
critical_section(7);
critical_section(7);
}

TEST_CASE("CriticalSection.11threads") {
critical_section(11);
critical_section(11);
}

TEST_CASE("CriticalSection.16threads") {
critical_section(16);
critical_section(16);
}

// --------------------------------------------------------
Expand All @@ -66,41 +66,41 @@ TEST_CASE("CriticalSection.16threads") {

void semaphore(size_t W) {

tf::Executor executor(W);
tf::Taskflow taskflow;
tf::Semaphore semaphore(1);
tf::Executor executor(W);
tf::Taskflow taskflow;
tf::Semaphore semaphore(1);

int N = 1000;
int counter = 0;
int N = 1000;
int counter = 0;

for(int i=0; i<N; i++) {
auto f = taskflow.emplace([&](){ counter++; });
auto t = taskflow.emplace([&](){ counter++; });
f.precede(t);
f.acquire(semaphore);
t.release(semaphore);
}
f.precede(t);
f.acquire(semaphore);
t.release(semaphore);
}

executor.run(taskflow).wait();
executor.run(taskflow).wait();

REQUIRE(counter == 2*N);
REQUIRE(counter == 2 * N);

}

TEST_CASE("Semaphore.1thread") {
semaphore(1);
semaphore(1);
}

TEST_CASE("Semaphore.2threads") {
semaphore(2);
semaphore(2);
}

TEST_CASE("Semaphore.4threads") {
semaphore(4);
semaphore(4);
}

TEST_CASE("Semaphore.8threads") {
semaphore(8);
semaphore(8);
}

// --------------------------------------------------------
Expand All @@ -109,43 +109,43 @@ TEST_CASE("Semaphore.8threads") {

void overlapped_semaphore(size_t W) {

tf::Executor executor(W);
tf::Taskflow taskflow;
tf::Semaphore semaphore1(1);
tf::Semaphore semaphore4(4);
tf::Executor executor(W);
tf::Taskflow taskflow;
tf::Semaphore semaphore1(1);
tf::Semaphore semaphore4(4);

int N = 1000;
int counter = 0;
int N = 1000;
int counter = 0;

for(int i=0; i<N; i++) {
auto task = taskflow.emplace([&](){ counter++; });
task.acquire(semaphore1);
task.acquire(semaphore4);
task.release(semaphore1);
task.release(semaphore4);
}
task.acquire(semaphore1);
task.acquire(semaphore4);
task.release(semaphore1);
task.release(semaphore4);
}

executor.run(taskflow).wait();
executor.run(taskflow).wait();

REQUIRE(counter == N);
REQUIRE(semaphore1.count() == 1);
REQUIRE(semaphore4.count() == 4);
REQUIRE(counter == N);
REQUIRE(semaphore1.count() == 1);
REQUIRE(semaphore4.count() == 4);
}

TEST_CASE("OverlappedSemaphore.1thread") {
overlapped_semaphore(1);
overlapped_semaphore(1);
}

TEST_CASE("OverlappedSemaphore.2threads") {
overlapped_semaphore(2);
overlapped_semaphore(2);
}

TEST_CASE("OverlappedSemaphore.4threads") {
overlapped_semaphore(4);
overlapped_semaphore(4);
}

TEST_CASE("OverlappedSemaphore.8threads") {
overlapped_semaphore(8);
overlapped_semaphore(8);
}

// --------------------------------------------------------
Expand All @@ -154,17 +154,17 @@ TEST_CASE("OverlappedSemaphore.8threads") {

void conflict_graph(size_t W) {

tf::Executor executor(W);
tf::Taskflow taskflow;
tf::Semaphore conflict_AB(1);
tf::Semaphore conflict_AC(1);
tf::Executor executor(W);
tf::Taskflow taskflow;
tf::Semaphore conflict_AB(1);
tf::Semaphore conflict_AC(1);

int counter {0};
std::mutex mutex;
int counter{0};
std::mutex mutex;

tf::Task A = taskflow.emplace([&](){ counter++; });

// B and C can run together
// B and C can run together
tf::Task B = taskflow.emplace([&](){
std::lock_guard<std::mutex> lock(mutex);
counter++;
Expand All @@ -174,38 +174,82 @@ void conflict_graph(size_t W) {
counter++;
});

// describe the conflict between A and B
A.acquire(conflict_AB).release(conflict_AB);
B.acquire(conflict_AB).release(conflict_AB);
// describe the conflict between A and B
A.acquire(conflict_AB).release(conflict_AB);
B.acquire(conflict_AB).release(conflict_AB);

// describe the conflict between A and C
A.acquire(conflict_AC).release(conflict_AC);
C.acquire(conflict_AC).release(conflict_AC);
// describe the conflict between A and C
A.acquire(conflict_AC).release(conflict_AC);
C.acquire(conflict_AC).release(conflict_AC);

executor.run(taskflow).wait();
executor.run(taskflow).wait();

REQUIRE(counter == 3);
REQUIRE(counter == 3);

for(size_t i=0; i<10; i++) {
executor.run_n(taskflow, 10);
}
executor.wait_for_all();
executor.run_n(taskflow, 10);
}
executor.wait_for_all();

REQUIRE(counter == 303);
REQUIRE(counter == 303);
}

TEST_CASE("ConflictGraph.1thread") {
conflict_graph(1);
conflict_graph(1);
}

TEST_CASE("ConflictGraph.2threads") {
conflict_graph(2);
conflict_graph(2);
}

TEST_CASE("ConflictGraph.3threads") {
conflict_graph(3);
conflict_graph(3);
}

TEST_CASE("ConflictGraph.4threads") {
conflict_graph(4);
conflict_graph(4);
}

TEST_CASE("Semaphore.Deadlock")
{
using namespace std::chrono_literals;

for (size_t i = 0; i < 10000; ++i)
{
tf::CriticalSection critical(1);
tf::Executor executor(16);

auto lazy_data = [&](tf::Subflow &rt)
{
auto task = rt.emplace([&](tf::Subflow&rt)
{
// (A) If this line is commented out then the test passes
rt.emplace([&] { });

rt.join();
});

// (B) If this line is commented out then the test passes
critical.add(task);

rt.join();
};

tf::Taskflow flow;
for (size_t k = 0; k < 16; ++k)
{
auto task = flow.emplace(
[&](tf::Subflow&rt)
{
lazy_data(rt);
});
}

std::cout << i << std::endl;

// If the following line fails - you are experiencing deadlock
REQUIRE(executor.run(flow).wait_for(5s) == std::future_status::ready);


}
}

0 comments on commit 9395abf

Please sign in to comment.