如何在springboot中测试kafka客户机配置

5us2dqdw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(537)

我有一个springboot应用程序,它使用kafka和ehcache在不同的微服务和示例之间执行缓存同步。我使用的是springboot2.2.4和kafka客户机的匹配版本。如何测试我的Kafka客户机是否与嵌入式Kafka正常工作。
我试过:
测试等级

@RunWith(SpringRunner.class)
@SpringBootTest()
@ActiveProfiles({"inmemory", "test", "kafka-test"})
@WebAppConfiguration
@DirtiesContext
public class CachePropagatorTest
{
   private static final String topic = "com.allstate.d3.sh.test.cache";
   //private static final String topic2 = "com.allstate.sh.test.alloc";

   //@Rule
   @ClassRule
   public static final EmbeddedKafkaRule embeddedKafkaRule;

   static
   {
      embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic);
      embeddedKafkaRule
              .getEmbeddedKafka().brokerListProperty("spring.kafka.bootstrap-servers");
   }

   //@Autowired
   private EmbeddedKafkaBroker embeddedKafka;

   @Autowired
   KafkaTemplate<String, CacheMessage> kafkaTemplate;

   @Autowired
   KafkaSHProperties properties;

   //@Autowired
   @SpyBean
   CachePropagator propagator;
   //CachePropagationHelper propagator;

   BlockingQueue<CacheMessage> records = new LinkedBlockingQueue<>();

   /* read sent messages  */
   Consumer<Integer, CacheMessage> consumer;

   private String topic1;

   @Before
   public void setUp() throws Exception
   {
      embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();

      topic1 = properties.getCacheTopic();
      assertThat(topic1, is(topic));
      //embeddedKafka.getEmbeddedKafka().addTopics(topic1);
      try { embeddedKafka.addTopics(topic1); }
      catch (KafkaException Ignored) { }

      Mockito.doAnswer(new Answer<Void>()
      {
         @Override
         public Void answer(InvocationOnMock invocation) throws Throwable
         {
            System.out.println("Cache Message Receive");
            records.add((CacheMessage) invocation.getArgument(0));
            return (Void)invocation.callRealMethod();
         }
      }).when(propagator).receive(ArgumentMatchers.any(),
                                  ArgumentMatchers.anyString());

      //prove raw template usage
      CacheMessage cm = new CacheMessage("Test","Test","put",
                                         true,"");
      kafkaTemplate.send(topic1, cm);

      Map<String, Object> consumerProps =
         KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(),
                                     "false", embeddedKafka);

      DefaultKafkaConsumerFactory<Integer, CacheMessage> cf =
           new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps);
      consumer = cf.createConsumer();
      embeddedKafka.consumeFromAllEmbeddedTopics(consumer);

      Set<String> topics = embeddedKafka.getTopics();
      assertThat(topics.size(),is(1) );
      assertThat(topics,hasItem(topic1) );

      //prove sent message received
      ConsumerRecord<Integer, CacheMessage> received =
              KafkaTestUtils.getSingleRecord(consumer, topic1, 30000);
      assertThat(received.value(), is("Test"));

   }

   @After
   public void tearDown() throws Exception { }

   @Test
   public void putExperiment() throws Exception
   {
      Date now = new Date();
      JsonNode emptyNode = new ObjectMapper().readTree("");
      List<BucketDetail> buckets = new ArrayList<>();
      buckets.add(new BucketDetail("99-1", "Kafka Bucket 1",
                                   0.5, emptyNode));
      buckets.add(new BucketDetail("99-2", "Kafka Bucket 2",
                                   0.5, emptyNode));
      buckets.add(new BucketDetail());

      ExperimentDetail exp = new ExperimentDetail("99", 1,
                                                  "KafkaTest",
                                                  "SH_TEST_PROFILE_9",
                                                  buckets, LifecycleStage.CONFIGURED,
                                                  now, null, "Mete Test Notes");

      propagator.putExperiment(exp);

      //TODO: test the allocation was correct

      ConsumerRecord<Integer, CacheMessage> received =
              KafkaTestUtils.getSingleRecord(consumer, topic1, 10000);

      //TODO: how much should this verify in the message
      assertThat(received.value().getAction(), is("put"));
      assertThat(received.value().getItem().toString(),
                 containsString(exp.getExperimentID()));
   }
}

应用测试中的Kafka.yml

spring:
  kafka:
    bootstrap-servers: localhost:2181
    listener:
      #add topics after start
      missing-topics-fatal: false
    properties:
      sasl:
        kerberos:
          service:
            name: kafka
      security:
        protocol: SASL_PLAINTEXT
    consumer:
      properties:
        spring:
          json:
            trusted:
              packages: com.allstate.d3.sh.commons.messaging
      bootstrap-servers: localhost:2181
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    producer:
      bootstrap-servers: localhost:2181
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  profiles:
    active: inmemory,kafka-test

运行测试时,它在单元中失败 setup() 方法

java.lang.IllegalStateException: No records found for topic
    at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:187)
    at com.allstate.d3.sh.execution.event.CachePropagatorTest.setUp(CachePropagatorTest.java:169)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
    at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)

那我为什么找不到这个主题的发送记录呢?
更新
下面@quicksilver的回答指出了并行运行。我可以吗 @SpyBean CachePropagator propagator; 干扰我的考试。
cache Propertator的侦听器方法定义如下:

@KafkaListener(topics = "#{kafkaSHProperties.cacheTopic}",
               groupId = "#{kafkaSHProperties.cacheConsumptionGroup}")
public void receive(@Payload CacheMessage message,
                    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key)
{
   if (!envName.equals(message.getEnv())) { return; }
   log.info("07e9d084-1b8c-4c4c-b9be-9e7bb2716c3c -- Cache sync message: {}, {}, {}",
            key, message.getEnv(), message.getCacheName());
   processMessage(message);
}

那会是我的留言吗?如果是这样的话,他们不应该仍然可以在经纪人?如果不是的话,我可以改变这个设置吗?

ni65a41a

ni65a41a1#

你能告诉我你的密码里有哪些项目吗,
Kafka消费是在Kafka制作人之前启动的,即使在Kafka制作人出版唱片之后,它也会继续运行
如果Kafka消费者在Kafka制作者之后开始,那么它应该从开始偏移量开始轮询。
在我的机器上低于测试

@SpringBootTest
@RunWith(SpringRunner.class)
public class TestKafkaConfig {

    @ClassRule
    // By default it creates two partitions.
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC_NAME);

    private static String TOPIC_NAME = "testTopic";

    @Test
    public void testKafkaConfig() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(TOPIC_NAME, 0, 0, "ABC")).get();
        producer.send(new ProducerRecord<>(TOPIC_NAME, 0, 1, "XYZ")).get();

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testConsumer", "false", embeddedKafka);
        consumerProps.put("auto.offset.reset", "earliest");

        final List<String> receivedMessages = Lists.newArrayList();
        final CountDownLatch latch = new CountDownLatch(2);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
            kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
            try {
                while (true) {
                    ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
                    records.iterator().forEachRemaining(record -> {
                        receivedMessages.add(record.value());
                        latch.countDown();
                    });
                }
            } finally {
                kafkaConsumer.close();
            }
        });

        latch.await(10, TimeUnit.SECONDS);
        assertTrue(receivedMessages.containsAll(Arrays.asList("ABC", "XYZ")));
    }
}

相关问题