就像我在标题中说的,我想收到最后一条窗口消息。目前,我是通过使用抑制方法来实现的,但首先,有一点描述。
我有一个生产者,它向Kafka Streams发送一个需要在给定时间内添加的整数列表。为此,我使用了一个SessionTimeWindow,我想一直聚合它,直到我们收到最后一条消息。窗口关闭后,我希望接收最后一个计数值。这是拓扑:
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input_topic", Consumed.with(Serdes.String(), new Serdes.ListSerde<Integer>()))
.flatMapValues((value) -> value)
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(10)))
.aggregate(
()-> 0,
((key, value, aggregate) -> aggregate + value),
((aggKey, aggOne, aggTwo) -> aggOne + aggTwo),
Materialized.with(Serdes.String(), Serdes.Integer()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.map((key, value) -> new KeyValue<String, Integer>(key.key(), value))
.to("output_topic", Produced.with(Serdes.String(), Serdes.Integer()));
Topology topology = builder.build();
我尝试使用本地Kafka服务器运行拓扑,或者使用TopologyTestDriver发送一些消息。尽管如此,我总是得到相同的结果,输出主题中的任何消息。
下面是我的TestTopologyDriver的流配置:
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test");
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
测试定义:
TopologyTestDriver topologyTestDriver = new TopologyTestDriver(topology, properties, Instant.now());
TestInputTopic<String, List<Integer>> inputTopic = topologyTestDriver.createInputTopic("input_topic", new StringSerializer(), new Serdes.ListSerde<Integer>().serializer());
TestOutputTopic<String, Integer> outputTopic = topologyTestDriver.createOutputTopic("output_topic", new StringDeserializer(), new IntegerDeserializer());
inputTopic.pipeInput("1", List.of(1,2,3));
inputTopic.pipeInput("1", List.of(1));
List<TestRecord<String, Integer>> outputList = outputTopic.readRecordsToList();
assertEquals(1, outputList.size());
assertEquals(7, outputList.get(0).value());
使用TestTopologyDriver,我尝试了很多情况:
- 在另一个线程中运行TopologyTestDriver,并使主线程休眠,没有任何结果。
- 发送带有时间戳的记录。
- 使用时间戳提取器。
- 调用
TopologyTestDriver.advanceWallClockTime();
在所有情况下,输出都是一个空列表。当我使用本地Kafka服务器时,我没有收到任何消息(与测试相同)
我错过了什么?
1条答案
按热度按时间o8x7eapl1#
由于大多数操作都是基于时间戳的,通常只需:
在此之后,您应该能够读取outputTopic。
或者,您可以使用inputTopic.pipeInput(“messaageKey”,value,timestamp)并调整这些时间戳。然后发送带有适当未来时间戳的伪消息。