Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move enforcing timeouts to the connection router #564

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

piodul
Copy link
Collaborator

@piodul piodul commented Sep 30, 2022

Problem description

In my benchmarks, I observed that just by disabling driver-side timeouts I could get 3-4 times the throughput. Without timeouts, the benchmark was CPU bound, while with timeouts the CPU usage was not 100% even though the throughput was lower. Recorded flamegraphs showed that above 25% of time was spent in Mutex-related code.

Here is why I think tokio::time::timeout performs badly. In order to register a timer, Tokio needs to acquire a Mutex of the current thread's timer driver. When the timer is cancelled (which happens when the inner future finishes before timeout), it needs to be unregistered and the same Mutex needs to be acquired - and, because tasks can migrate between threads, it might not be the Mutex of the current thread anymore. Because nearly all of the requests finish in time and nearly all timers are cancelled, this most likely leads to lock contention.

Solution

This PR reimplements driver-side timeouts using a different architecture. Instead of using tokio::time::timeout on the tasks that wait for a response from the connection router, now the connection router itself is responsible for enforcing timeouts and rejecting the requests.

In the new design, there is only one timer active for each active connection. The timers are reset much more rarely, roughly with a period of timeout - avg latency.

As a bonus, the PR also implements timeouts for some code paths that weren't previously covered: batch, query_iter and execute_iter.

Benchmark

To test this out I ran cql-stress (a scylla-bench rewrite using Rust driver) on a machine with 16 cores (32 threads) against a single Scylla node on another, identical machine. Using the following command (release mode, debug symbols enabled):

cargo flamegraph -F 99 --bin cql-stress-scylla-bench -- -nodes 10.0.1.8 -max-rate 0 -mode write -workload uniform -partition-count 10000 -concurrency 4096 -duration 10s

... I measured the throughput and how much % of the time was spent in locking, according to the flamegraphs:

564cf5d (before the PR):

  • Operations/s: 549975.1976076759
  • Time spent in mutex-related functions: 27.3%

564cf5d (before the PR, but with timeouts disabled):

  • Operations/s: 1858276.1677900965
  • Time spent in mutex-related functions: 1.7%

This PR:

  • Operations/s: 1866452.173848832
  • Time spent in mutex-related functions: 1.6%

Tests

For now, correctness tests were only done in a manual fashion (running cql-stress, adjusting timeout to be below latency and observing results). If the proxy gets merged before this, I'll add some automatic tests in this PR, if not - I'll send them separately later.

Pre-review checklist

  • I have split my patch into logically separate commits.
  • All commit messages clearly explain what they change and why.
  • I added relevant tests for new features and bug fixes.
  • All commits compile, pass static checks and pass test.
  • PR description sums up the changes and reasons why they should be introduced.
  • I added appropriate Fixes: annotations to PR description.

Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in love with the speedup 🚀

I tried to internalize how Connection works to the point where I can confidently say "Yes this makes sense", but I didn't manage to get there.
I feel like it has became pretty convoluted - a web of async functions, futures, channels, repurposed structs and assumptions.
I will have another go at it, but for now I can only say that the individual parts seem to work, apart from one possible bug.

I wonder if it would possible to refactor Connection to make the code simpler.
Some sort of event loop, which does one thing at a time, or something.

Comment on lines 1147 to 1180
async fn timeouter(handler_map: &StdMutex<ResponseHandlerMap>) -> Result<(), QueryError> {
// We are using poll_fn here in order to be able to poll the expiry timer
// inside the deadline tracker without having to hold the mutex.
futures::future::poll_fn(|cx| -> Poll<Result<(), QueryError>> {
// Guaranteed that nobody else locks the handler_map at the moment
let mut handler_map_guard = handler_map.try_lock().unwrap();

// Wait until the deadline of the first request in the queue elapses
futures::ready!(handler_map_guard.deadline_tracker.poll_expiry_timer(cx));

// Elapse some of the requests
let to_elapse = handler_map_guard
.deadline_tracker
.elapse_some(Instant::now());
tracing::debug!("Timing out {} requests", to_elapse.len());

for request_id in to_elapse {
if let Some(handler) = handler_map_guard.orphan(request_id) {
// Ignore sending error, request was dropped
let _ = handler.response_sender.send(Err(QueryError::RequestTimeout(
"request timed out".to_string(), // TODO: include information about deadline?
)));
}
}

// The expiry timer should be unset at the moment, so register
// the waker right now by polling again
futures::ready!(handler_map_guard.deadline_tracker.poll_expiry_timer(cx));

// We should not get here, but to be 100% safe, let's just yield
cx.waker().wake_by_ref();
Poll::Pending
})
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried there might be a bug here.

Let's say there are 3 requests in the DeadlineTracker with timeouts:
10:00:00, 10:00:01, 10:00:02.

  • The timeouter() is polled at 10:00:00.
  • poll_expiry_timer returns Ready
  • elapse_some causes timeouts for requests with deadlines 10:00:00 and 10:00:01
  • elapse_some notices that there is still the request with deadline 10:00:02, so the timer is armed with this deadline.
  • Then we poll the expiry timer again, and it returns Ready because it's already 10:00:03.
  • The timer state changes to Unarmed
  • We reach the end of the function and wake the waker
  • timeouter gets polled again, but the timer is unarmed so it hangs on it
  • The request with deadline 10:00:03 is not timed out until another request gets scheduled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. With the second poll_expiry_timer I wanted the future to immediately start waiting for the next expiration without having to force the whole task to be polled again - this future runs along three other futures in a try_join, so most likely this will cause all of them to be polled.

Side note: the The expiry timer should be unset at the moment comment is incorrect because elapse_some reschedules the timer. I removed it.

I rewrote the function so that runs an infinite loop and uses poll_fn only to poll the expiry timer. Entering an infinite loop without preemptions shouldn't be possible as the Sleep future used for waiting makes sure to yield if the task runs out of "budget" (see here: https://docs.rs/tokio/latest/tokio/task/index.html#cooperative-scheduling).

@piodul
Copy link
Collaborator Author

piodul commented Oct 10, 2022

v1.1: rebased on main

@piodul
Copy link
Collaborator Author

piodul commented Oct 10, 2022

I'm in love with the speedup 🚀

I tried to internalize how Connection works to the point where I can confidently say "Yes this makes sense", but I didn't manage to get there. I feel like it has became pretty convoluted - a web of async functions, futures, channels, repurposed structs and assumptions. I will have another go at it, but for now I can only say that the individual parts seem to work, apart from one possible bug.

I wonder if it would possible to refactor Connection to make the code simpler. Some sort of event loop, which does one thing at a time, or something.

I agree that the connection logic is difficult to follow.

I'm not sure that a single event loop would make things simpler. After this PR we will have four event loops - I guess we could merge them into a single loop { tokio::select!(...) }, but the number of events it would have to handle would not become smaller. Moreover, some operations that those event loops already do (e.g. frame::read_response_frame in Connection::reader) would be hard to realize in a way that does not prevent the event loop from consuming other events - this was the main reason why the whole try_join(reader, writer) + mutex pattern was used in the first place.

In some sense, we already are doing one thing at a time - we have ResponseHandlerMap which represents the connection's state, and only one of the existing event loops are allowed to operate on it at the same time. I think a good starting point for making the connection code easier to digest would be to rename ResponseHandlerMap to ConnectionState, bring the writer + reader + orphaner + timeouter closer to the connection state and add a good chunk of documentation about the lifecycle of a request.

@piodul
Copy link
Collaborator Author

piodul commented Oct 10, 2022

v2: fixed the bug described in #564 (comment)

Comment on lines +388 to +396
// If the query was a driver-side timeout, there is no use
// in retrying anymore
if matches!(last_error, QueryError::RequestTimeout(_)) {
trace!(parent: &span, "Query timed out driver-side");
break 'nodes_in_plan;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should leave all decisions about retrying to the RetryPolicy.
The default policy would decide not to retry in case of an unkown error.

Plus this is inconsistent with non-iter methods, which don't have such an if.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was that there is a fixed time to fetch a single page. If a query attempt doesn't succeed, the deadline is not reset for the next attempt. Therefore it does not make sense to let the retry policy to decide again in case of driver-side timeout, because the next attempt will already be timed out at the point when it starts - so it's a tiny optimization.

Good point about the inconsistency, I suppose that execute_query should get that if, too.

Copy link
Contributor

@cvybhu cvybhu Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it makes some sense.

I'm dreaming of the day when we unify iterators and normal queries so that we don't have to do everything twice.

Copy link
Contributor

@cvybhu cvybhu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I had another look and I think it looks good.

Please rebase and I'll merge it.

The ArmableTimer is a future which is also a resettable timer. It starts
in an "unarmed" state. If armed, it resolves at the configured point of
time. After successfully polling the timer after it resolves, it becomes
unarmed and must be reset again.

It is possible to concurrently await the timer and (re)-arm. Awaiting on
an unarmed timer does not resolve unless the timer is armed and the
deadline is reached. Re-arming a timer while awaiting on it will just
change the point of time when the awaiter resolves.

The ArmableTimer will be used as a component of the new, scalable
reimplementation of timeouts.
Adds a module to the connection router which is responsible for tracking
request deadlines and sending timeouts errors for their handlers when
their deadlines pass.

The main purpose of tracking deadlines on the router rather than via
tokio::time::timeout on the tasks that wait for response is to avoid
scalability issues. The current implementation of Tokio's `timeout`
registers a timer entry in an intrusive list in the current thread's
timer driver. If the inner future finishes before the timeout, the
`timeout` future needs to unregister the timer entry. Both registration
and unregistration requires acquiring a per-timer driver mutex, and we
observed that under very large throughput and core count acquiring this
mutex takes a lot of the total execution time (note that tokio worker
threads can steal tasks from each other, so cancelling a timer might
involve acquiring a mutex of a different thread's timer driver).

The new implementation reduces the number of times timers are registered
and unregistered. The deadline tracker module keeps a single timer and
reschedules it infrequently, i.e. roughly with a period of timeout minus
latency (the assumption is that the timeouts are rare and are longer
than the average latency).
…hods

Because we are moving responsibility for timing out the request from the
task waiting for the request to the connection router, the request
deadline needs to be propagated via Connection's methods such as query,
execute or batch. In order not to break the existing Connection's
interface, this commit introduces new methods which take an optional
deadline parameter and use them to reimplement the existing methods. The
new methods have pub(crate) visibility in order not to pollute the
public interface with even more versions of query/execute/batch methods.
This commit modifies the Session's methods so that they enforce
client-side timeouts in a more efficient way. Additionally, the timeout
is now also enforced for the batch method.
The `RowIterator::new_for_query` has a lot of parameters, and we will be
adding more of them. Most of them are shared with
`new_for_prepared_statement` which takes those parameters as a struct.
This commit refactors the code so that both of these methods take a
common struct.
This commit enforces driver-side timeouts in RowIterator. The timeout is
applied for each fetched page separately, i.e. the driver can perform
some retries for a single page, but it must not take more time than the
timeout in total.
@piodul
Copy link
Collaborator Author

piodul commented Nov 15, 2022

v3: rebased, added missing if (mentioned here) in iterator.rs

@piodul
Copy link
Collaborator Author

piodul commented Nov 15, 2022

Please don't merge yet. The test proxy was merged, and I promised automatic tests, but there are some improvements to the proxy in some of the outstanding PRs that I would like to be merged first before writing the tests.

@wprzytula
Copy link
Collaborator

Please don't merge yet. The test proxy was merged, and I promised automatic tests, but there are some improvements to the proxy in some of the outstanding PRs that I would like to be merged first before writing the tests.

Just a reminder ping - the proxy improvements were already merged.

@wprzytula
Copy link
Collaborator

Once again - ping @piodul. If this patch improves performance, we should definitely merge it quickly!

@piodul piodul added this to the 1.1.0 milestone Sep 27, 2023
@Lorak-mmk Lorak-mmk added the waiting-on-author Waiting for a response from issue/PR author label Mar 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting-on-author Waiting for a response from issue/PR author
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants