如何删除globalktable存储的状态?

ki0zmccv  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(432)

我想删除我的 GlobalKTable<Integer, Long> 商店。
我试图通过以下方式消除该州:
通过调用 kafka-topics.sh --zookeeper localhost:8080 --delete --topic my-topic 跑步 KafkaStreams.cleanUp() 应用程序启动时
产生一个test:null'消息到我的流,作为空值应该被视为一个删除语句在存储中,如这里所述。但是,我的streams应用程序失败,因为空值不能反序列化为long。
请参见以下异常:

org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

如何删除我的globalktable的状态?

jdgnovmf

jdgnovmf1#

我找到了解决这个问题的方法。要删除globalktable的整个状态,还需要清除rocksdb缓存文件。
rocksdb文件存储在 StreamsConfig.STATE_DIR_CONFIG . 我已经删除了文件,现在我的状态完全被清除了。
不过,有没有更好的解决办法呢?

zbdgwd5y

zbdgwd5y2#

KafkaStreams.cleanUp() 实际上应该删除那些rocksdb文件。这个bug在即将发布的1.1版本中已经修复:issues.apache.org/jira/browse/kafka-6259。

相关问题