Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got unexpected Kafka message error #57

Open
IgorFedchenko opened this issue Sep 18, 2019 · 8 comments
Open

Got unexpected Kafka message error #57

IgorFedchenko opened this issue Sep 18, 2019 · 8 comments

Comments

@IgorFedchenko
Copy link
Contributor

As stated in this comment, one of the tests (possibly not single one) can fail with IllegalActorStateException.

The code that throws it is:

var currentAssignment = _consumer.Assignment;
var initialRebalanceInProcess = _rebalanceInProgress;


try
{
    if (_requests.IsEmpty())
    {
        // no outstanding requests so we don't expect any messages back, but we should anyway
        // drive the KafkaConsumer by polling to handle partition events etc.
        _consumer.Pause(currentAssignment);
        var message = _consumer.Consume(TimeSpan.FromMilliseconds(1));
        if (message != null)
        {
            throw new IllegalActorStateException($"Got unexpected Kafka message: {message.ToJson()}");
        }
    }

So basically, we are pausing all currently assigned partitions and then consuming some service messages (like partition assigned events, etc) from kafka. But, in the original alpakka code, they do this check - as well as we do - for some reason.

This is not clear right now, but seems like this error is possible if we have partitions reassigned/resumed in separate thread and right after Pause() call some partition get resumed. But all Resume() calls are executed inside actor's message handling, so race conditions should be excluded... Needs to be checked and resolved.

@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb Do you think I need to spend time on this right now? On the one hand, this may be occasionally fixed with next PRs (some issues I have already found, but they seem unrelated to this issue). On the other hand, it is hard to reproduce this (and I have never got such error locally so far), and if code will changed I will never know if issue still remains or not.

Actually, right now I am working on #56 and it will already change some part of the code (related to _requests field updating), so this might already be modified.

@Aaronontheweb
Copy link
Member

@IgorFedchenko nah, I'd just keep it in mind for now and see if it happens again. Might be an issue you end up fixing as a result of other changes down the line.

@IgorFedchenko
Copy link
Contributor Author

@IgorFedchenko
Copy link
Contributor Author

@dimitar-j-shterev
Copy link

dimitar-j-shterev commented Feb 5, 2024

@IgorFedchenko IMO, there is some kind of race condition as all rebalancing events (partition assigned, partition revoked) are scheduled for processing in the actor's mailbox. Note that all partition event handlers are executed as a side effect of Consume(), i.e. in the same thread (by the driver design). In this situation, the state of KafkaConsumerActor will be updated a bit later after calling Consume() - when the PartitionAssigned or PartitionRevoked message is processed. Until then, the state of KafkaConsumerActor does not reflect the state of the consumer. I may have missed something, but it seems pretty safe to me PartitionAssigned/PartitionRevoked to not been sent to self. Instead the state of the actor should be updated by the callbacks whenever they are called.

@Arkatufus
Copy link
Contributor

This could very well be the source of the problem, it'll need a serious re-engineering to fix this though

@dimitar-j-shterev
Copy link

dimitar-j-shterev commented Feb 5, 2024

This could very well be the source of the problem, it'll need a serious re-engineering to fix this though

Well it wouldn't be so serious. But this condition affects offset commits as well since if you have commit messages for already revoked partitions you will receive a lot of Erroneous state errors and maybe actors restarts which could degraded performance dramatically. At first look this change could be enough

_consumer = _settings.CreateKafkaConsumer(
                    consumeErrorHandler: (c, e) => localSelf.Tell(new Status.Failure(new KafkaException(e))),
                    partitionAssignedHandler: (c, tp) => PartitionsAssignedHandler(tp),
                    partitionRevokedHandler: (c, tp) => PartitionsRevokedHandler(tp),
                    statisticHandler: (c, json) => _statisticsHandler.OnStatistics(c, json));
  • removal of some dead code after this change

@Aaronontheweb
Copy link
Member

I may have missed something, but it seems pretty safe to me PartitionAssigned/PartitionRevoked to not been sent to self. Instead the state of the actor should be updated by the callbacks whenever they are called.

If we're doing something that's out of alignment with the driver here, we should definitely fix it. And I don't think this is a huge engineering project either.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants