Kafka flink流式作业是否在作业运行之间保持其键值状态?

nuypyhwy  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(160)

我们 的 用例 是 我们 想 使用 flink 流 来 执行 去 重复 任务 , 它 从 源 ( kafka 主题 ) 读取 数据 , 并 将 唯一 记录 写入 hdfs 文件 接收 器 。 kafka 主题 可能 有 重复 数据 , 可以 使用 复合 键 ( adserver _ id , 记录 的 unix _ timestamp ) 来 识别 这些 数据 。
所以 我 决定 使用 flink 键 控 状态 流 来 实现 重复 数据 删除 。

val messageStream: DataStream[String] = env.addSource(flinkKafkaConsumer)

messageStream
  .map{
    record =>
      val key = record.adserver_id.get + record.event_timestamp.get
      (key,record)
  }
  .keyBy(_._1)
  .flatMap(new DedupDCNRecord())
  .map(_.toString)
  .addSink(sink)

  // execute the stream
  env.execute(applicationName)
}

中 的 每 一 个
下面 是 使用 flink 中 的 value state 执行 重复 数据 消除 的 代码 。

class DedupDCNRecord extends RichFlatMapFunction[(String, DCNRecord), DCNRecord] {
  private var operatorState: ValueState[String] = null

  override def open(configuration: Configuration) = {
    operatorState = getRuntimeContext.getState(
      DedupDCNRecord.descriptor
    )
  }

  @throws[Exception]
  override def flatMap(value: (String,DCNRecord), out: Collector[DCNRecord]): Unit = {

    if (operatorState.value == null) { // we haven't seen the element yet
      out.collect(value._2)
      // set operator state to true so that we don't emit elements with this key again
      operatorState.update(value._1)
    }
  }
}

格式
虽然 只要 流 作业 正在 运行 , 并 通过 valueState 维护 唯一 键 列表 和 执行 重复 数据 删除 , 这种 方法 就 可以 正常 工作 。( 在 上次 运行 作业 时 看到 的 唯一 键 )( 仅 保留 当前 运行 的 唯一 键 ) 并 让 记录 通过 , 这些 记录 已 在 作业 的 上 一 次 运行 中 处理 。 是否 有 方法 ,我们 可以 强制 flink 来 维护 它 的 valueState ( unique _ keys ) 。 感谢 您 的 帮助 。

fiei3ece

fiei3ece1#

这 要求 您 在 关闭 作业 之前 捕获 状态 的 快照 , 然后 从 该 快照 重新 启动 :
1.使用 保存 点 执行 停止 , 以 关闭 当前 作业 , 同时 获取 其 状态 的 快照 。
1.使用 保存 点 作为 起点 重新 启动 。
如 需 逐步 教育 课程 , 请 参阅 Flink Operations Playground 中 的 升级 和 重新 调整 工作 。 观察 失败 和 复原 的 章节 也 与 此处 相关 。

相关问题