Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alpakka throughput is not as claimed #1331

Open
waytoharish opened this issue Mar 1, 2021 · 6 comments
Open

alpakka throughput is not as claimed #1331

waytoharish opened this issue Mar 1, 2021 · 6 comments

Comments

@waytoharish
Copy link

waytoharish commented Mar 1, 2021

Hi Team,
I am trying to use the alpakka-kafka and my consumer throughput in nowhere as claimed in the article -

https://www.lightbend.com/blog/alpakka-kafka-flow-control-optimizations

My Consumer code is as following-

 ConsumerSettings<Integer, String> kafkaConsumerSettings =
                ConsumerSettings.create(toClassic(actorSystem),
                         new IntegerDeserializer(), new StringDeserializer())
                        .withBootstrapServers("localhost:9092")
                        .withGroupId("docs-group")
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .withPollInterval(Duration.ofMillis(100))
                        .withStopTimeout(Duration.ofSeconds(5));
        // #kafka-setup

 Consumer.DrainingControl<Done> control =
                Consumer.sourceWithOffsetContext(kafkaConsumerSettings, Subscriptions.topics("jobs-topic1")) // (5)
                        .map(
                                consumerRecord -> { // (6)

                                    System.out.println("startTime>>>>>>>>>>>>>"+start);
                                    Movie movie = JsonMappers.movieReader.readValue(consumerRecord.value());
                                    System.out.println(">>>>>>>>>>>>>"+movie.toString());
                                    System.out.println(counter++);
                                    long end = System.currentTimeMillis();

                                    long seconds = TimeUnit.MILLISECONDS.toSeconds(end-start);
                                    System.out.println("End time>>>>>"+seconds);
                                    //insertDataToMongoDB(actorSystem,movie,start);
                                    //retriveDataFromMongo(actorSystem);
                                    return WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
                                })

Please could someone help me if i am doing something wrong here.Library used here-

implementation group: 'com.lightbend.akka', name: 'akka-stream-alpakka-elasticsearch_2.13', version: '2.0.2'
implementation group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.13', version: '2.0.7'
@seglo
Copy link
Member

seglo commented Mar 1, 2021

Reproducing benchmarks results is always a challenge. The numbers from the post were not suggesting a baseline performance metric. The point was to visualize the before and after metrics relative to each other that resulted from the improvement in the Kafka consumer.

The cluster was local to the consumer (running as docker containers) and the following criteria was used as listed in the post:

Number of records: 1,000,000
Test Topic Partitions: 100
Test Topic Replication Factor: 3
Message Size: 5KB
Number of Brokers: 3
All other Alpakka Kafka and Consumer configuration used defaults

There were no external network calls or sources of back-pressure (i.e. an elasticsearch flow or sink, if that's present in your stream).

@waytoharish
Copy link
Author

waytoharish commented Mar 2, 2021

I am also doing this test on the local machine with 8core and 16GB RAM. Please could you suggest if anything need to be changed on code side
To Consume 93473 it taking 8 sec with 1 Broker- 1 Topic -1 partitio

@seglo
Copy link
Member

seglo commented Mar 3, 2021

You haven't provided enough information for me to give you any advice.

How large is your local cluster, how many partitions are you using, what is the topic replication factor, what is the message size, what are your consumer properties, etc.

@waytoharish
Copy link
Author

I am having a local cluster with 1 partition, 1 replicas , and message i am using is very small
{
"title":"Architect",
"id":1235
}

My Consumer property is as below-

ConsumerSettings<Integer, String> kafkaConsumerSettings =
ConsumerSettings.create(toClassic(actorSystem),
new IntegerDeserializer(), new StringDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId("docs-group")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withPollInterval(Duration.ofMillis(100))
.withStopTimeout(Duration.ofSeconds(5));

Please let me know if you need any additional information.
need help here to fix this issue how could i achieve the throughput mentioned in the document

@seglo
Copy link
Member

seglo commented Mar 15, 2021

I used this benchmark test to produce the results for the blog post. It will output a csv with rows of values representing metrics from brokers and the Kafka consumer client at various time intervals during the test. My machine at the time was a Lenovo X1 Carbon 5th with 16GB ram, max CPU at the time (2 core, 4 with ht), and a SSD.

@waytoharish
Copy link
Author

i am using MAC with 16GB RAM, 8 core and SSD

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants