我有两个ktable对象:
KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
KTable<Long, byte[]> secondTable = builder.table("secondTopic",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
然后我想把这两个表连接起来:
firstTable.leftJoin(secondTable,
(leftValue, rightValue) -> {
try {
return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
)
所以我有两个表,我将它们连接到一个表中,我希望得到的表按每个键存储在kafka状态存储中,但我不知道怎么做。
1条答案
按热度按时间afdcj2ne1#
您可以通过指定
Materialized
参数开启leftJoin
并指定状态存储的名称。