在kafka中重放消息

zfciruhq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(481)

我在SpringBoot中与kafka合作,我正在尝试添加一个功能,允许我们启动一个服务,并让它将消息回放到某个特定的时间。
消费者是这样设置的

public interface ProductScenarioStream {
    String SERVICE_REQUESTS_PRODUCT_PRICE = "serviceRequestsProductPrice";
    String SERVICE_CONCLUDES_PRODUCT_SCENARIO = "serviceConcludesProductScenario";

    @Output(SERVICE_REQUESTS_PRODUCT_PRICE)
    MessageChannel serviceRequestsProductPrice();

    @Input(SERVICE_CONCLUDES_PRODUCT_SCENARIO)
    SubscribableChannel serviceConcludesProductScenario();
}

@Service
@EnableBinding(ProductScenarioStream.class)
@Profile("stream")
public class ProductStreamServiceImpl implements ProductStreamService 
{
    @Resource
    private ProductScenarioStream productScenarioStream;

    @Override
    public void send(final ServiceRequestsProductPrice event) {
     ...
    }
 }

你知道我在哪里可以找到设置,允许我在这个场景中回放流上的偏移量吗?

8fq7wneg

8fq7wneg1#

我猜你是说 replay 不是 reply -我已经编辑了你的问题。
springcloudstream目前没有公开一种机制来寻找偏移量。
你可以用SpringKafka的 @KafkaListener 相反;实施 ConsumerSeekAware 它为您提供了在启动期间(或任何时候)查找的机制。

相关问题