我正在flink中开发一个离散化算法,但是在应用 map
功能。
离散化存储在 V
这是一个
private[this] val V = Vector.tabulate(nAttrs)(i => IntervalHeap(nBins, i, s))
此向量按以下方法更新:
private[this] def updateSamples(v: LabeledVector): Vector[IntervalHeap] = {
val attrs = v.vector.map(_._2)
// TODO: Check for missing values
attrs
.zipWithIndex
.foreach {
case (attr, i) =>
if (V(i).nInstances < s) {
V(i) insert (attr)
} else {
if (randomReservoir(0) <= s / (i + 1)) {
val randVal = Random nextInt (s)
V(i) replace (randVal, attr)
}
}
}
V
}
map函数应用于数据集的每个示例(updatesamples函数):
def discretize(data: DataSet[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
val d = data map (x => updateSamples(x))
log.debug(s"$V")
d print
}
但是,当我打印 d
,我得到v空:
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
Vector(Attr [0] [;][;][;][;][;])
我试着不回来了 V
在 updateSamples
只需从 driscretize
一旦Map被应用,同样的情况也会发生。如果在里面 updateSamples
我打印 V
我可以看到它正在更新。
更新
如果我不使用容器 DataSet[T]
而是使用 Seq
这样地:
def discretize(data: Seq[LabeledVector]) /*: DataSet[IntervalHeap]*/ = {
data map (x => updateSamples(x))
}
离散化工作正常。
可能会发生什么?
更新2
经过几天的搜索,似乎问题出在Guava的收藏上 MinMaxPriorityQueue
. 但是,我找不到任何有助于我序列化此集合的内容,以下是我迄今发现的内容:
如何序列化Guava收藏?
magro/kryo serializers这是一个用于guava集合的序列化程序集合,但是 MinMaxPriorityQeue
不存在吗
我试过这个,它包括注册一个 kryo
序列化程序 env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
但不起作用。
我还尝试添加默认值 Kryo
没有运气的序列化程序 env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
暂无答案!
目前还没有任何答案,快来回答吧!