使用Kafka流从多个主题累积事件

vohkndzv  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(415)

如果这是个愚蠢的问题,我很抱歉。
我有一个场景,其中我有3个来自上游服务的主题(没有键控)。不幸的是,我无法改变这三个主题的行为。
上游服务在一天结束时批量发布所有消息,我需要获得事务的累积视图,因为事务的顺序对下游服务很重要。
我知道我不能对主题的不同分区中的消息重新排序,所以我想如果我可以累积它们,我的服务就可以在处理之前获取累积的结果并重新排序。
然而,我注意到一个奇怪的行为,我希望有人能澄清我错过了什么。
当我使用1到500个帐户执行此操作时,我看到在输出主题中累积并显示了500条消息。
但是,当我对10000个帐户尝试相同的操作时,我看到的输出比应该的多(关于输出主题的13000条消息)。

KStream<String, TransactionAccumulator> transactions =
        disbursements
            .merge(repayments)
            .merge(fees)
            .groupBy(
                (k, v) -> v.getAccountId(),
                with(
                    String(),
                    serdeFrom(
                        new JsonSerializer<>(mapper),
                        new JsonDeserializer<>(Transaction.class, mapper))))
            .windowedBy(SessionWindows.with(Duration.of(1, ChronoUnit.MINUTES)))
            .aggregate(
                TransactionAccumulator::new,
                (key, value, aggregate) -> aggregate.add(value),
                (aggKey, aggOne, aggTwo) -> aggOne.merge(aggTwo),
                Materialized.with(
                    String(),
                    serdeFrom(
                        new JsonSerializer<>(mapper),
                        new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
            .toStream((key, value) -> key.key());

如前所述,上游服务在一天结束时批量发布所有事件(而不是实时发布)。
我很感激我在这里错过了什么,因为对于较小的卷,它似乎工作。

更新1

我尝试了使用抑制的建议,试图只发送最后一个窗口。
但是,当使用它时,它基本上不会向输出主题发布任何消息,尽管我看到“ktable suppress state store”中有消息
带有suppress的更新代码如下所示。

disbursements
        .merge(repayments)
        .merge(fees)
        .groupBy(
            (key, value) -> value.getAccountId(),
            Grouped.with(
                Serdes.String(),
                Serdes.serdeFrom(
                    new JsonSerializer<>(mapper),
                    new JsonDeserializer<>(Transaction.class, mapper))))
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMinutes(1)))
        .aggregate(
            TransactionAccumulator::new,
            (key, value, aggregate) -> aggregate.add(value),
            Materialized.with(
                Serdes.String(),
                Serdes.serdeFrom(
                    new JsonSerializer<>(mapper),
                    new JsonDeserializer<>(TransactionAccumulator.class, mapper))))
        .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
        .mapValues(
            value -> {
              LOGGER.info(
                  "Sending {} Transactions for {}",
                  value.getTransactions().size(),
                  value.getAccountId());
              return value;
            })
        .toStream((key, value) -> key.key());

我也没有看到日志消息的介绍。为了清楚起见,我在这个实验中使用了springcloudstream,我在stream应用程序上看到的最终日志条目如下所示。

INFO 23436 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [StreamConsumer-consume-applicationId-de25a238-5f0f-4d84-9bd2-3e7b01b7f0b3] State transition from REBALANCING to RUNNING
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
INFO 23436 --- [-StreamThread-1] o.a.k.s.s.i.RocksDBTimestampedStore      : Opening store KSTREAM-AGGREGATE-STATE-STORE-0000000006.1583625600000 in regular mode
uqdfh47h

uqdfh47h1#

对不起,我还不能评论,但这是我的两分钱: KGroupedStream.aggregate() :kafka stream使用记录缓存来控制从的物化视图(或ktable)发出聚合更新的速率 aggregate 状态存储和下游处理器。e、 g带信息:

("word1", 4)
("word1", 2)
("word2", 3)
("word1", 1)

以及你的字数结构:

wordCntPerSentenceKStream
    .groupByKey()
    .aggregate(() -> 0, (word, newWordCnt, aggsWordCnt) -> aggsWordCnt + newWordCnt, Materialized.as("word-cnt-store").withValueSerde(Serdes.Integer())
    .toStream();

您可能会收到如下下游消息:

("word1", 6)
("word2", 3)
("word1", 7)

因此,我猜测输入主题可能包含单个accountid的多个事务,并且记录缓存在缓存被刷新时被刷新( cache.max.bytes.buffering )已满或 commit.interval.ms 他很满足。
如果你的sink是幂等的,你可以重写你的sink TransactionAccumulator 或者您可以使用 KTable.suppress() 如这里所述,只发出聚合窗口的最后一条消息。

相关问题