kafkaspout元组重播引发空指针异常

ufj5ltwl  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(387)

我在storm kafka客户端1.0.3中使用storm 1.0.1和kafka 0.10.0.0。
请找到下面的代码配置。

kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

            KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(new Fields(fieldNames), topics)
                    .build();

            KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));

            KafkaSpoutTuplesBuilder tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder(new TestTupleBuilder(topics))
                        .build();

            KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
                                                                .setOffsetCommitPeriodMs(10_000)
                                                                .setFirstPollOffsetStrategy(LATEST)
                                                                .setMaxRetries(5)
                                                                .setMaxUncommittedOffsets(250)
                                                                .build();

当我失败的元组它不会被重放。喷口抛出下面的错误。请让我知道它为什么抛出空指针异常。

53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Async loop died!
java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.d.executor - 
java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
53527 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.8.0.jar:?]
    at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761) [storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271) [storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]

请在{key.deserializer=org.apache.kafka.common.serialization.bytearraydesializer,value.deserializer=org.apache.kafka.common.serialization.bytearraydesializer,group.id=test group,ssl.keystore.location=c:/test.jks,bootstrap.servers下面找到完整的喷口配置=localhost:1000,auto.commit.interval.ms=1000,security.protocol=ssl,enable.auto.commit=true,ssl.truststore.location=c:/test1.jks,ssl.keystore.password=pass123,ssl.key.password=pass123,ssl.truststore.password=pass123,session.timeout.ms=30000,auto.offset.reset=latest}

wlp8pajw

wlp8pajw1#

storm 1.0.1由storm kafka客户端的beta版组成。我们已经修复了一些问题,更稳定的版本在Storm1.1版本中可用,并且可以用于Kafka0.10以后的版本。在您的拓扑中,您可以对storm kafka客户端版本1.1建立依赖关系,并使用适当的版本建立kafka客户端依赖关系。您不需要升级风暴集群本身。

wmomyfyw

wmomyfyw2#

我让enable.auto.commit=true将值设置为false解决了这个问题。

相关问题