Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-creating SubSource for partition using the KafkaConsumer.CommittablePartitionedSource in parallel #232

Open
PugachA opened this issue Sep 29, 2021 · 2 comments
Labels
Milestone

Comments

@PugachA
Copy link

PugachA commented Sep 29, 2021

Version Information

<PackageReference Include="Akka.Streams" Version="1.4.25" />
<PackageReference Include="Akka.Streams.Kafka" Version="1.1.4" />

Describe the bug
Re-creating SubSource for partition using the KafkaConsumer.CommittablePartitionedSource in parallel.

[WARNING][28.09.2021 18:15:03][Thread 0004][akka://system/system/kafka-consumer-2] RequestMessages from topic/partition ipmrequeststatistics [[2]] already requested by other stage ipmrequeststatistics [[2]]

Such errors only occur if there is a slow stage in the pipeline.
Also, such errors lead to data loss.

To Reproduce
Steps to reproduce the behavior:

  1. Configuration
Configuration
akka {

  loglevel = DEBUG

  stream {

    # Default flow materializer settings
    materializer {

      # Initial size of buffers used in stream elements
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      initial-input-buffer-size = 4

      # Maximum size of buffers used in stream elements
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      max-input-buffer-size = 16

      # Fully qualified config path which holds the dispatcher configuration
      # to be used by FlowMaterialiser when creating Actors.
      # When this value is left empty, the default-dispatcher will be used.
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      dispatcher = ""

      blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"

      # Cleanup leaked publishers and subscribers when they are not used within a given
      # deadline
      subscription-timeout {
        # when the subscription timeout is reached one of the following strategies on
        # the "stale" publisher:
        # cancel - cancel it (via `onError` or subscribing to the publisher and
        #          `cancel()`ing the subscription right away
        # warn   - log a warning statement about the stale element (then drop the
        #          reference to it)
        # noop   - do nothing (not recommended)		
	    # Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
        mode = cancel

        # time after which a subscriber / publisher is considered stale and eligible
        # for cancelation (see `akka.stream.subscription-timeout.mode`)		
	    # Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
        timeout = 5s
      }
      
      # Enable additional troubleshooting logging at DEBUG log level
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      debug-logging = on

      loglevel = DEBUG

      # Maximum number of elements emitted in batch if downstream signals large demand
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      output-burst-limit = 1000
      
      # Enable automatic fusing of all graphs that are run. For short-lived streams
      # this may cause an initial runtime overhead, but most of the time fusing is
      # desirable since it reduces the number of Actors that are created.
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      auto-fusing = off

	  # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
      # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
      # buffer upon stream materialization if the requested buffer size is less than this
      # configuration parameter. The default is very high because failing early is better
      # than failing under load.
      #
      # Buffers sized larger than this will dynamically grow/shrink and consume more memory
      # per element than the fixed size buffers.
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
      max-fixed-buffer-size = 1000000000

	  # Maximum number of sync messages that actor can process for stream to substream communication.
	  # Parameter allows to interrupt synchronous processing to get upsteam/downstream messages.
	  # Allows to accelerate message processing that happening withing same actor but keep system responsive.
	  # Note: If you change this value also change the fallback value in ActorMaterializerSettings
	  sync-processing-limit = 1000

      debug {
        # Enables the fuzzing mode which increases the chance of race conditions
        # by aggressively reordering events and making certain operations more
        # concurrent than usual.
        # This setting is for testing purposes, NEVER enable this in a production
        # environment!
        # To get the best results, try combining this setting with a throughput
        # of 1 on the corresponding dispatchers.
	    # Note: If you change this value also change the fallback value in ActorMaterializerSettings
        fuzzing-mode = off
      }
      
      stream-ref {
        # Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref
        #
        # The buffer will be attempted to be filled eagerly even while the local stage did not request elements,
        # because the delay of requesting over network boundaries is much higher.
        buffer-capacity = 32
      
        # Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number)
        # Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should
        # be very rare in any case, yet possible -- mostly under connection break-down and re-establishment).
        #
        # The semantics of handling and updating the demand however are in-line with what Reactive Streams dictates.
        #
        # In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive
        # within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost.
        demand-redelivery-interval = 1 second
      
        # Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref.
        # This timeout does not have to be very low in normal situations, since the remote side may also need to
        # prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking
        # in-active streams which are never subscribed to.
        subscription-timeout = 30 seconds
        
        # In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed
        # message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it.
        # This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the
        # other side of the stream ref would never send the "final" terminal message.
        #
        # The timeout specifically means the time between the Terminated signal being received and when the local SourceRef
        # determines to fail itself, assuming there was message loss or a complete partition of the completion signal.
        final-termination-signal-deadline = 2 seconds
      }
    }

    # Deprecated, left here to not break Akka HTTP which refers to it
    blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"

    # Deprecated, will not be used unless user code refer to it, use 'akka.stream.materializer.blocking-io-dispatcher'
    # instead, or if from code, prefer the 'ActorAttributes.IODispatcher' attribute
    default-blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
  }

  # configure overrides to ssl-configuration here (to be used by akka-streams, and akka-http – i.e. when serving https connections)
  ssl-config {
    protocol = "TLSv1"
  }
  actor {
  
    serializers {
      akka-stream-ref = "Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams"
    }
  
    serialization-bindings {
      "Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams"         = akka-stream-ref
      "Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams"       = akka-stream-ref
      "Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams" = akka-stream-ref
    }
  
    serialization-identifiers {
      "Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams" = 30
    }
  }
}

# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
  # Config path of Akka Discovery method
  # "akka.discovery" to use the Akka Discovery method configured for the ActorSystem
  discovery-method = akka.discovery

  # Set a service name for use with Akka Discovery
  # https://doc.akka.io/docs/alpakka-kafka/current/discovery.html
  service-name = ""

  # Timeout for getting a reply from the discovery-method lookup
  resolve-timeout = 3 seconds

  # Tuning property of scheduled polls.
  # Controls the interval from one scheduled poll to the next.
  poll-interval = 500ms

  # Tuning property of the `KafkaConsumer.poll` parameter.
  # Note that non-zero value means that the thread that
  # is executing the stage will be blocked. See also the `wakup-timeout` setting below.
  poll-timeout = 500ms

  # The stage will delay stopping the internal actor to allow processing of
  # messages already in the stream (required for successful committing).
  # This can be set to 0 for streams using `DrainingControl`.
  stop-timeout = 30s

  # Duration to wait for `KafkaConsumer.close` to finish.
  close-timeout = 20s

  # If offset commit requests are not completed within this timeout
  # the returned Future is completed `CommitTimeoutException`.
  # The `Transactional.source` waits this ammount of time for the producer to mark messages as not
  # being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
  commit-timeout = 15s

  # If commits take longer than this time a warning is logged
  commit-time-warning = 1s

  # Not relevant for Kafka after version 2.1.0.
  # If set to a finite duration, the consumer will re-send the last committed offsets periodically
  # for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
  commit-refresh-interval = infinite

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the KafkaConsumerActor. Some blocking may occur.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
  # can be defined in this configuration section.
  kafka-clients {
    # Disable auto-commit by default
    enable.auto.commit = false
  }

  # Time to wait for pending requests when a partition is closed
  wait-close-partition = 500ms

  # Limits the query to Kafka for a topic's position
  position-timeout = 5s

  # When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
  # call to Kafka's API
  offset-for-times-timeout = 5s

  # Timeout for akka.kafka.Metadata requests
  # This value is used instead of Kafka's default from `default.api.timeout.ms`
  # which is 1 minute.
  metadata-request-timeout = 5s

  # Interval for checking that transaction was completed before closing the consumer.
  # Used in the transactional flow for exactly-once-semantics processing.
  eos-draining-check-interval = 30ms

  # Issue warnings when a call to a partition assignment handler method takes
  # longer than this.
  partition-handler-warning = 5s

  # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
  # configured by `consumer.metadata-request-timeout`
  connection-checker {

    #Flag to turn on connection checker
    enable = false

    # Amount of attempts to be performed after a first connection failure occurs
    # Required, non-negative integer
    max-retries = 3

    # Interval for the connection check. Used as the base for exponential retry.
    check-interval = 15s

    # Check interval multiplier for backoff interval
    # Required, positive number
    backoff-factor = 2.0
  }

  # Protect against server-side bugs that cause Kafka to temporarily "lose" the latest offset for a consumer, which
  # then causes the Kafka consumer to follow its normal 'auto.offset.reset' behavior. For 'earliest', these settings
  # allow the client to detect and attempt to recover from this issue. For 'none' and 'latest', these settings will
  # only add overhead. See
  # for more information
  offset-reset-protection {
    # turns on reset protection
    enable = false
    # if consumer gets a record with an offset that is more than this number of offsets back from the previously
    # requested offset, it is considered a reset
    offset-threshold = 9223372036854775807
    # if the record is more than this duration earlier the last received record, it is considered a reset
    time-threshold = 100000 days
  }
}

# The dispatcher that will be used by default by consumer and
# producer stages.
akka.kafka.default-dispatcher {
  type = "Dispatcher"
  executor = "default-executor"
}

# Committer flows use this settings to make batch commits
akka.kafka.committer {
    # Set maximum number of messages to commit at once
    max-batch = 100000
    
    # Set maximum interval between commits
    max-interval = 10s
    
    # Set parallelism for async committing
    parallelism = 16
}
2. Prepare
Prepare code
var configuration = ConfigurationFactory.ParseString(File.ReadAllText("application.conf"));

using var system = ActorSystem.Create("system", configuration);
using var materializer = system.Materializer();

var consumerSettings = ConsumerSettings<string, IpmRequestStatistic>.Create(system, Deserializers.Utf8, new KafkaJsonSerializer<IpmRequestStatistic>())
  .WithBootstrapServers("localhost:15000")
  .WithGroupId("aidb-group")
  .WithProperty("auto.offset.reset", "earliest");

var committerDefaults = CommitterSettings.Create(system);

var maxPartitions = 4;
var batchSize = 10000;
var parallel = 3;

var config = new ProducerConfig
{
  BootstrapServers = "localhost:15001",
  ClientId = Dns.GetHostName(),
  Acks = Acks.All
};

var producer = new ProducerBuilder<long, ApplicationInstallationConnect>(config)
   .SetKeySerializer(Serializers.Int64)
   .SetValueSerializer(new KafkaJsonSerializer<ApplicationInstallationConnect>())
   .Build();
3. I have a kafka in docker with a topic with 4 partitions.
Flow
var flow = KafkaConsumer.CommittablePartitionedSource(consumerSettings, Subscriptions.Topics("ipmrequeststatistics"))
  .SelectAsync(4, async tuple =>
  {
      return tuple.Item2
          .Select(m => new MessageContainer<CommittableMessage<string, IpmRequestStatistic>>
          {
              Message = m,
              State = MessageState.Valid,
              CommittableOffset = m.CommitableOffset
          })
          .Select(m => Validate(b))
          .GroupedWithin(batchSize, TimeSpan.FromSeconds(10))
          .SelectAsync(1, b => ApplicationInstallationLookup(b))
          .Select(b => MergeDataToDb(b))
          .Select(b =>
          {
              foreach (var message in b.Where(m => m.State == MessageState.Valid))
                  producer.Produce(
                      "applicationinstallationconnect",
                      new Message<long, ApplicationInstallationConnect> { Key = message.Message.ApplicationInstallationId, Value = message.Message });

              return b;
          })
          .Expand(group => group.GetEnumerator())
          .Select(m => m.CommittableOffset as ICommittable)
          .RunWith(Committer.Sink(committerDefaults), materializer);
  });

var control = flow
  .ToMaterialized(Sink.Ignore<Task>(), Keep.Both)
  .MapMaterializedValue(DrainingControl<Task>.Create)
  .Run(materializer);
4. I see error in log while start app
Logs
[DEBUG][28.09.2021 18:14:36][Thread 0001][EventStream(system)] Logger log1-DefaultLogger [DefaultLogger] started
[DEBUG][28.09.2021 18:14:36][Thread 0001][EventStream(system)] StandardOutLogger being removed
[DEBUG][28.09.2021 18:14:36][Thread 0001][EventStream(system)] Default Loggers started
Press any key to stop consumer.
[DEBUG][28.09.2021 18:14:38][Thread 0005][akka://system/system/kafka-consumer-2] Creating Kafka consumer with settings: {"KeyDeserializer":{},"ValueDeserializer":{},"PollInterval":"00:00:00.5000000","PollTimeout":"00:00:00.5000000","PartitionHandlerWarning":"00:00:05","WaitClosePartition":"00:00:00.5000000","CommitTimeWarning":"00:00:01","CommitTimeout":"00:00:15","CommitRefreshInterval":"-00:00:00.0010000","DrainingCheckInterval":"00:00:00.0300000","StopTimeout":"00:00:30","PositionTimeout":"00:00:05","BufferSize":50,"DispatcherId":"akka.kafka.default-dispatcher","AutoCreateTopicsEnabled":true,"Properties":{"bootstrap.servers":"localhost:15000","group.id":"aidb-group","enable.auto.commit":"false","auto.offset.reset":"earliest"},"MetadataRequestTimeout":"00:00:05","ConnectionCheckerSettings":{"Enabled":false,"MaxRetries":3,"CheckInterval":"00:00:15","Factor":2.0},"GroupId":"aidb-group"}
[DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceLogic`3 (akka://system)] #2 Assigning new partitions: ipmrequeststatistics [[0]], ipmrequeststatistics [[1]], ipmrequeststatistics [[2]], ipmrequeststatistics [[3]]
[DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[0]]
[DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[1]]
[DEBUG][28.09.2021 18:14:42][Thread 0004][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[2]]
[DEBUG][28.09.2021 18:14:42][Thread 0018][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[3]]
[DEBUG][28.09.2021 18:14:54][Thread 0028][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[0]]
[DEBUG][28.09.2021 18:15:02][Thread 0026][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[3]]
[DEBUG][28.09.2021 18:15:02][Thread 0027][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[2]]
28.09.2021 21:15:02 ipmrequeststatistics [[1]]: Message count=10000 | elapsed=20626 ms | Rps=484,80988
[WARNING][28.09.2021 18:15:03][Thread 0004][akka://system/system/kafka-consumer-2] RequestMessages from topic/partition ipmrequeststatistics [[2]] already requested by other stage ipmrequeststatistics [[2]]
28.09.2021 21:15:12 ipmrequeststatistics [[1]]: Message count=20000 | elapsed=9881 ms | Rps=1011,98175
28.09.2021 21:15:14 ipmrequeststatistics [[0]]: Message count=10000 | elapsed=20359 ms | Rps=491,1697
28.09.2021 21:15:20 ipmrequeststatistics [[2]]: Message count=10000 | elapsed=17892 ms | Rps=558,8913
28.09.2021 21:15:23 ipmrequeststatistics [[3]]: Message count=10000 | elapsed=20394 ms | Rps=490,33783
5. If I comment the slowest stage `Select(b => MergeDataToDb(b))` the error is not reproducible
Logs
[DEBUG][29.09.2021 9:57:34][Thread 0001][EventStream(system)] Logger log1-DefaultLogger [DefaultLogger] started
[DEBUG][29.09.2021 9:57:34][Thread 0001][EventStream(system)] StandardOutLogger being removed
[DEBUG][29.09.2021 9:57:34][Thread 0001][EventStream(system)] Default Loggers started
Press any key to stop consumer.
[DEBUG][29.09.2021 9:57:36][Thread 0018][akka://system/system/kafka-consumer-2] Creating Kafka consumer with settings: {"KeyDeserializer":{},"ValueDeserializer":{},"PollInterval":"00:00:00.5000000","PollTimeout":"00:00:00.5000000","PartitionHandlerWarning":"00:00:05","WaitClosePartition":"00:00:00.5000000","CommitTimeWarning":"00:00:01","CommitTimeout":"00:00:15","CommitRefreshInterval":"-00:00:00.0010000","DrainingCheckInterval":"00:00:00.0300000","StopTimeout":"00:00:30","PositionTimeout":"00:00:05","BufferSize":50,"DispatcherId":"akka.kafka.default-dispatcher","AutoCreateTopicsEnabled":true,"Properties":{"auto.offset.reset":"earliest","group.id":"aidb-group","enable.auto.commit":"false","bootstrap.servers":"localhost:15000"},"MetadataRequestTimeout":"00:00:05","ConnectionCheckerSettings":{"Enabled":false,"MaxRetries":3,"CheckInterval":"00:00:15","Factor":2.0},"GroupId":"aidb-group"}
[DEBUG][29.09.2021 9:57:39][Thread 0005][SubSourceLogic`3 (akka://system)] #2 Assigning new partitions: ipmrequeststatistics [[0]], ipmrequeststatistics [[1]], ipmrequeststatistics [[2]], ipmrequeststatistics [[3]]
[DEBUG][29.09.2021 9:57:39][Thread 0014][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[0]]
[DEBUG][29.09.2021 9:57:39][Thread 0019][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[1]]
[DEBUG][29.09.2021 9:57:39][Thread 0018][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[2]]
[DEBUG][29.09.2021 9:57:39][Thread 0016][SubSourceStreamStageLogic (akka://system)] 2 Starting SubSource for partition ipmrequeststatistics [[3]]
29.09.2021 12:58:00 ipmrequeststatistics [[0]]: Message count=10000 | elapsed=20332 ms | Rps=491,83264
29.09.2021 12:58:09 ipmrequeststatistics [[0]]: Message count=20000 | elapsed=9037 ms | Rps=1106,4485
29.09.2021 12:58:10 ipmrequeststatistics [[2]]: Message count=10000 | elapsed=30203 ms | Rps=331,09283
29.09.2021 12:58:10 ipmrequeststatistics [[3]]: Message count=10000 | elapsed=30209 ms | Rps=331,02295
29.09.2021 12:58:10 ipmrequeststatistics [[1]]: Message count=10000 | elapsed=30252 ms | Rps=330,55524

Expected behavior
Successfully consumer data from kafka partitions in parallel without warning and data loss

Actual behavior
Re-creating SubSource for partition and data loss.

Environment
Windows, .NET5.0

Additional context
Please help to configure Akka Streams for parallel reading data from partitions without warnings and data loss.

@Aaronontheweb Aaronontheweb added this to the 1.1.5 milestone Oct 27, 2021
@Aaronontheweb
Copy link
Member

Thanks for filing this - I'm taking a look at a similar issue to this now.

@Aaronontheweb
Copy link
Member

FWIW, unrelated to this, but came up during my investigation: .WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider)) does not help restart failed Kafka Source stages when a downstream fails, which seems off to me. Going to file a separate bug for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants