Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle/fix "Got unexpected Kafka message" error #99

Open
IgorFedchenko opened this issue Jan 30, 2020 · 0 comments
Open

Handle/fix "Got unexpected Kafka message" error #99

IgorFedchenko opened this issue Jan 30, 2020 · 0 comments

Comments

@IgorFedchenko
Copy link
Contributor

During test execution under Ubuntu:
Test:
Akka.Streams.Kafka.Tests.Integration.SourceWithOffsetContextIntegrationTests.SourceWithOffsetContext_at_least_once_consuming_should_work

Error:

Failed: Expected a message of type Akka.Streams.TestKit.TestSubscriber+OnNext1[System.ValueTuple2[Akka.NotUsed,Akka.Streams.Kafka.Messages.ICommittableOffsetBatch]], but received {TestSubscriber.OnError(Got unexpected Kafka message: {"Topic":"topic-1-3aac2d51-7efe-4e0c-adca-0bac512289e9","Partition":{"Value":0,"IsSpecial":false},"Offset":{"Value":0,"IsSpecial":false},"TopicPartition":{"Topic":"topic-1-3aac2d51-7efe-4e0c-adca-0bac512289e9","Partition":{"Value":0,"IsSpecial":false}},"TopicPartitionOffset":{"Topic":"topic-1-3aac2d51-7efe-4e0c-adca-0bac512289e9","Partition":{"Value":0,"IsSpecial":false},"Offset":{"Value":0,"IsSpecial":false},"TopicPartition":{"Topic":"topic-1-3aac2d51-7efe-4e0c-adca-0bac512289e9","Partition":{"Value":0,"IsSpecial":false}}},"Message":{"Value":"3","Timestamp":{"Type":1,"UnixTimestampMs":1580383308079,"UtcDateTime":"2020-01-30T11:21:48.079Z"},"Headers":[]},"Value":"3","Timestamp":{"Type":1,"UnixTimestampMs":1580383308079,"UtcDateTime":"2020-01-30T11:21:48.079Z"},"Headers":[],"IsPartitionEOF":false})} (type Akka.Streams.TestKit.TestSubscriber+OnError) instead from [akka://SourceWithOffsetContextIntegrationTests/user/StreamSupervisor-27/Flow-1-0-unknown-operation#1937253766]

Expected: True

Actual: False

Details: https://dev.azure.com/dotnet/Akka.NET/_build/results?buildId=27932&view=ms.vss-test-web.build-test-results-tab&runId=1085592&resultId=100006&paneView=debug

It is thrown from KafkaConsumerActor:

// no outstanding requests so we don't expect any messages back, but we should anyway
// drive the KafkaConsumer by polling to handle partition events etc.
PausePartitions(currentAssignment);
var message = _consumer.Consume(TimeSpan.FromMilliseconds(1));
if (message != null)
{
throw new IllegalActorStateException($"Got unexpected Kafka message: {message.ToJson()}");
}

Basically partition should be paused, but for some reason message is still received.

I think that instead of throwning exception here, we can log warning and make a workaround - ignore message and reset partition position pointer by 1 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Development

No branches or pull requests

1 participant