Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

case classes #42

Open
mazorigal opened this issue Jan 22, 2018 · 12 comments
Open

case classes #42

mazorigal opened this issue Jan 22, 2018 · 12 comments

Comments

@mazorigal
Copy link

Hi,

is there any example how to use kafka-streams-scala with case classes ?

Thanks,

@debasishg
Copy link
Contributor

Can u please clarify what u mean by use kafka-streams-scala with case classes ? Maybe an example of what u want to do ..

@mazorigal
Copy link
Author

mazorigal commented Jan 22, 2018

I mean to use case classes as the value of the message for the between steps in the stream processing topology.
meaning if I have different transformation steps (for example .map), I would like
to use case classes as the message value datatype between those steps, instead of
constantly SerDe between case class and string.

@debasishg
Copy link
Contributor

Currently we don't have higher level APIs towards this end.

@mazorigal
Copy link
Author

could you perhapses still share then the idea how that can be used in the meanwhile ?
I see that in the https://github.com/aseigneurin/kafka-streams-scala repo, base on which kafka-streams-scala was inspired, there is some example with case classes, does its same principal ?

@debasishg
Copy link
Contributor

The example with User case class applies to the current library as well. You need to define the Serde for the case class and pass it along. I was talking about higher level abstractions where the entire Serde thingy will not be exposed to the user.

@mazorigal
Copy link
Author

Would you support the higher level abstractions ? any roadmap to follow ?

@jeroenr
Copy link

jeroenr commented Feb 20, 2018

@mazorigal To give you some pointers I'm using spray-json serialization for my case classes for "internal" messaging (see https://gist.github.com/jeroenr/2895de32accd440c2558261a49952cab) and I'm using a Serde implementation (see https://gist.github.com/jeroenr/8b0cc0a4ce3b4d521de28267867bc003) based on avro4s (https://github.com/sksamuel/avro4s) to consume from Kafka Connect, which uses the avro format.

Hope this helps :)

@mazorigal
Copy link
Author

Thanks, its indeed helpful
small question: Since the case classes are used for "internal" messaging, why not to use
case class serialisation to bytes directly, without doing it via JSON ?
the nice JSON output I guess needed anyway only in the output topic which might be consumed by other apps.

@jeroenr
Copy link

jeroenr commented Feb 20, 2018

Yeah I use JSON for doing joins, for instance. Generally it's easier to filter and transform on. For internal state store I'm using Array[Byte] for the values.

@debasishg
Copy link
Contributor

Hi @mazorigal -

In the latest release 0.2.0 we have implicit serdes implementation, where u can define a case class, define its serde, have an implicit in scope and then the serde gets used all over without being explicit about it. Please have a look at https://github.com/lightbend/kafka-streams-scala/blob/develop/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala#L37-L61 ..

Does this answer your question ?

@mazorigal
Copy link
Author

mazorigal commented Mar 15, 2018

thanks!
few questions:

so if I would like to have my specific serde for case classes, all I need is to extend the custom serde class with StatelessScalaSerde trait and implement:

 def serialize(data: T): Array[Byte]
 def deserialize(data: Array[Byte]): Option[T]

?

Does StatelessScalaSerde is the one to be always used for the custom serde extension ? I see that there are multiple traits here:
https://github.com/lightbend/kafka-streams-scala/blob/74aabcd6369beef6915fbb77e92bdbf419da0403/src/main/scala/com/lightbend/kafka/scala/streams/ScalaSerde.scala

Just to make sure, the signature of the custom serde class the example is specific for AVRO serde implementation ?

[T >: Null : SchemaFor : FromRecord : ToRecord]

thnaks,

@joan38
Copy link
Contributor

joan38 commented May 3, 2018

@mazorigal
Serde: is the Java Kafka Stream implementation
ScalaSerde: is a Serde with no configuration and nothing to close (it's actually stateless)
StatelessScalaSerde: is allows you to implement a custom Serde without implementing Serializer and Deserializer.

All of this is confusing to me and I've opened a PR to add better helpers:
#70

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants