Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V4 Job Completion Duration Performance Hit #338

Open
malonaz opened this issue May 5, 2024 · 4 comments
Open

V4 Job Completion Duration Performance Hit #338

malonaz opened this issue May 5, 2024 · 4 comments

Comments

@malonaz
Copy link

malonaz commented May 5, 2024

Hi,
just wanted to share some data on our measured impact of the new notification system on the job completion duration:
image
We're observing a roughly 400% increase at p90 (from 50ms to 200ms), and with occasional spikes to 500ms.
This is with a 1 job inserted / second and 1 job exec / second.
Big fan of you work 👍 - this isn't really a dealbreaker, I'm curious to hear about your internal discussions around the performance tradeoffs of this new notification system. Was the previous NOTIFY implementation so penalizing at a certain scale that you felt you should make this change?
Thanks

@bgentry
Copy link
Contributor

bgentry commented May 5, 2024

Hi, thanks for reporting this! Can you provide some additional info to help us track down the source of this regression?

  • Which version of River were you on previously?
  • How many nodes (Clients) are you using to work jobs on this queue? How many clients are inserting jobs?
  • Can you confirm what duration is being measured here / how it is being measured? Is this measuring from the time difference between scheduled_at and completed_at for a job that is inserted as available to be worked immediately?
  • What settings are you putting on your River client configs? Any customization of poll intervals or anything like that?

There are a few reasons behind the change. One of them is that yes, there is a lot of overhead for a high throughput system in doing a NOTIFY on every insert. Another is that there were some mistakes / shortcomings in the original implementation that prevented a fully functional setup using multiple clients running on totally isolated Postgres schemas, basically the fact that notifications are a global namespace across all schemas & search paths but our notifications weren’t properly namespaced. We wanted to fix this as well.

I have a hunch that the specific number of clients and insert rate in this example is basically at odds with the chosen default debounce timer in the app level notifier. Hopefully it’s something we can trivially improve by tuning a couple of internal settings or worst case by exposing a new setting.

@malonaz
Copy link
Author

malonaz commented May 6, 2024

hI,

  • we were on v0.0.20 prior to this
  • we have a single consumer that consumes from ~5 queues, and the majority of our jobs trigger a new job insertion within the same transaction.
  • Another client is a producer_only, but produces at a much lower rate.
  • the duration is computed using the river event.JobStats.CompleteDuration.Seconds()
  • The consumer/producer uses the following config:
riverConfig := &river.Config{
  JobTimeout:           -1,
  RescueStuckJobsAfter: 5 * time.Minute,
  Queues:               queueNameToQueueConfig,
  Workers:              workers,
  ErrorHandler:         s,
}
  • The producer only uses a blank config riverConfig := &river.Config{}

@bgentry
Copy link
Contributor

bgentry commented May 7, 2024

@malonaz ah, I may have misinterpreted this report initially. This CompleteDuration stat should have nothing to do with the changes to the LISTEN/NOTIFY mechanism. This stat is measuring the time between when the worker's Work() function finishes executing, and when River finishes marking the job as completed in the database.

Instead of being the result of #301 and the v4 migration, I suspect you are seeing this stat increase as a result of the changes in #258 to introduce a batching async job completer. This change has the effect of tremendously increasing total throughput with the tradeoff of some additional latency, because completed jobs are batched up in memory in the client to be marked completed in groups. You can see the core of how it works here

// BatchCompleter accumulates incoming completions, and instead of completing
// them immediately, every so often complete many of them as a single efficient
// batch. To minimize the amount of driver surface area we need, the batching is
// only performed for jobs being changed to a `completed` state, which we expect
// to the vast common case under normal operation. The completer embeds an
// AsyncCompleter to perform other non-`completed` state completions.
type BatchCompleter struct {
baseservice.BaseService
startstop.BaseStartStop
withSubscribe
asyncCompleter *AsyncCompleter // used for non-complete completions
completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation
disableSleep bool // disable sleep in testing
maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted
exec PartialExecutor
setStateParams map[int64]*batchCompleterSetState
setStateParamsMu sync.RWMutex
started chan struct{}
waitOnBacklogChan chan struct{}
waitOnBacklogWaiting bool
}
func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *BatchCompleter {
const (
completionMaxSize = 5_000
maxBacklog = 20_000
)
return baseservice.Init(archetype, &BatchCompleter{
asyncCompleter: NewAsyncCompleter(archetype, exec),
completionMaxSize: completionMaxSize,
exec: exec,
maxBacklog: maxBacklog,
setStateParams: make(map[int64]*batchCompleterSetState),
})
}
func (c *BatchCompleter) Start(ctx context.Context) error {
stopCtx, shouldStart, stopped := c.StartInit(ctx)
if !shouldStart {
return nil
}
c.started = make(chan struct{})
go func() {
// This defer should come first so that it's last out, thereby avoiding
// races.
defer close(stopped)
c.Logger.DebugContext(ctx, c.Name+": Run loop started")
defer c.Logger.DebugContext(ctx, c.Name+": Run loop stopped")
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
close(c.started)
backlogSize := func() int {
c.setStateParamsMu.RLock()
defer c.setStateParamsMu.RUnlock()
return len(c.setStateParams)
}
for numTicks := 0; ; numTicks++ {
select {
case <-stopCtx.Done():
// Try to insert last batch before leaving. Note we use the
// original context so operations aren't immediately cancelled.
if err := c.handleBatch(ctx); err != nil {
c.Logger.Error(c.Name+": Error completing batch", "err", err)
}
return
case <-ticker.C:
}
// The ticker fires quite often to make sure that given a huge glut
// of jobs, we don't accidentally build up too much of a backlog by
// waiting too long. However, don't start a complete operation until
// we reach a minimum threshold unless we're on a tick that's a
// multiple of 5. So, jobs will be completed every 250ms even if the
// threshold hasn't been met.
const batchCompleterStartThreshold = 100
if backlogSize() < min(c.maxBacklog, batchCompleterStartThreshold) && numTicks != 0 && numTicks%5 != 0 {
continue
}
for {
if err := c.handleBatch(ctx); err != nil {
c.Logger.Error(c.Name+": Error completing batch", "err", err)
}
// New jobs to complete may have come in while working the batch
// above. If enough have to bring us above the minimum complete
// threshold, loop again and do another batch. Otherwise, break
// and listen for a new tick.
if backlogSize() < batchCompleterStartThreshold {
break
}
}
}
}()
return nil
}

As of now, there are no user-facing knobs for tuning this behavior. We even still have the old async completer in code, as well as an inline one (the latter has much worse throughput). There is no way to activate these as a user though, and no way to customize the thresholds for batching.

I wonder if @brandur has thoughts on whether this much of an increase is expected, and whether/how we might want to allow users to customize the behavior here?

@malonaz Could you add some more detail on the throughput rate for these queues? Additionally, I want to ask about this bit:

we have a single consumer that consumes from ~5 queues, and the majority of our jobs trigger a new job insertion within the same transaction.

The phrase "the same transaction" struck me once I realized you were measuring using the CompleteDuration stat. Jobs are not worked within a transaction and their completion is not tied to any transaction that occurs within the job itself. However, River does offer a transactional job completion feature that allows you to make other database state changes alongside the job being marked as completed, ensuring that all those changes happen atomically. Are you making use of that feature here? If so, the increase in this measurement won't actually be impacting you at all, at least not for the jobs where you are completing them within Work().

@brandur
Copy link
Contributor

brandur commented May 7, 2024

I wonder if @brandur has thoughts on whether this much of an increase is expected, and whether/how we might want to allow users to customize the behavior here?

Yeah, this is a little nuanced, but although the batch completer's ticker fires every 50 ms, it'll only complete a batch every 50 ms if a minimum batch size has accumulated during that time. Otherwise it waits until a tick that's a multiple of 5 to complete a batch. So you can expect up to a 5 * 50 = 250 ms delay.

Let me caveat that though to say that the delay doesn't actually matter — the job still finished in the same amount of time, and it won't be reworked (unless there's a hard crash). It's just that setting it to fully completed in the database might take a little bit longer.

I'd hold off on further customization unless we find a good reason for such. The defaults should be pretty good for everyone.

In terms of measuring statistics, it migh t make more sense to observe QueueWaitDuration + RunDuration as a more meaningful number for how long jobs are taking to run (as opposed to CompleteDuration).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants