我们有一个有状态的多阶段dstream应用程序,其中每个阶段都Map到不同的键。在第一个阶段之后,我们正在经历巨大的无序读写,这种无序读写在大约20个窗口之后不断增长并变得巨大。
val stageOnePartitionCount = 1000
val stageTwoPartitionCount = 1000
val stageThreePartitionCount = 1000
// Very little Shuffle for this stage consistent across all windows
val stageOneDataSet = dataSet
.map(data => (data.key1, data))
.reduceByKey(..., stageOnePartitionCount)
.mapWithState(
StateSpec
.function(...)
.initialState(...) // uses key1
.numPartitions(stageOnePartitionCount)
)
// Shuffle for this stage grows every window and becomes huge after about 20 windows or so..
val stageTwoDataSet = stageOneDataSet
.map(data => (data.key2, data))
.reduceByKey(..., stageTwoPartitionCount)
.mapWithState(
StateSpec
.function(...)
.initialState(...) // uses key2
.numPartitions(stageTwoPartitionCount)
)
// Shuffle for this stage grows every window and becomes huge after about 20 windows or so..
val stageThreeDataSet = stageTwoDataSet
.map(data => (data.key3, data))
.reduceByKey(..., stageThreePartitionCount)
.mapWithState(
StateSpec
.function(...)
.initialState(...) // uses key3
.numPartitions(stageThreePartitionCount)
)
stageThreeDataSet
.foreachRDD(...)
我们正在表演 reduceByKey
(合并部分更新)操作 mapWithState
每个阶段中的操作,其中分区计数在它们之间匹配,目的是在 reduceByKey
在传入流上的函数数据将已经在其状态所在的节点中。
那么,是什么触发了第二和第三阶段在每一个经过的窗口中的洗牌指数上升,而第一阶段在所有窗口中几乎没有洗牌的情况下仍然保持性能呢?
注意:查看第二和第三阶段被洗牌(读/写)的数据量,我想知道是否有可能对有状态rdd(包含状态数据)进行洗牌,以便将它们与传入流(在大约20个窗口之后,其大小将比累积状态小得多)一起定位。如果是这种情况,我们如何解决同样的问题。
谢谢你的帮助!!
暂无答案!
目前还没有任何答案,快来回答吧!