Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: For Each (Range) Index #552

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 42 additions & 23 deletions taskflow/algorithm/for_each.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,50 @@ TF_FORCE_INLINE auto make_for_each_task(B b, E e, C c, P part = P()) {
};
}

template<typename T, typename C>
using is_index_func = std::is_invocable_r<void, C, T>;

template<typename T, typename C>
using is_range_func = std::is_invocable_r<void, C, T, T>;

// Function: make_for_each_index_task
template <typename B, typename E, typename S, typename C, typename P = DefaultPartitioner>
TF_FORCE_INLINE auto make_for_each_index_task(B b, E e, S s, C c, P part = P()){
template <typename T, typename C, typename P = DefaultPartitioner>
TF_FORCE_INLINE auto make_for_each_index_task(T b, T e, C c, P part = P()){
Comment on lines 84 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment in the PR is specifically about this part of the patch. The begin and end iterators, b and e may sometimes be of different types and I think it is useful to retain the ability to have different types as was there before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separately, though, I think it is important to keep the original interface for backward compatibility. Perhaps you want to overload the function by providing a second version that takes the two-argument function (instead of modifying the existing function)? You can use std::enable_if_t to ensure the right one gets called.

using T_t = std::decay_t<unwrap_ref_decay_t<T>>;

static_assert(std::is_integral<T_t>::value, "Begin and end values must be an integral type.");
static_assert(
std::disjunction<is_index_func<T_t, C>, is_range_func<T_t, C>>::value,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be easier to read as follows :-)

Suggested change
std::disjunction<is_index_func<T_t, C>, is_range_func<T_t, C>>::value,
is_index_func<T_t, C>::value || is_range_func<T_t, C>::value,

"C must be either a void function taking one int or two ints"
);
constexpr bool is_index_callable = is_index_func<T, C>::value;

using namespace std::string_literals;

using B_t = std::decay_t<unwrap_ref_decay_t<B>>;
using E_t = std::decay_t<unwrap_ref_decay_t<E>>;
using S_t = std::decay_t<unwrap_ref_decay_t<S>>;

return [=] (Runtime& rt) mutable {

// fetch the iterator values
B_t beg = b;
E_t end = e;
S_t inc = s;
T_t beg = b;
T_t end = e;

// nothing to be done if the range is invalid
if(is_range_invalid(beg, end, inc)) {
if(is_range_invalid(beg, end, 1)) {
return;
}

size_t W = rt.executor().num_workers();
size_t N = distance(beg, end, inc);
T_t n = end-beg;
size_t N = static_cast<size_t>(n);

// only myself - no need to spawn another graph
if(W <= 1 || N <= part.chunk_size()) {
TF_MAKE_LOOP_TASK(
for(size_t x=0; x<N; x++, beg+=inc) {
c(beg);
if constexpr(is_index_callable) {
for(T_t i=0; i<n; i++) {
c(i);
}
} else {
c(0, n);
}
);
return;
Expand All @@ -123,9 +137,12 @@ TF_FORCE_INLINE auto make_for_each_index_task(B b, E e, S s, C c, P part = P()){
TF_MAKE_LOOP_TASK(
part.loop(N, W, curr_b, chunk_size,
[&](size_t part_b, size_t part_e) {
auto idx = static_cast<B_t>(part_b) * inc + beg;
for(size_t x=part_b; x<part_e; x++, idx += inc) {
c(idx);
if constexpr(is_index_callable) {
for(size_t i=part_b; i<part_e; i++) {
c(static_cast<T_t>(i));
}
} else {
c(static_cast<T_t>(part_b), static_cast<T_t>(part_e));
}
}
);
Expand All @@ -142,9 +159,12 @@ TF_FORCE_INLINE auto make_for_each_index_task(B b, E e, S s, C c, P part = P()){
TF_MAKE_LOOP_TASK(
part.loop(N, W, next,
[&](size_t part_b, size_t part_e) {
auto idx = static_cast<B_t>(part_b) * inc + beg;
for(size_t x=part_b; x<part_e; x++, idx += inc) {
c(idx);
if constexpr(is_index_callable) {
for(size_t i=part_b; i<part_e; i++) {
c(static_cast<T_t>(i));
}
} else {
c(static_cast<T_t>(part_b), static_cast<T_t>(part_e));
}
}
);
Expand All @@ -171,13 +191,12 @@ Task FlowBuilder::for_each(B beg, E end, C c, P part) {
// ----------------------------------------------------------------------------

// Function: for_each_index
template <typename B, typename E, typename S, typename C, typename P>
Task FlowBuilder::for_each_index(B beg, E end, S inc, C c, P part){
template <typename T, typename C, typename P>
Task FlowBuilder::for_each_index(T beg, T end, C c, P part){
return emplace(
make_for_each_index_task(beg, end, inc, c, part)
make_for_each_index_task(beg, end, c, part)
);
}


} // end of namespace tf -----------------------------------------------------

34 changes: 5 additions & 29 deletions taskflow/core/flow_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,44 +355,30 @@ class FlowBuilder {
/**
@brief constructs an STL-styled index-based parallel-for task

@tparam B beginning index type (must be integral)
@tparam E ending index type (must be integral)
@tparam S step type (must be integral)
@tparam T beginning index type (must be integral)
@tparam T ending index type (must be integral)
@tparam C callable type
@tparam P partitioner type (default tf::DefaultPartitioner)

@param first index of the beginning (inclusive)
@param last index of the end (exclusive)
@param step step size
@param callable callable object to apply to each valid index
@param part partitioning algorithm to schedule parallel iterations

@return a tf::Task handle

The task spawns asynchronous tasks that applies the callable object to each index
in the range <tt>[first, last)</tt> with the step size.
in the range <tt>[first, last)</tt>.
This method is equivalent to the parallel execution of the following loop:

@code{.cpp}
// case 1: step size is positive
for(auto i=first; i<last; i+=step) {
callable(i);
}

// case 2: step size is negative
for(auto i=first, i>last; i+=step) {
callable(i);
}
@endcode

Iterators are templated to enable stateful range using std::reference_wrapper.
The callable needs to take a single argument of the integral index type.

Please refer to @ref ParallelIterations for details.
*/
template <typename B, typename E, typename S, typename C, typename P = DefaultPartitioner>
template <typename T, typename C, typename P = DefaultPartitioner>
Task for_each_index(
B first, E last, S step, C callable, P part = P()
T first, T last, C callable, P part = P()
);

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1408,13 +1394,3 @@ inline void Subflow::reset(bool clear_graph) {
}

} // end of namespace tf. ---------------------------------------------------