如何将滚烫的valuepipe连接到typedpipe?

mwg9r5ms  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(449)

我已经将滚烫的kmeans示例改编为kmodes。问题是当作业完成时,我需要将聚集的记录与匹配的质心连接起来。kmeans代码使用valuepipe来保存质心。为了得到valuepipe的质心,我把它平面Map。然后我像这样连接:

HVKModes(500000,inputSets,10).waitFor(Config.default,mode) match {
    case Success((a,centroids: ValuePipe[List[LabeledCentroid]], points: TypedPipe[LabeledVector])) => {
      val joined = centroids
        .flatMap {
          cs : List[LabeledCentroid] => {
          val t = TypedPipe.from(cs)
          Iterator(points.join(t)) }
        }
        .values
        .write(clusteredOutput)
    }
    case Failure(e) => sys.error("problem running job:" + e.toString)
  }

问题是编译器在“values”行上给出了一个类型错误:

Cannot prove that com.twitter.scalding.typed.CoGrouped[Int,((String, Set[String]), Set[String])] <:< (Any, V).
[error]         .values

我推测这个错误表明它不能算出我取的v值。但我该怎么办呢?

dldeef67

dldeef671#

我差点就搞定了。我只需要加入平面Map的结果。

HVKModes(500000,inputSets,10).waitFor(Config.default,mode) match {
    case Success((a,centroids: ValuePipe[List[LabeledCentroid]], points: TypedPipe[LabeledVector])) => {

      val cs  =
        centroids
        .flatMap {
          cs : List[LabeledCentroid] => { cs.toIterator}
        }

      points
        .join(cs)
        .values
        .write(clusteredOutput)
    }
    case Failure(e) => sys.error("problem running job:" + e.toString)
  }

相关问题