我 使用 EmbeddedKafkaBroker 编写 了 一 个 简单 的 测试 , 我 创建 了 一 个 测试 生成 器 并 发送 了 一 条 消息 , 但是 我 的 KafkaListener 没有 被 触发 , 所以 测试 每次 都 失败 。 有 没有 办法 测试 我 的 Kafka 消费 者 , 这样 我 就 可以 确保 测试 代码 的 覆盖 率 ? 我 希望 我 的 伪 生成 器 ( producerTest ) 从 测试 类 内部 触发 我 的 " 真正 " Kafka 消费 者 并 处理 消息 。
Kafka 消费 者 :
@Component
@EnableAutoConfiguration
@Slf4j
public class MyKafkaListener {
@KafkaListener(topics = "${kafka.topic.name}")
public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
try {
log.info("Reading message: " + message);
//do stuff, process message
} catch (Exception e) {
log.error("Error while reading message from topic", e);
}
}
}
中 的 每 一 个
我 的 测试 类 :
@Slf4j
@ExtendWith(SpringExtension.class)
@ActiveProfiles("local")
@TestInstance(PER_CLASS)
@EmbeddedKafka(topics = { "test-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@RunWith(SpringRunner.class)
@DirtiesContext
@Disabled
@SpringBootTest
public class MyKafkaListenerTest {
private KafkaTemplate<String, String> producer;
public static final String TEST_KEY = "x";
public static final String TOPIC = "test-topic";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void myKafkaListener_success_test() throws InterruptedException, ExecutionException {
//insert object first so I can later assert that it was modified after receiving message from producer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
log.info("props {}", producerProps);
Producer<String, String> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producerTest.send(new ProducerRecord(TOPIC, "", TEST_KEY));
Thread.sleep(5000);
//Assertions.assertNull(condition to assert message has been processed);
producerTest.close();
}
格式
我 试 着 调试 我 的 代码 , Kafka 监听 器 没有 被 触发 , 这 是 我 的 测试 应用 程序 。 yaml :
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
jaas:
enabled: true
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="{USERNAME}" password="{PASSWORD}";
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
request:
timeout:
ms: 20000
group-id: kafka-list-app
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 10
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
request:
timeout:
ms: 20000
max:
in:
flight:
requests:
per:
connection: 1
kafka:
topic:
name: ${TOPIC_NAME:profil.topic-dev}
格式
我 也 总是 得到 以下 错误 :
Connection to node -1 (kubernetes.docker.internal/127.0.0.1:51131) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
格式
2条答案
按热度按时间o2gm4chl1#
首先,您需要从Test类中删除
@Disabled
。如果生成器没有发送任何内容,则不会调用消费者侦听器。我看到您的代理配置包含JAAS / SASL属性,而这些属性在测试中的生产者配置中缺失。根据错误,它无法验证客户端。也许您应该打印出
producerProps
并进行调试。要使用在配置文件中定义的生成器属性,需要使用
private KafkaTemplate<String, String> producer;
字段,而不是producerTest
局部变量此外,根据Sping Boot 文档,您的
properties
条目应如下所示要从Spring配置中为客户端添加身份验证,您需要在Producer和Consumer配置中的某个位置设置
sasl.jaas.config
。否则,您将继续收到auth username/password denied错误。pod7payv2#
如果代理不使用 SASL,并且您为
security.protocol
属性指定了SASL_SSL
或SASL_PLAINTEXT
(就像您在问题中的application.yaml
文件中所做的那样),则会收到此错误消息。尝试使用
PLAIN
(在您的特定情况下)或security.protocol
的SSL
将排除这种可能性。