我正在尝试批量读取Kafka主题中的所有数据(在两个偏移值之间读取)并将它们加载到spark dataframes,而不使用 readStream
在Spark流中。
我的想法是:
我首先在查找最大偏移值的主题中获得数据行的总数。
我定义 step
,即每批数据的总数。
使用for循环,我从kafka主题设置中读取数据批 startingOffsets
以及 endingOffsets
参数。
这是我的代码(针对单个分区的主题),用于打印每批中的计数:
val maxOffsetValue = {
Process(s"kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname")
.!!
.split(":")
.last
.trim
.toInt
}
val step = 1000
for (i <- 0 until maxOffsetValue by step) {
val df: DataFrame = {
spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topicname")
.option("startingOffsets", s"""{"topicname":{"0":${i}}}""")
.option("endingOffsets", s"""{"topicname":{"0":${i+step}}}""")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.select(from_json(col("value"), dataSchema) as "data")
.select("data.*")
}
println(s"i: ${i}, i+step: ${i+step}, count: ${df.count()}")
}
但是,似乎 startingOffsets
以及 endingOffsets
不灵活,因为显然需要为每个分区指定所有偏移量索引,例如 {"0":${i}, "1": ${i}}}
如果有两个分区。
我的问题是:
有没有更好的方法来实现相同的结果,可能可以直接扩展到多分区主题?
有没有不使用shell命令读取最大偏移量的方法?
暂无答案!
目前还没有任何答案,快来回答吧!