我通过spark结构化流媒体使用kafka主题的数据,该主题有3个分区。由于spark structured streaming不允许显式提供group.id并为消费者分配一些随机id,因此我尝试检查消费者组id的using below kafka命令
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
下面是我的问题
1) 为什么会产生3个消费群体?是因为3个分区吗?
2) 有什么方法可以在spark应用程序中获得这些消费组名称吗?
3) 尽管我的spark应用程序还在运行,但过了一段时间,这些组名并没有出现在消费者组列表中。这是因为所有的数据都被spark应用程序使用了,而Kafka主题中没有更多的数据了吗?
4) 如果我对第3点的假设是正确的,那么如果新数据到达,它是否会创建一个新的消费组id,或者消费组的名称将保持不变?
下面是我的阅读流
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," {\""+topic+"\":[0]}")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
我在应用程序中有3个writestreams,如下所示
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
流式查询在代码中只使用一次,并且没有连接。
执行计划
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
2条答案
按热度按时间4zcjmb1e1#
group.id:kafka source将为每个查询自动创建一个唯一的组id。http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
r8uurelv2#
1) 为什么会产生3个消费群体?是因为3个分区吗?
当然不是。这只是个巧合。您似乎已经运行应用程序3次了,并且主题有3个分区。
让我们重新开始来支持它。
我删除了所有的消费群体,以确保我们重新开始。
我创建了一个有5个分区的主题。
我使用的代码如下:
当我运行上面的spark结构化流媒体应用程序时,我只创建了一个消费者组。
这是有意义的,因为所有spark处理应该使用与分区数量相同的kafka消费者,但是不管消费者的数量如何,应该只有一个消费者组(或者kafka消费者将使用所有记录,并且会有重复记录)。
2) 有什么方法可以在spark应用程序中获得这些消费组名称吗?
没有公共api,所以答案是否定的。
但是,您可以“黑客”spark,并在公共api下面找到使用以下行的内部kafka使用者:
或者更确切地说是这句话:
只要找到
KafkaMicroBatchReader
对于kafka数据源,请求KafkaOffsetReader
那就知道了groupId
. 这似乎是可行的。尽管我的spark应用程序还在运行,但过了一段时间,这些组名并没有出现在消费者组列表中。这是因为所有的数据都被spark应用程序使用了,而Kafka主题中没有更多的数据了吗?
这是否与kip-211有关:修改消费群体补偿的过期语义,即:
当到达与主题分区相关联的过期时间戳时,使用者组中主题分区的偏移量将过期。此过期时间戳通常受代理config offsets.retention.minutes的影响,除非用户重写该默认值并使用自定义保留。
4) 如果新数据到达,它是否会创建新的消费组id,或者消费组的名称将保持不变?
将保持不变。
此外,当用户组中至少有一个用户处于活动状态时,不能删除该用户组。