Spring Data 流-日志接收器-如何记录纯文本消息

ffscu2ro  于 2023-06-21  发布在  Spring
关注(0)|答案(1)|浏览(82)

我一直在遵循Spring教程来创建一个简单的HTTP摄取流:
https://dataflow.spring.io/docs/stream-developer-guides/getting-started/stream/
我定义并部署了一个流:
http --server.port=20100 | log
我通过cURL确认了流正在工作:
curl http://localhost:20100 -H "Content-type: text/plain" -d "Happy streaming"
在测试中,我注意到以'['字符开头的消息负载无法处理。日志接收器显示异常:
curl http://localhost:20100 -H "Content-type: text/plain" -d "[Happy streaming]"

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Happy': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"[Happy streaming]"; line: 1, column: 8]

我猜日志接收器需要一个带有JSON有效负载的消息。如何配置日志接收器以接受纯文本形式的消息?
谢谢你的帮助。我在下面包含了完整的堆栈跟踪。

  • 奥利弗
ERROR [log-sink,9faaa08056715918,f33480c6cbbcf4f4] 1062 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@79558290]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Happy': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"[Happy streaming]"; line: 1, column: 8], failedMessage=GenericMessage [payload=byte[17], headers={content-length=17, http_requestMethod=POST, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=http-ingest.http, accept=*/*, b3=9faaa08056715918-eec3d590a4684f37-0, nativeHeaders={}, kafka_offset=1, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3de9aa7c, host=localhost:20100, http_requestUrl=http://localhost:20100/, id=ebdf33b5-6ea2-7611-28e7-6b01a1fa6849, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTimestamp=1686352845917, kafka_groupId=http-ingest, user-agent=curl/7.87.0, timestamp=1686352848940}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Happy': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"[Happy streaming]"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:346)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
    at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723)
    at org.springframework.cloud.function.json.JacksonMapper.doFromJson(JacksonMapper.java:60)
    at org.springframework.cloud.function.json.JsonMapper.fromJson(JsonMapper.java:94)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fluxifyInputIfNecessary(SimpleFunctionRegistry.java:808)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:705)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:562)
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:790)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:622)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 32 more

2023-06-09 23:20:48.959 ERROR [log-sink,,] 1062 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for http-ingest.http-0@1

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@79558290]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Happy': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"[Happy streaming]"; line: 1, column: 8], failedMessage=GenericMessage [payload=byte[17], headers={content-length=17, http_requestMethod=POST, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=http-ingest.http, accept=*/*, b3=9faaa08056715918-eec3d590a4684f37-0, nativeHeaders={}, kafka_offset=1, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3de9aa7c, host=localhost:20100, http_requestUrl=http://localhost:20100/, id=ebdf33b5-6ea2-7611-28e7-6b01a1fa6849, kafka_receivedPartitionId=0, contentType=text/plain, kafka_receivedTimestamp=1686352845917, kafka_groupId=http-ingest, user-agent=curl/7.87.0, timestamp=1686352848940}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@79558290]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Happy': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"[Happy streaming]"; line: 1, column: 8]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.20.jar!/:5.3.20]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.20.jar!/:5.3.20]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.20.jar!/:5.3.20]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.20.jar!/:5.3.20]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) ~[spring-retry-1.3.3.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar!/:2.8.6]
    ... 11 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Happy': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"[Happy streaming]"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391) ~[jackson-core-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:745) ~[jackson-core-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3635) ~[jackson-core-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2734) ~[jackson-core-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902) ~[jackson-core-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794) ~[jackson-core-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:346) ~[jackson-databind-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) ~[jackson-databind-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) ~[jackson-databind-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323) ~[jackson-databind-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674) ~[jackson-databind-2.13.3.jar!/:2.13.3]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3723) ~[jackson-databind-2.13.3.jar!/:2.13.3]
    at org.springframework.cloud.function.json.JacksonMapper.doFromJson(JacksonMapper.java:60) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
    at org.springframework.cloud.function.json.JsonMapper.fromJson(JsonMapper.java:94) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.fluxifyInputIfNecessary(SimpleFunctionRegistry.java:808) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:705) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:562) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
    at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84) ~[spring-cloud-stream-3.2.4.jar!/:3.2.4]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:790) ~[spring-cloud-stream-3.2.4.jar!/:3.2.4]
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:622) ~[spring-cloud-stream-3.2.4.jar!/:3.2.4]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
    ... 32 common frames omitted
sg24os4d

sg24os4d1#

看起来像是将文本 Package 在"[...]"中是它试图将其处理为JSON的触发器。如果你使用-d "\[foo]",它可以工作,但是"\"在那个时候在有效负载中:(。此外,如果您改为执行-d '["foo"]',它将被视为有效的JSON字符串。
这可能是低级Spring Cloud Function代码中的一个bug。我将在Spring Cloud Function中创建一个问题。我将更新这个问题的答案,我们可以在那里进一步讨论。
谢谢

相关问题