使用apachespark和java将csv解析为Dataframe/数据集

vs91vp4v  于 2021-05-30  发布在  Hadoop
关注(0)|答案(4)|浏览(434)

我是spark的新手,我想使用group by&reduce从csv中查找以下内容(一行):

Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想通过按部门分组、指定、状态以及附加列sum(costtocompany)和totalemployeecount来简化关于csv的内容
结果如下:

Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

是否有任何方法可以通过转换和操作来实现这一点。或者我们应该进行rdd操作?

uqdfh47h

uqdfh47h1#

对于json,如果文本文件每行包含一个json对象,则可以使用 sqlContext.jsonFile(path) 让sparksql作为 SchemaRDD (将自动推断架构)。然后,您可以将它注册为一个表并用sql查询它。也可以手动加载文本文件作为 RDD[String] 每个记录包含一个json对象并使用 sqlContext.jsonRDD(rdd) 把它变成一个 SchemaRDD . jsonRDD 在需要预处理数据时非常有用。

nkcskrwz

nkcskrwz2#

下面的内容可能不完全正确,但它应该能让您了解如何处理数据。它并不漂亮,应该用case类等代替,但是作为如何使用sparkapi的一个快速示例,我希望它足够了:)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)

val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

# debug output

results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

或者可以使用sparksql:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables

employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""
hyrbngr7

hyrbngr73#

csv文件可以用spark内置的csv阅读器进行解析。它将在成功读取文件时返回dataframe/dataset。在dataframe/dataset之上,您可以轻松地应用类似sql的操作。

在java中使用spark 2.x(及更高版本)

创建sparksession对象(又名spark)

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

为structtype为行创建架构

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

从csv文件创建dataframe并对其应用架构

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

从csv文件读取数据的更多选项

现在我们可以通过两种方式对数据进行聚合

1. sql方式

在spark sql元存储中注册表以执行sql操作

df.createOrReplaceTempView("employee");

对已注册的Dataframe运行sql查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

我们甚至可以直接在csv文件上执行sql,而不必使用sparksql创建表

2. 对象链接或编程或类似java的方式

为sql函数执行必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

使用 groupBy 以及 agg 在要执行的Dataframe/数据集上 count 以及 sum 关于数据

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

相关库

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
9rnv2umw

9rnv2umw4#

程序

创建一个类(schema)来封装您的结构(方法b不需要它,但是如果您使用java,它会使您的代码更易于阅读)

public class Record implements Serializable {
  String department;
  String designation;
  long costToCompany;
  String state;
  // constructor , getters and setters  
}

加载cvs(json)文件

JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path/input.csv");
//JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified

JavaRDD<Record> rdd_records = sc.textFile(data).map(
  new Function<String, Record>() {
      public Record call(String line) throws Exception {
         // Here you can use JSON
         // Gson gson = new Gson();
         // gson.fromJson(line, Record.class);
         String[] fields = line.split(",");
         Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
         return sd;
      }
});

目前有两种方法:

答。sparksql语言

注册一个表(使用您定义的schema类)

JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
table.registerAsTable("record_table");
table.printSchema();

使用所需的查询分组查询表

JavaSchemaRDD res = sqlContext.sql("
  select department,designation,state,sum(costToCompany),count(*) 
  from record_table 
  group by department,designation,state
");

在这里,您还可以使用sql方法执行任何其他需要的查询

b。Spark

使用复合键Map: Department , Designation , State ```
JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD =
rdd_records.mapToPair(new
PairFunction<Record, String, Tuple2<Long, Integer>>(){
public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
Tuple2<String, Tuple2<Long, Integer>> t2 =
new Tuple2<String, Tuple2<Long,Integer>>(
record.Department + record.Designation + record.State,
new Tuple2<Long, Integer>(record.costToCompany,1)
);
return t2;
}

});
使用复合键归约键,求和 `costToCompany` 列,并按键累积记录数

JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records =
records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
Integer>, Tuple2<Long, Integer>>() {
public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
Tuple2<Long, Integer> v2) throws Exception {
return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
}
});

相关问题