嵌入的kafka:ktable+ktable leftjoin生成重复记录

gxwragnw  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(318)

我来寻找神秘的知识。
首先,我有两对主题,每对中的一个主题与另一个主题相结合。后一个主题正在形成两个ktable,用于ktable+ktable leftjoin。问题是,当我向任意一个ktable生成一条记录时,leftjoin生成三条记录。我希望有两个形式的记录(a-null,a-b),但是我得到了(a-null,a-b,a-null)。我已经确认ktables每个都收到一条记录。
我已经摆弄了cache\u max\u bytes\u buffering\u config来启用/禁用状态存储缓存。上面的行为是将cache\u max\u bytes\u buffering\u config设置为0。当我使用cache\u max\u bytes\u buffering\u config的默认值时,我看到从join输出的以下记录:(a-b,a-b,a-null)
以下是流、消费者和生产者的配置:

  1. properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
  2. properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
  3. properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
  4. properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // fiddled with
  5. properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
  6. properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
  7. properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
  8. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  9. properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
  10. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
  11. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
  12. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
  13. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

经历此行为的处理器api代码(已清理)如下所示,请注意成对的主题[a1,a2]和[b1,b2]:

  1. KTable<Long, Value> kTableA =
  2. kstreamBuilder.table(longSerde, valueSerde, topicA2);
  3. kstreamBuilder.stream(keySerde, envelopeSerde, topicA1)
  4. .to(longSerde, valueSerde, topicA2);
  5. kstreamBuilder.stream(keySerde, envelopeSerde, topicB1)
  6. .to(longSerde, valueSerde, topicB2.topicName);
  7. KTable<Long, Value> kTableB =
  8. kstreamBuilder.table(longSerde, valueSerde, topicB2.topicName);
  9. KTable<Long, Result> joinTable = kTableA.leftJoin(kTableB, (a,b) -> {
  10. // value joiner called three times with only a single record input
  11. // into topicA1 and topicB1
  12. });
  13. joinTable.groupBy(...)
  14. .aggregate(...)
  15. .to(longSerde, aggregateSerde, outputTopic);

提前感谢所有的帮助,哦,仁慈的人。
更新:我在运行一个kafka服务器,每个主题有一个分区,并经历了这种行为。当我将服务器的数量增加到2,分区的数量增加到3时,我的输出就变成(a-null)。
在我看来,我需要花更多的时间与Kafka手册。。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题