编辑:现在只有当我使用localhost:9092作为引导服务器时,测试才能通过,这是我运行Kafka/zookeeper docker镜像的默认端口。
我的集成测试需要帮助,该测试测试Kafka有效负载是否被消耗。我正在为KafkaContainer和MysqlContainer使用测试容器。测试的目标是确保SUT能够正确地消耗Kafka产品的有效载荷,但在测试过程中消耗者永远不会被击中。尽管在测试过程中没有正确工作,但代码在本地运行时可以正常工作。
到目前为止,我已经尝试使用EmbeddedKafkaBroker,但它不起作用,我还尝试更改消息主题。我知道问题出在我的配置中,但我不能确切地把我的手指放在它。下面是www. example www.example.com 、SUT、测试类和日志将按特定顺序附加。
另外,我使用userRepository
进行Assert的原因是,每当使用有效负载时,它都会保存到存储库中,我想检查保存的用户是否存在。
应用测试。属性
# Kafka Properties
spring.kafka.topic.name=tappedtechnologies.emails.recipients
# Kafka Consumer Properties
spring.kafka.consumer.group-id=userId
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.default.type=com.tappedtechnologies.userservice.events.RecipientSavedEvent
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
SUT(RecipientPayloadConsumer)
@Slf4j
@Service
@RequiredArgsConstructor
public class RecipientPayloadConsumer implements KafkaConsumer<RecipientSavedEvent> {
private final Creator<User> userCreator;
@Override
@KafkaListener(topics = "${spring.kafka.topic.name}", groupId = "${spring.kafka.consumer.group-id}")
public void consumePayload(@Payload RecipientSavedEvent payload) {
log.info("Payload received from 'email-service': {}", payload);
User userToSave = this.convertPayloadToUser(payload);
try {
userCreator.create(userToSave);
} catch (IllegalStateException exception) {
log.error("IllegalStateException caught attempting to save payload from email-service.");
log.error("Message: {}", exception.getMessage());
throw exception;
}
}
private User convertPayloadToUser(RecipientSavedEvent payload) {
return User.builder()
.firstName(payload.getFirstName())
.lastName(payload.getLastName())
.email(payload.getEmail())
.build();
}
}
收件人PayloadConsumerTest
@Testcontainers
@SpringBootTest
@TestPropertySource(locations = "classpath:application-test.properties")
public class RecipientPayloadConsumerTests {
private static KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private UserRepository userRepository;
@Autowired
private RecipientPayloadConsumer sut;
@Container
public static MySQLContainer<?> mySQLContainer = new MySQLContainer<>("mysql:latest");
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));
@DynamicPropertySource
public static void setProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", mySQLContainer::getJdbcUrl);
registry.add("spring.datasource.username", mySQLContainer::getUsername);
registry.add("spring.datasource.password", mySQLContainer::getPassword);
}
@BeforeAll
public static void setUp() {
Map<String, Object> producerProps = new HashMap<>();
producerProps.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.springframework.kafka.support.serializer.JsonSerializer");
kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
kafkaTemplate.setDefaultTopic("tappedtechnologies.emails.recipients");
kafkaContainer.start();
mySQLContainer.start();
}
@AfterEach
public void afterEach() {
userRepository.deleteAll();
}
@AfterAll
public static void tearDown() {
kafkaContainer.stop();
mySQLContainer.stop();
}
@Test
public void consumePayload_Should_SavePayload() throws InterruptedException {
RecipientSavedEvent expected = getPayload();
kafkaTemplate.sendDefault(expected.getPayloadKey(), expected.toString());
Thread.sleep(5000);
User actual = userRepository.findByEmail(expected.getEmail()).orElse(null);
assertThat(actual).isNotNull();
assertThat(actual.getFirstName()).isEqualTo(expected.getFirstName());
assertThat(actual.getLastName()).isEqualTo(expected.getLastName());
assertThat(actual.getEmail()).isEqualTo(expected.getEmail());
}
private RecipientSavedEvent getPayload() {
return RecipientSavedEvent.builder()
.payloadKey(UUID.randomUUID().toString())
.firstName("Randy")
.lastName("Marsh")
.email("randy@tegridyfarms.com")
.build();
}
}
日志
2023-04-23T21:07:12.300-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.3.2
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: b66af662e61082cb
2023-04-23T21:07:12.324-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ Test worker] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1682309232324
2023-04-23T21:07:12.993-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:12.994-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: Im2Ds_vjQq289WTPzltJlA
2023-04-23T21:07:13.018-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 0 with epoch 0
2023-04-23T21:07:13.150-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.289-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 5 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.424-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 6 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.550-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 7 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.667-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 8 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.789-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 9 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:13.911-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 10 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.609-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 11 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.754-07:00 WARN [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Error while fetching metadata with correlation id 12 : {tappedtechnologies.emails.recipients=LEADER_NOT_AVAILABLE}
2023-04-23T21:07:14.934-07:00 INFO [Tapped Technologies - User Service,,] 63697 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition tappedtechnologies.emails.recipients-0 to 0 since the associated topicId changed from null to mmZzWI7DSkGiNcBk1J0Lmw
1条答案
按热度按时间3pvhb19x1#
在@M的帮助下代努姆我解决了我面临的问题很简单。我使用
@Autowired
从上下文注入KafkaTemplate
,并删除了整个@BeforeAll
设置。通过删除@BeforeAll
设置,我必须使用DynamicPropertyRegistry
和KafkaContainer
来将spring.kafka.bootstrap-server
添加到属性中。下面是调整后的代码。
应用测试。属性
RecipientPayloadConsumerTests