我在SpringBoot中与kafka合作,我正在尝试添加一个功能,允许我们启动一个服务,并让它将消息回放到某个特定的时间。
消费者是这样设置的
public interface ProductScenarioStream {
String SERVICE_REQUESTS_PRODUCT_PRICE = "serviceRequestsProductPrice";
String SERVICE_CONCLUDES_PRODUCT_SCENARIO = "serviceConcludesProductScenario";
@Output(SERVICE_REQUESTS_PRODUCT_PRICE)
MessageChannel serviceRequestsProductPrice();
@Input(SERVICE_CONCLUDES_PRODUCT_SCENARIO)
SubscribableChannel serviceConcludesProductScenario();
}
和
@Service
@EnableBinding(ProductScenarioStream.class)
@Profile("stream")
public class ProductStreamServiceImpl implements ProductStreamService
{
@Resource
private ProductScenarioStream productScenarioStream;
@Override
public void send(final ServiceRequestsProductPrice event) {
...
}
}
你知道我在哪里可以找到设置,允许我在这个场景中回放流上的偏移量吗?
1条答案
按热度按时间8fq7wneg1#
我猜你是说
replay
不是reply
-我已经编辑了你的问题。springcloudstream目前没有公开一种机制来寻找偏移量。
你可以用SpringKafka的
@KafkaListener
相反;实施ConsumerSeekAware
它为您提供了在启动期间(或任何时候)查找的机制。