sample spark程序

of1yzvn4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(241)

嗨,我在学习spark和scala,我有一个场景,我需要想出sparkscala代码
输入文件

Name  attr1 attr2 attr3  
John    Y     N    N  
Smith   N     Y    N

预期产量

John  attr1 Y  
John  attr2 N  
John  attr3 N  
Smith attr1 N  
...  
...

我知道怎么在Map上做这个
对于每一行,分别获取名称,遍历attr值,并将输出emmit为 (Name, attrX Y/N) 但在scala和spark中有点混乱,有人能帮我吗?

aiqt4smr

aiqt4smr1#

假设您已经知道输入属性的数量,并且输入属性由 \t ,则可以执行以下操作:
在java中

// load data file
JavaRDD<String> file = jsc.textFile(path);

// build header rdd
JavaRDD<String> header = jsc.parallelize(Arrays.asList(file.first()));

// subtract header to have real data
JavaRDD<String> data = file.subtract(header);

// create row rdd
JavaRDD<Row> rowRDD = data.flatMap(new FlatMapFunction<String,Row>(){
    private static final long serialVersionUID = 1L;

    @Override
    public Iterable<Row> call(String line) throws Exception {
        String[] strs = line.split("\t");
        Row r1 = RowFactory.create(strs[0], "Attr1", strs[1]);
        Row r2 = RowFactory.create(strs[0], "Attr2", strs[2]);
        Row r3 = RowFactory.create(strs[0], "Attr3", strs[3]);
        return Arrays.asList(r1,r2,r3);
    }
});

// schema for df
StructType schema = new StructType().add("Name", DataTypes.StringType)
                                    .add("Attr", DataTypes.StringType)
                                    .add("Value", DataTypes.StringType);

DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show();

以下是输出:

+-----+-----+-----+
| Name| Attr|Value|
+-----+-----+-----+
|Smith|Attr1|    N|
|Smith|Attr2|    Y|
|Smith|Attr3|    N|
| John|Attr1|    Y|
| John|Attr2|    N|
| John|Attr3|    N|
+-----+-----+-----+

scala和java是相似的,您可以很容易地将它们转换成scala。

相关问题