如何在spark结构化流媒体中手动设置group.id并提交kafka偏移量?

6psbrbz9  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(705)

我正在浏览spark结构化流媒体-Kafka集成指南。
在这个环节上有人说
enable.auto.commit:kafka源不提交任何偏移量。
那么,如何在spark应用程序成功处理每条记录后手动提交偏移量呢?

qojgxg4l

qojgxg4l1#

自spark 3.0.0以来

根据结构化kafka集成指南,您可以提供consumergroup作为一个选项 kafka.group.id :

  1. val df = spark
  2. .readStream
  3. .format("kafka")
  4. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  5. .option("subscribe", "topic1")
  6. .option("kafka.group.id", "myConsumerGroup")
  7. .load()

但是,spark仍然不会提交任何偏移量,因此您将无法“手动”将偏移量提交给kafka。此功能旨在使用基于角色的访问控制来处理kafka的最新功能授权,而您的consumergroup通常需要遵循命名约定。
本文讨论并解决了spark 3.x应用程序的完整示例。

直到spark 2.4.x

spark社区似乎正在讨论这个特性https://github.com/apache/spark/pull/24613.
在这个pull请求中,您还可以找到一个可能的解决方案https://github.com/heartsavior/spark-sql-kafka-offset-committer.
目前,spark structured streaming+kafka集成文档清楚地说明了它是如何管理kafka偏移的。管理补偿的最重要的Kafka配置是:
group.id:kafka source将为每个查询自动创建一个唯一的组id。根据代码,group.id将被设置为

  1. val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

auto.offset.reset:设置源选项startingoffset以指定从何处开始。结构化流媒体管理哪些偏移量是内部消耗的,而不是依赖Kafka消费者来完成。
enable.auto.commit:kafka源不提交任何偏移量。
因此,在结构化流媒体中,目前无法为kafka消费者定义自定义group.id,而结构化流媒体正在内部管理偏移量,并且不会提交回kafka(也不会自动提交)。

2.4.x生效

假设您有一个简单的spark结构化流媒体应用程序,可以读取和写入kafka,如下所示:

  1. // create SparkSession
  2. val spark = SparkSession.builder()
  3. .appName("ListenerTester")
  4. .master("local[*]")
  5. .getOrCreate()
  6. // read from Kafka topic
  7. val df = spark.readStream
  8. .format("kafka")
  9. .option("kafka.bootstrap.servers", "localhost:9092")
  10. .option("subscribe", "testingKafkaProducer")
  11. .option("failOnDataLoss", "false")
  12. .load()
  13. // write to Kafka topic and set checkpoint directory for this stream
  14. df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  15. .writeStream
  16. .format("kafka")
  17. .option("kafka.bootstrap.servers", "localhost:9092")
  18. .option("topic", "testingKafkaProducerOut")
  19. .option("checkpointLocation", "/home/.../sparkCheckpoint/")
  20. .start()

spark补偿管理

提交此应用程序并处理数据后,可以在checkpoint目录中找到相应的偏移量:
mycheckpointdir/偏移量/

  1. {"testingKafkaProducer":{"0":1}}

这里检查点文件中的条目确认分区的下一个偏移量 0 被消费是 1 . 这意味着应用程序已经处理了偏移量 0 从分区 0 主题的 testingKafkaProducer .
有关容错语义的更多信息,请参阅spark文档。

Kafka的抵销管理

然而,如文件所述,补偿款并没有退回Kafka。这可以通过执行 kafka-consumer-groups.sh Kafka装置。
./kafka/current/bin/kafka-consumer-groups.sh—引导服务器localhost:9092 --describe --组“spark-kafka-source-92ea6f85-[…]-driver-0”

  1. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  2. testingKafkaProducer 0 - 1 - consumer-1-[...] /127.0.0.1 consumer-1

kafka不知道此应用程序的当前偏移量,因为它从未提交过。

可能的解决方案

我在web上做了一些研究,发现您可以在的回调函数中提交偏移量 onQueryProgress 方法在自定义 StreamingQueryListener Spark。
由于我不会声称自己已经开发了这个,以下是帮助我理解的最重要的链接:
侦听器的代码示例
围绕抵销管理的探讨
streamingquerylistener概述

展开查看全部

相关问题