我有一个Spark的工作是由研究员:
1-从德尔塔湖读取静态dataFrame
。
2-从德尔塔湖读取dataFrame
的流。
3-加入流与静态。
4-做一个flatMapGroupsWithState
。
5-写入输出。
问题是我得到了一个与我预期不同的输出,比如我在flatMapGroupsWithState
上丢失了事件。不仅如此,输出是随机的。当我用相同的输入重新运行时,我得到了不同的输出。
但是,当我在写入操作中添加.coalesce(1)
时,我总是在LocalMode下得到所需的输出,而在ClusterMode下却得不到。
这是我正在使用的代码:
val entityScheduleSlots = data
.withColumn("products", concat(col("batteries"), col("photovoltaics")))
.drop("photovoltaics", "batteries", "labels")
.join(
entities,
array_contains(entities("entity_delivery_points"), col("delivery_point_id")))
.withColumn("now", current_timestamp())
.withWatermark("now", "5 minutes")
.as(Encoders.product[enrichedDeliveryPointSchedule])
.groupByKey(e => e.timestamp.toString + e.entity_id.toString + e.schedule_id)(
Encoders.STRING)
.flatMapGroupsWithState(
outputMode = OutputMode.Append,
timeoutConf = GroupStateTimeout.EventTimeTimeout)(
Function.computeExplodedEntityScheduleSlots)(
Encoders.kryo[Function.State],
Encoders.product[EntityScheduleSlot])
entityScheduleSlots
是我的输出,我在LocalMode下进行了测试。
object Function {
case class ProductState(
var count: Int,
var quantity: Double,
var price: Double,
val sellable: Boolean)
case class State(var delivery_points_count: Int, var products: mutable.Map[Long, ProductState])
private def computeExplodedEntityScheduleSlots(
uid: String,
ss: Iterator[enrichedDeliveryPointSchedule],
state: GroupState[State]): Iterator[EntityScheduleSlot] = {
if (state.hasTimedOut) {
state.remove()
return Iterator.empty
}
val schedules = ss.toList
val newState: State =
state.getOption.getOrElse(State(0, mutable.Map()))
schedules.foreach(s => {
newState.delivery_points_count = newState.delivery_points_count + 1
val qualificationsProductsIDs =
if (s.entity_qualifications != null) s.entity_qualifications.map(q => q.product)
else List()
if (s.products != null) {
s.products.foreach(p => {
if (qualificationsProductsIDs.contains(p.product)) {
val productState =
newState.products.getOrElse(p.product, ProductState(0, 0.0, 0.0, p.sellable))
val factor =
if (productState.count == 0) 1
else p.quantity / (productState.quantity / productState.count)
productState.quantity += p.quantity
productState.price =
(productState.price * productState.count + p.price * factor) / (productState.count + 1)
productState.count += 1
newState.products.update(p.product, productState)
}
})
}
})
if (newState.delivery_points_count == schedules.head.entity_delivery_points.length) {
state.remove()
return Iterator(
EntityScheduleSlot(
timestamp = schedules.head.timestamp,
entity = schedules.head.entity_id,
schedule_timestamp = schedules.head.schedule_timestamp,
schedule_id = schedules.head.schedule_id,
products =
if (schedules.head.entity_qualifications != null)
schedules.head.entity_qualifications
.map(q => {
val product =
newState.products.getOrElse(q.product, ProductState(0, 0.0, 0.0, false))
EntityScheduleSlotProduct(
q.product,
product.quantity,
product.price,
product.sellable)
})
else List()))
}
state.update(newState)
val currentWatermarkMs =
if (state.getCurrentWatermarkMs() > 0) state.getCurrentWatermarkMs()
else System.currentTimeMillis()
state.setTimeoutTimestamp(currentWatermarkMs, "2 minutes")
Iterator.empty
}
}
case class enrichedDeliveryPointSchedule(
timestamp: java.sql.Timestamp,
schedule_timestamp: java.sql.Timestamp,
schedule_id: String,
delivery_point_id: Long,
products: List[DeliveryPointScheduleSlotProduct],
entity_id: Long,
entity_delivery_points: List[Long],
entity_qualifications: List[EntityQualification])
先谢谢你。
1条答案
按热度按时间yquaqz181#
您提供的信息很少,很难理解这个问题,我可以给予您一些提示:
除了初始状态之外,参数中还有状态更新函数:
在将来添加代码以更好地理解该问题