如何将滚烫的typedpipe转换为迭代器

z4bn682m  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(413)

在我的hadoop工作中,我在管道上有一些分组逻辑,然后我需要处理每个组:

val georecs : TypedPipe[GeoRecord] = getRecords

georecs.map( r => (getRegion(r),r) )
  .groupBy(_._1)
  .mapValueStream( xs => clusterRecords(xs) )
  .values
  .write(out)

在clusterrecords中,我需要将传入的迭代器转换为typedpipe,以便1)对其进行采样,2)取叉积:

//turn the iterator to a pipe so we can sample it    
    val sample = TypedPipe.from( xs.map( x => Centroid(x._2.coreActivity)).toIterable)
    .sample(0.11)
    .distinct

//turn the iterator to a pipe so we can take its cross product
val records : TypedPipe[GeoRecord] = TypedPipe.from(xs.map(_._2).toIterable)

records
  .cross(sample)   //cartesian product of records and centroids
  .groupBy( _._2)  // group By the user record so we get a list of pairs (user, centroid)
  .minBy( x => score( x._1.coreActivity, x._2.core) ) //find the centroid with the lowest score for each Record
  .values
  .groupBy( x => x._2 )   //now groupBy centroid to get the clusters
  .values

问题是mapvaluestream期望Map函数返回一个迭代器,但我得到的是一个typedpipe。我知道如何将迭代器转换为管道,但不是相反。我需要执行它,将它写入磁盘,然后再读回吗?
如果是这样,最好的方法是什么?

vi4fp9gy

vi4fp9gy1#

看起来可以通过运行管道将其转换为迭代器。可以这样完成:

val georecs : TypedPipe[GeoRecord] = getRecords

val i : Iterator[GeoRecord] = georecs
  .toIterableExecution
  .waitFor(this.scaldingConfig,this.mode)
  .get
  .toIterator

(类型检查,但尚未测试)

相关问题