我是apachespark框架的新手,我正在使用apachespark通过hive将数据写入hadoop。
在下面的代码中,我从配置单元读取表\u1并创建数据集,然后将此数据集Map到另一个数据集。将结果数据集保存到另一个表2之后。
public static void main(String[] args) {
Dataset<Row> ipSet = spark.sql("Select distinct ip from table_1");
ipSet.map(new TestFunction(), Encoders.bean(MyPojoClass.class)).write().mode("append").insertInto("table_2");
}
// ...
public class TestFunction implements MapFunction<Row, MyPojoClass>
{
private static List<..> staticData;
private static boolean callOnce = false;
private static void initStaticData() throws IOException
{
staticData = // set the data,
callOnce = true;
}
@Override
public MyPojoClass call(Row value) throws Exception {
if (!calledOnce)
initStaticData();
String ip = value.getAs("ip");
return new MyPojoClass(ip);
}
}
两个不同表中的行数应该相同,但是结果数据集/表2中的行数不同。
例如,表1有9.000.000个不同的行,而表2有42.000个不同的行。我怎样才能解决这个问题(或者我缺少了一些关于Spark的概念?)
1条答案
按热度按时间c3frrgcw1#
我以前也遇到过类似的问题。
这主要是因为sparksql行函数
getAs
.getasapi getas(字符串字段名)返回给定字段名的值。对于基元类型,如果值为null,则返回基元特定的“零值”,即0表示int。
因此,在大多数情况下,您的ip地址可能为空,而其他数据的行为与代码中“ip”的null或0值相同。