Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potentially Low Throughput in Sink Implementation #267

Open
n3ziniuka5 opened this issue Jan 26, 2021 · 1 comment
Open

Potentially Low Throughput in Sink Implementation #267

n3ziniuka5 opened this issue Jan 26, 2021 · 1 comment

Comments

@n3ziniuka5
Copy link

I am currently investigating what could be causing low throughput to Pulsar in our application. I looked at PulsarSinkGraphStage.scala implementation and the following lines caught my eye:

override def preStart(): Unit = {
  producer = createFn()
  produceCallback = getAsyncCallback {
    case Success(_) =>
      pull(in)
    case Failure(e) =>
      logger.error("Failing pulsar sink stage", e)
      failStage(e)
  }
  pull(in)
}

override def onPush(): Unit = {
  try {
    val t = grab(in)
    logger.debug(s"Sending message $t")
    producer.sendAsync(t).onComplete(produceCallback.invoke)
  } catch {
    case e: Throwable =>
      logger.error("Failing pulsar sink stage", e)
      failStage(e)
  }
}

I haven't implemented any akka-streams Sinks myself, so my assumptions could be wrong here. But if you are doing producer.sendAsync(t).onComplete(produceCallback.invoke) doesn't that mean that a new message will be pulled only after a successful response from producer.sendAsync? Meaning that messages are effectively sent one-by-one to Pulsar and producer batching settings have no effect?

@n3ziniuka5
Copy link
Author

Here is the manually constructed sink that helped with throughput

Sink
  .foreachAsync[ProducerMessage[T]](300)(msg => producer.sendAsync[Task](msg).void.runToFuture)
  .mapMaterializedValue { completionFuture =>
    val flushAndClose = for {
      _ <- producer.flushAsync
      _ <- producer.closeAsync
    } yield ()
  
    Task.fromFuture(completionFuture).guarantee(flushAndClose).runToFuture
  }

This is very simple and it works, but not sure about potential issues of using this approach. I think a custom sink could be based on a MapAsync inlet with configurable parallelism and then PulsarSinkGraphStage sink would only be responsible for correctly closing the producer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant