如何连接两个ktable并将结果ktable写入状态存储

rt4zxlrg  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(356)

我有两个ktable对象:

KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));

 KTable<Long, byte[]> secondTable = builder.table("secondTopic",
        Consumed.with(Serdes.Long(), Serdes.ByteArray()));

然后我想把这两个表连接起来:

firstTable.leftJoin(secondTable,
            (leftValue, rightValue) -> {
            try {
                return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
            }
          )

所以我有两个表,我将它们连接到一个表中,我希望得到的表按每个键存储在kafka状态存储中,但我不知道怎么做。

afdcj2ne

afdcj2ne1#

您可以通过指定 Materialized 参数开启 leftJoin 并指定状态存储的名称。

firstTable.leftJoin(..., Materialized.as("my-store-name"));

相关问题