这是我的自定义项:
public DataBag exec(Tuple input) throws IOException {
Aggregate aggregatedOutput = null;
int spillCount = 0;
DataBag outputBag = BagFactory.newDefaultBag();
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
//spillCount++;
...
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
outputBag.add(returnTuple);
spillCount++;
aggregatedOutputTuple = new Aggregate(tuple);
if (spillCount == 1000) {
outputBag.spill();
spillCount = 0;
}
}
}
return outputBag;
}
请注意,每1000个输入元组,包就会溢出到磁盘。我已将此数字设置为低至50,高至100000,但仍收到内存错误:
Pig logfile dump:
Backend error message
---------------------
Error: Java heap space
Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space
我能做些什么来解决这个问题?它正在处理大约一百万行。
这是解决办法
使用累加器接口:
public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
private DataBag outputBag = null;
private UltraAggregation currentAggregation = null;
public void accumulate(Tuple input) throws IOException {
DataBag values = (DataBag)input.get(0);
Aggregate aggregatedOutput = null;
outputBag = BagFactory.getInstance().newDefaultBag();
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
...
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
outputBag.add(aggregatedOutput.getTuple());
aggregatedOutputTuple = new Aggregate(tuple);
}
}
}
// Called when all tuples from current key have been passed to accumulate
public DataBag getValue() {
//Add final current aggregation
outputBag.add(currentAggregation.getTuple());
return outputBag;
}
// This is called after getValue()
// Not sure if these commands are necessary as they are repeated in beginning of accumulate
public void cleanup() {
outputBag = null;
currentAggregation = null;
}
public DataBag exec(Tuple input) throws IOException {
// Same as above ^^ but this doesn't appear to ever be called.
}
public Schema outputSchema(Schema input) {
try {
return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
} catch {FrontendException e) {
e.printStackTrace();
return null;
}
}
class Aggregate {
...
public Tuple getTuple() {
Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
try {
output.set(0, val);
...
} catch (ExecException e) {
e.printStackTrace();
return null;
}
}
...
}
}
1条答案
按热度按时间but5z9lq1#
你应该增加
spillCount
每次附加到outputBag
,而不是每次从迭代器获得元组时。只有当spillcount是1000的倍数并且if条件不满足时,才是溢出,这可能不会经常发生(取决于逻辑)。这也许可以解释为什么不同的溢出阈值没有太大差别。如果这不能解决你的问题,我会尽量延长
AccumulatorEvalFunc<DataBag>
. 在你的情况下,你实际上不需要接触整个包。您的实现适合于累加器样式的实现,因为您只需要访问当前元组。这可能会减少内存使用。实际上,您将拥有一个databag类型的示例变量,该变量将累加最终的输出。您还将有一个示例变量aggregatedOutput
这将是当前的总和。打电话给accumulate()
是1)更新当前聚合,还是2)将当前聚合添加到aggregatedOutput
开始新的聚合。这基本上遵循for循环的主体。