我试图在不同的粒度级别上确定完成状态。例如,如果一个地区的所有城镇都是完整的,那么这个地区就是“完整的”。
我使用spark中的以下方法在内存中将状态保持在最低级别(镇):
第一步。将初始状态从cassandra表加载到sparkDataframe中。
+----------+--------+--------+------------+
| country | region | town | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1 | FALSE |
| Country1 | State1 | Town2 | FALSE |
| Country1 | State1 | Town3 | FALSE |
| Country1 | State1 | Town4 | FALSE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
+----------+--------+--------+------------+
第二步。开始流处理并使用在每个微批处理中创建的Dataframe尝试使用left outer join从步骤1更新Dataframe中的状态。
批次1:
+----------+--------+-------+------------+
| country | region | town | isComplete |
+----------+--------+-------+------------+
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
+----------+--------+-------+------------+
涂抹第1批后:
+----------+--------+--------+------------+
| country | state | town | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1 | TRUE |
| Country1 | State1 | Town2 | TRUE |
| Country1 | State1 | Town3 | TRUE |
| Country1 | State1 | Town4 | TRUE |
| Country1 | State1 | Town5 | FALSE |
| Country1 | State1 | Town6 | FALSE |
| Country1 | State1 | Town7 | FALSE |
| Country1 | State1 | Town8 | FALSE |
| Country1 | State1 | Town9 | FALSE |
| Country1 | State1 | Town10 | FALSE |
| Country1 | State1 | Town11 | FALSE |
+----------+--------+--------+------------+
我的想法是,通过保持Dataframe可变,我将能够在每个批处理中更新Dataframe,并在流式处理作业的整个生命周期中保持总体状态(就像一个全局变量)。
基本数据集约有120万条记录(约100 mb),预计可扩展到10 gb。
我遇到了内存不足的问题。每批要比前一批花更多的处理时间。同样,同一个作业的阶段数也会随着批次的增加而增加。最终应用程序失败,超出了gc开销限制。
var statusDf = loadStatusFromCassandra(sparkSession)
ipStream.foreachRDD { statusMsgRDD =>
if (!statusMsgRDD.isEmpty) {
// 1. Create data-frame from the current micro-batch RDD
val messageDf = getMessageDf(sparkSession, statusMsgRDD)
// 2. To update, Left outer join statusDf with messageDf
statusDf = updateStatusDf(sparkSession, statusDf, messageDf)
// 3. Use updated statusDf to generate aggregations at higher levels
// and publish to a Kafka topic
// if a higher level (eg. region) is completed.
}
}
暂无答案!
目前还没有任何答案,快来回答吧!