Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] Question about Flink Event-time support. #615

Open
moweonlee opened this issue Sep 11, 2022 · 0 comments
Open

[BUG] Question about Flink Event-time support. #615

moweonlee opened this issue Sep 11, 2022 · 0 comments
Labels

Comments

@moweonlee
Copy link

moweonlee commented Sep 11, 2022

Describe the bug

I think I have found a problem when I tries to execute Flink's job with both pulsar-connector especially in flink's TimeCharacteristic.EventTime mode.

Why I think so is that I have tried same code with Kafka source It works well with the Event time window.

By the way with Pulsar connector, Flink doesn't seems to recognize its TimeCharacteristic is EventTime.

And Flink's dashboard says that it is not running in EventTime Window mode like below.

image

I have checked that timestamps that I'm creating with source data is normal like this.

[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844581]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844581]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845321]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845321]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845307]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911845307]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844954]
[10.172.27.108]:[2022-09-11T15:57:26.540Z]:timstamp : [1662911844954]

Could you please let me know whether if pulsar connector support watermarking code like mine ?

Here is the environment that I have tested so far.

  • Flink ( version 1.14.5 )
  • Pulsar Connector ( version 1.14.3.4)
  • Event time processing code is under here.
public class StreamRealtime {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.getConfig().setGlobalJobParameters(parameterTool);

        // Only in Eventtime 
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        String adminServiceUrl     = "http://10.96.77.102:31143";
        String brokerServiceUrl   = "pulsar://10.96.77.102:32543";
        String inputTopic        = "persistent://nds/nds/lcs-refined-topic";

        int parallelism = 30;
        Properties properties = new Properties();
        properties.setProperty("topic", inputTopic);

        FlinkPulsarSource<String> source = new FlinkPulsarSource<>(
            brokerServiceUrl,
            adminServiceUrl,
            new SimpleStringSchema(),
            properties
        ).setStartFromLatest();

        DataStream<Tuple3<String, String, Long>> stream = env
                .setParallelism( parallelism )
                .addSource(source)
                .setParallelism( parallelism )
                .flatMap( LCSSTMSUrlMapperRefined.create() )
                .setParallelism( parallelism );

        //Event Timestamp 
        stream
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(200))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                                        Debug.sendUDP("timstamp : [" + element.f2 +"]\n");
                                        return element.f2;
                                }
                        })
                )
                .setParallelism ( parallelism )
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply( LCSSTMSProcessor.create(0))
                .setParallelism ( parallelism );

        env.execute("Pulsar NDS Streaming");
    }
}
                                                     

To Reproduce
Steps to reproduce the behavior:

  1. Developed Flink eventtime aggregation code with pulsar-connector.
  2. Checked whether if message have valid timestamps. --> ( okay, It works)
  3. Checked whether if my code works in Flink's processing-time window and pulsar-connector after ... ( -->okay, It works)
  • Removals of watermarking assigner and change to EventProcessing
  • Changed Configuration from TimeCharacteristic.EventTime -> TimeCharacteristic.ProcessingTime

Expected behavior

  • Flink's eventtime windows fired regularly.

Screenshots

Posted with description.

Additional context

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

1 participant