首先,这与我重新运行flink consumer时kafka再次使用最新消息非常相似,但不同。这个问题的答案似乎不能解决我的问题。如果我在回答中漏掉了什么,那么请重新措辞,因为我显然漏掉了什么。
不过,问题是完全一样的——flink(Kafka连接器)重新运行了关闭前看到的最后3-9条消息。
我的版本
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
我的代码
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._
object Runner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");
val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}
我的sbt依赖项
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.1.2",
"org.apache.flink" %% "flink-streaming-scala" % "1.1.2",
"org.apache.flink" %% "flink-clients" % "1.1.2",
"org.apache.flink" %% "flink-connector-kafka-0.9" % "1.1.2",
"org.apache.flink" %% "flink-connector-filesystem" % "1.1.2"
)
我的过程
(3个端子)
TERM-1 start sbt, run program
TERM-2 create kafka topics testing-in and testing-out
TERM-2 run kafka-console-producer on testing-in topic
TERM-3 run kafka-console-consumer on testing-out topic
TERM-2 send data to kafka producer.
Wait for a couple seconds (buffers need to flush)
TERM-3 watch data appear in testing-out topic
Wait for at least 500 milliseconds for checkpointing to happen
TERM-1 stop sbt
TERM-1 run sbt
TERM-3 watch last few lines of data appear in testing-out topic
我的期望
当系统中没有错误时,我希望能够打开和关闭flink,而无需重新处理在前一次运行中成功完成流的消息。
我试图修复
我已将呼叫添加到 setStateBackend
,认为可能是默认的内存后端没有正确地记住。那似乎没用。
我已经取消了对的呼叫 enableCheckpointing
,希望也许在flink和zookeeper中有一个单独的机制来跟踪状态。那似乎没用。
我用过不同的Flume,rollingfilesink,print();希望虫子可能在Kafka。那似乎没用。
我已经返回到flink(和所有连接器)v1.1.0和v1.1.1,希望这个bug可能是最新版本的。那似乎没用。
我添加了 zookeeper.connect
配置到properties对象,希望关于它只在0.8中有用的注解是错误的。那似乎没用。
我已经明确地将检查点模式设置为 EXACTLY_ONCE
(好主意,B医生)。那似乎没用。
我的请求
救命啊!
2条答案
按热度按时间kzipqqlq1#
更新2:我用偏移量处理修复了这个bug,它被合并到了当前主程序中。
更新:不是问题,在取消作业之前使用手动保存点(感谢gordon)
我检查了日志,它似乎是一个错误的偏移处理。我根据法律提交了一份报告https://issues.apache.org/jira/browse/flink-4618. 当我得到反馈时,我会更新这个答案。
xn1cxnb42#
(我已经在jira上发布了相同的回复,只是在这里交叉发布了相同的回复)
根据您的描述,我假设您正在手动关闭作业,然后重新提交,对吗?
除非使用保存点,否则flink不会在手动作业重新启动时完全保留一次(https://ci.apache.org/projects/flink/flink-docs-master/setup/savepoints.html). “精确一次”保证是指当作业失败,然后从以前的检查点自动恢复自身时(当启用检查点时,就像您使用env.enablecheckpointing(500)时所做的那样)
实际情况是,当您手动重新提交作业时,kafka使用者只需开始读取zk/kafka中提交的现有偏移量。这些偏移量在您第一次执行作业时提交给zk/kafka。然而,它们并没有用于Flink的精确一次语义;flink使用内部检查Kafka偏移量。Kafka消费者将这些补偿重新提交给zk,仅仅是为了向外界公开工作消费进展的度量(wrt flink)。