我正在尝试构建一个结构化的流式处理作业,从kafka读取数据并将数据写入kafka。我知道,默认情况下,类似结构化流存储偏移量和提交到检查点目录,它将用于触发器。
我正在用json{“topic1”:{“0”:23,“1”:-2},“topic2”:{“0”:-2}探索startingoffset选项。
如果我将当前批处理的偏移量存储到json变量中,我可以使用这些json偏移量来触发下一个间隔吗
pseudo code:
json_offset = {"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets",json_offset)
.load()
json_offset = df.select("offset").toJSON().collect()
这种结构化流式代码是否每次都会触发从最新的json\u偏移量存储的偏移量?或者它只会尝试使用初始json\U偏移量声明。这里这个json\u偏移量可以是redis-cache()
暂无答案!
目前还没有任何答案,快来回答吧!