spark流中单个Dataframe上的scala状态转换

pu3pd22g  于 2021-06-10  发布在  Cassandra
关注(0)|答案(0)|浏览(262)

我试图在不同的粒度级别上确定完成状态。例如,如果一个地区的所有城镇都是完整的,那么这个地区就是“完整的”。
我使用spark中的以下方法在内存中将状态保持在最低级别(镇):
第一步。将初始状态从cassandra表加载到sparkDataframe中。

+----------+--------+--------+------------+
| country  | region |  town  | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1  | FALSE      |
| Country1 | State1 | Town2  | FALSE      |
| Country1 | State1 | Town3  | FALSE      |
| Country1 | State1 | Town4  | FALSE      |
| Country1 | State1 | Town5  | FALSE      |
| Country1 | State1 | Town6  | FALSE      |
| Country1 | State1 | Town7  | FALSE      |
| Country1 | State1 | Town8  | FALSE      |
| Country1 | State1 | Town9  | FALSE      |
| Country1 | State1 | Town10 | FALSE      |
| Country1 | State1 | Town11 | FALSE      |
+----------+--------+--------+------------+

第二步。开始流处理并使用在每个微批处理中创建的Dataframe尝试使用left outer join从步骤1更新Dataframe中的状态。
批次1:

+----------+--------+-------+------------+
| country  | region | town  | isComplete |
+----------+--------+-------+------------+
| Country1 | State1 | Town1 | TRUE       |
| Country1 | State1 | Town2 | TRUE       |
| Country1 | State1 | Town3 | TRUE       |
| Country1 | State1 | Town4 | TRUE       |
+----------+--------+-------+------------+

涂抹第1批后:

+----------+--------+--------+------------+
| country  | state  |  town  | isComplete |
+----------+--------+--------+------------+
| Country1 | State1 | Town1  | TRUE       |
| Country1 | State1 | Town2  | TRUE       |
| Country1 | State1 | Town3  | TRUE       |
| Country1 | State1 | Town4  | TRUE       |
| Country1 | State1 | Town5  | FALSE      |
| Country1 | State1 | Town6  | FALSE      |
| Country1 | State1 | Town7  | FALSE      |
| Country1 | State1 | Town8  | FALSE      |
| Country1 | State1 | Town9  | FALSE      |
| Country1 | State1 | Town10 | FALSE      |
| Country1 | State1 | Town11 | FALSE      |
+----------+--------+--------+------------+

我的想法是,通过保持Dataframe可变,我将能够在每个批处理中更新Dataframe,并在流式处理作业的整个生命周期中保持总体状态(就像一个全局变量)。
基本数据集约有120万条记录(约100 mb),预计可扩展到10 gb。
我遇到了内存不足的问题。每批要比前一批花更多的处理时间。同样,同一个作业的阶段数也会随着批次的增加而增加。最终应用程序失败,超出了gc开销限制。

var statusDf = loadStatusFromCassandra(sparkSession)
ipStream.foreachRDD { statusMsgRDD =>
  if (!statusMsgRDD.isEmpty) {
    // 1. Create data-frame from the current micro-batch RDD
    val messageDf = getMessageDf(sparkSession, statusMsgRDD)

    // 2. To update, Left outer join statusDf with messageDf
    statusDf = updateStatusDf(sparkSession, statusDf, messageDf)

    // 3. Use updated statusDf to generate aggregations at higher levels
    // and publish to a Kafka topic
    // if a higher level (eg. region) is completed.
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题