Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] Message can not consume from latest offset of specified subscription #494

Open
KevinyhZou opened this issue Dec 30, 2021 · 0 comments
Labels

Comments

@KevinyhZou
Copy link
Contributor

Describe the bug
I want to consume a pulsar topic from latest offset of a subcription, and I try to use the code below

FlinkPulsarSource<String> flinkPulsarSource =
                new FlinkPulsarSource<String>(
                        serviceUrl,
                        adminUrl,
                        new PulsarRecordDeserializeSchema(),
                        properties).setStartFromSubscription(groupid_antisdk, MessageId.latest);

and when restart the flink job, it consume from the subscritpion offset commited before, not the latest.

and then I try to use startFromLatest method, the code as below

  FlinkPulsarSource<String> flinkPulsarSource =
                new FlinkPulsarSource<String>(
                        serviceUrl,
                        adminUrl,
                        new PulsarRecordDeserializeSchema(),
                        properties).setStartFromLatest();

but the subscription name is a random string generated by connector, not we specified by pulsar.reader.subscriptionRolePrefix

To Reproduce
reproduce this problem as described above

Expected behavior
by use setStartFromSubscription(groupid_antisdk, MessageId.latest) we can start from latest offset of a subscription.

Screenshots

Additional context

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

1 participant