如何在testcontainers中运行RabbitMQ broker向spring cloud stream创建的队列发送消息?

soat7uwm  于 2024-01-09  发布在  RabbitMQ
关注(0)|答案(1)|浏览(188)

我想为packlabel方法创建一个集成测试(pipe composed),基于Spring Cloud Function。我知道配置的输入绑定正在等待来自order-accepted exchange的order-accepted.dispatcher-service队列的消息,而Spring Cloud Stream会自动将消息发送到order-dispatched交换机。所以我只需要将消息发送到order-accepted.dispatcher-service队列,并检查从order-dispatched交换机接收到的消息。但是我不知道routing key,我如何才能将消息发送到order-accepted.dispatcher-service队列?下面是我到目前为止的配置文件和测试类代码:

  1. application.yml:
server:
  port: 9003

spring:
  application:
    name: dispatcher-service
  cloud:
    function:
      definition: pack|label
    stream:
      bindings:
        packlabel-in-0:
          destination: order-accepted
          group: ${spring.application.name}
        packlabel-out-0:
          destination: order-dispatched

  rabbitmq:
    host: localhost
    port: 5672
    username: user
    password: password
    connection-timeout: 5s

字符串
1.集成测试类(如何实现此集成测试代码?):

import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class DispatcherServiceApplicationTests {

    @Container
    static RabbitMQContainer rabbitMQ = new RabbitMQContainer(DockerImageName.parse("rabbitmq:3.10-management"));

    @DynamicPropertySource
    static void rabbitMQProperties(DynamicPropertyRegistry registry){
        registry.add("spring.rabbitmq.host", rabbitMQ::getHost);
        registry.add("spring.rabbitmq.port", rabbitMQ::getAmqpPort);
        registry.add("spring.rabbitmq.username", rabbitMQ::getAdminUsername);
        registry.add("spring.rabbitmq.password", rabbitMQ::getAdminPassword);
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    void contextLoads() {
    }

    @Test
    void packAndLabel(){
        long orderId = 121;

        Queue inputQueue = this.rabbitAdmin.declareQueue();
        assert inputQueue != null;
        Binding inputBinding = new Binding(inputQueue.getName(), Binding.DestinationType.QUEUE, "order-accepted", "#", null);

        Queue outputQueue = this.rabbitAdmin.declareQueue();
        assert outputQueue != null;
      
        Binding outputBinding = new Binding(outputQueue.getName(), Binding.DestinationType.QUEUE, "order-dispatched", "#", null);

        this.rabbitAdmin.declareBinding(inputBinding);
        this.rabbitAdmin.declareBinding(outputBinding);

        // I think this only send message to order-accepted exchange rather than order-accepted.dispatcher-service queue.
        rabbitTemplate.convertAndSend("order-accepted", "#", new OrderAcceptedMessage(orderId));

        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
            OrderDispatchedMessage message = rabbitTemplate.receiveAndConvert(outputQueue.getName(),
                10000, new ParameterizedTypeReference<OrderDispatchedMessage>(){});

            assert message != null;
            assertThat(message.orderId()).isEqualTo(orderId);
            System.out.println("------------------------------------: " + message.orderId());
        });
    }
}

h7appiyu

h7appiyu1#

我们自己使用测试容器,所以你可以看看我们在Rabbit中的一些测试。https://github.com/spring-cloud/spring-cloud-stream/blob/8f5e7d75f23f9463ecc4f1e8372685b1c01d97d4/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/integration/RabbitBinderModuleTests.java#L99

相关问题