kafka&flink重新启动时复制消息

eit6fx6z  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(398)

首先,这与我重新运行flink consumer时kafka再次使用最新消息非常相似,但不同。这个问题的答案似乎不能解决我的问题。如果我在回答中漏掉了什么,那么请重新措辞,因为我显然漏掉了什么。
不过,问题是完全一样的——flink(Kafka连接器)重新运行了关闭前看到的最后3-9条消息。

我的版本

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91

我的代码

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}

我的sbt依赖项

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.1.2",
    "org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
    "org.apache.flink" %% "flink-clients" % "1.1.2",
    "org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
    "org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)

我的过程

(3个端子)

TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic

我的期望

当系统中没有错误时,我希望能够打开和关闭flink,而无需重新处理在前一次运行中成功完成流的消息。

我试图修复

我已将呼叫添加到 setStateBackend ,认为可能是默认的内存后端没有正确地记住。那似乎没用。
我已经取消了对的呼叫 enableCheckpointing ,希望也许在flink和zookeeper中有一个单独的机制来跟踪状态。那似乎没用。
我用过不同的Flume,rollingfilesink,print();希望虫子可能在Kafka。那似乎没用。
我已经返回到flink(和所有连接器)v1.1.0和v1.1.1,希望这个bug可能是最新版本的。那似乎没用。
我添加了 zookeeper.connect 配置到properties对象,希望关于它只在0.8中有用的注解是错误的。那似乎没用。
我已经明确地将检查点模式设置为 EXACTLY_ONCE (好主意,B医生)。那似乎没用。

我的请求

救命啊!

kzipqqlq

kzipqqlq1#

更新2:我用偏移量处理修复了这个bug,它被合并到了当前主程序中。
更新:不是问题,在取消作业之前使用手动保存点(感谢gordon)
我检查了日志,它似乎是一个错误的偏移处理。我根据法律提交了一份报告https://issues.apache.org/jira/browse/flink-4618. 当我得到反馈时,我会更新这个答案。

xn1cxnb4

xn1cxnb42#

(我已经在jira上发布了相同的回复,只是在这里交叉发布了相同的回复)
根据您的描述,我假设您正在手动关闭作业,然后重新提交,对吗?
除非使用保存点,否则flink不会在手动作业重新启动时完全保留一次(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). “精确一次”保证是指当作业失败,然后从以前的检查点自动恢复自身时(当启用检查点时,就像您使用env.enablecheckpointing(500)时所做的那样)
实际情况是,当您手动重新提交作业时,kafka使用者只需开始读取zk/kafka中提交的现有偏移量。这些偏移量在您第一次执行作业时提交给zk/kafka。然而,它们并没有用于Flink的精确一次语义;flink使用内部检查Kafka偏移量。Kafka消费者将这些补偿重新提交给zk,仅仅是为了向外界公开工作消费进展的度量(wrt flink)。

相关问题