多个StreamListener使用连接到kafka的spring cloud stream访问同一主题

vaj7vani  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(600)

我有SpringBoot应用程序,我正在使用SpringCloudStream连接到kafka。我正在尝试为同一个kafka主题设置两个独立的流侦听器方法。

@StreamListener("countries")
    @SendTo("aggregated-statistic")
    public KStream<?, AggregatedCountry> process(KStream<Object, Country> input) {
        return input
                .groupBy((key, value) -> value.getCountryCode())
                .aggregate(this::initialize,
                        this::aggregateAmount,
                        materializedAsPersistentStore("countries", Serdes.String(),
                                Serdes.serdeFrom(new JsonSerializer<>(),
                                        new JsonDeserializer<>(AggregatedCountry.class))))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, value));
    }
    @StreamListener("countries")
    @SendTo("daily-statistic")
    public KStream<?, List<DailyStatistics>> daily(KStream<Object, Country> input) {
        return input
                .groupBy((key, value) -> value.getCountryCode())
                .aggregate(this::initializeDailyStatistics,
                        this::dailyStatistics,
                        materializedAsPersistentStore("daily", Serdes.String(),
                                Serdes.serdeFrom(new JsonSerializer<>(),
                                        new JsonDeserializer<>(List.class))))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, value));
    }

但是当我启动springboot应用程序时,我得到了这个错误。

Exception in thread "kafka-stream-f4f8166b-cbeb-42ca-b461-2b3a23885a5d-StreamThread-1" java.lang.IllegalStateException: Consumer was assigned partitions [kafka-stream-daily-repartition-0] which didn't correspond to subscription request [kafka-stream-countries-repartition, countries]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

我想每个streamlistener方法都需要单独的应用程序id,但是如果我在听同一个主题,如何在application.yml文件中配置它呢?

qpgpyjmq

qpgpyjmq1#

您需要提供两个独立的输入绑定(这两个绑定都可以指向同一主题)。不能在多个服务器上使用相同的绑定名称 StreamListener s。然后您可以设置 application.id 对于多个 StreamListener 基于输入绑定的处理器。例如。

spring.cloud.stream.kafka.streams.bindings.countries1.consumer.applicationId

spring.cloud.stream.kafka.streams.bindings.countries2.consumer.applicationId

参见参考文件中的本节。

iqih9akk

iqih9akk2#

你读了两遍“国家”专题,如果你读一次“国家”,并将数据发送到“每日统计”和“汇总统计”会更好。
读两遍和并发处理是两码事。如果需要并发性,请配置此参数:

spring:
  cloud.stream:
    bindings:
      countries:
        destination: countries-topic
        consumer.concurrency: 6

您可以使用如下拓扑:

@StreamListener("countries")
@SendTo({"daily-statistic", "aggregated-statistic"})

相关问题