我们 的 用例 是 我们 想 使用 flink 流 来 执行 去 重复 任务 , 它 从 源 ( kafka 主题 ) 读取 数据 , 并 将 唯一 记录 写入 hdfs 文件 接收 器 。 kafka 主题 可能 有 重复 数据 , 可以 使用 复合 键 ( adserver _ id , 记录 的 unix _ timestamp ) 来 识别 这些 数据 。
所以 我 决定 使用 flink 键 控 状态 流 来 实现 重复 数据 删除 。
val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
messageStream
.map{
record =>
val key = record.adserver_id.get + record.event_timestamp.get
(key,record)
}
.keyBy(_._1)
.flatMap(new DedupDCNRecord())
.map(_.toString)
.addSink(sink)
// execute the stream
env.execute(applicationName)
}
中 的 每 一 个
下面 是 使用 flink 中 的 value state 执行 重复 数据 消除 的 代码 。
class DedupDCNRecord extends RichFlatMapFunction[(String, DCNRecord), DCNRecord] {
private var operatorState: ValueState[String] = null
override def open(configuration: Configuration) = {
operatorState = getRuntimeContext.getState(
DedupDCNRecord.descriptor
)
}
@throws[Exception]
override def flatMap(value: (String,DCNRecord), out: Collector[DCNRecord]): Unit = {
if (operatorState.value == null) { // we haven't seen the element yet
out.collect(value._2)
// set operator state to true so that we don't emit elements with this key again
operatorState.update(value._1)
}
}
}
格式
虽然 只要 流 作业 正在 运行 , 并 通过 valueState 维护 唯一 键 列表 和 执行 重复 数据 删除 , 这种 方法 就 可以 正常 工作 。( 在 上次 运行 作业 时 看到 的 唯一 键 )( 仅 保留 当前 运行 的 唯一 键 ) 并 让 记录 通过 , 这些 记录 已 在 作业 的 上 一 次 运行 中 处理 。 是否 有 方法 ,我们 可以 强制 flink 来 维护 它 的 valueState ( unique _ keys ) 。 感谢 您 的 帮助 。
1条答案
按热度按时间fiei3ece1#
这 要求 您 在 关闭 作业 之前 捕获 状态 的 快照 , 然后 从 该 快照 重新 启动 :
1.使用 保存 点 执行 停止 , 以 关闭 当前 作业 , 同时 获取 其 状态 的 快照 。
1.使用 保存 点 作为 起点 重新 启动 。
如 需 逐步 教育 课程 , 请 参阅 Flink Operations Playground 中 的 升级 和 重新 调整 工作 。 观察 失败 和 复原 的 章节 也 与 此处 相关 。