当生产者停止在java/spring中发送消息时,如何在windowedby+聚合中接收最后一条带窗口的kafka消息?

4uqofj5v  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(335)

就像我在标题中说的,我想在制作者停止发送消息时接收最后一条windowedby消息。目前我是手工做的,但首先,有一点描述。
我有一个Kafka生产者,是从一个文件读取行(每行是一个不同的json)每读取行发送到Kafka与500毫秒的时间段的差异。我只有120行(或json)。
我有一个消费程序,它使用生产者发送的所有json。代码:

final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));

  // Topology
  transactions
        .groupBy(this::groupedByTimeStampAndProtocolName)
        .windowedBy( TimeWindows
                .of( Duration.ofSeconds( 10 ))
                .grace( Duration.ofMillis( 0 )))
        .aggregate(
                tool::emptyAggregate,
                this::processNewRecord, //new TransactionAggregator(),
                Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
                        .withKeySerde(Serdes.String())
                        .withValueSerde(aggregateSerde)
        )
        .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
        .toStream()
        .foreach(sendAggregatesToCassandra);

我有预期的功能,我的意思是,它接收所有的记录,但要接收最后的窗口消息,我必须手动发送记录。
关于这个有两个问题:
有没有办法自动处理最后一个窗口?当生产者发送最后一个记录(第120个json)时,生产者将不再发送更多的记录。我是否应该等一段时间或其他什么都没关系。
我看到我必须发送3条记录来处理最后一个窗口。我不清楚为什么我必须发送3条记录(如果我发送<3条记录,最后一个窗口没有被完全占用)。有没有办法只发送一张唱片?更换缓冲器?改变一些属性?
我正在jdk 11中使用kafka流(带spring),我正在使用Docked kafka:
汇合公司/cp-kafka:5.5.1
zookeeper:3.4.14
Kafka:

<version.kafka>2.5.0</version.kafka>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>${version.kafka}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${version.kafka}</version>
            </dependency>

《Kafka》中使用的属性包括:

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID);
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

在制片人方面:

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.put(ProducerConfig.ACKS_CONFIG, "all");

拜托,你能帮帮我吗?

nlejzf6q

nlejzf6q1#

当你使用 suppress() (与 untilWindowCloses 配置)只有在“流时间”提前时,操作员才会发出最终结果。”“流时间”作为记录时间戳的函数计算,因此,如果没有处理任何记录,“流时间”将提前并 suppress() 不会散发出任何东西。因此,发送更多的记录是“流时间”可以提前的唯一方法。
注意:对于流式处理用例,假设数据从不停止,因此对于实际部署来说这不是问题——像您这样从文件中读取数据,并不是真正的流处理用例:我假设您从文件中读取数据是为了测试,对于这种情况,您的输入文件应该包含更多的记录以相应地提前流时间。
有关详细信息,请查看以下博客:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
我在Kafka峰会上也谈到了这个主题:https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/

相关问题