我有多年的Java 8和它的lambda的经验,但是当我开发一个hello世界大小的Spark程序时,我遇到了一个疯狂的问题。
这里我有一个Java类,其中的Data注解来自Lombok:
@Data
public class Person implements Serializable {
private String name;
private Long age;
}
然后我构建了一个包含Persion
类对象的java列表:
Person p1 = new Person("sb", 1L);
Person p2 = new Person("sth", null);
List<Person> list = new ArrayList<>(2);
list.add(p1);
list.add(p2);
到目前为止还不错。然后我试着用这个列表生成一个Spark数据集:
SparkSession session = SparkSession.builder().master("local[1]").appName("SparkSqlApp").getOrCreate();
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> dataset1 = session.createDataset(list, personEncoder);
dataset1.foreach(new ForeachFunction<Person>() { // 1
@Override
public void call(Person person) throws Exception {
System.out.println(person);
}
});
dataset1.foreach((ForeachFunction<Person>) System.out::println); //2
注意,block 1等价于java中的block 2,并且block 2是通过IntelliJ IDEA从block 1简化而来的,唯一不同的是block 2使用了lambda表达式。
然而,当我执行程序时,块1结束良好,而块2运行异常:
什么...大地球和大宇宙?为什么JVM或Spark引擎会做这样的事情?!
2条答案
按热度按时间p8h8hvxi1#
如What is the equivalent lambda expression for System.out::println中所解释的,方法引用
System.out::println
与lambda表达式x -> System.out.println(x)
不相同。方法引用 * 捕获 *
System.out
的当前值,以便在每次调用函数时对其调用println
,而不是像lambda表达式的主体那样每次都重新计算System.out
。如前所述,这很少有什么不同,但在这里,它有。当你尝试序列化函数时,它会尝试序列化所有捕获的值,包括在示例化过程中从
System.out
读取的PrintStream
示例。PrintStream
是不可序列化的,实现一个满足预期的可序列化PrintStream
将是相当具有挑战性的。但重要的是要记住,当您序列化lambda表达式
x -> System.out.println(x)
或等效的类对象并在不同的环境中将其反序列化时,它将在那里读取的System.out
将计算为与原始环境中不同的PrintStream
。这不会'分布式计算框架何时注意将打印到标准输出的所有内容通过管道返回给发起者无关紧要。但重要的是要记住,
static
字段不是序列化数据的一部分,通常在不同的环境中可能有不同的内容。twh00eeo2#
接口ForeachFunction扩展了
Serializable
。Dataset.foreach(f)
可能正在序列化参数f
。在下面的测试中,testBlock1
成功,testBlcok2
失败(NotSerializableException)。但我不知道为什么。