springframework.integration.kafka consumercontext迭代器处于失败状态

qc6wkl3g  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(225)

尝试使用java应用程序上的主题时,出现以下异常:

org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:135)
        ... 32 more
Caused by: java.lang.IllegalStateException: Iterator is in failed state
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:38)
        at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:46)
        at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:104)
        at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:98)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more

当处理大量消息时,此异常会在一段时间后发生。总是发生在同一个主题上。
Kafka消费者的配置是:

<int-kafka:zookeeper-connect id="zookeeperConnect"
    zk-connect="${kafkaZookeeperUrl}" zk-connection-timeout="${kafkaZkConnectionTimeout}"
    zk-session-timeout="${kafkaZkSessionTimeout}" zk-sync-time="${kafkaZkSyncTime}" />

<!-- -->
<!-- Spring Integration -->
<!-- -->
<bean id="consumerProperties"
    class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="auto.commit.enable">${kafkaConsumerAutoCommitEnable}</prop>
            <prop key="auto.commit.interval.ms">${kafkaConsumerAutoCommitInterval}</prop>
            <prop key="fetch.min.bytes">${kafkaConsumerFetchMinBytes}</prop>
            <prop key="fetch.wait.max.ms">${kafkaConsumerFetchWaitMax}</prop>
            <prop key="auto.offset.reset">${kafkaConsumerOffsetReset}</prop>
        </props>
    </property>
</bean>
<!-- -->
<!-- Channels -->
<!-- -->
<int:channel id="kafka1">
    <int:interceptors>
        <int:wire-tap channel="kafkaWiretap" />
    </int:interceptors>
</int:channel>
<!-- -->
<!-- Consumer Contexts -->
<!-- -->
<int-kafka:consumer-context id="consumerContext1"
    consumer-timeout="${kafkaDataInTimeout}" zookeeper-connect="zookeeperConnect"
    consumer-properties="consumerProperties">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id="dataWriterSource" value-decoder="valueDecoder"
            key-decoder="valueDecoder" max-messages="${kafkaDataInMaxMessages}">
            <int-kafka:topic id="DATA_IN" streams="${kafkaDataInStreams}" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<!-- -->
<!-- Inbound Channel Adapters -->
<!-- -->
<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter1" kafka-consumer-context-ref="consumerContext1"
    auto-startup="${kafkaConsumerChannelAutoStartup}" channel="kafka1">
    <int:poller fixed-delay="10" time-unit="MILLISECONDS"
        max-messages-per-poll="1000" />
</int-kafka:inbound-channel-adapter>

这个主题有600个分区,它接收很多消息。上下文使用者的配置为:


#################################### 

# KAFKA Consumers Configuration.

#################################### 

# General consumer properties

kafkaConsumerAutoCommitEnable=true
kafkaConsumerAutoCommitInterval=500
kafkaConsumerFetchMinBytes=1
kafkaConsumerFetchWaitMax=100
kafkaConsumerOffsetReset=largest

# Consumers

# Data In

kafkaDataInTimeout=500
kafkaDataInMaxMessages=5000
kafkaDataInStreams=4

现在据我所知,存在某种问题,要么是我如何为消费者配置轮询器,要么是consumercontext.java上的以下代码中存在错误:

private Map<String, Map<Integer, List<Object>>> executeTasks(
        final List<Callable<List<MessageAndMetadata<K, V>>>> tasks) {

    final Map<String, Map<Integer, List<Object>>> messages = new ConcurrentHashMap<String, Map<Integer, List<Object>>>();
    messages.putAll(getLeftOverMessageMap());

    try {
        for (final Future<List<MessageAndMetadata<K, V>>> result : this.executorService.invokeAll(tasks)) {
            if (!result.get().isEmpty()) {
                final String topic = result.get().get(0).topic();
                if (!messages.containsKey(topic)) {
                    messages.put(topic, getPayload(result.get()));
                }
                else {

                    final Map<Integer, List<Object>> existingPayloadMap = messages.get(topic);
                    getPayload(result.get(), existingPayloadMap);
                }
            }
        }

public ConsumerMetadata<K, V> getConsumerMetadata() {
    return consumerMetadata;
}

public Map<String, Map<Integer, List<Object>>> receive() {
    count = messageLeftOverTracker.getCurrentCount();
    final Object lock = new Object();

    final List<Callable<List<MessageAndMetadata<K, V>>>> tasks = new LinkedList<Callable<List<MessageAndMetadata<K, V>>>>();

    for (final List<KafkaStream<K, V>> streams : createConsumerMessageStreams()) {
        for (final KafkaStream<K, V> stream : streams) {
            tasks.add(new Callable<List<MessageAndMetadata<K, V>>>() {
                @Override
                public List<MessageAndMetadata<K, V>> call() throws Exception {
                    final List<MessageAndMetadata<K, V>> rawMessages = new ArrayList<MessageAndMetadata<K, V>>();
                    try {
                        while (count < maxMessages) {
                            final MessageAndMetadata<K, V> messageAndMetadata = stream.iterator().next();
                            synchronized (lock) {
                                if (count < maxMessages) {
                                    rawMessages.add(messageAndMetadata);
                                    count++;
                                }
                                else {
                                    messageLeftOverTracker.addMessageAndMetadata(messageAndMetadata);
                                }
                            }
                        }
                    }
                    catch (ConsumerTimeoutException cte) {
                        LOGGER.debug("Consumer timed out");
                    }
                    return rawMessages;
                }
            });
        }
    }
    return executeTasks(tasks);
}

第104行是final messageandmetadata messageandmetadata=stream.iterator().next();不同步,可能与135行冲突:if(!result.get().isempty()){
来自Kafka人的任何帮助都将是巨大的。问题是:发生了什么,我们如何解决这个问题。
先谢谢你,弗朗西斯科

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题