Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommittableOffsetBatchImpl allow reordering #1285

Open
Matzz opened this issue Dec 13, 2020 · 5 comments
Open

CommittableOffsetBatchImpl allow reordering #1285

Matzz opened this issue Dec 13, 2020 · 5 comments

Comments

@Matzz
Copy link
Contributor

Matzz commented Dec 13, 2020

Short description

Commiting older offset after newer in CommittableOffsetBatchImpl will override it.

Details

There two main problems I would like to discuss here:

  1. Commit older offset after newer using CommittableOffsetBatchImpl will override offset. Example:
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.testkit.ConsumerResultFactory

object Main extends App {
  def makeCommittableOffset(offset: Long) = {
    ConsumerResultFactory.committableOffset(
      groupId = "x",
      topic = "test",
      partition = 1,
      offset = offset,
      metadata = ""
    )
  }
  var batch = CommittableOffsetBatch.empty
  batch = batch.updated(makeCommittableOffset(1))
  batch = batch.updated(makeCommittableOffset(3))
  batch = batch.updated(makeCommittableOffset(4))
  batch = batch.updated(makeCommittableOffset(2))
  println(batch)
  // CommittableOffsetBatch(batchSize=4, GroupTopicPartition(x,test,1) -> 2)
}

In my opinion there should be an option to log an error or throw an exception in such case. At the first glance, it affects only at most once delivery. However, this behavior could be a problem for low traffic topics. We will observe a lag in metrics until we will receive next message on that partition. And sometimes it could take days. Such problems are very hard to debug. In my case my code was faulty due to using flatMapMerge instead of flatMapConcat.
I understand that CommittableOffsetBatch may not catch problems between independent batches. But I think logging such issues within a the same batch is possible and beneficial. Of course it could be configurable and optional.

  1. Skipping some offsets:
  var batch = CommittableOffsetBatch.empty
  batch = batch.updated(makeCommittableOffset(1))
  batch = batch.updated(makeCommittableOffset(2))
  batch = batch.updated(makeCommittableOffset(3))
  batch = batch.updated(makeCommittableOffset(20))
  println(batch)
 // CommittableOffsetBatch(batchSize=4, GroupTopicPartition(x,test,1) -> 2)

I'm not entirely sure about that. But I think for at least once delivery it should be possible to log in such case (optional)

I could implement that feature however I that expected behavior should be discussed first.

Regards,
Matzz

@ennru
Copy link
Member

ennru commented Dec 14, 2020

Thank you for discussing this here.

Reordering of messages and thus committing offsets wrong causes nasty bugs and Alpakka Kafka can get better at avoiding it.

For case 1. I agree that we should stop lowering the offset for a topic partition right in the batch implementation (note that there is logic to avoid committing lower offsets in CommitObservationLogic.updateBatchForPartition).
Logging a warning might be a good idea. Passing configuration to the CommittableBatch implementation is not straight-forward, so I'm not sure it would be worth it.

For case 2. I wouldn't want to change anything, in many scenarios it is valid to just filter out messages (and offsets).

@Matzz
Copy link
Contributor Author

Matzz commented Dec 18, 2020

@ennru
For both scenarios I thought that it could be configurable. So by default I would disable warnings for scenario 2.
I think it could be implemented by adding reorderingEvent listener to CommittableOffsetBatch. So beside predefined behaviors user could set some custom actions (for example metrics).
I was thinking about changing CommittableOffsetBatch.empty and CommitObservationLogic via settings which internally use empty.

Other approach would be just adding logging in CommittableOffsetBatch and CommitObservationLogic.updateBatch for the first scenario.

@ennru
Copy link
Member

ennru commented Dec 22, 2020

If you're willing to try this out that would be great.

@EgbertW
Copy link

EgbertW commented Jan 6, 2021

+1 for this. I experienced some issues with this where offsets would never reach 0 because of the use of mapAsyncUnordered. After switching to mapAsync the messages are produced in order and therefore the offsets are committed correctly so they reach 0. While this is more "correct", the throughput of the pipeline lowers quite significantly by this. It is mostly a esthetic issue as long as the pipeline keeps running, but when it is restarted it'll lead to duplication.

I wouldn't want to start a discussion of what correct behavior for this is, but it would be great if Alpakka would at least warn that offsets are commited in the wrong order and this may produce undesired / surprising results.

@seglo
Copy link
Member

seglo commented Jan 6, 2021

it would be great if Alpakka would at least warn that offsets are commited in the wrong order and this may produce undesired / surprising results.

Agreed, I created #1305 to track this specifically. @EgbertW if you're willing we'd welcome a PR!

Regarding @Matzz 2nd request I don't see the value warning the user of "skipped" offset commits since it's a common practice to batch or skip records during processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants