使用checkpointlocation偏移从kafka主题读取流的正确方法

kcrjzv8t  于 2021-05-19  发布在  Spark
关注(0)|答案(1)|浏览(743)

我正在尝试开发一个小的spark应用程序(使用scala)来读取来自kafka的消息(合流)并将它们写入hive表。除了一个重要的特性——在重新启动(提交)应用程序时管理偏移量外,其他一切都按预期工作。我很困惑。
从我的代码中删除:

def main(args: Array[String]): Unit = {

    val sparkSess = SparkSession
      .builder
      .appName("Kafka_to_Hive")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse/")
      .config("hive.metastore.uris", "thrift://localhost:9083")
      .config("hive.exec.dynamic.partition", "true")
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    sparkSess.sparkContext.setLogLevel("ERROR")

    // don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
    sparkSess.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )

    val kafkaDataFrame = sparkSess
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", 'localhost:9092')
      .option("group.id", 'kafka-to-hive-1')
      // ------>   which Kafka options do I need to set here for starting from last right offset to ensure completenes of data and "exactly once" writing?   <--------
      .option("failOnDataLoss", (false: java.lang.Boolean))
      .option("subscribe", 'some_topic')
      .load()

    import org.apache.spark.sql.functions._

    // don't consider this code block please, it's just a part of Confluent avro message deserializing adventures
    val valueDataFrame = kafkaDataFrame.selectExpr("""deserialize(value) AS message""")
    val df = valueDataFrame.select(
      from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
      .select("parsed_value.*")

    df.writeStream
      .foreachBatch((batchDataFrame, batchId) => {
        batchDataFrame.createOrReplaceTempView("`some_view_name`")
        val sqlText = "SELECT * FROM `some_view_name` a where some_field='some value'"
        val batchDataFrame_view = batchDataFrame.sparkSession.sql(sqlText);
        batchDataFrame_view.write.insertInto("default.some_hive_table")
      })
      .option("checkpointLocation", "/user/some_user/tmp/checkpointLocation")
      .start()
      .awaitTermination()
  }

问题(这些问题相互关联):
我需要申请哪些Kafka选项 readStream.format("kafka") 从spark应用程序每次提交的最后一个右偏移开始?
我是否需要手动读取checkpointlocation/offsets/latest\批处理文件的第3行,以查找要从kafka读取的最后一个偏移?我的意思是: readStream.format("kafka").option("startingOffsets", """{"some_topic":{"2":35079,"5":34854,"4":35537,"1":35357,"3":35436,"0":35213}}""") 阅读Kafka(合流)主题中的流的正确/方便的方法是什么(我没有考虑(Kafka的引擎)

u5rb5r59

u5rb5r591#

“我需要在readstream.format(“kafka”)上应用哪些kafka选项,以便在每次提交spark应用程序时从最后一个右偏移开始?”
你需要设置 startingOffsets=latest 清理检查点文件。
“我是否需要手动读取checkpointlocation/offsets/latest\批处理文件的第3行,以查找要从kafka读取的最后一个偏移?我的意思是:readstream.format(“kafka”).option(“startingoffsets”,“{”some\u topic“:{”2“:35079”,5“:34854”,4“:35537”,1“:35357”,3“:35436”,0“:35213}}”“)”
与第一个问题类似,如果将startingoffset设置为json字符串,则需要删除检查点文件。否则,spark应用程序将始终获取存储在检查点文件中的信息,并覆盖 startingOffsets 选项。
“阅读Kafka(合流)主题中的流的正确/方便的方式是什么(我没有考虑“Kafka的引擎”
询问“正确的方法”可能会得到基于意见的答案,因此在stackoverflow上是离题的。不管怎样,以我的经验来看,使用spark结构化流媒体已经是一种成熟的、适合生产的方法。然而,Kafka康奈克的研究也总是值得的。

相关问题