Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipeType::PARALLEL Parallel data access security #525

Open
ayongsir opened this issue Oct 19, 2023 · 2 comments
Open

PipeType::PARALLEL Parallel data access security #525

ayongsir opened this issue Oct 19, 2023 · 2 comments
Labels
enhancement New feature or request help wanted Extra attention is needed question Further information is requested

Comments

@ayongsir
Copy link

I have an application that uses tf:: Pipeline to build a multi DAG task, and uses pf. token() to control the processing data of each task.

If the type of the tf:: Pipe task is set to PipeType:: SERIAL, then I will construct an array based on pf. pipe() to save data, which can safely access the concurrent data of the Pipeline.

However, if I set the type of the tf:: Pipe task to PipeType:: PARALLEL, how can I know the concurrent quantity of this tf:: Pipe task under the current data flow? Or how can I safely access data concurrently?

I think tf:: Taskflow task support parameter settings can solve this problem and be more flexible.

Is there a plan to support parameters for the tf:: Taskflow task?

Looking forward to a reply and thanking in advance!

@cheng-hsiang-chiu
Copy link
Member

Hi ayongsir, could you provide a sample code so we can better understand the question? Thank you.

@ayongsir
Copy link
Author

Hi ayongsir, could you provide a sample code so we can better understand the question? Thank you.

ok,Thank you. Here is my understanding. I don't know if there are any mistakes, please advise me.

The core logic of my code is as follows:

  1. I use a queue to cache data, with a maximum of 4 parallel pipelines based on the size of the cached data;
  2. Each pipeline has 4 tasks to process data, which are composed of DAG graphs;
  3. Considering the parallel data security of multiple pipelines, I use the same number of arrays as the pipeline;
  4. Each task accesses data based on the saved token index;

Regarding the third article, if tf:: PipeType is set to SERIAL, I understand it is safe because only the parallel of the pipeline, and the tasks within the same pipeline are serial

But if tf:: PipeType is set to PARALLEL, in addition to the parallelism of the pipeline, there are also parallelization of individual tasks within the pipeline. Is there a way to access data in parallel without using locks?

I thought about it for a moment. If the task supports parameters, such a problem would be relatively simple, such as the following:
auto task1 = pipe_task_[0].emplace(
[this](pipe_index, token_index) {
token = token[pipe_index][token_index];
method_[0]->DoProcess(line_datas[token]); });

  tf::Pipe{tf::PipeType::PARALLEL,
           [&](tf::Pipeflow &pf) {
             task_executor_->corun(pipe_task_[pf.pipe()](pf.pipe(), pf.token()));
           }},

The code I am currently using is as follows:

class TaskFlow {
private:
static const uint32_t kPipeTaskNum = 4;
static const uint32_t kPipeLineNum = 4;
std::atomic run_flag_ = false;
std::atomic pipe_flag_ = false;
std::unique_ptrstd::thread thread_ptr_ = nullptr;
size_t token_num_ = 0;
std::array<size_t, kPipeTaskNum> token_index_;
std::array<std::shared_ptr, kPipeLineNum> line_datas;
BoundedQueue<std::shared_ptr> cache_data_;
std::array<tf::Taskflow, kPipeTaskNum> pipe_task_;
std::unique_ptrtf::Taskflow task_flow_ = nullptr;
std::unique_ptrtf::Executor task_executor_ = nullptr;
};

inline void TaskFlow::AddData(const std::shared_ptr &frame) {
if (!cache_data_.Enqueue(frame)) {
LOG_WARN << "Worker data enqueue failed.";
}
}

bool_t TaskFlow::Init() {
task_executor_.reset(new tf::Executor());

auto task1 = pipe_task_[0].emplace(
this { method_[0]->DoProcess(line_datas[token_index_[0]]); });
auto task2 = pipe_task_[1].emplace(
this { method_[1]->DoProcess(line_datas[token_index_[1]]); });
auto task3 = pipe_task_[2].emplace(
this { method_[2]->DoProcess(line_datas[token_index_[2]]); });
auto task4 = pipe_task_[3].emplace(
this { method_[3]->DoProcess(line_datas[token_index_[3]]); });

static tf::Pipeline pipe_line_(
kPipeLineNum,
// first pipe runs taskflow1
tf::Pipe{tf::PipeType::SERIAL,
[&](tf::Pipeflow &pf) {
if (pf.token() == token_num_) {
pf.stop();
return;
}
token_index_[pf.pipe()] = pf.token();
task_executor_->corun(pipe_task_[pf.pipe()]);
}},

  tf::Pipe{tf::PipeType::SERIAL,
           [&](tf::Pipeflow &pf) {
             token_index_[pf.pipe()] = pf.token();
             task_executor_->corun(pipe_task_[pf.pipe()]);
           }},

  tf::Pipe{tf::PipeType::SERIAL,
           [&](tf::Pipeflow &pf) {
             token_index_[pf.pipe()] = pf.token();
             task_executor_->corun(pipe_task_[pf.pipe()]);
           }},

  tf::Pipe{tf::PipeType::SERIAL,
           [&](tf::Pipeflow &pf) {
             token_index_[pf.pipe()] = pf.token();
             task_executor_->corun(pipe_task_[pf.pipe()]);
           }},

task_flow_->composed_of(pipe_line_);

core_callback_ = & {
auto callback_func = & {
token_num_ = 0;
pipe_line_.reset();
pipe_flag_.store(true);
};

task_executor_->run(*(task_flow_.get()), callback_func);

};

return true;
}

void TaskFlow::Start() {
pipe_flag_.store(true);
run_flag_.store(true);
if (thread_ptr_ == nullptr) {
thread_ptr_.reset(new std::thread(&TaskFlow::Core, this));
}
}

void TaskFlow::Core() {
while (run_flag_.load()) {
if (pipe_flag_.load() && cache_data_.WaitDequeue(&line_datas[0])) {
pipe_flag_.store(false);
if (!cache_data_.Empty()) {
token_num_ = cache_data_.Size() > (kPipeLineNum - 1)
? (kPipeLineNum - 1)
: cache_data_.Size();

    for (size_t i = 1; i <= token_num_; i++) {
      cache_data_.Dequeue(&line_datas[i]);
    }
  }

  token_num_ += 1;

  core_callback_();
}

}
}

@tsung-wei-huang tsung-wei-huang added enhancement New feature or request help wanted Extra attention is needed question Further information is requested labels Feb 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants