既然我正在将数据包溢出到磁盘,为什么这个pig udf会导致“error:java heap space”?

ryevplcw  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(420)

这是我的自定义项:

public DataBag exec(Tuple input) throws IOException { 
    Aggregate aggregatedOutput = null;

    int spillCount = 0;

    DataBag outputBag = BagFactory.newDefaultBag(); 
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();
        //spillCount++;
        ...
        if (some condition regarding current input tuple){
            //do something to aggregatedOutput with information from input tuple
        } else {
            //Because input tuple does not apply to current aggregateOutput
            //return current aggregateOutput and apply input tuple
            //to new aggregateOutput
            Tuple returnTuple = aggregatedOutput.getTuple();
            outputBag.add(returnTuple);
            spillCount++;
            aggregatedOutputTuple = new Aggregate(tuple);

            if (spillCount == 1000) {
                outputBag.spill();
                spillCount = 0;
            }
        }
    }
    return outputBag; 
}

请注意,每1000个输入元组,包就会溢出到磁盘。我已将此数字设置为低至50,高至100000,但仍收到内存错误:

Pig logfile dump:

Backend error message
---------------------
Error: Java heap space

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

我能做些什么来解决这个问题?它正在处理大约一百万行。

这是解决办法

使用累加器接口:

public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
    private DataBag outputBag = null;
    private UltraAggregation currentAggregation = null;

    public void accumulate(Tuple input) throws IOException {
        DataBag values = (DataBag)input.get(0);
        Aggregate aggregatedOutput = null;
        outputBag = BagFactory.getInstance().newDefaultBag();

        for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
            Tuple tuple = iterator.next();
            ...
            if (some condition regarding current input tuple){
                //do something to aggregatedOutput with information from input tuple
            } else {
                //Because input tuple does not apply to current aggregateOutput
                //return current aggregateOutput and apply input tuple
                //to new aggregateOutput
                outputBag.add(aggregatedOutput.getTuple());
                aggregatedOutputTuple = new Aggregate(tuple);
            }
        }
    }

    // Called when all tuples from current key have been passed to accumulate
    public DataBag getValue() {
        //Add final current aggregation
        outputBag.add(currentAggregation.getTuple());
        return outputBag;
    }
    // This is called after getValue()
    // Not sure if these commands are necessary as they are repeated in beginning of accumulate
    public void cleanup() {
        outputBag = null;
        currentAggregation = null;
    }

    public DataBag exec(Tuple input) throws IOException {
        // Same as above ^^ but this doesn't appear to ever be called.
    }

    public Schema outputSchema(Schema input) {
        try {
            return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
        } catch {FrontendException e) {
            e.printStackTrace();
            return null;
        }
    }

    class Aggregate {
        ...
        public Tuple getTuple() {
            Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
            try {
                output.set(0, val);
                ...
            } catch (ExecException e) {
                e.printStackTrace();
                return null;
            }
        }
        ...
    }
}
but5z9lq

but5z9lq1#

你应该增加 spillCount 每次附加到 outputBag ,而不是每次从迭代器获得元组时。只有当spillcount是1000的倍数并且if条件不满足时,才是溢出,这可能不会经常发生(取决于逻辑)。这也许可以解释为什么不同的溢出阈值没有太大差别。
如果这不能解决你的问题,我会尽量延长 AccumulatorEvalFunc<DataBag> . 在你的情况下,你实际上不需要接触整个包。您的实现适合于累加器样式的实现,因为您只需要访问当前元组。这可能会减少内存使用。实际上,您将拥有一个databag类型的示例变量,该变量将累加最终的输出。您还将有一个示例变量 aggregatedOutput 这将是当前的总和。打电话给 accumulate() 是1)更新当前聚合,还是2)将当前聚合添加到 aggregatedOutput 开始新的聚合。这基本上遵循for循环的主体。

相关问题