将整个Kafka主题作为sparkDataframe分批阅读

okxuctiv  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(229)

我正在尝试批量读取Kafka主题中的所有数据(在两个偏移值之间读取)并将它们加载到spark dataframes,而不使用 readStream 在Spark流中。
我的想法是:
我首先在查找最大偏移值的主题中获得数据行的总数。
我定义 step ,即每批数据的总数。
使用for循环,我从kafka主题设置中读取数据批 startingOffsets 以及 endingOffsets 参数。
这是我的代码(针对单个分区的主题),用于打印每批中的计数:

val maxOffsetValue = {
    Process(s"kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topicname")
    .!!
    .split(":")
    .last
    .trim
    .toInt
}

val step = 1000

for (i <- 0 until maxOffsetValue by step) {
    val df: DataFrame = {
        spark
          .read
          .format("kafka")
          .option("kafka.bootstrap.servers", "localhost:9092") 
          .option("subscribe", "topicname")
          .option("startingOffsets", s"""{"topicname":{"0":${i}}}""")
          .option("endingOffsets", s"""{"topicname":{"0":${i+step}}}""")
          .load()
          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .as[(String, String)]
          .select(from_json(col("value"), dataSchema) as "data")
          .select("data.*")
        }

         println(s"i: ${i}, i+step: ${i+step}, count: ${df.count()}")
        }

但是,似乎 startingOffsets 以及 endingOffsets 不灵活,因为显然需要为每个分区指定所有偏移量索引,例如 {"0":${i}, "1": ${i}}} 如果有两个分区。
我的问题是:
有没有更好的方法来实现相同的结果,可能可以直接扩展到多分区主题?
有没有不使用shell命令读取最大偏移量的方法?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题