我正在浏览spark结构化流媒体-Kafka集成指南。在这个环节上有人说enable.auto.commit:kafka源不提交任何偏移量。那么,如何在spark应用程序成功处理每条记录后手动提交偏移量呢?
qojgxg4l1#
根据结构化kafka集成指南,您可以提供consumergroup作为一个选项 kafka.group.id :
kafka.group.id
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .option("kafka.group.id", "myConsumerGroup") .load()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("kafka.group.id", "myConsumerGroup")
.load()
但是,spark仍然不会提交任何偏移量,因此您将无法“手动”将偏移量提交给kafka。此功能旨在使用基于角色的访问控制来处理kafka的最新功能授权,而您的consumergroup通常需要遵循命名约定。本文讨论并解决了spark 3.x应用程序的完整示例。
spark社区似乎正在讨论这个特性https://github.com/apache/spark/pull/24613.在这个pull请求中,您还可以找到一个可能的解决方案https://github.com/heartsavior/spark-sql-kafka-offset-committer.目前,spark structured streaming+kafka集成文档清楚地说明了它是如何管理kafka偏移的。管理补偿的最重要的Kafka配置是:group.id:kafka source将为每个查询自动创建一个唯一的组id。根据代码,group.id将被设置为
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
auto.offset.reset:设置源选项startingoffset以指定从何处开始。结构化流媒体管理哪些偏移量是内部消耗的,而不是依赖Kafka消费者来完成。enable.auto.commit:kafka源不提交任何偏移量。因此,在结构化流媒体中,目前无法为kafka消费者定义自定义group.id,而结构化流媒体正在内部管理偏移量,并且不会提交回kafka(也不会自动提交)。
假设您有一个简单的spark结构化流媒体应用程序,可以读取和写入kafka,如下所示:
// create SparkSessionval spark = SparkSession.builder() .appName("ListenerTester") .master("local[*]") .getOrCreate()// read from Kafka topicval df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "testingKafkaProducer") .option("failOnDataLoss", "false") .load()// write to Kafka topic and set checkpoint directory for this streamdf.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic", "testingKafkaProducerOut") .option("checkpointLocation", "/home/.../sparkCheckpoint/") .start()
// create SparkSession
val spark = SparkSession.builder()
.appName("ListenerTester")
.master("local[*]")
.getOrCreate()
// read from Kafka topic
val df = spark.readStream
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testingKafkaProducer")
.option("failOnDataLoss", "false")
// write to Kafka topic and set checkpoint directory for this stream
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.option("topic", "testingKafkaProducerOut")
.option("checkpointLocation", "/home/.../sparkCheckpoint/")
.start()
提交此应用程序并处理数据后,可以在checkpoint目录中找到相应的偏移量:mycheckpointdir/偏移量/
{"testingKafkaProducer":{"0":1}}
这里检查点文件中的条目确认分区的下一个偏移量 0 被消费是 1 . 这意味着应用程序已经处理了偏移量 0 从分区 0 主题的 testingKafkaProducer .有关容错语义的更多信息,请参阅spark文档。
0
1
testingKafkaProducer
然而,如文件所述,补偿款并没有退回Kafka。这可以通过执行 kafka-consumer-groups.sh Kafka装置。./kafka/current/bin/kafka-consumer-groups.sh—引导服务器localhost:9092 --describe --组“spark-kafka-source-92ea6f85-[…]-driver-0”
kafka-consumer-groups.sh
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-IDtestingKafkaProducer 0 - 1 - consumer-1-[...] /127.0.0.1 consumer-1
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testingKafkaProducer 0 - 1 - consumer-1-[...] /127.0.0.1 consumer-1
kafka不知道此应用程序的当前偏移量,因为它从未提交过。
我在web上做了一些研究,发现您可以在的回调函数中提交偏移量 onQueryProgress 方法在自定义 StreamingQueryListener Spark。由于我不会声称自己已经开发了这个,以下是帮助我理解的最重要的链接:侦听器的代码示例围绕抵销管理的探讨streamingquerylistener概述
onQueryProgress
StreamingQueryListener
1条答案
按热度按时间qojgxg4l1#
自spark 3.0.0以来
根据结构化kafka集成指南,您可以提供consumergroup作为一个选项
kafka.group.id
:但是,spark仍然不会提交任何偏移量,因此您将无法“手动”将偏移量提交给kafka。此功能旨在使用基于角色的访问控制来处理kafka的最新功能授权,而您的consumergroup通常需要遵循命名约定。
本文讨论并解决了spark 3.x应用程序的完整示例。
直到spark 2.4.x
spark社区似乎正在讨论这个特性https://github.com/apache/spark/pull/24613.
在这个pull请求中,您还可以找到一个可能的解决方案https://github.com/heartsavior/spark-sql-kafka-offset-committer.
目前,spark structured streaming+kafka集成文档清楚地说明了它是如何管理kafka偏移的。管理补偿的最重要的Kafka配置是:
group.id:kafka source将为每个查询自动创建一个唯一的组id。根据代码,group.id将被设置为
auto.offset.reset:设置源选项startingoffset以指定从何处开始。结构化流媒体管理哪些偏移量是内部消耗的,而不是依赖Kafka消费者来完成。
enable.auto.commit:kafka源不提交任何偏移量。
因此,在结构化流媒体中,目前无法为kafka消费者定义自定义group.id,而结构化流媒体正在内部管理偏移量,并且不会提交回kafka(也不会自动提交)。
2.4.x生效
假设您有一个简单的spark结构化流媒体应用程序,可以读取和写入kafka,如下所示:
spark补偿管理
提交此应用程序并处理数据后,可以在checkpoint目录中找到相应的偏移量:
mycheckpointdir/偏移量/
这里检查点文件中的条目确认分区的下一个偏移量
0
被消费是1
. 这意味着应用程序已经处理了偏移量0
从分区0
主题的testingKafkaProducer
.有关容错语义的更多信息,请参阅spark文档。
Kafka的抵销管理
然而,如文件所述,补偿款并没有退回Kafka。这可以通过执行
kafka-consumer-groups.sh
Kafka装置。./kafka/current/bin/kafka-consumer-groups.sh—引导服务器localhost:9092 --describe --组“spark-kafka-source-92ea6f85-[…]-driver-0”
kafka不知道此应用程序的当前偏移量,因为它从未提交过。
可能的解决方案
我在web上做了一些研究,发现您可以在的回调函数中提交偏移量
onQueryProgress
方法在自定义StreamingQueryListener
Spark。由于我不会声称自己已经开发了这个,以下是帮助我理解的最重要的链接:
侦听器的代码示例
围绕抵销管理的探讨
streamingquerylistener概述