quarkus-smallrye-kafka连接器:确认有效负载流

jv2fixgn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(1550)

我有以下制作人:

@Outgoing("platform-outcode-stats")
    public Multi<OutcodeStats> produceOutCodeStats() {
        return outcodeStatsResource
                .getUkOutCodeStats()
                .onFailure().retry().atMost(1)
                .onOverflow().buffer(100);
    }

将有效负载流发布到“平台输出代码统计”主题。如文档中所述,此方法只调用一次以检索发布者。到现在为止,一直都还不错。
我想创建一个单元测试来查看主题是否正确创建和填充。因此,我使用testcontainer以这种方式创建单个kafka服务器:

public class KafkaServerResource implements QuarkusTestResourceLifecycleManager {

KafkaContainer kafkaServer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.2.1"));

@Override
public Map<String, String> start() {
    kafkaServer.start();
    return Collections.singletonMap("kafka.bootstrap.servers", kafkaServer.getBootstrapServers());
}

@Override
public void stop() {
    kafkaServer.stop();
}

我创建了一个愚蠢的测试来看看会发生什么:

@Inject
@Channel("platform-outcode-stats")
@Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
Publisher<OutcodeStats> outcodeStats;

@Test
void topicShouldHaveStats(){
    Flowable.fromPublisher(outcodeStats).toList().blockingGet().forEach(el-> System.out.println(el.getOutcode()));
}

这个测试实际上什么都没有测试,但是我想看看控制台中是否打印了一些东西。好吧,因为没有消息确认,我让Kafka永远困在下面的日志里:

2020-10-24 18:00:21,173 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Subscribed to topic(s): platform-outcode-stats
2020-10-24 18:00:21,220 WARN  [org.apa.kaf.cli.NetworkClient] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Error while fetching metadata with correlation id 3 : {platform-outcode-stats=LEADER_NOT_AVAILABLE}
2020-10-24 18:00:21,220 INFO  [org.apa.kaf.cli.Metadata] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Cluster ID: Yuo_aAylRgWyqJWKPV-zHQ
2020-10-24 18:00:21,311 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Discovered group coordinator localhost:32993 (id: 2147483646 rack: null)
2020-10-24 18:00:21,313 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] (Re-)joining group
2020-10-24 18:00:21,348 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2020-10-24 18:00:21,348 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] (Re-)joining group
2020-10-24 18:00:21,376 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Finished assignment for group at generation 1: {consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7=Assignment(partitions=[platform-outcode-stats-0])}
2020-10-24 18:00:21,416 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Successfully joined group with generation 1
2020-10-24 18:00:21,420 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Adding newly assigned partitions: platform-outcode-stats-0
2020-10-24 18:00:21,433 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Found no committed offset for partition platform-outcode-stats-0
2020-10-24 18:00:21,447 INFO  [org.apa.kaf.cli.con.int.SubscriptionState] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Resetting offset for partition platform-outcode-stats-0 to offset 0.
2020-10-24 18:03:36,158 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Revoke previously assigned partitions platform-outcode-stats-0
2020-10-24 18:03:36,158 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] (vert.x-kafka-consumer-thread-0) [Consumer clientId=consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1, groupId=d34f1529-406f-4f84-b7e1-235df4ca94da] Member consumer-d34f1529-406f-4f84-b7e1-235df4ca94da-1-970f67b0-7968-4995-b2c7-2bba95daf5d7 sending LeaveGroup request to coordinator localhost:32993 (id: 2147483646 rack: null) due to the consumer is being closed
2020-10-24 18:03:36,200 INFO  [org.apa.kaf.cli.pro.KafkaProducer] (vert.x-worker-thread-1) [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2020-10-24 18:03:36,227 INFO  [io.sma.rea.mes.provider] (Quarkus Test Cleanup Shutdown task) SRMSG00207: Cancel subscriptions
2020-10-24 18:03:36,238 INFO  [io.quarkus] (Quarkus Test Cleanup Shutdown task) Quarkus stopped in 0.100s

当然,我在这里遗漏了一些东西,或者我只是在做一些我不该做的事情。任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题