我是spark的新手,我想使用group by&reduce从csv中查找以下内容(一行):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
我想通过按部门分组、指定、状态以及附加列sum(costtocompany)和totalemployeecount来简化关于csv的内容
结果如下:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
是否有任何方法可以通过转换和操作来实现这一点。或者我们应该进行rdd操作?
4条答案
按热度按时间uqdfh47h1#
对于json,如果文本文件每行包含一个json对象,则可以使用
sqlContext.jsonFile(path)
让sparksql作为SchemaRDD
(将自动推断架构)。然后,您可以将它注册为一个表并用sql查询它。也可以手动加载文本文件作为RDD[String]
每个记录包含一个json对象并使用sqlContext.jsonRDD(rdd)
把它变成一个SchemaRDD
.jsonRDD
在需要预处理数据时非常有用。nkcskrwz2#
下面的内容可能不完全正确,但它应该能让您了解如何处理数据。它并不漂亮,应该用case类等代替,但是作为如何使用sparkapi的一个快速示例,我希望它足够了:)
或者可以使用sparksql:
hyrbngr73#
csv文件可以用spark内置的csv阅读器进行解析。它将在成功读取文件时返回dataframe/dataset。在dataframe/dataset之上,您可以轻松地应用类似sql的操作。
在java中使用spark 2.x(及更高版本)
创建sparksession对象(又名spark)
为structtype为行创建架构
从csv文件创建dataframe并对其应用架构
从csv文件读取数据的更多选项
现在我们可以通过两种方式对数据进行聚合
1. sql方式
在spark sql元存储中注册表以执行sql操作
对已注册的Dataframe运行sql查询
我们甚至可以直接在csv文件上执行sql,而不必使用sparksql创建表
2. 对象链接或编程或类似java的方式
为sql函数执行必要的导入
使用
groupBy
以及agg
在要执行的Dataframe/数据集上count
以及sum
关于数据相关库
9rnv2umw4#
程序
创建一个类(schema)来封装您的结构(方法b不需要它,但是如果您使用java,它会使您的代码更易于阅读)
加载cvs(json)文件
目前有两种方法:
答。sparksql语言
注册一个表(使用您定义的schema类)
使用所需的查询分组查询表
在这里,您还可以使用sql方法执行任何其他需要的查询
b。Spark
使用复合键Map:
Department
,Designation
,State
```JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD =
rdd_records.mapToPair(new
PairFunction<Record, String, Tuple2<Long, Integer>>(){
public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
Tuple2<String, Tuple2<Long, Integer>> t2 =
new Tuple2<String, Tuple2<Long,Integer>>(
record.Department + record.Designation + record.State,
new Tuple2<Long, Integer>(record.costToCompany,1)
);
return t2;
}
JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records =
records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
Integer>, Tuple2<Long, Integer>>() {
public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
Tuple2<Long, Integer> v2) throws Exception {
return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
}
});