查询kafka streams中的全局状态存储会引发null异常

ppcbkaq5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(373)

使用处理器api和 addGlobalStore 函数生成的存储填充为ok。但随后对存储内容进行迭代的尝试会导致以下异常:

Exception in thread "main" java.lang.NullPointerException                                                                                                             at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:63)                                               at org.apache.kafka.streams.state.internals.SerializedKeyValueIterator.next(SerializedKeyValueIterator.java:26)                                               at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:208)                                  at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueIterator.next(MeteredKeyValueStore.java:189)                                  at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:155)            at org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore$CompositeKeyValueIterator.next(CompositeReadOnlyKeyValueStore.java:113)            at org.apache.kafka.streams.state.internals.DelegatingPeekingKeyValueIterator.hasNext(DelegatingPeekingKeyValueIterator.java:63)

这来自于在存储区上迭代的简单代码:

ReadOnlyKeyValueStore<String, StreamConfig> store = this.globalStreams.store("config-table",QueryableStoreTypes.<String, Config>keyValueStore());
final KeyValueIterator<String, Config> iteratble = store.all();
HashMap<String, Config> dynamicStreams = new HashMap<String,Config>();
while (iteratble.hasNext()) {
    final KeyValue<String, Config> next = iteratble.next();
    dynamicStreams.put(next.key, next.value);
}
iteratble.close();

对该存储中的条目的任何访问都会导致该异常。如果我使用 globalTable 函数(因此这不是反序列化问题)。状态存储还返回成功的行数(因此它是完全填充的)。
全局状态存储创建如下,使用kafka streams 0.10.2.1。

KStreamBuilder globalBuilder = new KStreamBuilder();
StateStoreSupplier<KeyValueStore<String, Config>> storeSupplier = Stores
    .create("config-table")
    .withKeys(Serdes.String())
    .withValues(configSerdes)
    .persistent()
    .build();

// a Processor that updates the store
ProcessorSupplier<String, Config> procSupplier = () -> new ConfigWorker();
globalBuilder.addGlobalStore(
    storeSupplier.get(), 
    "config-table-source", 
    new StringDeserializer(),
    configDeserializer, 
    "config", 
    "config-worker", 
    procSupplier)
    .buildGlobalStateTopology();

this.globalStreams = new KafkaStreams(
    globalBuilder, 
    this.getProperties());
globalStreams.start();

编辑:
一个问题是,默认情况下,状态存储是记录的,而全局ktables不受更改日志的支持。所以添加一个 .disableLogging 到状态存储创建修复了此问题。
另一个问题似乎是呼吁 context.schedule 在处理器的 init 功能。这引发了无效操作异常。删除这个修复了代码,但我想 punctuate 现在再也不叫了。它现在似乎起作用了,但不清楚为什么我不能打电话 schedule .

jdzmm42g

jdzmm42g1#

根据这段代码,似乎不允许在全局上下文中进行转发。所以我的全局处理器只需要处理入站更新。

相关问题