kafka启动的结构化流媒体

2q5ifsrm  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(266)

我正在尝试构建一个结构化的流式处理作业,从kafka读取数据并将数据写入kafka。我知道,默认情况下,类似结构化流存储偏移量和提交到检查点目录,它将用于触发器。
我正在用json{“topic1”:{“0”:23,“1”:-2},“topic2”:{“0”:-2}探索startingoffset选项。
如果我将当前批处理的偏移量存储到json变量中,我可以使用这些json偏移量来触发下一个间隔吗

pseudo code:

json_offset = {"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets",json_offset)
  .load()

json_offset = df.select("offset").toJSON().collect()

这种结构化流式代码是否每次都会触发从最新的json\u偏移量存储的偏移量?或者它只会尝试使用初始json\U偏移量声明。这里这个json\u偏移量可以是redis-cache()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题