基于级联的烫伤(旧版本)计数器

kyvafyod  于 2021-06-04  发布在  Hadoop
关注(0)|答案(1)|浏览(377)

在旧版本的 scalding 还是没有 counters 在其api中引入。烫伤中的hadoop计数器建议如何回退到烫伤中的级联计数器

def addCounter(pipe : Pipe, group : String, counter : String) = {

  pipe.each(() -> ('addCounter)) ( fields =>
    new BaseOperation[Any](fields) with Function[Any] {

      def operate(flowProcess : FlowProcess[_], 
        functionCall : FunctionCall[Any]) {

          try {
            flowProcess.asInstanceOf[HadoopFlowProcess]
              .increment(group, counter, 1L)
            functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
          } catch {
            case cce: ClassCastException =>
            // HadoopFlowProcess is not available in local mode
          }
      }.discard('addCounter)
    }
  )
}

然而,当我尝试时,我得到:

Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^

我错过什么了吗?我用的烫版:0.8.7

rekjcdws

rekjcdws1#

.discard 是烫伤命令,因此应与 .each ,代码块中的另一个烫伤命令。试着把它放在最后一个右括号“)”之后(您发布的代码的最后一行。)
在这里,操作被链接到richpipe管道,第一个 each ,然后 discard :

pipe.each(...){predicate}.discard(...)

相关问题