我有一份由安东尼奥斯·查基欧普洛斯编写的带有烫伤的mapreduce程序。在这本书中,他讨论了烫伤代码的外部操作设计模式。你可以在他的网站上看到一个例子。我选择了使用类型安全api。当然,这会带来新的挑战,但我更喜欢它而不是fields api,这是我之前提到的书和网站中大量讨论的内容。
我想知道人们是如何用类型安全api实现外部操作模式的。我的初步实施情况如下:
我创建了一个扩展com.twitter.bolding.job的类,这个类将作为我的bolding job类,在这个类中我将“管理参数、定义抽头并使用外部操作来构建数据处理管道”。
我创建一个对象,在其中定义要在类型安全管道中使用的函数。因为类型安全管道将参数作为函数,所以我可以将对象中的函数作为参数传递给管道。
这将创建如下代码:
class MyJob(args: Args) extends Job(args) {
import MyOperations._
val input_path = args(MyJob.inputArgPath)
val output_path = args(MyJob.outputArgPath)
val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
case _ => TypedPipe.from(WritableSequenceFile[LongWritable, Text](input_path))
}
val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => WritableSequenceFile[LongWritable, Text](output_path)
case _ => TypedTsv[(LongWritable, Text)](output_path)
}
val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.map(convertTextToEither).fork
validatedEvents.filter(isEvent).map(removeEitherWrapper).write(eventOutput)
}
object MyOperations {
def convertTextToEither(v: (LongWritable, Text)): (LongWritable, Either[Text, Event]) = {
...
}
def isEvent(v: (LongWritable, Either[Text, Event])): Boolean = {
...
}
def removeEitherWrapper(v: (LongWritable, Either[Text, Event])): (LongWritable, Text) = {
...
}
}
如您所见,传递给烫伤类型安全操作的函数与作业本身是分开的。虽然这并不像所呈现的外部操作模式那样“干净”,但这是编写此类代码的一种快速方法。另外,我可以使用junitrunner进行作业级集成测试,使用scalatest进行功能级单元测试。
这篇文章的重点是问人们是如何做这类事情的?互联网上关于烫伤类型安全api的文档很少。有没有更实用的scala友好的方法呢?我是否缺少设计模式的关键组件?我对此有点紧张,因为有了fields api,您可以用boldingtest在管道上编写单元测试。据我所知,你不能用typedpipes。请让我知道,如果有一个烫伤类型安全的api或你如何创建可重用的,模块化的,可测试的类型安全的api代码模式普遍同意。谢谢你的帮助!
安东尼奥斯回复后更新2
谢谢你的回复。这基本上就是我想要的答案。我想继续谈话。我在您的回答中看到的主要问题是,这个实现需要一个特定的类型实现,但是如果在整个工作过程中类型都发生了变化怎么办?我已经探索了这个代码,它似乎工作,但它似乎黑客攻击。
def self: TypedPipe[Any]
def testingPipe: TypedPipe[(LongWritable, Text)] = self.map(
(firstVar: Any) => {
val tester = firstVar.asInstanceOf[(LongWritable, Text)]
(tester._1, tester._2)
}
)
这样做的好处是我声明了self的一个实现,但是缺点是这种丑陋的类型转换。此外,我还没有用更复杂的管道对此进行深入的测试。所以基本上,你对如何处理类型的想法是什么,因为它们只需要一个自我实现就可以改变,以保持干净/简洁?
2条答案
按热度按时间q5iwbnjs1#
我不知道你看到的片段有什么问题,为什么你认为它“不够干净”。我觉得很好。
至于使用类型化api的单元测试作业的问题,请看一下jobtest,它似乎正是您想要的。
yqkkidmi2#
scala扩展方法是使用隐式类实现的。将typedpipe转换为包含外部操作的( Package 器)类的功能添加到编译器中:
关于如何跨管道管理类型,我的建议是尝试找出一些有意义的基本类型和用例类。为了使用您的示例,我将重命名该方法
convertTextToEither
进入extractEvents
:那你应该
LogInputOperations
为了LogInput
类型EventOperations
为了Event
类型