我正在开发一个蒸汽应用程序poc,其中我从kafka生产者那里获取消息,在spark结构化蒸汽消费者中,我获取这些主题并将其存储在delta表中 option("checkpointLocation", checkPointdir)
. 我的问题是如何读取此位置以获取最新偏移量,如果流失败并传递到起始偏移量。option(“startingoffsets”,readvalue)时,将处理该偏移量
我浏览了下面的参考资料,但没有太多的线索如何读取s3的值,或者我必须编写一个单独的scala程序来读取s3的值。https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html 我的偏移文件如下所示
v1
{"batchWatermarkMs":0,"batchTimestampMs":1594923737216,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"json-topic":{"0":41}}
任何线索都会有帮助
暂无答案!
目前还没有任何答案,快来回答吧!