spring cloud pubsub无法发布到队列

eqzww0vc  于 2021-07-26  发布在  Java
关注(0)|答案(0)|浏览(324)

我有一个从队列监听并发布到多个队列的实现。因此,我实现了一个通用接口,通过传递不同主题的名称向它们发送消息。
假设:
我在用google云pubsub
spring cloud gcp依赖项版本:1.1.3.release
spring云依赖版本:greenwich.release
spring集成核心版本:5.1.7.0发布
spring cloud gcp starter发布子版本:1.1.3.release
我在干什么?我正在侦听一个队列并执行一些逻辑,然后手动将ack消息发送到同一个队列(可以看到下面方法中的consumer.ack()),并将另外两条消息发送到其他队列(电子邮件触发队列和对其他微服务的异步acking)。我做错了什么,我找不到办法,我找了很多,尝试了不同的东西。
Spring特性

spring.cloud.gcp:
  project-id: theProjectId
  credentials:
    location: XXX.json
    scopes: DEFAULT_SCOPES
  pubsub:
    enabled: true
    publisher:
      retry:
        total-timeout-seconds: 120
        initial-retry-delay-second: 5
    subscriber:
      executor-threads: 1
      initial-retry-delay-seconds: 20

堆栈跟踪:

org.springframework.cloud.gcp.pubsub.core.PubSubException: Sending Spring message failed.; nested exception is org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalArgumentException, failedMessage=GenericMessage [payload=byte[105], headers={gcp_pubsub_acknowledgement=org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$1@37a56117, id=2141149b-b1fd-6f35-55a8-XXXXXXXX, gcp_pubsub_topic=xxx-dev, gcp_pubsub_original_message=PushedAcknowledgeablePubsubMessage{projectId='theprojectId', subscriptionName='xxx-dev', message=data: "{\"eventId\":22,\"serviceName\":\"nameService\",\"eventName\":\"VISIT_EVENT\",\"payload\":null,\"accountId\":3}"
attributes {
  key: "gcp_pubsub_topic"
  value: "xxx-dev"
}
message_id: "8542727XXXXXX"
publish_time {
  seconds: 1573833888
  nanos: 57000000
}
}, timestamp=1573845386667}]
    at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:157)
    at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:152)
    at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:438)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is java.lang.IllegalArgumentException

要不包括所有堆栈跟踪,其他重要行:

Caused by: java.lang.IllegalArgumentException: null

配置

@Configuration
public class GoogleCloud {

    private final GoogleProperties googleProperties;

    public GoogleCloud(final GoogleProperties googleProperties) {
        this.googleProperties = googleProperties;
    }

    @Bean
    @ServiceActivator(inputChannel = "pubsubOutputChannel")
    public MessageHandler messageSender(final PubSubOperations pubSubTemplate) {
        return new PubSubMessageHandler(pubSubTemplate, "doesNotMatter");
    }

    @Bean
    public PubSubInboundChannelAdapter scheduleEventsChannelAdapter(@Qualifier("schedulerChannel") final MessageChannel outputChannel,
                                                                    final PubSubOperations pubSubTemplate) {
        final PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, googleProperties.getExecuteEvent());
        adapter.setOutputChannel(outputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

    @Bean
    public Storage customStorage() {
        return StorageOptions.getDefaultInstance().getService();
    }
}

听众

@ServiceActivator(inputChannel = "schedulerChannel")
    public void handleMessage(final Message<String> message) {
        final ScheduledEvent scheduledEvent;
        try {
            scheduledEvent = objectMapper.readValue(message.getPayload(), ScheduledEvent.class);
        } catch (IOException e) {
            LOGGER.error("Error parsing message from queue");
            return;
        }

        BasicAcknowledgeablePubsubMessage consumer = (BasicAcknowledgeablePubsubMessage) message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE);
        if (isNull(consumer)) {
            LOGGER.error("Failed to get message headers");
            return;
        }

        var eventName = scheduledEvent.getEventName();
        var accountId = scheduledEvent.getAccountId();
        myAwesomeService.doStuff(acountId,eventName);

        consumer.ack();
        queuePublisher.send(googleProperties.getSchedulerAcknowledge(),
                queuePublisher.convertObjectToString(new AcknowledgeEvent(scheduledEvent.getEventId())));
    }

排队发布者

@Service
@Transactional
public class QueuePublisher implements PubSubOutBoundGateway {

    private static final Logger LOGGER = LoggerFactory.getLogger(QueuePublisher.class);

    private final ObjectMapper objectMapper;
    private final PubSubOutBoundGateway pubSubOutBoundGateway;

    public QueuePublisher(final ObjectMapper objectMapper,
                          final PubSubOutBoundGateway pubSubOutBoundGateway) {
        this.objectMapper = objectMapper;
        this.pubSubOutBoundGateway = pubSubOutBoundGateway;
    }

    @Override
    public void send(final String topic, final String message) {
        pubSubOutBoundGateway.send(topic, message);
    }

    public String convertObjectToString(final Object object) {
        try {
            return objectMapper.writeValueAsString(object);
        } catch (IOException e) {
            LOGGER.error("Error converting object to string:{}", e);
            throw new BadRequestException();
        }
    }
}

pubsuboutboundgateway网关

@MessagingGateway(defaultRequestChannel = "pubsubOutputChannel")
public interface PubSubOutBoundGateway {

    void send(@Header(GcpPubSubHeaders.TOPIC) String topic, String message);
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题