我想删除我的 GlobalKTable<Integer, Long>
商店。
我试图通过以下方式消除该州:
通过调用 kafka-topics.sh --zookeeper localhost:8080 --delete --topic my-topic
跑步 KafkaStreams.cleanUp()
应用程序启动时
产生一个test:null'消息到我的流,作为空值应该被视为一个删除语句在存储中,如这里所述。但是,我的streams应用程序失败,因为空值不能反序列化为long。
请参见以下异常:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
如何删除我的globalktable的状态?
2条答案
按热度按时间jdgnovmf1#
我找到了解决这个问题的方法。要删除globalktable的整个状态,还需要清除rocksdb缓存文件。
rocksdb文件存储在
StreamsConfig.STATE_DIR_CONFIG
. 我已经删除了文件,现在我的状态完全被清除了。不过,有没有更好的解决办法呢?
zbdgwd5y2#
KafkaStreams.cleanUp()
实际上应该删除那些rocksdb文件。这个bug在即将发布的1.1版本中已经修复:issues.apache.org/jira/browse/kafka-6259。