java Kafka消费者的集成测试问题

x6yk4ghg  于 2023-04-28  发布在  Java
关注(0)|答案(1)|浏览(98)

编辑:现在只有当我使用localhost:9092作为引导服务器时,测试才能通过,这是我运行Kafka/zookeeper docker镜像的默认端口。
我的集成测试需要帮助,该测试测试Kafka有效负载是否被消耗。我正在为KafkaContainer和MysqlContainer使用测试容器。测试的目标是确保SUT能够正确地消耗Kafka产品的有效载荷,但在测试过程中消耗者永远不会被击中。尽管在测试过程中没有正确工作,但代码在本地运行时可以正常工作。
到目前为止,我已经尝试使用EmbeddedKafkaBroker,但它不起作用,我还尝试更改消息主题。我知道问题出在我的配置中,但我不能确切地把我的手指放在它。下面是www. example www.example.com 、SUT、测试类和日志将按特定顺序附加。
另外,我使用userRepository进行Assert的原因是,每当使用有效负载时,它都会保存到存储库中,我想检查保存的用户是否存在。

应用测试。属性

# Kafka Properties
spring.kafka.topic.name=tappedtechnologies.emails.recipients

# Kafka Consumer Properties
spring.kafka.consumer.group-id=userId
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

SUT(RecipientPayloadConsumer)

@Slf4j
@Service
@RequiredArgsConstructor
public class RecipientPayloadConsumer implements KafkaConsumer<RecipientSavedEvent> {

    private final Creator<User> userCreator;

    @Override
    @KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
    public void consumePayload(@Payload RecipientSavedEvent payload) {
        log.info("Payload received from 'email-service': {}", payload);
        User userToSave = this.convertPayloadToUser(payload);
        try {
            userCreator.create(userToSave);
        } catch (IllegalStateException exception) {
            log.error("IllegalStateException caught attempting to save payload from email-service.");
            log.error("Message: {}", exception.getMessage());
            throw exception;
        }
    }

    private User convertPayloadToUser(RecipientSavedEvent payload) {
        return User.builder()
                .firstName(payload.getFirstName())
                .lastName(payload.getLastName())
                .email(payload.getEmail())
                .build();
    }
}

收件人PayloadConsumerTest

@Testcontainers
@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {

    private static KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private UserRepository userRepository;
    @Autowired
    private RecipientPayloadConsumer sut;

    @Container
    public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
    @Container
    public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @DynamicPropertySource
    public static void setProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
        registry.add("spring.datasource.username", mySQLContainer::getUsername);
        registry.add("spring.datasource.password", mySQLContainer::getPassword);
    }

    @BeforeAll
    public static void setUp() {
        Map<String, Object> producerProps = new HashMap<>();
        producerProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");

        kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
        kafkaTemplate.setDefaultTopic("tappedtechnologies.emails.recipients");

        kafkaContainer.start();
        mySQLContainer.start();
    }

    @AfterEach
    public void afterEach() {
        userRepository.deleteAll();
    }

    @AfterAll
    public static void tearDown() {
        kafkaContainer.stop();
        mySQLContainer.stop();
    }

    @Test
    public void consumePayload_Should_SavePayload() throws InterruptedException {
        RecipientSavedEvent expected = getPayload();
        kafkaTemplate.sendDefault(expected.getPayloadKey(), expected.toString());
        Thread.sleep(5000);

        User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);

        assertThat(actual).isNotNull();
        assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
        assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
        assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
    }

    private RecipientSavedEvent getPayload() {
        return RecipientSavedEvent.builder()
                .payloadKey(UUID.randomUUID().toString())
                .firstName("Randy")
                .lastName("Marsh")
                .email("randy@tegridyfarms.com")
                .build();
    }
}

日志

2023-04-23T21:07:12.300-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-04-23T21:07:12.324-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.3.2
2023-04-23T21:07:12.324-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: b66af662e61082cb
2023-04-23T21:07:12.324-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [    Test worker] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1682309232324
2023-04-23T21:07:12.993-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:12.994-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: Im2Ds_vjQq289WTPzltJlA
2023-04-23T21:07:13.018-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-04-23T21:07:13.150-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.289-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 5 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.424-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 6 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.550-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.667-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.789-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.911-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.609-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.754-07:00  WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.934-07:00  INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition tappedtechnologies.emails.recipients-0 to 0 since the associated topicId changed from null to mmZzWI7DSkGiNcBk1J0Lmw
3pvhb19x

3pvhb19x1#

在@M的帮助下代努姆我解决了我面临的问题很简单。我使用@Autowired从上下文注入KafkaTemplate,并删除了整个@BeforeAll设置。通过删除@BeforeAll设置,我必须使用DynamicPropertyRegistryKafkaContainer来将spring.kafka.bootstrap-server添加到属性中。
下面是调整后的代码。

应用测试。属性

# Kafka Properties
spring.kafka.topic.name=tappedtechnologies.test.topics

# Kafka Consumer Properties
spring.kafka.consumer.group-id=testId
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.type.mapping=event:com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer

# Kafka Producer Properties
# -- NEWLY ADDED -- #
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

RecipientPayloadConsumerTests

@Testcontainers
@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {

    private static final String TOPIC = "tappedtechnologies.test.topics";

    @Autowired
    private UserRepository userRepository;
    @Autowired
    private KafkaTemplate<String, RecipientSavedEvent> kafkaTemplate;

    @Container
    public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
    @Container
    public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @DynamicPropertySource
    public static void setProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
        registry.add("spring.datasource.username", mySQLContainer::getUsername);
        registry.add("spring.datasource.password", mySQLContainer::getPassword);

        registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
    }

    @AfterEach
    public void afterEach() {
        userRepository.deleteAll();
    }

    @Test
    public void consumePayload_Should_SavePayload() throws InterruptedException {
        RecipientSavedEvent expected = getPayload();
        kafkaTemplate.send(TOPIC, 0, Instant.now().getEpochSecond(), expected.getPayloadKey(), expected);
        Thread.sleep(5000);

        User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);

        assert actual != null;
        assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
        assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
        assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
    }

    private RecipientSavedEvent getPayload() {
        return RecipientSavedEvent.builder()
                .payloadKey(UUID.randomUUID().toString())
                .firstName("Randy")
                .lastName("Marsh")
                .email("randy@tegridyfarms.com")
                .build();
    }
}

相关问题