Spring Boot Cloud PubSub [Sping Boot ]中的主题发送消息过多

ezykj2lf  于 2023-06-22  发布在  Spring
关注(0)|答案(1)|浏览(176)

我正在使用GCP和Cloud PubSub开发一个聊天服务器。
我想用Spring Integration从一个示例向Cloud PubSub中的主题发送消息,但它发送了太多消息。

  1. DEFAULT 2023-06-14T18:53:50.785528Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  2. DEFAULT 2023-06-14T18:53:50.785542Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  3. ERROR 2023-06-14T18:53:51.690026Z org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage], failedMessage=GenericMessage [payload={"roomId":1,"userId":1,"message":"zxcv"}, headers={replyChannel=nullChannel, errorChannel=, id=8e1e8628-36cf-ebd5-bd36-925668f6644c, timestamp=1686768830200}] at org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver.handleMissingValue(HeaderMethodArgumentResolver.java:111) at org.springframework.messaging.handler.annotation.support.AbstractNamedValueMethodArgumentResolver.resolveArgument(AbstractNamedValueMethodArgumentResolver.java:108) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper$HandlerMethod.invoke(MessagingMethodInvokerHelper.java:1075) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:558) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476) at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354) at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114) at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:95) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222) at org.springframework.integration.dispatcher.BroadcastingDispatcher.…
  4. DEFAULT 2023-06-14T18:53:52.215535Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  5. DEFAULT 2023-06-14T18:53:52.215551Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  6. DEFAULT 2023-06-14T18:53:52.355030Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  7. DEFAULT 2023-06-14T18:53:52.355046Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  8. DEFAULT 2023-06-14T18:53:52.516490Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  9. DEFAULT 2023-06-14T18:53:52.516506Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  10. DEFAULT 2023-06-14T18:53:52.662022Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  11. DEFAULT 2023-06-14T18:53:52.662037Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  12. DEFAULT 2023-06-14T18:53:52.789312Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  13. DEFAULT 2023-06-14T18:53:52.789327Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  14. DEFAULT 2023-06-14T18:53:52.991170Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  15. DEFAULT 2023-06-14T18:53:52.991186Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  16. DEFAULT 2023-06-14T18:53:53.115730Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  17. DEFAULT 2023-06-14T18:53:53.115747Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  18. DEFAULT 2023-06-14T18:53:53.241358Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  19. DEFAULT 2023-06-14T18:53:53.241373Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  20. DEFAULT 2023-06-14T18:53:53.358250Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  21. DEFAULT 2023-06-14T18:53:53.358270Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  22. DEFAULT 2023-06-14T18:53:53.504470Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  23. DEFAULT 2023-06-14T18:53:53.504486Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  24. DEFAULT 2023-06-14T18:53:53.637282Z 2023-06-15T03:53:53.631+09:00 WARN 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
  25. ERROR 2023-06-14T18:53:53.637335Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal
  26. DEFAULT 2023-06-14T18:53:53.637519Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  27. DEFAULT 2023-06-14T18:53:53.637531Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  28. DEFAULT 2023-06-14T18:53:53.649037Z 2023-06-15T03:53:53.648+09:00 INFO 1 --- [bscriber-SE-1-0] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
  29. DEFAULT 2023-06-14T18:53:53.786696Z 2023-06-15T03:53:53.784+09:00 WARN 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : failed to send operations
  30. ERROR 2023-06-14T18:53:53.786742Z com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were invalid. This could be because the acknowledgement ids have expired or the acknowledgement ids were malformed. at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:92) ~[gax-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:98) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:66) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[gax-grpc-2.23.3.jar!/:2.23.3] at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:67) ~[api-common-2.6.3.jar!/:2.6.3] at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038) ~[guava-31.1-jre.jar!/:na] at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808) ~[guava-31.1-jre.jar!/:na] at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:574) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544) ~[grpc-stub-1.53.0.jar!/:1.53.0] at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) ~[grpc-api-1.53.0.jar!/:1.53.0] at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) ~[grpc-api-1.53.0.jar!/:1.53.0] at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:541) ~[gax-grpc-2.23.3.jar!/:2.23.3] at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:576) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) ~[grpc-core-1.53.0.jar!/:1.53.0] at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal
  31. DEFAULT 2023-06-14T18:53:53.786965Z 2023-06-15T03:53:53.786+09:00 INFO 1 --- [bscriber-SE-1-1] c.g.c.p.v.StreamingSubscriberConnection : Permanent error invalid ack id message, will not resend
  32. DEFAULT 2023-06-14T18:53:53.827465Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  33. DEFAULT 2023-06-14T18:53:53.827481Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  34. DEFAULT 2023-06-14T18:53:53.973948Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  35. DEFAULT 2023-06-14T18:53:53.973962Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  36. DEFAULT 2023-06-14T18:53:54.116426Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  37. DEFAULT 2023-06-14T18:53:54.116443Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}
  38. DEFAULT 2023-06-14T18:53:54.237137Z send success : {"roomId":1,"userId":1,"message":"zxcv"}
  39. DEFAULT 2023-06-14T18:53:54.237158Z send fail : {"roomId":1,"userId":1,"message":"zxcv"}

这是该示例的代码。

  1. // Create an inbound channel adapter to listen to the subscription `chat-sub` and send
  2. // messages to the input message channel.
  3. @Bean
  4. fun inboundChannelAdapter(
  5. @Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
  6. pubSubTemplate: PubSubTemplate?
  7. ): PubSubInboundChannelAdapter? {
  8. val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
  9. adapter.outputChannel = messageChannel
  10. adapter.ackMode = AckMode.AUTO
  11. adapter.payloadType = String::class.java
  12. return adapter
  13. }
  14. // Define what happens to the messages arriving in the message channel.
  15. @ServiceActivator(inputChannel = "inputMessageChannel")
  16. fun messageReceiver(
  17. payload: String,
  18. @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
  19. ) {
  20. val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
  21. messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
  22. message.ack()
  23. //LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
  24. }
  25. // Create an outbound channel adapter to send messages from the input message channel to the
  26. // topic `chat`.
  27. @Bean
  28. @ServiceActivator(inputChannel = "inputMessageChannel")
  29. fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
  30. val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
  31. adapter.setSuccessCallback { ackId, message -> println("send success : " + message.payload) }
  32. adapter.setFailureCallback { cause, message -> println("send fail : " + message.payload) }
  33. return adapter
  34. }
  1. @MessagingGateway(defaultRequestChannel = "inputMessageChannel")
  2. interface PubSubOutBoundGateway {
  3. @kotlin.jvm.Throws(MessagingException::class)
  4. fun sendToPubsub(text : String)
  5. }

因此,我在客户端关闭示例时收到了大约200条消息。
第一个错误是

  1. org.springframework.messaging.MessageHandlingException: Missing header 'gcp_pubsub_original_message' for method parameter type [interface com.google.cloud.spring.pubsub.support.BasicAcknowledgeablePubsubMessage]

我以为这是消息重发的关键,所以我尝试添加GcpPubSubHeaders.ORIGINAL_MESSAGE头。但是我找不到实现BasicAcknowledgeablePubsubMessage来生成对象的类。
我现在错过了什么?
或者有什么方法可以在异常发生时停止重试?

umuewwlo

umuewwlo1#

这是因为发送到主题的消息返回到inputMessageChannel,因此messageSender适配器再次工作。
所以我把入站通道和出站通道分开,所以代码看起来像这样。

  1. @Bean
  2. fun inputMessageChannel(): MessageChannel? {
  3. return PublishSubscribeChannel()
  4. }
  5. @Bean
  6. fun outputMessageChannel(): MessageChannel? {
  7. return PublishSubscribeChannel()
  8. }
  9. // Create an inbound channel adapter to listen to the subscription `chat-sub` and send
  10. // messages to the input message channel.
  11. @Bean
  12. fun inboundChannelAdapter(
  13. @Qualifier("inputMessageChannel") messageChannel: MessageChannel?,
  14. pubSubTemplate: PubSubTemplate?
  15. ): PubSubInboundChannelAdapter? {
  16. val adapter = PubSubInboundChannelAdapter(pubSubTemplate, "chat-sub")
  17. adapter.outputChannel = messageChannel
  18. adapter.ackMode = AckMode.MANUAL
  19. adapter.payloadType = String::class.java
  20. return adapter
  21. }
  22. // Define what happens to the messages arriving in the message channel.
  23. @ServiceActivator(inputChannel = "inputMessageChannel")
  24. fun messageReceiver(
  25. payload: String,
  26. @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) message: BasicAcknowledgeablePubsubMessage
  27. ) {
  28. val model = gson.fromJson(payload, ChatMessageToPubSub::class.java)
  29. println("received message : " + message.pubsubMessage.data.toStringUtf8())
  30. println(message.pubsubMessage.attributesMap)
  31. message.ack()
  32. messagingTemplate.convertAndSend("/sub/message/" + model.roomId.toString(), ChatMessageToClient(model.userId, model.message))
  33. //LOGGER.info("Message arrived via an inbound channel adapter from chat-sub! Payload: $payload")
  34. }
  35. // Create an outbound channel adapter to send messages from the input message channel to the
  36. // topic `chat`.
  37. @Bean
  38. @ServiceActivator(inputChannel = "outputMessageChannel")
  39. fun messageSender(pubsubTemplate: PubSubTemplate?): MessageHandler? {
  40. val adapter = PubSubMessageHandler(pubsubTemplate, "chat")
  41. adapter.setSuccessCallback { ackId, message -> println("send success : " + ackId + " - " + message.payload) }
  42. adapter.setFailureCallback { cause, message -> println("send fail : " + cause.stackTraceToString() + " - " + message.payload) }
  43. adapter.setPublishTimeoutExpressionString("500")
  44. return adapter
  45. }
  1. @MessagingGateway(defaultRequestTimeout = "500", defaultRequestChannel = "outputMessageChannel", errorChannel = "errorChannel")
  2. interface PubSubOutBoundGateway {
  3. @kotlin.jvm.Throws(MessagingException::class, MessageHandlingException::class)
  4. fun sendToPubsub(text : String)
  5. }

那就成功了

展开查看全部

相关问题