嵌入的kafka:ktable+ktable leftjoin生成重复记录

gxwragnw  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(268)

我来寻找神秘的知识。
首先,我有两对主题,每对中的一个主题与另一个主题相结合。后一个主题正在形成两个ktable,用于ktable+ktable leftjoin。问题是,当我向任意一个ktable生成一条记录时,leftjoin生成三条记录。我希望有两个形式的记录(a-null,a-b),但是我得到了(a-null,a-b,a-null)。我已经确认ktables每个都收到一条记录。
我已经摆弄了cache\u max\u bytes\u buffering\u config来启用/禁用状态存储缓存。上面的行为是将cache\u max\u bytes\u buffering\u config设置为0。当我使用cache\u max\u bytes\u buffering\u config的默认值时,我看到从join输出的以下记录:(a-b,a-b,a-null)
以下是流、消费者和生产者的配置:

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // fiddled with
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

经历此行为的处理器api代码(已清理)如下所示,请注意成对的主题[a1,a2]和[b1,b2]:

KTable<Long, Value> kTableA =
        kstreamBuilder.table(longSerde, valueSerde, topicA2);

    kstreamBuilder.stream(keySerde, envelopeSerde, topicA1)
        .to(longSerde, valueSerde, topicA2);

    kstreamBuilder.stream(keySerde, envelopeSerde, topicB1)
        .to(longSerde, valueSerde, topicB2.topicName);

    KTable<Long, Value> kTableB =
        kstreamBuilder.table(longSerde, valueSerde, topicB2.topicName);

    KTable<Long, Result> joinTable = kTableA.leftJoin(kTableB, (a,b) -> {
        // value joiner called three times with only a single record input
        // into topicA1 and topicB1
    });

    joinTable.groupBy(...)
    .aggregate(...)
    .to(longSerde, aggregateSerde, outputTopic);

提前感谢所有的帮助,哦,仁慈的人。
更新:我在运行一个kafka服务器,每个主题有一个分区,并经历了这种行为。当我将服务器的数量增加到2,分区的数量增加到3时,我的输出就变成(a-null)。
在我看来,我需要花更多的时间与Kafka手册。。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题