我有一个Flink批处理作业,它从Kafka中读取数据并写入S3。
从:时间戳到:时间戳。
所以我的Kafka消费者基本上是这样的:
KafkaSource.<T>builder()
.setBootstrapServers(resolvedBootstrapBroker)
.setTopics(List.of("TOPIC_0"))
.setGroupId(consumerGroupId)
.setStartingOffsets(OffsetsInitializer.timestamp(startTimeStamp))
.setValueOnlyDeserializer(deserializationSchema)
.setBounded(OffsetsInitializer.timestamp(endTimeStamp))
.setProperties(additionalProperties)
.build();
开始时间戳和结束时间戳的计算方式如下(从10天前到10小时前):
long startTimeStamp = Instant.now().minus(10, ChronoUnit.DAYS).toEpochMilli();
long endTimeStamp = Instant.now().minus(10, ChronoUnit.HOURS).toEpochMilli();
但是,这些记录不会写入S3。如果我只是将bounded
参数切换为:
.setBounded(OffsetsInitializer.latest())
它工作并写到S3。知道我可能做错了什么吗?
编辑:
我了解到它正在写部分文件。但是它没有将部分文件转换为完整文件。知道为什么会发生这种情况吗?
1条答案
按热度按时间6ie5vjzr1#
Flink的FileSink只在检查点操作期间提交结果(或者在批处理结束时对有界输入进行操作)。请参见文档中的注解:
重要事项:在流式处理模式下使用FileSink时,需要启用检查点。部件文件只能在成功的检查点上完成。如果禁用检查点,部件文件将永远处于进行中或挂起状态,并且下游系统无法安全读取。