apache-kafka Flink批处理作业未从带有时间戳的Kafka中阅读

f45qwnt8  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(119)

我有一个Flink批处理作业,它从Kafka中读取数据并写入S3。
从:时间戳到:时间戳。
所以我的Kafka消费者基本上是这样的:

KafkaSource.<T>builder()
                .setBootstrapServers(resolvedBootstrapBroker)
                .setTopics(List.of("TOPIC_0"))
                .setGroupId(consumerGroupId)
                .setStartingOffsets(OffsetsInitializer.timestamp(startTimeStamp))
                .setValueOnlyDeserializer(deserializationSchema)
                .setBounded(OffsetsInitializer.timestamp(endTimeStamp))
                .setProperties(additionalProperties)
                .build();

开始时间戳和结束时间戳的计算方式如下(从10天前到10小时前):

long startTimeStamp = Instant.now().minus(10, ChronoUnit.DAYS).toEpochMilli();
        long endTimeStamp = Instant.now().minus(10, ChronoUnit.HOURS).toEpochMilli();

但是,这些记录不会写入S3。如果我只是将bounded参数切换为:

.setBounded(OffsetsInitializer.latest())

它工作并写到S3。知道我可能做错了什么吗?
编辑:
我了解到它正在写部分文件。但是它没有将部分文件转换为完整文件。知道为什么会发生这种情况吗?

6ie5vjzr

6ie5vjzr1#

Flink的FileSink只在检查点操作期间提交结果(或者在批处理结束时对有界输入进行操作)。请参见文档中的注解:
重要事项:在流式处理模式下使用FileSink时,需要启用检查点。部件文件只能在成功的检查点上完成。如果禁用检查点,部件文件将永远处于进行中或挂起状态,并且下游系统无法安全读取。

相关问题