Kafka流错误的连接结果2个简单的ktable

x7rlezfr  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(415)

我正在两个简单的ktable之间进行连接,假设我有两个主题,包含以下数据,请参阅详细信息:
第一个主题
1 |维克托c。
1 |瓦迪姆c。
2 |凡士利p。
3 |维塔利c。
4 |润滑油c。
第二个主题
1 |编程器
2 |管理员
3 |经理
当我执行以下查询时,sql行为很明显,输出对我来说很清楚:

SELECT * FROM firstTable
INNER JOIN secondTable
ON firstTable.ID = secondTable.ID

1 |维克托·c.|1 |编程器
1 |瓦迪姆c.|1 |编程器
2 |凡士利p.|2 |管理员
3 |维塔利c.|3 |经理
所以,我在扮演傲慢的Kafka,我试图做同样的行为,但结果完全混淆了我的想法
请参见代码段详细信息:

@Test
  public void joinKTableToKTableWhereKeyValueIsIntegerAndString() throws Exception {
      InternalTestConfiguration itc = getInternalTestConfiguration();
      List < String > topics = Arrays.asList(itc.getFirstTopic(), itc.getSecondTopic(), itc.getProjectionTopic(), itc.getFirstKeyedTopic(), itc.getSecondKeyedTopic());
      KafkaStreams streams = null;

      try {
          Integer partitions = 1;
          Integer replication = 1;
          RestUtils.createTopics(topics, partitions, replication, new Properties());

          List < KeyValue < Integer, String >> employees = Arrays.asList(
              new KeyValue < > (1, "Victor C."),
              new KeyValue < > (1, "Vadim C."),
              new KeyValue < > (2, "Vasile P."),
              new KeyValue < > (3, "Vitalie C."),
              new KeyValue < > (4, "Oleg C.")
          );

          List < KeyValue < Integer, String >> specialities = Arrays.asList(
              new KeyValue < > (1, "Programmer"),
              new KeyValue < > (2, "Administrator"),
              new KeyValue < > (3, "Manager")
          );

          List < KeyValue < Integer, String >> expectedResults = Arrays.asList(
              new KeyValue < > (1, "Victor C./Programmer"),
              new KeyValue < > (1, "Vadim C./Programmer"),
              new KeyValue < > (2, "Vasile P./Administrator"),
              new KeyValue < > (3, "Vitalie C../Manager")
          );

          final Serde < Integer > keySerde = Serdes.Integer();
          final Serde < String > valueSerde = Serdes.String();

          Properties streamsConfiguration = new Properties();
          streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, itc.getAppIdConfig());
          streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
          streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
          streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
          streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
          //streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
          streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

          KStreamBuilder builder = new KStreamBuilder();

          KTable < Integer, String > firstKTable = builder.table(keySerde, valueSerde, itc.getFirstTopic(), itc.getFirstStore());
          KTable < Integer, String > secondKTable = builder.table(keySerde, valueSerde, itc.getSecondTopic(), itc.getSecondStore());
          KTable < Integer, String > projectionKTable = firstKTable.join(secondKTable, (l, r) - > {
              return l + "/" + r;
          });

          projectionKTable.to(keySerde, valueSerde, itc.getProjectionTopic());

          streams = new KafkaStreams(builder, streamsConfiguration);

          streams.start();

          Properties cfg1 = new Properties();
          cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
          cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
          cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
          cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          IntegrationTestUtils.produceKeyValuesSynchronously(itc.getFirstTopic(), employees, cfg1);

          Properties cfg2 = new Properties();
          cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
          cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
          cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
          cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
          IntegrationTestUtils.produceKeyValuesSynchronously(itc.getSecondTopic(), specialities, cfg2);

          Properties consumerConfig = new Properties();
          consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
          consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, itc.getGroupIdConfig());
          consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
          consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
          consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

          List < KeyValue < Integer, String >> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, itc.getProjectionTopic(), expectedResults.size());

          assertThat(actualResults).containsExactlyElementsOf(expectedResults);               
      } finally {
          if (streams != null) {
              streams.close();
          }

          RestUtils.deleteTopics(topics);
      }
  }

我期望得到与sql相同的结果,但事实并非如此。
禁用了//streamsconfiguration.put(streamsconfig.cache\u max\u bytes\u buffering\u config,0)”的结果
1,vadim c./编程器
2,vasile p./管理员
3,vitalie c./经理
1,vadim c./编程器
2,vasile p./管理员
3,vitalie c./经理
streamsconfiguration.put的结果(streamsconfig.cache\u max\u bytes\u buffering\u config,0);启用
1,vadim c./编程器
2,vasile p./管理员
3,vitalie c./经理
不管怎样,这两个结果和sql不一样,请帮助我理解这一点,因为我已经在自杀了:(

ecbunoof

ecbunoof1#

在sql比较中,数字似乎不是主键,因为两者都是 Victor C. 以及 Vadim C. 与…有关 1 如果数字是消息的键,则在ktable中不起作用- Vadim C. 正在覆盖 Victor C. . 这就是为什么输出中只有三个不同的人。
关于ktables的缓存行为的问题的第二部分。在启用缓存的情况下(您的第一个示例),在刷新缓存时会触发连接(默认情况下为30秒)。在启用缓存时,还存在重复的问题。禁用缓存时不会发生这种情况,因此这是没有重复的“正确”输出。
我最近在博客上写了kafka0.10.1中的join行为(所以不是改变了一些语义的最新版本)。也许这对你有帮助。

相关问题