Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer.CommittableSource strange error #396

Open
ManuSchm opened this issue Jan 5, 2024 · 0 comments
Open

KafkaConsumer.CommittableSource strange error #396

ManuSchm opened this issue Jan 5, 2024 · 0 comments

Comments

@ManuSchm
Copy link

ManuSchm commented Jan 5, 2024

Version Information
Akka 1.5.14
Akka.Streams.Kafka 1.5.13.1

We ran into a strange error/behaviour using KafkaConsumer.CommittableSource.
The stream works as expected for quite a time without any problems. Then then we encountered some out of sync problems between some of our applications and we found in our logfiles that we didn't receive any messages from the KafkaConsumer.
Further research in our logs lead us to the following log entries which we are not able to interpret because they are not coming form our application.

kafka
kafka2

In our interpretation the second is a result of the first one but do not know where the first came from.

There is also a similar error log entry regarding phobos in that timespan.
[14:32:38 ERR ] Error while executing scheduled task ScheduledWork(Deadline=6118411510800, RepeatEvery=0, Cancelled=False, System.InvalidOperationException: Nullable object must have a value. at Phobos.Actor.Instrumentation.PhobosRepointableActorRef.TellInternal(Object message, IActorRef sender) at Akka.Actor.HashedWheelTimerScheduler.Bucket.Execute(Int64 deadline)

The following is the code snippet where we use KafkaConsumer.CommittableSource.

var committerDefaults = CommitterSettings.Create(Context.System); _control = KafkaConsumer.CommittableSource(consumerSettings, subscription) .SelectAsync(1, async elem => { try { var res = await _kafkaConsumeRouter.Ask<IInternEventResult>(elem, TimeSpan.FromSeconds(10)); if (res is InternEventFailure failure) { if (failure.Exception != null) elem.Record.Message.Headers.Add("exception", Encoding.UTF8.GetBytes(failure.Exception.ToString())); if (failure.Error != null) elem.Record.Message.Headers.Add("error", Encoding.UTF8.GetBytes(failure.Error)); _kafkaDltActor.Tell(elem); } } catch (AskTimeoutException) { elem.Record.Message.Headers.Add("error", Encoding.UTF8.GetBytes("timeout")); _kafkaDltActor.Tell(elem); } return (ICommittable) elem.CommitableOffset; }) .ToMaterialized(Committer.Sink(committerDefaults), DrainingControl<NotUsed>.Create) .WithAttributes(ActorAttributes.CreateSupervisionStrategy(Deciders.StoppingDecider)) .Run(Context.System); _control.IsShutdown.ContinueWith(_ => new KafkaStreamStopped()).PipeTo(Self);

The KafkaConsumer.CommittableSource stream don't break or stop.

We have encountered this problem only once and weren't able to reproduce it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant