如何使保存点以正确的偏移量开始?我使用的是标准的kinesis接收器和一个指向s3路径的连续读卡器。我只使用了一次语义(检查点),我可以验证源的状态是否与每个保存点一起存储。
fafcakar1#
在引擎盖下,flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个子任务都由一个单独的实体来实现。。。单个监视任务的作用是扫描目录(定期或仅扫描一次,具体取决于watchtype),找到要处理的文件,将其拆分,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。监控功能处于 ContinuousFileMonitoringFunction ,并将其所监视的文件或目录的最长文件修改时间记录到快照中。每一个 CheckpointableInputFormat 对于正在读取的文件分割,记录它们的偏移量。我不知道为什么它的行为不符合您的预期,但是正如@yuvalitzchakov所提到的,当这个状态被恢复时,有一些有用的日志记录——如果您启用调试级别日志记录,则更是如此。看到这些taskmanager日志可能会发现发生了什么。如果您在切换到kinesis之前陷入困境,并且希望探索一种引导状态的替代方法,那么可以使用状态处理器api从这些文件中的数据创建保存点。或者您可以使用状态处理器api来检查已存在的保存点中的状态。
ContinuousFileMonitoringFunction
CheckpointableInputFormat
1条答案
按热度按时间fafcakar1#
在引擎盖下,flink将文件读取过程分为两个子任务,即目录监视和数据读取。这些子任务中的每一个子任务都由一个单独的实体来实现。。。单个监视任务的作用是扫描目录(定期或仅扫描一次,具体取决于watchtype),找到要处理的文件,将其拆分,并将这些拆分分配给下游读卡器。读者是那些将阅读实际数据的人。
监控功能处于
ContinuousFileMonitoringFunction
,并将其所监视的文件或目录的最长文件修改时间记录到快照中。每一个CheckpointableInputFormat
对于正在读取的文件分割,记录它们的偏移量。我不知道为什么它的行为不符合您的预期,但是正如@yuvalitzchakov所提到的,当这个状态被恢复时,有一些有用的日志记录——如果您启用调试级别日志记录,则更是如此。看到这些taskmanager日志可能会发现发生了什么。
如果您在切换到kinesis之前陷入困境,并且希望探索一种引导状态的替代方法,那么可以使用状态处理器api从这些文件中的数据创建保存点。或者您可以使用状态处理器api来检查已存在的保存点中的状态。