foreach函数在sparkDataframe中不工作

blmhpbnm  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(424)

根据dataframes api,定义为:

public void foreach(scala.Function1<Row,scala.runtime.BoxedUnit> f)

将函数f应用于所有行。
但是当我试着

Dataframe df = sql.read()
    .format("com.databricks.spark.csv")
    .option("header","true")
    .load("file:///home/hadoop/Desktop/examples.csv");

df.foreach(x->
{
   System.out.println(x);
});

我得到编译时错误。有什么错误吗?

vatpfxk5

vatpfxk51#

您可以将其转换为java rdd,以便使用lambda,如下所示:

df.toJavaRDD().foreach(x->
   System.out.println(x)
);
wixjitnu

wixjitnu2#

尝试使用以下代码:

df.foreach(new VoidFunction<String>(){ public void call(String line) {
          //your function code here
}});

如果您只想显示df内容,这就简单多了:

df.show();
x6h2sr28

x6h2sr283#

第一次延伸 scala.runtime.AbstractFunction1 并实现如下可序列化

public abstract class SerializableFunction1<T,R> 
      extends AbstractFunction1<T, R> implements Serializable 
{
}

现在用这个 SerializableFunction1 下课。

df.foreach(new SerializableFunction1<Row,BoxedUnit>(){
        @Override
        public BoxedUnit apply(Row row) {
            System.out.println(row.get(0));
            return BoxedUnit.UNIT;
        }
});

相关问题