scala spark flatMap组状态随机丢失事件

oalqel3c  于 2023-01-26  发布在  Scala
关注(0)|答案(1)|浏览(140)

我有一个Spark的工作是由研究员:
1-从德尔塔湖读取静态dataFrame
2-从德尔塔湖读取dataFrame的流。
3-加入流与静态。
4-做一个flatMapGroupsWithState
5-写入输出。
问题是我得到了一个与我预期不同的输出,比如我在flatMapGroupsWithState上丢失了事件。不仅如此,输出是随机的。当我用相同的输入重新运行时,我得到了不同的输出。
但是,当我在写入操作中添加.coalesce(1)时,我总是在LocalMode下得到所需的输出,而在ClusterMode下却得不到。
这是我正在使用的代码:

val entityScheduleSlots = data
  .withColumn("products", concat(col("batteries"), col("photovoltaics")))
  .drop("photovoltaics", "batteries", "labels")
  .join(
    entities,
    array_contains(entities("entity_delivery_points"), col("delivery_point_id")))
  .withColumn("now", current_timestamp())
  .withWatermark("now", "5 minutes")
  .as(Encoders.product[enrichedDeliveryPointSchedule])
  .groupByKey(e => e.timestamp.toString + e.entity_id.toString + e.schedule_id)(
    Encoders.STRING)
  .flatMapGroupsWithState(
    outputMode = OutputMode.Append,
    timeoutConf = GroupStateTimeout.EventTimeTimeout)(
    Function.computeExplodedEntityScheduleSlots)(
    Encoders.kryo[Function.State],
    Encoders.product[EntityScheduleSlot])

entityScheduleSlots是我的输出,我在LocalMode下进行了测试。

object Function {
  case class ProductState(
      var count: Int,
      var quantity: Double,
      var price: Double,
      val sellable: Boolean)
  case class State(var delivery_points_count: Int, var products: mutable.Map[Long, ProductState])
  private def computeExplodedEntityScheduleSlots(
      uid: String,
      ss: Iterator[enrichedDeliveryPointSchedule],
      state: GroupState[State]): Iterator[EntityScheduleSlot] = {
    if (state.hasTimedOut) {
      state.remove()
      return Iterator.empty
    }
    val schedules = ss.toList
    val newState: State =
      state.getOption.getOrElse(State(0, mutable.Map()))
    schedules.foreach(s => {
      newState.delivery_points_count = newState.delivery_points_count + 1
      val qualificationsProductsIDs =
        if (s.entity_qualifications != null) s.entity_qualifications.map(q => q.product)
        else List()
      if (s.products != null) {
        s.products.foreach(p => {
          if (qualificationsProductsIDs.contains(p.product)) {
            val productState =
              newState.products.getOrElse(p.product, ProductState(0, 0.0, 0.0, p.sellable))
            val factor =
              if (productState.count == 0) 1
              else p.quantity / (productState.quantity / productState.count)
            productState.quantity += p.quantity
            productState.price =
              (productState.price * productState.count + p.price * factor) / (productState.count + 1)
            productState.count += 1
            newState.products.update(p.product, productState)
          }
        })
      }
    })
    if (newState.delivery_points_count == schedules.head.entity_delivery_points.length) {
      state.remove()
      return Iterator(
        EntityScheduleSlot(
          timestamp = schedules.head.timestamp,
          entity = schedules.head.entity_id,
          schedule_timestamp = schedules.head.schedule_timestamp,
          schedule_id = schedules.head.schedule_id,
          products =
            if (schedules.head.entity_qualifications != null)
              schedules.head.entity_qualifications
                .map(q => {
                  val product =
                    newState.products.getOrElse(q.product, ProductState(0, 0.0, 0.0, false))
                  EntityScheduleSlotProduct(
                    q.product,
                    product.quantity,
                    product.price,
                    product.sellable)
                })
            else List()))
    }
    state.update(newState)
    val currentWatermarkMs =
      if (state.getCurrentWatermarkMs() > 0) state.getCurrentWatermarkMs()
      else System.currentTimeMillis()
    state.setTimeoutTimestamp(currentWatermarkMs, "2 minutes")
    Iterator.empty
  }
}

case class enrichedDeliveryPointSchedule(
    timestamp: java.sql.Timestamp,
    schedule_timestamp: java.sql.Timestamp,
    schedule_id: String,
    delivery_point_id: Long,
    products: List[DeliveryPointScheduleSlotProduct],
    entity_id: Long,
    entity_delivery_points: List[Long],
    entity_qualifications: List[EntityQualification])

先谢谢你。

yquaqz18

yquaqz181#

您提供的信息很少,很难理解这个问题,我可以给予您一些提示:

  • flatMapGroupsWithState* 是在有状态结构化流中用于将部分结果存储在spark内部状态中的函数
def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

除了初始状态之外,参数中还有状态更新函数:

func: (K, Iterator[V], GroupState[S]) => Iterator[U]
  • 这可能只是因为您正在聚合结果,因此得到了不同的输出
  • 另一个问题可能是你正在检查来自执行器的输出并且给定Spark框架的分布式特性接收部分输出,

在将来添加代码以更好地理解该问题

相关问题