Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16675: Refactored and new rebalance callbacks integration tests #15965

Merged
merged 2 commits into from
May 21, 2024

Conversation

lianetm
Copy link
Contributor

@lianetm lianetm commented May 15, 2024

  • Move existing rebalance callback + consumer.position test to the PlaintextConsumerCallbackTest file (refactored to reuse the new helper funcs available)
  • Add new integration tests for callbacks interaction with seek and pause
  • Minor cleanup in the callbacks test file

@lianetm
Copy link
Contributor Author

lianetm commented May 15, 2024

Hey @lucasbru, extending on top of your initial callbacks test file to get more coverage (very nice helper funcs btw). Could you take a look when you have a chance? Thanks!

}

override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
// noop
}
})
TestUtils.pollUntilTrue(consumer, () => partitionsAssigned.get(), "Timed out before expected rebalance completed")
consumer.close()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed as it's not needed here, the test already has the logic for closing all consumers on (IntegrationTestHarness.tearDown). (Left it on the triggerOnPartitionsRevoked as it was, where it's needed to assert the revocation)

val startingTimestamp = 0
sendRecords(producer, totalRecords.toInt, tp, startingTimestamp)

consumer.subscribe(asList(topic), new ConsumerRebalanceListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you not use the helper method below here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't initially mainly because of the close and poll in the helper, that played against what I needed to test in these 2, but then I myself removed the close and forgot to try again :). So I did integrate the helper here now, with a minor twist to pass the consumer as param. Also it allowed me to simplify the seek/pause test in one, given that we do need to pause to properly check the seek behaviour, so removed the extra test for pause. Good catch, thanks!

@lucasbru
Copy link
Member

Thanks for the PR @lianetm . I just left one question

@lianetm
Copy link
Contributor Author

lianetm commented May 17, 2024

Thanks for the helpful comment @lucasbru , addressed.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lucasbru lucasbru merged commit 52b4596 into apache:trunk May 21, 2024
1 check failed
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request May 24, 2024
…apache#15965)

Move existing rebalance callback + consumer.position test to the PlaintextConsumerCallbackTest file (refactored to reuse the new helper funcs available)
Add new integration tests for callbacks interaction with seek and pause
Minor cleanup in the callbacks test file

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants