flink:将使用keyby操作符创建的值Map广播到另一个流,即冷启动问题

4jb9z9bj  于 2021-06-24  发布在  Flink
关注(0)|答案(1)|浏览(424)

我有一个高频率的流发布事件,每个事件包含一些关于汽车的信息。我需要处理这个事件流,但排除与特定城市和车牌号组合的事件。这些关于黑名单城市和车牌号组合的信息来自一个每天更新的s3文件。
示例:汽车事件如下所示:

[
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Berlin"
    },
    {
        "name": "Car2",
        "plate": "XYZ1234",
        "city": "Amsterdam"
    },
    {
        "name": "Car3",
        "plate": "ASD 123",
        "city": "Kuala Lumpur"
    },
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Moscow"
    },
    {
        "name": "Car1",
        "plate": "XYZ123",
        "city": "Barcelona"
    }
]

s3文件如下:例如,假设它被调用 excludedCars ```
[
{
"plate": "XYZ123",
"city": "Berlin"
},
{
"plate": "ABC1231",
"city": "Berlin"
},
{
"plate": "AWS121",
"city": "Berlin"
},
{
"plate": "XYZ1234",
"city": "Amsterdam"
},
{
"plate": "AMC3421",
"city": "Amsterdam"
},
{
"plate": "ASD 123",
"city": "Kuala Lumpur"
},
{
"plate": "XYZ123",
"city": "Moscow"
},
{
"plate": "XYZ123",
"city": "Barcelona"
}
]

方法:
使用s3文件 `excludedCars` 作为流媒体源。
转换事件以生成以下结构:

{
"Berlin": ["XYZ123", "ABC1231", "AWS121"],
"Amsterdam": ["XYZ1234", "AMC3421"],
"Kuala Lumpur":["ASD 123"],
"Moscow":["XYZ123"],
"Barcelona":["XYZ123"]
}

将此流广播到主流(cars流)。然后使用#2中的信息进行处理。
代码:

object Cars {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val excludedCarStream: DataStream[Array[ExcludedCarDetail]] = getExcludedCarsStream(env)
val excludedCarDetails = excludedCarStream.flatMap(item => item) // Array of Excluded Car objects
excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0) // As per my understanding, this should result into a map of city to array of plate number maps
excludedCarDetails.print() // This just prints the simple tuples without any grouping by city
env.execute("Scala SocketTextStreamWordCount Example")
}

private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[Array[ExcludedCarDetail]] = {
val path: String = "file:///Users/name/flinkTest/excluded"
val textInputFormat = new TextInputFormat(new Path(path))
env
.readFile(
textInputFormat,
path,
FileProcessingMode.PROCESS_CONTINUOUSLY,
1000
)
.map(jsonString => {
val excludedCars: Array[ExcludedCarDetail] = (new Gson).fromJson(jsonString, classOf[Array[ExcludedCarDetail]])
excludedCars
})
}
}

case class ExcludedCarDetail(
@(SerializedName @scala.annotation.meta.field)("city") cityId: String,
@(SerializedName @scala.annotation.meta.field)("plate") plateNumber: String
)

据我所知, `excludedCarDetails.map(car => (car.cityId, car.plateNumber)).keyBy(0)` 结果应该是 `city to array of plate numbers` 我可以广播到我的主流(汽车)。相反,它只是简单地打印 `(city, plateNumber)` . 
我对Flink绝对是新鲜的,并且试图掌握和实施概念。请说明我做错了什么,以及如何达到要求的行为。
限制:广播Map的结构不能更改。
广播状态解决方案:

object Cars {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment

val excludedCarsState: MapStateDescriptor[Int, List[String]] = new MapStateDescriptor("excludedCars", classOf[Int], classOf[List[String]])
val excludedCarDetails: DataStream[ExcludedCarDetail] = getExcludedCarsStream(env)
val excludedCarBroadcast: BroadcastStream[ExcludedCarDetail] = excludedCarDetails.broadcast(excludedCarsState)

val carsStream: DataStream[CarDetail] = getMainCarsStream(env)

val bs = carsStream
  .keyBy(_.cityId)
  .connect(excludedCarBroadcast)
  .process(new CarsStateLogic(excludedCarsState))

bs.print()

env.execute("Scala SocketTextStreamWordCount Example")

}

private def getExcludedCarsStream(env: StreamExecutionEnvironment): DataStream[ExcludedCarDetail] = {
val cars: ListBuffer[ExcludedCarDetail] = ListBuffer()
for(i <- 0 until 3) {
val cityId = i+1
val plateNumber = "Plate"+(i+1)
cars += ExcludedCarDetail(cityId, plateNumber) // Basically exclude cars with plate1 in city1, plate2 in city2, plate3 in city3
}
env.fromCollection(cars.toList)
}

private def getMainCarsStream(env: StreamExecutionEnvironment): DataStream[CarDetail] = {
val cars: ListBuffer[CarDetail] = ListBuffer()
for(i <- 0 until 10) {
val cityId = i+1
val plateNumber = "Plate"+(i+1)
val name = "Name"+(i+1)
cars += CarDetail(cityId, plateNumber, name)
}
env.fromCollection(cars.toList)
}
}

case class ExcludedCarDetail(cityId: Int, plateNumber: String)
case class CarDetail(cityId: Int, plateNumber: String, name: String)

class CarsStateLogic(excludedCarsState: MapStateDescriptor[Int, List[String]]) extends KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail] {
override def processElement(car: CarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#ReadOnlyContext, out: Collector[CarDetail]): Unit = {
val state = ctx.getBroadcastState(excludedCarsState)

if(state.contains(car.cityId)) {
  val cityState = state.get(car.cityId)
  if(cityState.indexOf(car.plateNumber) < 0) { // not excluded
    out.collect(car)
  }
} else {
  out.collect(car)
}

}

override def processBroadcastElement(value: ExcludedCarDetail, ctx: KeyedBroadcastProcessFunction[String, CarDetail, ExcludedCarDetail, CarDetail]#Context, out: Collector[CarDetail]): Unit = {
val state = ctx.getBroadcastState(excludedCarsState)
val newStateForKey = if(state.contains(value.cityId)) {
value.plateNumber :: state.get(value.cityId)
} else {
List(value.plateNumber)
}
ctx.getBroadcastState(excludedCarsState).put(value.cityId, newStateForKey)
println("BroadCast element: CityId:"+ value.cityId+ ", State: "+state.get(value.cityId))
}
}

但我现在碰到了冷启动问题。在处理主数据之前,确保广播状态可用的可靠方法是什么。
mf98qq94

mf98qq941#

如果排除的cars数据集很小,则可以按原样广播(不按城市分组)。如果它很大,那么您可以按城市设置关键帧(与汽车流相同),并连接这两个流,以便每个子任务只获得所有排除的汽车和常规汽车数据的分区集。
请注意,存在冷启动问题,您希望在处理任何常规车辆数据之前,首先处理所有当前排除的车辆数据,以便在接收到排除的车辆数据之前,不会从正在处理的车辆数据中得到误报。

相关问题