使用kafka消息总线时abstractaggregatingmessagegroupprocessor的nullpointerexception

mxg2im7a  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(441)

很确定这是springxd中的一个bug。
在单节点模式下运行spring xd 1.3.0.release。所有配置都是默认的,除了我使用kafka而不是本地传输。相关xd配置:

spring:
  profiles: singlenode
xd:
  transport: kafka
  messagebus:
    kafka:
      brokers:                                 localhost:9092
      zkAddress:                               localhost:2181
      mode:                                    embeddedHeaders
      offsetManagement:                        kafkaTopic
      socketBufferSize:                        2097152
      offsetStoreTopic:                        SpringXdOffsets
      offsetStoreSegmentSize:                  25000000
      offsetStoreRetentionTime:                60000
      offsetStoreRequiredAcks:                 1
      offsetStoreMaxFetchSize:                 1048576
      offsetStoreBatchBytes:                   16384
      offsetStoreBatchTime:                    1000
      offsetUpdateTimeWindow:                  10000
      offsetUpdateCount:                       0
      offsetUpdateShutdownTimeout:             2000
      default:
        batchSize:                 16384
        batchTimeout:              0
        replicationFactor:         1
        concurrency:               1
        requiredAcks:              1
        compressionCodec:          none
        queueSize:                 8192 # must be a power of 2
        maxWait:                   100
        fetchSize:                 1048576
        minPartitionCount:         1
        durableSubscription:       false

使用聚合器创建流(此聚合器直接来自参考文档):

stream create --name aggregates --definition "http | aggregator --count=3 --aggregation=T(org.springframework.util.StringUtils).collectionToDelimitedString(#this.![payload],' ') | log" --deploy

然后发送3个帖子:

xd:> http post --data Hello
xd:> http post --data World
xd:> http post --data !

结果是stacktrace:

2015-12-10T17:07:11-0800 1.3.0.RELEASE ERROR pool-13-thread-1 listener.LoggingErrorHandler - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 344940496, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=81 cap=81]), KafkaMessageMetadata [offset=2, nextOffset=3, Partition[topic='aggregates.0', id=0]]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.config.AggregatorFactoryBean#0]; nested exception is java.lang.NullPointerException
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.2.2.RELEASE.jar:4.2.2.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:43) ~[spring-integration-kafka-1.3.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) ~[spring-integration-kafka-1.3.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) ~[spring-integration-kafka-1.3.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
    at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
    at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.5.RELEASE.jar:na]
    at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.5.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_66]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_66]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
Caused by: java.lang.NullPointerException: null
    at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.aggregateHeaders(AbstractAggregatingMessageGroupProcessor.java:115) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractAggregatingMessageGroupProcessor.processMessageGroup(AbstractAggregatingMessageGroupProcessor.java:79) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:648) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:405) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.2.2.RELEASE.jar:na]
    ... 32 common frames omitted

这是一个spring xd错误吗?有解决办法吗?
更多细节。
导致npe的标题是 kafka_messageKey . 此标头的值为 null 所以abstractaggregatingmessagegroupprocessor:115 throws npe。
进一步看kafkamessagebus,似乎kafka messagekey是从生产者有意设置为null的。

hxzsmxv2

hxzsmxv21#

这是spring xd中的一个bug。详见int-3908。
根据马吕斯的建议,以下是一个合适的解决方法:
编辑 $XD_HOME/modules/processor/aggregator/config/aggregator.xml 包括:

<channel id="aggregatorInput"/>

<header-enricher input-channel="input" output-channel="aggregatorInput" default-overwrite="true">
    <header name="kafka_messageKey" value="."/>
</header-enricher>

<aggregator input-channel="aggregatorInput" output-channel="output"
    correlation-strategy-expression="${correlation}"
    release-strategy-expression="${release}" expression="${aggregation}"
    send-partial-result-on-expiry="true" expire-groups-upon-completion="true"
    message-store="messageStore">
</aggregator>

注意:使用 header-filter 将不起作用,因为si用于删除标头的机制只会在标头不为null时删除它。
或者,如果您不想直接编辑模块xml,可以使用模块组合在标准聚合器模块之前包含一个头丰富器。

module compose --name kafa-aggregator --definition "header-enricher --headers={\"kafka_messageKey\":\"'.'\"} --overwrite=true | aggregator --count=3 --aggregation=T(org.springframework.util.StringUtils).collectionToDelimitedString(#this.![payload],' ')"

stream create --name aggregates --definition "http | kafa-aggregator | log" --deploy
fsi0uk1n

fsi0uk1n2#

一种可能的解决方法是在聚合器模块的开头插入一个头过滤器,从而将头过滤器添加到聚合器模块中 $XD_HOME/modules/processor/aggregator/config/aggregator.xml ,将其开头改为:

<channel id="input" />

<channel id="aggregatorInput"/>

<int:header-filter input-channel="inputChannel"
    output-channel="aggregatorInput" header-names="kafka_messageKey"/>

<aggregator input-channel="aggregatorInput" output-channel="output"
            correlation-strategy-expression="${correlation}"
            release-strategy-expression="${release}" expression="${aggregation}"
            send-partial-result-on-expiry="true" expire-groups-upon-completion="true"
            message-store="messageStore">
    </aggregator>

... rest of the module definition remains unchanged ...

至于如何在日志运行中修复这个问题,我将对jira问题进行评论。
干杯,马吕斯

相关问题