我想使用spark结构化流媒体来读取一个安全的Kafka。这意味着我需要强制一个特定的group.id。然而,如文件中所述,这是不可能的。不过,在databricks文档中https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl,它说这是可能的。这是否仅指azure群集?
另外,通过查看apache/spark repo的主分支的文档https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md,我们可以理解,这样的功能将在以后的spark版本中添加。您知道这样一个稳定的版本有什么计划吗?它允许设置consumer group.id?
如果没有,spark 2.4.0是否有解决方法来设置特定的consumer group.id?
4条答案
按热度按时间7gyucuyw1#
自spark 3.0.0以来
根据结构化kafka集成指南,您可以提供consumergroup作为一个选项
kafka.group.id
:但是,spark不会提交回任何偏移量,因此您的ConsumerGroup的偏移量不会存储在kafka的内部主题\uu consumer\u偏移量中,而是存储在spark的检查点文件中。
能够设置
group.id
旨在使用基于角色的访问控制来处理kafka的最新功能授权,而您的consumergroup通常需要遵循命名约定。spark 3.x应用程序设置的完整示例
kafka.group.id
在这里讨论和解决。xdnvmnnf2#
现在使用spark3.0,可以为kafka指定group.idhttps://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-特定配置
oogrdqng3#
结构化流媒体指南似乎对此非常明确:
请注意,无法设置以下kafka参数,kafka源或接收器将引发异常:
group.id:kafka source将为每个查询自动创建一个唯一的组id。
auto.offset.reset:设置源选项startingoffset以指定从何处开始。
aurhwmvo4#
目前(v2.4.0)不可能。
您可以在apache spark项目中检查以下行:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l81 -生成组id
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l534 -在用于创建
KafkaConsumer
在主分支中,您可以找到允许设置前缀或特定group.id的修改https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l83 -基于组前缀生成group.id(
groupidprefix
)https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/kafkasourceprovider.scala#l543 -设置以前生成的groupid,如果
kafka.group.id
没有传入属性