我试图通过外键(planpiodid)连接两个ktable,结果如下:
ID DATA
{"object_id":"\u0003Ý®"} null
联接的左侧如下所示:
ID DATA
{"object_id":"\u0007ßX"} {....,"object_id":"\u0004wÇ",....}
右边是这样的:
ID DATA
{"object_id":"\u0004wÇ"} {.....}
这是我的拓扑图
final KTable<ByteBuffer, Value> styleTable = builder.table(styleTopic, Consumed.with(Serdes.ByteBuffer(), styleSerde));
final KTable<ByteBuffer, Value2> itemKTable = builder.table(itemTopic, Consumed.with(Serdes.ByteBuffer(), itemSerde));
//Joiner
final StyleItemJoiner styleItemJoiner = new StyleItemJoiner();
final KTable<ByteBuffer, StyleIndex> join1 = styleTable.join(itemKTable, Value::getPlanperiodId, styleItemJoiner);
我在一个值连接符上执行这个连接:
public StyleIndex apply(Value s, Value2 i) {
return StyleIndex.newBuilder()
.setAge("TODO")
.setAgeDetail("TODO")
.setArrangement("TODO")
.....
.build()
}
我的代码参考本教程:https://kafka-tutorials.confluent.io/foreign-key-joins/kstreams.html
暂无答案!
目前还没有任何答案,快来回答吧!