scdf聚合器notserializableexception

7y4bm7vi  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(406)

我正在尝试拆分一个字符串,比如“a,b,c”,然后聚合它们。但失败了。我在scdf中创建一个流,如下所示:

  1. :test-agg-redis > splitter | aggregator

自定义参数:
app.aggregator.aggregation=#这个[有效载荷]
app.aggregator.message store type=redis
app.splitter.delimiters=,
然后我启动一个自定义源,如:

  1. @Bean
  2. public Supplier<String> simpleString() {
  3. return () ->
  4. "a,b,c"
  5. ;
  6. }

但聚合器失败:

  1. 2021-04-22 04:51:59.264 ERROR [aggregator-processor,604bc6c7962cf644,9c467b646ef31f5a,true] 553 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'aggregator'; defined in: 'org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration'; from source: 'org.springframework.core.type.StandardMethodMetadata@f627d13']; nested exception is java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)', failedMessage=GenericMessage [payload=byte[1], headers={sequenceNumber=1, sequenceSize=3, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-agg-redis.splitter, b3=604bc6c7962cf644-c30db5ed0c06fce8-1, nativeHeaders={}, kafka_offset=240, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7ecf45f1, correlationId=0cd0c4aa-6fe1-7b77-c673-412815ca04bb, id=32d25597-c5f7-7731-2102-591425bd550f, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1619067118736, kafka_groupId=test-agg-redis, timestamp=1619067118896}]
  2. at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
  3. at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:79)
  4. at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:58)
  5. at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
  6. at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
  7. at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:102)
  8. at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
  9. at org.springframework.integration.endpoint.ReactiveStreamsConsumer.lambda$doStart$1(ReactiveStreamsConsumer.java:177)
  10. at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
  11. at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
  12. at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:432)
  13. at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:274)
  14. at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
  15. at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
  16. at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
  17. at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
  18. at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
  19. at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
  20. at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
  21. at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
  22. at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:439)
  23. at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:526)
  24. at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
  25. at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
  26. at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
  27. at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
  28. at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
  29. at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
  30. at java.base/java.lang.Thread.run(Unknown Source)
  31. Caused by: java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
  32. at org.springframework.integration.redis.store.RedisMessageStore.rethrowAsIllegalArgumentException(RedisMessageStore.java:188)
  33. at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:128)
  34. at org.springframework.integration.store.AbstractKeyValueMessageStore.doAddMessage(AbstractKeyValueMessageStore.java:145)
  35. at org.springframework.integration.store.AbstractKeyValueMessageStore.addMessagesToGroup(AbstractKeyValueMessageStore.java:212)
  36. at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:189)
  37. at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:780)
  38. at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:495)
  39. at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:474)
  40. at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
  41. ... 27 more
  42. Caused by: org.springframework.data.redis.serializer.SerializationException: Cannot serialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
  43. at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:96)
  44. at org.springframework.data.redis.core.AbstractOperations.rawValue(AbstractOperations.java:127)
  45. at org.springframework.data.redis.core.DefaultValueOperations.setIfAbsent(DefaultValueOperations.java:295)
  46. at org.springframework.data.redis.core.DefaultBoundValueOperations.setIfAbsent(DefaultBoundValueOperations.java:149)
  47. at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:121)
  48. ... 34 more
  49. Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
  50. at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:64)
  51. at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:33)
  52. at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:94)
  53. ... 38 more
  54. Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
  55. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  56. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  57. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  58. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  59. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  60. at java.base/java.io.ObjectOutputStream.writeArray(Unknown Source)
  61. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  62. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  63. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  64. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  65. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  66. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  67. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  68. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  69. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  70. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  71. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  72. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  73. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  74. at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
  75. at java.base/java.util.HashMap.internalWriteEntries(Unknown Source)
  76. at java.base/java.util.HashMap.writeObject(Unknown Source)
  77. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  78. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  79. at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  80. at java.base/java.lang.reflect.Method.invoke(Unknown Source)
  81. at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
  82. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  83. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  84. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  85. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  86. at java.base/java.io.ObjectOutputStream.defaultWriteObject(Unknown Source)
  87. at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:316)
  88. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  89. at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  90. at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  91. at java.base/java.lang.reflect.Method.invoke(Unknown Source)
  92. at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
  93. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  94. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  95. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  96. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  97. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  98. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  99. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  100. at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
  101. at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
  102. at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
  103. at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
  104. at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
  105. at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:46)
  106. at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56)
  107. at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60)
  108. ... 40 more

为什么?我使用的是最新版本的scdf。谢谢!
注意:如果我没有设置参数'app.aggregator.message store type'aggregator不抛出exoption,但是app'log'打印:[“yq==”,“yg==”,“yw==”]似乎是解码错误。为什么

1tu0hz3e

1tu0hz3e1#

请在github中针对聚合器应用打开一个问题。
消息存储使用java序列化来存储消息;这个 Consumer 聚合前应过滤掉标头;它不可序列化。

相关问题