如何使用精确一次模式实现处理器api

5sxhfpxr  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(346)

我正在研究kafka流并使用处理器api来实现我的用例。下面的代码显示了将消息转发到下游并在调用之前中止的处理方法 commit . 这将导致重新处理流并在接收器上复制消息。

public void process(String key, String value) {

    context.forward(key, value);

    .. 
    ..
    //killed

    context.commit();
}

加工保证参数:

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

有没有办法只在调用 commit 声明。如果不是,什么是正确的方法来实现一次模式。
谢谢您

ymdaylpp

ymdaylpp1#

确保你的Flume在里面 read_committed 消费者模式,因此它将只看到提交的消息。如果消息是在事务中止之前写入输出主题的,那么在事务中止时,消息仍然存在,只是没有在提交时标记。事务第二次完成时,消息和提交标记被添加到输出主题中。如果你不在家看书 read_committed 模式,然后您将看到所有消息(包括未提交的消息),并且它可能会显示为重复消息,因为您会看到中止的结果和提交的结果。
从这里的0.11javadochttps://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
事务是在kafka 0.11.0中引入的,在kafka 0.11.0中,应用程序可以原子地写入多个主题和分区。为了使其工作,应该将从这些分区读取的使用者配置为只读取提交的数据。这可以通过设置consumer配置中提交的isolation.level=read\ u来实现。
在read\u committed模式下,使用者将只读取已成功提交的事务性消息。它将一如既往地继续读取非事务性消息。在read\u committed模式下没有客户端缓冲。相反,read\u提交的使用者分区的结束偏移量将是分区中属于打开事务的第一条消息的偏移量。这种偏移称为“最后稳定偏移”(lso)。
read\u提交的使用者将只读取到lso,并过滤掉任何已中止的事务性消息。lso还影响seektoend(collection)和endoffset(collection)对于read\u提交的使用者的行为,详细信息在每个方法的文档中。最后,还将fetch-lag度量调整为相对于读用户的lso。带有事务性消息的分区将包含表示事务结果的提交或中止标记。这些标记没有返回到应用程序,但在日志中有一个偏移量。因此,从包含事务性消息的主题中读取的应用程序将看到消耗的偏移量中的间隙。这些丢失的消息将是事务标记,它们将在两个隔离级别中为使用者过滤掉。此外,使用read\ committed consumer的应用程序还可能会看到由于事务中止而导致的间隙,因为这些消息不会由consumer返回,而是具有有效的偏移量。

bsxbgnwa

bsxbgnwa2#

您可能希望将context.commit() Package 在finally块下,以确保调用它。但是,您还需要确保在成功处理之后确实调用了该函数。

相关问题