过滤flink数据流到scala中的可选子对象

ctzwtxfj  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(347)

我们使用scala中的flink在分析管道中路由和转换protobuf事件(使用scalapb)。我有一个具有以下模式的“播放流”数据流:

message PlayStream {
  optional PlayerEvent player_event = 1;
  optional BlockAccountIPEvent block_account_ip_event = 2;
}

生成的case类具有 playerEvent 签名类型的成员 Option[PlayerEvent] .
我想把数据流转换成playerevents,过滤掉所有没有它们的。我是scala的新手,所以我不知道如何用惯用的方法来做。我现在做的很好:

// in main()
   getDataStream(name, env, config.get("KafkaSource"))
      .keyBy[String](PlayStreamFunctions.key(_))
      .map{ _.getPlayerEvent }
      .filter(filterDefaultPlayerEvents(_))

  def filterDefaultPlayerEvents(playerEvent: PlayerEvent): Boolean = {
    playerEvent match {
      case PlayerEvent.defaultInstance => false
      case _ => true
    }
  }

这是因为 getPlayerEvent 在生成的类中 playerEvent.getOrElse(PlayerEvent.defaultInstance) ,我们不使用任何默认示例。然而,创建一堆对defaultinstance的引用只是为了在下一步立即过滤掉它们,这感觉很奇怪。有没有办法避免我没看到的?

0wi1tuuw

0wi1tuuw1#

我想澄清一下,我在flink下定义了这个问题的范围,因为所有map函数都是flink特定的实现。我意识到 flatMap 是可用的,并且考虑到Map操作比选项的模式匹配更惯用,我使用了以下实现:

getDataStream(name, env, config.get("KafkaSource"))
      .keyBy[String](PlayStreamFunctions.key(_))
      .flatMap{ _.playerEvent.toList }
      .flatMap(toFlatPlayerEvent(_))

toList 如果该选项不存在,则返回一个空列表;如果存在,则返回一个带值的一元列表,跨它们的平面Map解决了我的问题。

相关问题