Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make message optional in topic producer event stream #2708

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

bossqone
Copy link

Pull Request Checklist

  • Have you read through the contributor guidelines?
  • Have you signed the Lightbend CLA?
  • Have you added copyright headers to new files?
  • Have you updated the documentation?
  • Have you added tests for any changed functionality?

Purpose

PR introduces support for optional message in TopicProducer API. This way we can advance kafka offset even without producing message.

Background Context

Current design requires to produce (Message, Offset) pair for message to be published. So in case we filter out messages, it won't commit offset until another message pass through, which results in unnecessary/unwanted replays when app restarts (as mentioned in https://www.lagomframework.com/documentation/current/scala/MessageBrokerApi.html#Filtering-events).

Copy link
Member

@octonato octonato left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bossqone, thanks for your efforts in improving the API.

As said in Gitter, this is a breaking API. Instead, we need to provide an alternative API.

This has been discussed a long time ago on this other issue #844

Moreover, @ignasi35 is doing some other work on topic producer that may impact it as well (see #2700).

It will be nice to coordinate on that to avoid issues.

Copy link
Contributor

@ignasi35 ignasi35 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As much as I like the idea of fixing this issue in the TopicProducer API I'm still divided on what's the best implementation. On one hand we have the PartialFunction @renatocaval suggested in #844, then we have this which relies on Tuple2[Option[T], Offset] and I'm thinking of a third option:

  • instead of Tuple2[Option[T], Offset] or japi.Pair<Optional<T>, Offset> introduce an ADT supporting two operations EmitAndCommit<T>(t,offset) and Justcommit(offset) and then provide new factory methods in TopicProducer for that ADT.

This approach is more powerful than the two above in that the user can decide 3 different things:

  1. I want to emit t (and we force that emitting is paired with committing)
  2. I don't want to emit anything but I want to commit this offset
  3. I don't want to emit and I'm fine not committing anything and I may commit some offset later.

Note how option 3. becomes possible (as opposed to the Option and the PartialFunction) and allows for fewer commits into the database.

@@ -89,7 +90,7 @@
public static <Message, Event extends AggregateEvent<Event>>
Topic<Message> taggedStreamWithOffset(
AggregateEventShards<Event> shards,
BiFunction<AggregateEventTag<Event>, Offset, Source<Pair<Message, Offset>, ?>>
BiFunction<AggregateEventTag<Event>, Offset, Source<Pair<Optional<Message>, Offset>, ?>>
eventStream) {
return new TaggedOffsetTopicProducer<>(shards.allTags(), eventStream);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in TopicProducer are the main problem in this PR as these three edits break the API. Instead, we could consider adding methods as alternatives to the existing ones. I would also mark the new methods as @ApiMayChange and keep the original methods as non-deprecated.

Finally, I'd add a comment on the new methods to this PR or #844 so users finding the API could locate and comment on the discussion.

@bossqone
Copy link
Author

bossqone commented Apr 6, 2020

Hi, sorry for late response. Based on your comments it looks like more work then I naively expected. I'm fairly new to lagom/akka (or even scala) world and some stuff (e.g. binary compatibility) is unknown for me.

Anyway, if this feature is something you might/want to deliver soon please take over and work on it.
If not, then I can try it out and learn something new. Btw, I like the idea of solution with ADTs instead of partial function/option.

@ignasi35
Copy link
Contributor

Hi @bossqone, I don't think anyone on the team is working on this, so feel free to move on at your own pace. :-)

@ignasi35
Copy link
Contributor

Hi @bossqone, a solution based on an ADT will need a public set of classes implementing that ADT in service/core/kafka/server. The reason why the location is that is because we need the TopicProducerActor to have access to the ADT so it can decide wether the item contains a message to be published or is a noop-with-offset-storage.

I think we will need to step back from a case class-based ADT and implement the ADT manually keeping in mind it must be idiomatic for Java users too. That's a situation different from other Lagom modules where each implementation (Java and Scala) are 100% dedicated and idiomatic.

Then, once we have the ADT, we'll need new factory methods on TopicProducer(java and scala), then it's ScaladslRegisterTopicProducers and JavadslRegisterTopicProducers ... (you've been there already :-) ).

@ignasi35 ignasi35 mentioned this pull request May 12, 2020
5 tasks
@ignasi35
Copy link
Contributor

Hi @bossqone , as we discussed elsewhere (gitter DMs) you are right in pointing out that service/core/kafka/server is not the correct location.

I keep forgetting about the particularities of how broker/kafka projects are depending on each other in the lagom build.

I'm affraid this takes us to the following situation:

  1. have a java ADT in service/javadsl/broker which is Java idiomatic and user-facing (used via methods in TopicProducer)
  2. have a scala ADT in service/scala/broker which is Scala idiomatic and user-facing
  3. in ScaladslRegisterTopicProducers and JavadslRegisterTopicProducers map each ADT to an internal representation that's used in TopicProducerActor. See an example of such adapter in this code)

@bossqone
Copy link
Author

Hi @ignasi35 , can you please help me understand what is ProjectionSpi and how should it behave for new EmitAndCommit, Commit, Skip flows? Especially for Skip when there is no offset to send into ProjectionSpi methods.

@ignasi35
Copy link
Contributor

ignasi35 commented May 14, 2020

Especially for Skip when there is no offset to send into ProjectionSpi methods.

Hmmm, that was unexpected.

What is the purpose of Skip? I mean I understand the follwing:

  • EmitAndCommit: current behavior
  • Commit: commit the offset but don't commit the offset emit anything
  • Skip: sounds like a case we don't need. I mean, users that want to filter out a message (and skip committing the offset) may just use a filter transformer in the Flow. But still, I can't see any real use for that since skipping messages can make things faster but is prone to cause repeated processing if the stream is restarted.

I'd rather not support Skip, TBH. I'm curious to hear your ideas on why we may need it.


ProjectionSpi doesn't make sense on the Skip case. The message is skipped, there's no need to report anything because that anything didn't happen ;-)

@ignasi35
Copy link
Contributor

(pressed Comment too fast)

About the main question on how ProjectionSpi should behave on each case:

  • ProjectionSpi#startProcessing is invoked before the message is offered to the user's Flow os it is always invoked by the framework in AbstractPersistenEntityRegistry.
  • ProjectionSpi#afterUserFlow is invoked after running the user's Flow and before emiting the event and storing the offset.
  • ProjectionSpi#completedProcessing is invoked when the message is emitted and the offset update DB transaction has completed.
  • ProjectionSpi#failed is invoked when the stream fails.

So, EmitAndCommit and Commit will have the exact same usage of all methods in ProjectionsSpi. And, I don't think Skip should even exist :-) #ISoundLikeABrokenRecord

@bossqone
Copy link
Author

In your first comment you mentioned:

3. I don't want to emit and I'm fine not committing anything and I may commit some offset later.

Note how option 3. becomes possible (as opposed to the Option and the PartialFunction) and allows for fewer commits into the database.

But you're right, this can be achieved with filter operator on developer side. I'll align my implementation with this knowledge and get rid of Skip flow.

@ecassady
Copy link

Hi everyone,

I came across this PR while looking into a solution for a use-case we have. We need to be able to publish zero or more domain events to Kafka while committing a single entity offset.

I solved this problem for the time being by making a version of TopicProducer (along with the required related code) that operates on Seq[Message], similar to the original proposed solution that used Option. This worked well for me because I could turn the Seq into a specific type of ProducerMessage in TopicProducerActor to get multi-message publishing as supported by the Kafka client itself. Just to illustrate, here's the main change I made:

  private def kafkaFlowPublisher(endpoints: String): Flow[Seq[Message], _, _] = {

...

    def record(message: Message): ProducerRecord[String, Message] =
      new ProducerRecord[String, Message](topicId, keyOf(message), message)

    Flow[Seq[Message]]
      .map {
        case Nil =>
          ProducerMessage.passThrough[String, Message, NotUsed](NotUsed)
        case message :: Nil =>
          ProducerMessage.Message(record(message), NotUsed)
        case messages =>
          ProducerMessage.multi(messages.toList.map(record), NotUsed)
      }
      .via {
        ReactiveProducer.flexiFlow(producerSettings(endpoints))
      }
  }

Since we didn't need it, I didn't concern myself with Java support, and my solution doesn't support the "skip" case that was discussed above.

My use case could be supported with the ADT approach as well, of course. I don't know if this PR is the right place for this discussion but I would love to get your thoughts on the requirement and possibly see it incorporated into this change if possible.

Thanks!
Eric

@lightbend-cla-validator

At least one pull request committer is not linked to a user. See https://help.github.com/en/articles/why-are-my-commits-linked-to-the-wrong-user#commits-are-not-linked-to-any-user

@bossqone bossqone force-pushed the topic-producer-optional-message branch from 7e8dc2c to e78966d Compare July 17, 2020 11:53
@ignasi35
Copy link
Contributor

Hi @bossqone, I see you're back working on this. It's great to see you back. Timing is not great as some of us may be going on vacation in a week or two so I apologise in advance if we're not very responsive.

Copy link
Contributor

@ignasi35 ignasi35 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @bossqone, this looks really good!

I was a bit surprised to see the support for multi-message sneak into this PR. I'd rather keep things a bit simpler. I'm not familiar with the MultiMessage imlementation/semantics in Alpakka-Kafka so I'm not sure how the delivery guarantees would vary. The other concern around multi-message is that I got the impression the user is left responsible to choose what Offset they pass into the EmitMultipleAndCommit command. This sound more flexible but it'll require a more careful documentation.

Talking about documentation, I noticed your TODO's for api docs, but did you plan on adding other documentation on this PR already? It'd be good to have some small guide for user to migrate and ref docs on the various commands (and the details around EmitMultipleAndCommit I mentioned above).

Keep it up! :-)

return TaggedOffsetTopicProducer.fromEventAndOffsetPairStream(shards.allTags(), eventStream);
}

// TODO(bossqone): add docs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in these docs (or maybe somewhere else) there should be some guidance for users to migrate.

Comment on lines +23 to +39
object TaggedOffsetTopicProducer {

def fromTopicProducerCommandStream[Message, Event <: AggregateEvent[Event]](
tags: PSequence[AggregateEventTag[Event]],
readSideStream: BiFunction[AggregateEventTag[Event], Offset, JSource[TopicProducerCommand[Message], _]]
): TaggedOffsetTopicProducer[Message, Event] = new TaggedOffsetTopicProducer[Message, Event](tags, readSideStream)

def fromEventAndOffsetPairStream[Message, Event <: AggregateEvent[Event]](
tags: PSequence[AggregateEventTag[Event]],
readSideStream: BiFunction[AggregateEventTag[Event], Offset, JSource[Pair[Message, Offset], _]]
): TaggedOffsetTopicProducer[Message, Event] =
new TaggedOffsetTopicProducer[Message, Event](
tags,
(tag, offset) =>
readSideStream.apply(tag, offset).map(pair => new TopicProducerCommand.EmitAndCommit(pair.first, pair.second))
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These extra allocations are a good motivator for users to migrate from the old impl to the new one. Worth mentioning on the migration guide.

_.map(offset =>
ProjectionSpi
.completedProcessing(workerCoordinates.projectionName, workerCoordinates.tagName, offset)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The instance returned by completedProcessing is not guaranteed to be the same as the one passed in. Your code is correct.

Comment on lines 13 to 18
object TopicProducerCommand {
case class EmitMultipleAndCommit[Message](messages: immutable.Seq[Message], offset: Offset)
extends TopicProducerCommand[Message]
case class EmitAndCommit[Message](message: Message, offset: Offset) extends TopicProducerCommand[Message]
case class Commit[Message](offset: Offset) extends TopicProducerCommand[Message]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using case classes will difficult future evolution of the API.

This should be marked as @ApiMayChange or refactorred as non-case class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially considering the upcoming need for metadata (#2835 ).

Copy link
Author

@bossqone bossqone Aug 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to normal class. InternalTopicProducerCommand should be changed as well, or it can stay as case class?

@bossqone
Copy link
Author

Hi @ignasi35,
you're right EmitMultipleAndCommit was really sneaky so I've reverted it. It can be done in separate PR if still needed.

Comment on lines 13 to 30
final class EmitAndCommit[Message](val message: Message, val offset: Offset) extends TopicProducerCommand[Message] {
override def equals(that: Any): Boolean = that match {
case command: EmitAndCommit[Message] => message.equals(command.message) && offset.equals(command.offset)
case _ => false
}

override def hashCode(): Int = Objects.hash(message.asInstanceOf[AnyRef], offset)
}

final class Commit[Message](val offset: Offset) extends TopicProducerCommand[Message] {
override def equals(that: Any): Boolean = that match {
case command: Commit[Message] => offset.equals(command.offset)
case _ => false
}

override def hashCode(): Int = Objects.hash(offset)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great.

As this is public API there should be API docs.

In future improvements, doesn't have to be in this PR, we could add a toString and also some factory methods in object TopicProducerCommand so building instances of EmitAndCommit is shorter than new TopicProducerCommand.EmitAndCommit.

@ignasi35
Copy link
Contributor

This is almost ready. I think only documentation is left.

@solarmosaic-kflorence
Copy link

@bossqone @ignasi35 will this be merged soon? It's blocking #2835 which is a feature we'd really like to have.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

6 participants