Pulsar functions are a great way of doing something simple with the messages in topic. However, at the moment of the 2.5.1 pulsar, the default version of a subscription points to the latest position, and you can't change it within the function creation call. Which means function would see only messages after function was created. How to adjust that? Let's find out.
First of all, function config builder allow you setup subscription name among other things:
FunctionConfig.builder().subName("subscription-name")
Just do it directly right before the function creation call.
pulsarAdmin.topics().createSubscription("topic", "subscription-name", MessageId.earliest);
However, this would create Shared
type of subscription, which could mixup the
messages in a target topic. Which leads to the next option.
pulsarClient.newConsumer()
.subscriptionType(SubscriptionType.Failover)
.consumerName(UUID.randomUUID())
.subscriptionName("subscription-name")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic("topic")
.subscribe()
.close();
Without receiving messages that would leave subscription which you can just simply pickup
by a function. That wouldn't work with Exclusive
type of subscription,
but you can set up everything here.
pulsarAdmin.topics().resetCursor("topic", "subscription-name", MessageId.earliest);
Can't really find a usecase for that from my side, but this option worth mentioning as well.
Hopefully, that would help to mitigate the current miss of the API, which have been requested for change in apache/pulsar#6531