我正在使用java spark。在这里,我从hdfs读取数据集,然后在上面应用mappartition。在call函数的末尾,我可以看到输出行有一个值,这个值没有被收集到我的df2数据集中。下面是我的代码。
主要方法
Dataset<Row> df = readDataFrame();
df.show();
Dataset<Row> df2 = df.mapPartitions(new DataframeProcessMap(getConfig()), RowEncoder.apply(getSchema()));
dataframeprocessmap类
public class DataframeProcessMap implements MapPartitionsFunction<Row,Row> {
private final Config config;
public DataframeProcessMap(Config config){
this.config=config;
}
@Override
public Iterator<Row> call(Iterator<Row> iterator) throws Exception{
List<Row> outputRows = new ArrayList();
while (iterator.hasNext()){
Row inputRow = iterator.next();
ArrayList output = projectAndRenameColumn(inputRow);
Row outputRow = DataIntegrationUtils.getRow(output.toArray(), getSchema());
System.out.println(outputRow);
outputRows.add(outputRow);
}
return outputRows.iterator();
}
public ArrayList projectAndRenameColumn(Row inputData){
...
// Some processing
}
public StructType getSchema(){
...
//schema from config file
}
}
getschema()也存在于主类中,与此完全相同。
in-call方法system.out.println(outputrow);给我正确的输出。但是,我在df2中得到空行。为什么??
暂无答案!
目前还没有任何答案,快来回答吧!