sparksql-无法将所有记录写入配置单元表

ergxz8rk  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(662)

我是apachespark框架的新手,我正在使用apachespark通过hive将数据写入hadoop。
在下面的代码中,我从配置单元读取表\u1并创建数据集,然后将此数据集Map到另一个数据集。将结果数据集保存到另一个表2之后。

public static void main(String[] args) {
    Dataset<Row> ipSet = spark.sql("Select distinct ip from table_1");
    ipSet.map(new TestFunction(), Encoders.bean(MyPojoClass.class)).write().mode("append").insertInto("table_2");
}

// ...
public class TestFunction implements MapFunction<Row, MyPojoClass>
{

    private static List<..> staticData;
    private static boolean callOnce = false;

    private static void initStaticData() throws IOException
    {
        staticData = // set the data,
        callOnce = true;
    }

    @Override
    public MyPojoClass call(Row value) throws Exception {
        if (!calledOnce)
            initStaticData();
        String ip = value.getAs("ip");
        return new MyPojoClass(ip);
    }
}

两个不同表中的行数应该相同,但是结果数据集/表2中的行数不同。
例如,表1有9.000.000个不同的行,而表2有42.000个不同的行。我怎样才能解决这个问题(或者我缺少了一些关于Spark的概念?)

c3frrgcw

c3frrgcw1#

我以前也遇到过类似的问题。
这主要是因为sparksql行函数 getAs .
getasapi getas(字符串字段名)返回给定字段名的值。对于基元类型,如果值为null,则返回基元特定的“零值”,即0表示int。
因此,在大多数情况下,您的ip地址可能为空,而其他数据的行为与代码中“ip”的null或0值相同。

相关问题