Storm中Kafka喷口的无效GroupId异常

snz8szmq  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(322)

我已经定义了一个基本的Storm拓扑,其中包含Kafka的spout consumer(生产者是在Kafka的单独模块中创建的)。但是,当我运行应用程序时,我收到以下错误:

java.lang.RuntimeException: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
    at org.apache.storm.utils.Utils$1.run(Utils.java:407) ~[storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
Caused by: org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.

如何设置组ID?我正在本地运行Storm 2.1.0版本。
下面是拓扑的代码:

val cluster = new LocalCluster()

val bootstrapServers = "localhost:9092"
val brokerHosts = new ZkHosts(bootstrapServers)
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "tweets").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
zsohkypk

zsohkypk1#

应将setProp(java.lang.String, java.lang.Object)ConsumerConfig.GROUP_ID_CONFIG一起使用,以便在KafkaSpoutConfig上添加使用者组ID

相关问题