Kafka K表-K表FK连接:无法选择外键serde

e0bqpujr  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(113)

摘要

我尝试执行KTable到KTable的外键连接,但我得到一个错误,因为Kafka Streams尝试使用String serde作为外键。
我希望它使用Kotlinx序列化serde。我怎么能指定这个?

详情

我想使用FK选择器将两个KTable的数据连接在一起,并将值重新Map到聚合对象中。

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes KTable
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

然而,我得到了一个错误,因为Kafka Streams使用Serdes.String()(我的默认Serde)来重新定义外键。但它是一个JSON对象,我希望它使用Kotlinx序列化。

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. 
Do the Processor's input types match the deserialized types? 
Check the Serde setup and change the default Serdes in 
StreamConfig or provide correct Serdes via method 
parameters. Make sure the Processor can accept the 
deserialized input of type key: myproject.MyTopology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.

Note that although incorrect Serdes are a common cause 
of error, the cast exception might have another cause 
(in user code, for example). For example, if a 
processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised 
during processing, but the cause would not be wrong Serdes.

后台

我处理的数据来自一个电脑游戏。游戏有一张Map,叫做曲面。每个表面由表面索引唯一标识。每个曲面在x/y平面上都有平铺。图块有一个“prototype name”,即TilePrototype的ID。每个TilePrototype都有关于瓷砖的功能或外观的信息。我需要它的颜色。

拓扑

按区块分组切片

首先,我将图块分成32 x32的块,然后将它们分组到KTable中。

/** Each chunk is identified by the surface, and an x/y coordinate */
@Serializable
data class MapChunkDataPosition(
  val position: MapChunkPosition,
  val surfaceIndex: SurfaceIndex,
)

/** Each chunk has 32 tiles */
@Serializable
data class MapChunkData(
  val chunkPosition: MapChunkDataPosition,
  val tiles: Set<MapTile>,
)

// get all incoming tiles and group them by chunk,
// this works successfully
val tilesGroupedByChunk: KTable<MapChunkDataPosition, MapChunkData> =
  buildChunkedTilesTable(tilesTable)

按表面索引分组原型

然后我收集所有的原型表面索引,并聚合成一个列表

/** Identifier for a surface (a simple wrapper, so I can use a Kotlinx Serialization serde everywhere)*/
@Serializable
data class SurfaceIndex(
  val surfaceIndex: Int
)

/** Each surface has some 'prototypes' - I want this because each tile has a colour */
@Serializable
data class SurfacePrototypesData(
  val surfaceIndex: SurfaceIndex,
  val mapTilePrototypes: Set<MapTilePrototype>,
)

// get all incoming prototypes and group them by surface index,
// this works successfully
val tilePrototypesTable: KTable<SurfaceIndex, SurfacePrototypesData> =
  tilePrototypesTable()

ktable-ktable fk join

这是导致错误的代码

/** For each chunk, get all tiles in that chunk, and all prototypes */
@Serializable
data class ChunkTilesAndProtos(
  val chunkTiles: MapChunkData,
  val protos: SurfacePrototypesData
)

tilesGroupedByChunk
  .join<ChunkTilesAndProtos, SurfaceIndex, SurfacePrototypesData>(
    tilePrototypesTable, // join the prototypes
    { cd: MapChunkData -> cd.chunkPosition.surfaceIndex }, // FK join on SurfaceIndex
    { chunkTiles: MapChunkData, protos: SurfacePrototypesData ->
      ChunkTilesAndProtos(chunkTiles, protos) // remap value 
    },
    namedAs("joining-chunks-tiles-prototypes"),
    materializedAs(
      "joined-chunked-tiles-with-prototypes",
      // `.serde()`- helper function to make a Serde from a Kotlinx Serialization JSON module 
      // see https://github.com/adamko-dev/kotka-streams/blob/38388e74b16f3626a2733df1faea2037b89dee7c/modules/kotka-streams-kotlinx-serialization/src/main/kotlin/dev/adamko/kotka/kxs/jsonSerdes.kt#L48
      jsonMapper.serde(),
      jsonMapper.serde(),
    ),
  )

全栈跟踪

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: MyProject.processor.Topology$MapChunkDataPosition, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:150)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:131)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:105)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:186)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:54)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:29)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:182)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$1.apply(MeteredKeyValueStore.java:179)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:87)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:136)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flushCache(CachingKeyValueStore.java:345)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:487)
at org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:402)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(TaskManager.java:1043)
at org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1016)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1017)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:786)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
Caused by: java.lang.ClassCastException: class MyProjectTopology$MapChunkData cannot be cast to class java.lang.String (MyProject.processor.MyProject$MapChunkData is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:99)
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:69)
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
... 30 common frames omitted

版本

  • Kotlin1.6.9
  • Kafka Streams 3.0.0
  • Kotlinx序列化1.3.2
eqzww0vc

eqzww0vc1#

有点遗憾的是,我在拓扑定义中犯了一个错误。
在创建其中一个表的最后阶段,我Map了这些值,但没有指定serdes。

.mapValues { _, v ->
      ChunkTilesAndProtos(v.tiles, v.protos)
    }

所以我把它改成了serdes。

.mapValues(
      "finalise-web-map-tile-chunk-aggregation",
      materializedAs("web-map-tile-chunks", jsonMapper.serde(), jsonMapper.serde())
    ) { _, v ->
      ChunkTilesAndProtos(v.tiles, v.protos)
    }
// note: this uses extension functions from github.com/adamko-dev/kotka-streams

找到这个不容易。我是通过在www.example.com的构造函数AbstractStream.java(以及其他构造函数)中放置一个断点来查看keySerdevalueSerde字段何时未被设置来发现它的。
有时候会出现null serde(我认为有些KTables/KStreams是“虚拟”的,不会对Kafka主题进行编码/解码)。然而,我能够找到导致我的问题的操作,并定义serdes,因为我改变了值类型。

相关问题