在旧版本的 scalding
还是没有 counters
在其api中引入。烫伤中的hadoop计数器建议如何回退到烫伤中的级联计数器
def addCounter(pipe : Pipe, group : String, counter : String) = {
pipe.each(() -> ('addCounter)) ( fields =>
new BaseOperation[Any](fields) with Function[Any] {
def operate(flowProcess : FlowProcess[_],
functionCall : FunctionCall[Any]) {
try {
flowProcess.asInstanceOf[HadoopFlowProcess]
.increment(group, counter, 1L)
functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
} catch {
case cce: ClassCastException =>
// HadoopFlowProcess is not available in local mode
}
}.discard('addCounter)
}
)
}
然而,当我尝试时,我得到:
Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^
我错过什么了吗?我用的烫版:0.8.7
1条答案
按热度按时间rekjcdws1#
.discard
是烫伤命令,因此应与.each
,代码块中的另一个烫伤命令。试着把它放在最后一个右括号“)”之后(您发布的代码的最后一行。)在这里,操作被链接到richpipe管道,第一个
each
,然后discard
: