滚烫类型管道api外部操作模式

wmvff8tz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(437)

我有一份由安东尼奥斯·查基欧普洛斯编写的带有烫伤的mapreduce程序。在这本书中,他讨论了烫伤代码的外部操作设计模式。你可以在他的网站上看到一个例子。我选择了使用类型安全api。当然,这会带来新的挑战,但我更喜欢它而不是fields api,这是我之前提到的书和网站中大量讨论的内容。
我想知道人们是如何用类型安全api实现外部操作模式的。我的初步实施情况如下:
我创建了一个扩展com.twitter.bolding.job的类,这个类将作为我的bolding job类,在这个类中我将“管理参数、定义抽头并使用外部操作来构建数据处理管道”。
我创建一个对象,在其中定义要在类型安全管道中使用的函数。因为类型安全管道将参数作为函数,所以我可以将对象中的函数作为参数传递给管道。
这将创建如下代码:

class MyJob(args: Args) extends Job(args) {

  import MyOperations._

  val input_path = args(MyJob.inputArgPath)
  val output_path = args(MyJob.outputArgPath)

  val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
    case _ => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
  }

  val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => WritableSequenceFile[LongWritable, Text](output_path)
    case _ => TypedTsv[(LongWritable, Text)](output_path)
  }

  val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.map(convertTextToEither).fork
  validatedEvents.filter(isEvent).map(removeEitherWrapper).write(eventOutput)
}

object MyOperations {

  def convertTextToEither(v: (LongWritable, Text)): (LongWritable, Either[Text, Event]) = {
    ...
  }

  def isEvent(v: (LongWritable, Either[Text, Event])): Boolean = {
    ...
  }

  def removeEitherWrapper(v: (LongWritable, Either[Text, Event])): (LongWritable, Text) = {
    ...
  }
}

如您所见,传递给烫伤类型安全操作的函数与作业本身是分开的。虽然这并不像所呈现的外部操作模式那样“干净”,但这是编写此类代码的一种快速方法。另外,我可以使用junitrunner进行作业级集成测试,使用scalatest进行功能级单元测试。
这篇文章的重点是问人们是如何做这类事情的?互联网上关于烫伤类型安全api的文档很少。有没有更实用的scala友好的方法呢?我是否缺少设计模式的关键组件?我对此有点紧张,因为有了fields api,您可以用boldingtest在管道上编写单元测试。据我所知,你不能用typedpipes。请让我知道,如果有一个烫伤类型安全的api或你如何创建可重用的,模块化的,可测试的类型安全的api代码模式普遍同意。谢谢你的帮助!

安东尼奥斯回复后更新2

谢谢你的回复。这基本上就是我想要的答案。我想继续谈话。我在您的回答中看到的主要问题是,这个实现需要一个特定的类型实现,但是如果在整个工作过程中类型都发生了变化怎么办?我已经探索了这个代码,它似乎工作,但它似乎黑客攻击。

def self: TypedPipe[Any]

def testingPipe: TypedPipe[(LongWritable, Text)] = self.map(
    (firstVar: Any) => {
        val tester = firstVar.asInstanceOf[(LongWritable, Text)]
        (tester._1, tester._2)
    }
)

这样做的好处是我声明了self的一个实现,但是缺点是这种丑陋的类型转换。此外,我还没有用更复杂的管道对此进行深入的测试。所以基本上,你对如何处理类型的想法是什么,因为它们只需要一个自我实现就可以改变,以保持干净/简洁?

q5iwbnjs

q5iwbnjs1#

我不知道你看到的片段有什么问题,为什么你认为它“不够干净”。我觉得很好。
至于使用类型化api的单元测试作业的问题,请看一下jobtest,它似乎正是您想要的。

yqkkidmi

yqkkidmi2#

scala扩展方法是使用隐式类实现的。将typedpipe转换为包含外部操作的( Package 器)类的功能添加到编译器中:

import com.twitter.scalding.TypedPipe
import com.twitter.scalding._
import cascading.flow.FlowDef

class MyJob(args: Args) extends Job(args) {

  implicit class MyOperationsWrapper(val self: TypedPipe[Double]) extends MyOperations with Serializable

  val pipe = TypedPipe.from(TypedTsv[Double](args("input")))

  val result = pipe
    .operation1
    .operation2(x => x*2)
    .write(TypedTsv[Double](args("output")))

}

trait MyOperations {

  def self: TypedPipe[Double]

  def operation1(implicit fd: FlowDef): TypedPipe[Double] =
    self.map { x =>
      println(s"Input: $x")
      x / 100
    }

  def operation2(datafn:Double => Double)(implicit fd: FlowDef): TypedPipe[Double] =
    self.map { x=>
      val result = datafn(x)
      println(s"Result: $result")
      result
    }

}

import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.conf.Configuration

object MyRunner extends App {

  ToolRunner.run(new Configuration(), new Tool, (classOf[MyJob].getName :: "--local" ::
    "--input" :: "doubles.tsv" ::
    "--output":: "result.tsv" :: args.toList).toArray)

}

关于如何跨管道管理类型,我的建议是尝试找出一些有意义的基本类型和用例类。为了使用您的示例,我将重命名该方法 convertTextToEither 进入 extractEvents :

case class LogInput(l : Long, text: Text)
case class Event(data: String)
def extractEvents( line : LogInput ): TypedPipe[Event] =
  self.filter( isEvent(line) )
      .map ( getEvent(line.text) )

那你应该 LogInputOperations 为了 LogInput 类型 EventOperations 为了 Event 类型

相关问题