我正在尝试拆分一个字符串,比如“a,b,c”,然后聚合它们。但失败了。我在scdf中创建一个流,如下所示:
:test-agg-redis > splitter | aggregator
自定义参数:
app.aggregator.aggregation=#这个[有效载荷]
app.aggregator.message store type=redis
app.splitter.delimiters=,
然后我启动一个自定义源,如:
@Bean
public Supplier<String> simpleString() {
return () ->
"a,b,c"
;
}
但聚合器失败:
2021-04-22 04:51:59.264 ERROR [aggregator-processor,604bc6c7962cf644,9c467b646ef31f5a,true] 553 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'aggregator'; defined in: 'org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@f627d13']; nested exception is java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)', failedMessage=GenericMessage [payload=byte[1], headers={sequenceNumber=1, sequenceSize=3, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-agg-redis.splitter, b3=604bc6c7962cf644-c30db5ed0c06fce8-1, nativeHeaders={}, kafka_offset=240, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7ecf45f1, correlationId=0cd0c4aa-6fe1-7b77-c673-412815ca04bb, id=32d25597-c5f7-7731-2102-591425bd550f, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1619067118736, kafka_groupId=test-agg-redis, timestamp=1619067118896}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:58)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:102)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.lambda$doStart$1(ReactiveStreamsConsumer.java:177)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432)
at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:439)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:526)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
at org.springframework.integration.redis.store.RedisMessageStore.rethrowAsIllegalArgumentException(RedisMessageStore.java:188)
at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:128)
at org.springframework.integration.store.AbstractKeyValueMessageStore.doAddMessage(AbstractKeyValueMessageStore.java:145)
at org.springframework.integration.store.AbstractKeyValueMessageStore.addMessagesToGroup(AbstractKeyValueMessageStore.java:212)
at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:189)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:780)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:495)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:474)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
... 27 more
Caused by: org.springframework.data.redis.serializer.SerializationException: Cannot serialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:96)
at org.springframework.data.redis.core.AbstractOperations.rawValue(AbstractOperations.java:127)
at org.springframework.data.redis.core.DefaultValueOperations.setIfAbsent(DefaultValueOperations.java:295)
at org.springframework.data.redis.core.DefaultBoundValueOperations.setIfAbsent(DefaultBoundValueOperations.java:149)
at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:121)
... 34 more
Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:64)
at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:33)
at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:94)
... 38 more
Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeArray(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
at java.base/java.util.HashMap.internalWriteEntries(Unknown Source)
at java.base/java.util.HashMap.writeObject(Unknown Source)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteObject(Unknown Source)
at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:316)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:46)
at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56)
at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60)
... 40 more
为什么?我使用的是最新版本的scdf。谢谢!
注意:如果我没有设置参数'app.aggregator.message store type'aggregator不抛出exoption,但是app'log'打印:[“yq==”,“yg==”,“yw==”]似乎是解码错误。为什么
1条答案
按热度按时间1tu0hz3e1#
请在github中针对聚合器应用打开一个问题。
消息存储使用java序列化来存储消息;这个
Consumer
聚合前应过滤掉标头;它不可序列化。