我们使用scala中的flink在分析管道中路由和转换protobuf事件(使用scalapb)。我有一个具有以下模式的“播放流”数据流:
message PlayStream {
optional PlayerEvent player_event = 1;
optional BlockAccountIPEvent block_account_ip_event = 2;
}
生成的case类具有 playerEvent
签名类型的成员 Option[PlayerEvent]
.
我想把数据流转换成playerevents,过滤掉所有没有它们的。我是scala的新手,所以我不知道如何用惯用的方法来做。我现在做的很好:
// in main()
getDataStream(name, env, config.get("KafkaSource"))
.keyBy[String](PlayStreamFunctions.key(_))
.map{ _.getPlayerEvent }
.filter(filterDefaultPlayerEvents(_))
def filterDefaultPlayerEvents(playerEvent: PlayerEvent): Boolean = {
playerEvent match {
case PlayerEvent.defaultInstance => false
case _ => true
}
}
这是因为 getPlayerEvent
在生成的类中 playerEvent.getOrElse(PlayerEvent.defaultInstance)
,我们不使用任何默认示例。然而,创建一堆对defaultinstance的引用只是为了在下一步立即过滤掉它们,这感觉很奇怪。有没有办法避免我没看到的?
1条答案
按热度按时间0wi1tuuw1#
我想澄清一下,我在flink下定义了这个问题的范围,因为所有map函数都是flink特定的实现。我意识到
flatMap
是可用的,并且考虑到Map操作比选项的模式匹配更惯用,我使用了以下实现:自
toList
如果该选项不存在,则返回一个空列表;如果存在,则返回一个带值的一元列表,跨它们的平面Map解决了我的问题。