集成测试中使用TestContainers的Kafka Consumer

gv8xihay  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(139)

在我的项目中,我为Kafka集成测试提供了测试容器。我可以生成消息并使用它们。对于使用,我有一个遵循CQRS的Kafka消费者(Command Query Responsibility Segregation)模式。与直接Assert来自消费者的返回值不同,我使用一个订阅Kafka主题并对传入消息作出React的处理程序。因此,如何在集成测试中有效地测试这个Kafka消费者呢

gcuhipw9

gcuhipw91#

我会使用一个库来处理类似Awaitility的等待消息,并Assert消息的处理会导致您想要的结果。
下面是我们在testcontainers/workshop中使用的测试的一部分:

given(requestSpecification)
                .body(new Rating(talkId, 5))
                .when()
                .post("/ratings")
                .then()
                .statusCode(202);

        await().untilAsserted(() -> {
            given(requestSpecification)
                    .queryParam("talkId", talkId)
                    .when()
                    .get("/ratings")
                    .then()
                    .body("5", is(1));
        });

字符串
它使用RestAssured来触发POST查询,这将启动处理,包括通过Kafka进行对话。您也可以以任何其他方式设置消息。
然后Awaititility.await将重复运行代码(您可以配置延迟、超时等),直到满足某个条件。在我们的例子中,评级的值在不同的端点上可见。(在应用程序的业务逻辑中,它们通过Kafka传播到那里。

相关问题