如何在结构化流媒体中为kafka数据源中的consumer group设置group.id?

oewdyzsn  于 2021-06-06  发布在  Kafka
关注(0)|答案(4)|浏览(781)

我想使用spark结构化流媒体来读取一个安全的Kafka。这意味着我需要强制一个特定的group.id。然而,如文件中所述,这是不可能的。不过,在databricks文档中https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl,它说这是可能的。这是否仅指azure群集?
另外,通过查看apache/spark repo的主分支的文档https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md,我们可以理解,这样的功能将在以后的spark版本中添加。您知道这样一个稳定的版本有什么计划吗?它允许设置consumer group.id?
如果没有,spark 2.4.0是否有解决方法来设置特定的consumer group.id?

7gyucuyw

7gyucuyw1#

自spark 3.0.0以来

根据结构化kafka集成指南,您可以提供consumergroup作为一个选项 kafka.group.id :

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("kafka.group.id", "myConsumerGroup")
  .load()

但是,spark不会提交回任何偏移量,因此您的ConsumerGroup的偏移量不会存储在kafka的内部主题\uu consumer\u偏移量中,而是存储在spark的检查点文件中。
能够设置 group.id 旨在使用基于角色的访问控制来处理kafka的最新功能授权,而您的consumergroup通常需要遵循命名约定。
spark 3.x应用程序设置的完整示例 kafka.group.id 在这里讨论和解决。

xdnvmnnf

xdnvmnnf2#

现在使用spark3.0,可以为kafka指定group.idhttps://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-特定配置

oogrdqng

oogrdqng3#

结构化流媒体指南似乎对此非常明确:
请注意,无法设置以下kafka参数,kafka源或接收器将引发异常:
group.id:kafka source将为每个查询自动创建一个唯一的组id。
auto.offset.reset:设置源选项startingoffset以指定从何处开始。

aurhwmvo

aurhwmvo4#

目前(v2.4.0)不可能。
您可以在apache spark项目中检查以下行:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l81 -生成组id
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l534 -在用于创建 KafkaConsumer 在主分支中,您可以找到允许设置前缀或特定group.id的修改
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l83 -基于组前缀生成group.id( groupidprefix )
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l543 -设置以前生成的groupid,如果 kafka.group.id 没有传入属性

相关问题