如何使用KafkaSource在java flink上提交偏移量

wko9yo5t  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(243)

我希望处理来自Kafka的消息,然后提交该消息,Flink使用并处理所有消息后,结束作业,使用任务管理器和心跳升级进程

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    KafkaSource<String> source = KafkaSource.<String>builder()
            .setBootstrapServers(address)
            .setTopics(inputTopic)
            .setGroupId(consumerGroup)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setProperty("enable.auto.commit", "true")
            .setProperty("commit.offsets.on.checkpoint", "true")
            .build();

    DataStream<String> stream = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
    ObjectMapper mapper = new ObjectMapper();
    stream.map((value) -> {
9cbw7uwe

9cbw7uwe1#

如果要停止作业,则应将其设置为批处理作业,而不是流作业。以下是详细信息:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
至于将记录提交给Kafka broker,它是通过flink在每个成功的检查点/保存点上自动完成的,因此您不必在这方面做任何事情。

相关问题