如何在Session TimeWindow中使用aggregation + suppress接收最后一条窗口化的Kafka消息?

vbopmzt1  于 2023-06-21  发布在  Apache
关注(0)|答案(1)|浏览(126)

就像我在标题中说的,我想收到最后一条窗口消息。目前,我是通过使用抑制方法来实现的,但首先,有一点描述。
我有一个生产者,它向Kafka Streams发送一个需要在给定时间内添加的整数列表。为此,我使用了一个SessionTimeWindow,我想一直聚合它,直到我们收到最后一条消息。窗口关闭后,我希望接收最后一个计数值。这是拓扑:

StreamsBuilder builder = new StreamsBuilder();

builder.stream("input_topic", Consumed.with(Serdes.String(), new Serdes.ListSerde<Integer>()))
  .flatMapValues((value) -> value)
  .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
  .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(10)))
  .aggregate(
    ()-> 0,
    ((key, value, aggregate) -> aggregate + value),
    ((aggKey, aggOne, aggTwo) -> aggOne + aggTwo),
    Materialized.with(Serdes.String(), Serdes.Integer()))
  .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
  .toStream()
  .map((key, value) -> new KeyValue<String, Integer>(key.key(), value))
  .to("output_topic", Produced.with(Serdes.String(), Serdes.Integer()));

Topology topology = builder.build();

我尝试使用本地Kafka服务器运行拓扑,或者使用TopologyTestDriver发送一些消息。尽管如此,我总是得到相同的结果,输出主题中的任何消息。
下面是我的TestTopologyDriver的流配置:

Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");        
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);       
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test");       
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);

测试定义:

TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties, Instant.now());

TestInputTopic<String, List<Integer>> inputTopic = topologyTestDriver.createInputTopic("input_topic", new StringSerializer(), new Serdes.ListSerde<Integer>().serializer());
TestOutputTopic<String, Integer> outputTopic = topologyTestDriver.createOutputTopic("output_topic", new StringDeserializer(), new IntegerDeserializer());

inputTopic.pipeInput("1", List.of(1,2,3));
inputTopic.pipeInput("1", List.of(1));
List<TestRecord<String, Integer>> outputList = outputTopic.readRecordsToList();

assertEquals(1, outputList.size());
assertEquals(7, outputList.get(0).value());

使用TestTopologyDriver,我尝试了很多情况:

  • 在另一个线程中运行TopologyTestDriver,并使主线程休眠,没有任何结果。
  • 发送带有时间戳的记录。
  • 使用时间戳提取器。
  • 调用TopologyTestDriver.advanceWallClockTime();

在所有情况下,输出都是一个空列表。当我使用本地Kafka服务器时,我没有收到任何消息(与测试相同)
我错过了什么?

o8x7eapl

o8x7eapl1#

由于大多数操作都是基于时间戳的,通常只需:

  1. inputTopic.advanceTime([消息通过所有窗口/抑制所需的持续时间])
  2. inputTopic.pipeInput(“dummyMessage”,List.of(...))
    在此之后,您应该能够读取outputTopic。
    或者,您可以使用inputTopic.pipeInput(“messaageKey”,value,timestamp)并调整这些时间戳。然后发送带有适当未来时间戳的伪消息。

相关问题