用kafka的spark结构批处理作业管理偏移量

vi4fp9gy  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(688)

我有一个编写批处理作业的用例
我需要阅读Kafka的主题,并将数据记录到hdfs中。我的代码如下所示

val df: DataFrame = spark.read
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

df.write.
  parquet(buildPathWithCurrentBatchTime())

每次作业读取kafka主题时,它都从最早的偏移量开始,因此同一消息会被分多批记录。如何使我的作业从上一个作业示例读取的偏移量之后的偏移量开始读取消息。
我试图设置检查点的位置,组id,但没有帮助。
我不想使用流式查询。我有一个记录Kafka主题数据的简单用例。我没有任何延迟要求。唯一的要求是日记账上不能有任何副本。这是一个低优先级。如果我使用流式查询,它将一直使用执行器,这是浪费资源。所以我想分批做

2hh7jdfx

2hh7jdfx1#

您使用的是批处理查询而不是流式查询(可能是缺了点?)只是换个地方 readreadStream 以及 writewriteStream 对你有用。
编辑:op澄清了使用一次触发器是可以的,我刚刚更新了代码,使用一次触发器的结构化流(免责声明:我没有编译/运行代码,但更改适合结构化流媒体指南文档。)

val df: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "test-topic")
  .option("includeTimestamp", true)
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("group.id", "test-cg")
  .option("checkpointLocation", "/group/test/checkpointsDir")
  .load

val query = df.writeStream
  .format("parquet")
  .option("path", buildPathWithCurrentBatchTime())
  .trigger(Trigger.Once())
  .start()

query.awaitTermination()

相关问题