Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception Support for corun #519

Open
tsung-wei-huang opened this issue Oct 3, 2023 · 10 comments
Open

Exception Support for corun #519

tsung-wei-huang opened this issue Oct 3, 2023 · 10 comments
Labels
enhancement New feature or request

Comments

@tsung-wei-huang
Copy link
Member

Current exception supports only tf::Future from tf::Executor::run. There is a need to support tf::Executor::co_run as well:

tf::Taskflow taskflow;
taskflow.for_each_index(0, 10, [](int i){
   if (i==5){
       throw std::runtime_error("foo");
  }
});
try{
    executor.co_run(taskflow);
}catch(std::runtime_error & e){
    std::cout << e.what();
}
@tsung-wei-huang tsung-wei-huang added the enhancement New feature or request label Oct 3, 2023
@ZigRazor
Copy link

ZigRazor commented Nov 6, 2023

@tsung-wei-huang I would like to work on this. But I don't understand weel the concept...
What you expect? that the signature of the corun function return a tf:Future as for run function?
Do you have any doubts about how it can work?

@tsung-wei-huang
Copy link
Member Author

No - corun will block until the predicate returns true. Basically, exception should be thrown if any.

@olologin
Copy link

olologin commented Feb 18, 2024

@tsung-wei-huang I recently played with corun implementation, and I made it rethrow an exception from the underlying taskflow.
But I stumbled upon some potential problems that require discussion with you :)
Here is my implementation for Executor::corun (already tested on our internal tests, seems to work fine). I mostly copied Executor::run method, and just like Executor::run I create new topology because topology is the actual holder of std::exception_ptr and cancellation bit:

// Function: corun
void Executor::corun(Taskflow& taskflow) {
  
  auto w = _this_worker();

  if(w == nullptr) {
    TF_THROW("corun must be called by a worker of the executor");
  }

  _increment_topology();

  {
    std::lock_guard<std::mutex> lock(taskflow._mutex);
    if (taskflow.empty())
    {
      _decrement_topology();
      return;
    }
  }

  //Node parent;  // dummy parent
  
  // create a topology for this run
  auto t = std::make_shared<Topology>(taskflow, [](){return true;}, [](){});

  // need to create future before the topology got torn down quickly
  tf::Future<void> future(t->_promise.get_future(), t);

  // modifying topology needs to be protected under the lock
  {
    std::lock_guard<std::mutex> lock(taskflow._mutex);
    taskflow._topologies.push(t);
    if(taskflow._topologies.size() == 1) {
      _set_up_topology(w, t.get());
    }
  }

  _corun_until(*w, [t] () -> bool { 
    return t->_join_counter.load(std::memory_order_acquire) == 0; }
  ); // coruns until all tasks complete
  future.get(); // Waits for completion of teardown_topology and additionally throws an exception in case of failure
}

Potential problems that I see:

  1. corun does not take template with T::graph() anymore, it now takes Taskflow object. I am not sure why would anyone want to pass naked Graph into corun.
    I think Executor::corun interface should be similar to Executor::run (in the sense that it should take Taskflow object instead of a naked Graph). This way you can later merge run and corun into one method which can automatically decide whether it should block the calling thread or do corun depending on which thread is calling this unified method.

  2. It makes sense to make Runtime::corun behave the same way as Executor::corun, but Runtime::corun is currently reusing topology of the parent Node. This means that corun'ned child that throws an exception cancels parent's taskflow on, and the exception is raised to the waiter of the parent topology (instead of the caller of Runtime::corun). What we could do instead is spawn Runtime::corun in a new topology (like Executor::corun), but this is change of current behavior.

  3. Who should be getting exceptions from tasks started via Subflow ? Task that calls Subflow::join/Subflow::detach , or starter of the parent topology? I think parent Taskflow graph should be cancelled if any Subflow is throwing an exception, but I would wait for your opinion on this. (keep in mind that emplaced tasks may not be joined or detached, and in this case they are started by Executor internals)

  4. Additional heap allocation call (to create Topology object), but I guess this can later be replaced with Topology pool.

Thing that IMO must be done together with this change:

  • std::terminate for any exception that is thrown but has no way to be passed back to the user (for example if Graph Node does not have topology pointer), like with silent async task. Exception should not be silently ignored, and it should not be killing worker thread silently.

Just wanted to mention: Of course I would prefer other people implementing this :), but if there are no volunteers I can help with implementation, and if you guide me a bit in resolving such opinion-based problems. I think I have good overview of how scheduling works, not perfect but good enough to start helping.

@olologin
Copy link

Maybe this behavior meaningful as a long-term solution: https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Cancellation_and_Nested_Parallelism.html

@tsung-wei-huang
Copy link
Member Author

tsung-wei-huang commented Feb 19, 2024

@olologin Thank you for your idea! I think the best and fundamental way to solve this problem is to have an exception_ptr per node because corun/join-styled execution is all parented to a node rather than a topology. Basically, we will need to add code to this section when the node that throws exception has a parent. Then, the outer level can re-throw or perform necessary actions.

tf::Executor::corun needs to take a template with .graph() defined. This is used by not just Taskflow but also other libraries, such as pipeline, that implicitly manages a Taskflow through composition.

will think about how this can be done as our other users@AMD are also requesting similar issues. In the meantime, please do not hesitate to let me know if you have any thoughts on this.

@tsung-wei-huang tsung-wei-huang changed the title Exception Support for co_run Exception Support for corun Feb 19, 2024
@tsung-wei-huang
Copy link
Member Author

tsung-wei-huang commented Feb 24, 2024

@olologin @bradphelan this support has been implemented in the current master branch. Instead of having an additional topology as @olologin presented above, the new implementation will keep for each node a std::exception_ptr to store the exception thrown by children tasks of that node. This will introduce very little overhead to the current codebase but maximize the capability of throwing/catching exceptions. With this enhancement, blocking call (e.g., tf::Executor::corun, tf::Runtime::corun, tf::Subflow::join) will all support exception.

For example:

tf::Executor executor(W);
tf::Taskflow taskflow1;
tf::Taskflow taskflow2;

taskflow1.emplace([](){
  throw std::runtime_error("x");
});
taskflow2.emplace([&](){
  try {
    executor.corun(taskflow1);                  // exception is supported now at blocking method corun
  } catch(const std::runtime_error& re) {
    std::cout << re.what() << std::endl;    // will get "x"
  }
});
executor.run(taskflow2).get();

Would you please have a look and let me know if this works for your application? Thank you for bringing this up!

Let me know if you observe any other exception problems that Taskflow does not handle properly. Handling exception in a multithreaded environment is quite tricky, and I may miss something.

@olologin
Copy link

@tsung-wei-huang
I will test it today or tomorrow, thanks!

@olologin
Copy link

olologin commented Feb 29, 2024

@tsung-wei-huang It seems it works correctly. I am happy with this implementation, I could not break it.
But I have some minor nitpicks:

  1. Never push commented out code into the production branch (in this case everything related to module_task). When people see code that is commented out they feel like someone is not 100% sure if it is correct to have it commented out, and it feels like you can later fix some potential problem by uncommenting it. It is a bad practice :)
  2. (This point is controversial, not sure about it) I would std::terminate in Executor::_process_exception if there is nowhere to put current_exception (if there is no topology and no parent). This way at least at some point someone will notice that something is not right instead of just silencing exception.
  3. (This point is controversial, not sure about it) Shouldn't we cancel parallel algorithms completely on first exception? for_each_index will currently fail only a task of a single worker, and the rest of workers will try to complete their iterations.
    Some research is needed to check how this situation is handled in other multithreading libraries.

For example the following code will have sum ~ 75 at the moment we can catch the exception.

        using namespace std::chrono_literals;
	bool is_thrown_outside = false;
	bool is_thrown_inside = false;
	::tf::Executor ex(4);
	mw::tf::TaskWrapper wrapper;
	tf::Taskflow taskflow;
	std::atomic_size_t sum = 0;
	taskflow.for_each_index(
		0,
		100,
		1,
		[&](int i)
		{
			if (i == 1)
			{
				throw std::runtime_error("Exception E");
			}
			else
			{
				sum += 1;
				std::this_thread::sleep_for(10ms);
			}
		},
		::tf::StaticPartitioner(1));

	try
	{
		ex.run(taskflow).get();
	}
	catch (...)
	{
	}
  1. Exception documentation could mention that Subflow and Runtime are using the same rethrowing logic (Exception documentation only mentions tf::Runtime)
  2. Exception documentation could mention a test like this to underline that ALL tasks are cancelled if at least one of them fails in topology, because the current documentation may seem like only dependent tasks are failing:
                using namespace std::chrono_literals;
		::tf::Executor ex(2);
		mw::tf::TaskWrapper wrapper;
		::tf::Taskflow tf;
		bool aFinished = false;
		bool bFinished = false;
		bool cFinished = false;
		auto A = tf.emplace([&] {
			aFinished = true;
			});
		auto B = tf.emplace(
			[&]
			{
				bFinished = true;
				std::this_thread::sleep_for(500ms); // Wait until E throws an exception and try to start C only after this
			});
		auto C = tf.emplace([&] { cFinished = true; });
		auto E = tf.emplace([] {
			throw std::runtime_error("Exception A");
			});
		A.precede(B);
		B.precede(C);
		A.precede(E);
		try
		{
			ex.run(tf).get();
			ASSERT_TRUE(false);
		}
		catch (...)
		{
		}

		ASSERT_TRUE(aFinished);
		ASSERT_TRUE(bFinished);
		ASSERT_FALSE(Finished);
  1. Exception documentation could mention that Subflow::detach raises exception to a topology.

@tsung-wei-huang
Copy link
Member Author

@olologin thank you very much for your great comments!

  1. Yes, when I was working on exception, I realized the current design for the module task cannot align well with the exception flow, so I need to change its execution algorithm. However, I am still researching a better execution algorithm, and that's why I left the comment there. But you are right, perhaps it should be saved somewhere else.

  2. Looks like there is a missing verb in I would std::terminate? Do you mean I would call std::terminate?

  3. This is difficult. When a parallel algorithm starts running, the worker will grab a task and enter into the execution loop. At this point, there is no cheap way for that worker to communicate with others about whether an exception has been thrown. If we need to achieve this, I imagine a periodic check is needed - which will hamper the performance a lot. So, this is a trade-off between exception and performance at the algorithm level.

4-6. Thank you! I will update them later. Please also let me know if you could help update them in doxygen/cookbook/exception.dox :)

@olologin
Copy link

olologin commented Mar 1, 2024

@tsung-wei-huang
2) Yes, I would call std::terminate to make sure nobody silently misses exception.

4-6) Ok, I will propose MR with changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants