apache-flink:使用keyby/connect维护流中的消息输入顺序

5uzkadbs  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(492)

简介

我正在使用apache flink构建一个相当复杂的数据流网络。我们的想法是,用flink实现一个规则引擎。
作为应用程序的基本描述,以下是它的工作原理:
数据由Kafka消费源接收,并用多个数据流进行处理,直到最终发送到Kafka生产者接收器。传入数据包含具有逻辑键(“object id”)的对象,传入消息可能引用相同的object-id。对于每个给定的object id,必须在整个应用程序中保留其传入消息的顺序。整个消息的顺序可以是任意的。
这意味着,object1的消息a、b和c必须按顺序处理,但是object2的消息x可能在a1/b1/c1之间、之前或之后处理,这无关紧要。
就我目前的理解,这意味着我必须 keyBy(_.objectID) ,以便按到达顺序处理同一对象的消息。

当前方法

为了实现实际的规则引擎,创建了一个流网络。其思路如下:
每个规则将有1-n个条件
对于每个规则的每个条件,使用 .filter(_.matches(rule.condition)) 通过使用
substream1.connect(substream2).flatMap(new CombineFunctionMyObject) connect 只能联接2个流,因此具有3个条件的规则将导致随后的2个联接
使用相同条件的规则将重用在第二步中创建的相同子流。
这将导致n个连接流,其中n对应于规则数。连接的流将具有 map 函数附加到它们之后,它标记了消息,这样我们就知道规则匹配了。
每个连接的/结果流可以独立于其他结果将其结果(“rule xyz matched”)发布到kafka生产者,因此此时我可以将接收器附加到流。

连接详细信息

因为 .connect 在两个流中(“condition”-substreams)必须只传递一个消息,如果它是在两个流上接收的(^=两个条件都匹配),我需要一个 RichCoFlatMapFunction 具有键控状态,该状态可以处理“仅当另一侧已接收时才传递”。
但是,问题是,流是由object-id设置密钥的。那么,如果同一对象的两条消息通过网络运行并到达 .connect().map(new RichCoFlatMapFunction...) ? 它会导致错误的输出。我需要在进入网络时为每个传入的消息分配一个唯一的id(uuid),这样我就可以在 .connect().map().. 加入。但同时,我需要通过对象id对流进行键控,以便按顺序处理相同对象的消息。怎么办?
为了解决这个问题,我用 keyBy(_.objectID) ,但是 RichCoFlatMapFunction 在流中,join不再使用键控状态。相反,我使用的是一个简单的操作符状态,它保留了传递对象的Map,但实现了相同的逻辑,只需要手动查找键/值。
这似乎是工作,但我不知道这是否会带来更多的问题。

可视化

flink gui将呈现此图像,其中包含14条规则,共23个条件(有些规则只有一个条件):

代码

使用以下代码创建网络:

val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()

if (rules.isEmpty)
  return

// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)

for (rule <- rules if rule.checks.nonEmpty ;
     cond <- rule.checks if !streamCache.contains(cond.hashCode()))
  streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)

// create joined streams for combined conditions (sub-levels)

for (rule <- rules if rule.checks.nonEmpty)
{
  val ruleName = rule.ruleID

  // for each rule, starting with the rule with the least conditions ...

  if (rule.checks.size == 1)
  {
    // ... create exit node if single-condition rule
    // each exit node applies the rule-name to the objects set of matched rules.

    outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
  }
  else
  {
    // ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)

    var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
    var idString = rule.checks.head.idString

    for (i <- rule.checks.indices)
    {
      if (i == rule.checks.size-1)
      {
        // reached last condition of rule, create exit-node
        // each exit node applies the rule-name to the objects set of matched rules.

        val rn = ruleName
        val objectType = rule.objectType.mkString(":")
        val statement = rule.statement

        outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
      }
      else
      {
        // intermediate condition, create normal intermediate node

        val there = rule.checks(i+1)
        val connectStream = streamCache(there.hashCode)

        idString += (":" + there.idString)

        // try to re-use existing tree-segments

        if (streamCache.contains(idString.hashCode))
          sourceStream = streamCache(idString.hashCode)
        else
          sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
      }
    }
  }
}

// connect each output-node to the sink

for (stream <- outputNodesCache)
{
  stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
}

这个 StatefulCombineFunction 在前面的代码段中使用:

class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
{
  @transient
  private var leftState:ListState[(String, WorkingMemory)] = _
  private var rightState:ListState[(String, WorkingMemory)] = _
  private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
  private var bufferedRight = ListBuffer[(String, WorkingMemory)]()

  override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
  override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")

  def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
  {
    val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)

    if (otherIdx > -1)
    {
      out.collect(leftState(otherIdx)._2)
      leftState.remove(otherIdx)
    }
    else
    {
      rightState += ((xmlObject.uuid, xmlObject))
    }
  }

  override def initializeState(context:FunctionInitializationContext): Unit = ???
  override def snapshotState(context:FunctionSnapshotContext):Unit = ???
}

我知道从操作符状态中清除部分匹配是缺失的(生存时间),但是对于当前的开发状态来说这并不重要,稍后会添加。

背景资料

此应用程序应使用flink实现规则匹配的rete算法(https://en.wikipedia.org/wiki/rete_algorithm).
另一种方法是循环每个传入消息的所有规则,并附加结果。我用flink实现了这个方法,所以请不要把它当作解决方案。

问题

问题是,应用程序在对象id级别上扰乱了传入消息的顺序。也就是说,它没有达到我在介绍中所要求的。对于每个对象id,传入的消息必须保持顺序。但事实并非如此。
我不知道在代码的哪一点上顺序会混乱,也不知道这些操作是如何分布在线程之间的,所以我不知道如何解决这个问题。

wwodge7n

wwodge7n1#

一些评论。。。
我想你已经检查了flink的cep支持,特别是处理事件时间的延迟。关键的概念是,您可以依赖事件时间(而不是处理时间)来帮助事件排序,但是您必须始终确定您愿意容忍的最大延迟量(延迟可能是由源和工作流中发生的任何处理引起的)。
从您提供的flink作业图来看,似乎您正在通过哈希对传入数据进行分区,但是每个规则都需要获取每个传入数据,对吗?所以在这种情况下你需要广播。

相关问题