EmbeddedKafka身份验证失败,原因是:使用客户端机制PLAIN的意外握手请求,启用的机制为[]

smtd7mpg  于 2022-11-21  发布在  Apache
关注(0)|答案(2)|浏览(748)

我 使用 EmbeddedKafkaBroker 编写 了 一 个 简单 的 测试 , 我 创建 了 一 个 测试 生成 器 并 发送 了 一 条 消息 , 但是 我 的 KafkaListener 没有 被 触发 , 所以 测试 每次 都 失败 。 有 没有 办法 测试 我 的 Kafka 消费 者 , 这样 我 就 可以 确保 测试 代码 的 覆盖 率 ? 我 希望 我 的 伪 生成 器 ( producerTest ) 从 测试 类 内部 触发 我 的 " 真正 " Kafka 消费 者 并 处理 消息 。
Kafka 消费 者 :

@Component
@EnableAutoConfiguration
@Slf4j
public class MyKafkaListener {

    @KafkaListener(topics = "${kafka.topic.name}")
    public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
        try {
            log.info("Reading message: " +  message);
            //do stuff, process message
        } catch (Exception e) {
            log.error("Error while reading message from topic", e);
        }

    }

}

中 的 每 一 个
我 的 测试 类 :

@Slf4j
@ExtendWith(SpringExtension.class)
@ActiveProfiles("local")
@TestInstance(PER_CLASS)
@EmbeddedKafka(topics = { "test-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@RunWith(SpringRunner.class)
@DirtiesContext
@Disabled
@SpringBootTest
public class MyKafkaListenerTest {

    private KafkaTemplate<String, String> producer;

    public static final String TEST_KEY = "x";
    public static final String TOPIC = "test-topic";

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void myKafkaListener_success_test() throws InterruptedException, ExecutionException {
        //insert object first so I can later assert that it was modified after receiving message from producer

        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
        log.info("props {}", producerProps);
        Producer<String, String> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
        producerTest.send(new ProducerRecord(TOPIC, "", TEST_KEY));
        Thread.sleep(5000);
        //Assertions.assertNull(condition to assert message has been processed);
        producerTest.close();

}

格式
我 试 着 调试 我 的 代码 , Kafka 监听 器 没有 被 触发 , 这 是 我 的 测试 应用 程序 。 yaml :

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    jaas:
      enabled: true
    properties:
      security:
        protocol: SASL_PLAINTEXT
      sasl:
        mechanism: PLAIN
        jaas:
          config: org.apache.kafka.common.security.plain.PlainLoginModule  required username="{USERNAME}" password="{PASSWORD}";
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      properties:
        sasl:
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
        request:
          timeout:
            ms: 20000
      group-id: kafka-list-app

    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      retries: 10
      properties:
        sasl:
          mechanism: PLAIN
        security:
          protocol: SASL_PLAINTEXT
        request:
          timeout:
            ms: 20000
        max:
          in:
            flight:
              requests:
                per:
                  connection: 1

kafka:
  topic:
    name: ${TOPIC_NAME:profil.topic-dev}

格式
我 也 总是 得到 以下 错误 :

Connection to node -1 (kubernetes.docker.internal/127.0.0.1:51131) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

格式

o2gm4chl

o2gm4chl1#

首先,您需要从Test类中删除@Disabled。如果生成器没有发送任何内容,则不会调用消费者侦听器。
我看到您的代理配置包含JAAS / SASL属性,而这些属性在测试中的生产者配置中缺失。根据错误,它无法验证客户端。也许您应该打印出producerProps并进行调试。
要使用在配置文件中定义的生成器属性,需要使用private KafkaTemplate<String, String> producer;字段,而不是producerTest局部变量
此外,根据Sping Boot 文档,您的properties条目应如下所示

properties:
    [sasl.mechanism]: PLAIN
    [security.protocol]: SASL_PLAINTEXT
    [request.timeout.ms]: 20000
    [max.in.flight.requests.per.connection]: 1

要从Spring配置中为客户端添加身份验证,您需要在Producer和Consumer配置中的某个位置设置sasl.jaas.config。否则,您将继续收到auth username/password denied错误。

properties:
    [sasl.jaas.config]: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";'
pod7payv

pod7payv2#

如果代理不使用 SASL,并且您为security.protocol属性指定了SASL_SSLSASL_PLAINTEXT(就像您在问题中的application.yaml文件中所做的那样),则会收到此错误消息。
尝试使用PLAIN(在您的特定情况下)或security.protocolSSL将排除这种可能性。

相关问题