我正在两个简单的ktable之间进行连接,假设我有两个主题,包含以下数据,请参阅详细信息:
第一个主题
1 |维克托c。
1 |瓦迪姆c。
2 |凡士利p。
3 |维塔利c。
4 |润滑油c。
第二个主题
1 |编程器
2 |管理员
3 |经理
当我执行以下查询时,sql行为很明显,输出对我来说很清楚:
SELECT * FROM firstTable
INNER JOIN secondTable
ON firstTable.ID = secondTable.ID
1 |维克托·c.|1 |编程器
1 |瓦迪姆c.|1 |编程器
2 |凡士利p.|2 |管理员
3 |维塔利c.|3 |经理
所以,我在扮演傲慢的Kafka,我试图做同样的行为,但结果完全混淆了我的想法
请参见代码段详细信息:
@Test
public void joinKTableToKTableWhereKeyValueIsIntegerAndString() throws Exception {
InternalTestConfiguration itc = getInternalTestConfiguration();
List < String > topics = Arrays.asList(itc.getFirstTopic(), itc.getSecondTopic(), itc.getProjectionTopic(), itc.getFirstKeyedTopic(), itc.getSecondKeyedTopic());
KafkaStreams streams = null;
try {
Integer partitions = 1;
Integer replication = 1;
RestUtils.createTopics(topics, partitions, replication, new Properties());
List < KeyValue < Integer, String >> employees = Arrays.asList(
new KeyValue < > (1, "Victor C."),
new KeyValue < > (1, "Vadim C."),
new KeyValue < > (2, "Vasile P."),
new KeyValue < > (3, "Vitalie C."),
new KeyValue < > (4, "Oleg C.")
);
List < KeyValue < Integer, String >> specialities = Arrays.asList(
new KeyValue < > (1, "Programmer"),
new KeyValue < > (2, "Administrator"),
new KeyValue < > (3, "Manager")
);
List < KeyValue < Integer, String >> expectedResults = Arrays.asList(
new KeyValue < > (1, "Victor C./Programmer"),
new KeyValue < > (1, "Vadim C./Programmer"),
new KeyValue < > (2, "Vasile P./Administrator"),
new KeyValue < > (3, "Vitalie C../Manager")
);
final Serde < Integer > keySerde = Serdes.Integer();
final Serde < String > valueSerde = Serdes.String();
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, itc.getAppIdConfig());
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_CONNECT_CONFIG);
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
//streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
KStreamBuilder builder = new KStreamBuilder();
KTable < Integer, String > firstKTable = builder.table(keySerde, valueSerde, itc.getFirstTopic(), itc.getFirstStore());
KTable < Integer, String > secondKTable = builder.table(keySerde, valueSerde, itc.getSecondTopic(), itc.getSecondStore());
KTable < Integer, String > projectionKTable = firstKTable.join(secondKTable, (l, r) - > {
return l + "/" + r;
});
projectionKTable.to(keySerde, valueSerde, itc.getProjectionTopic());
streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Properties cfg1 = new Properties();
cfg1.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg1.put(ProducerConfig.ACKS_CONFIG, "all");
cfg1.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg1.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
cfg1.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(itc.getFirstTopic(), employees, cfg1);
Properties cfg2 = new Properties();
cfg2.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
cfg2.put(ProducerConfig.ACKS_CONFIG, "all");
cfg2.put(ProducerConfig.RETRIES_CONFIG, 0);
cfg2.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
cfg2.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(itc.getSecondTopic(), specialities, cfg2);
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, itc.getGroupIdConfig());
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List < KeyValue < Integer, String >> actualResults = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, itc.getProjectionTopic(), expectedResults.size());
assertThat(actualResults).containsExactlyElementsOf(expectedResults);
} finally {
if (streams != null) {
streams.close();
}
RestUtils.deleteTopics(topics);
}
}
我期望得到与sql相同的结果,但事实并非如此。
禁用了//streamsconfiguration.put(streamsconfig.cache\u max\u bytes\u buffering\u config,0)”的结果
1,vadim c./编程器
2,vasile p./管理员
3,vitalie c./经理
1,vadim c./编程器
2,vasile p./管理员
3,vitalie c./经理
streamsconfiguration.put的结果(streamsconfig.cache\u max\u bytes\u buffering\u config,0);启用
1,vadim c./编程器
2,vasile p./管理员
3,vitalie c./经理
不管怎样,这两个结果和sql不一样,请帮助我理解这一点,因为我已经在自杀了:(
1条答案
按热度按时间ecbunoof1#
在sql比较中,数字似乎不是主键,因为两者都是
Victor C.
以及Vadim C.
与…有关1
如果数字是消息的键,则在ktable中不起作用-Vadim C.
正在覆盖Victor C.
. 这就是为什么输出中只有三个不同的人。关于ktables的缓存行为的问题的第二部分。在启用缓存的情况下(您的第一个示例),在刷新缓存时会触发连接(默认情况下为30秒)。在启用缓存时,还存在重复的问题。禁用缓存时不会发生这种情况,因此这是没有重复的“正确”输出。
我最近在博客上写了kafka0.10.1中的join行为(所以不是改变了一些语义的最新版本)。也许这对你有帮助。