如何提高性能我的spark工作在这里加载数据到cassandra表?

42fyovps  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(278)

我使用的是spark-sql-2.4.1、spark-cassandra-connector 2.11-2.4.1以及java8和apache cassandra 3.0版本。
我有我的Spark提交或Spark集群环境如下加载20亿记录。

--executor-cores 3 
--executor-memory 9g 
--num-executors 5 
--driver-cores 2 
--driver-memory 4g

我使用的cassandra 6节点群集具有以下设置:

cassandra.output.consistency.level=ANY
cassandra.concurrent.writes=1500
cassandra.output.batch.size.bytes=2056
cassandra.output.batch.grouping.key=partition 
cassandra.output.batch.grouping.buffer.size=3000
cassandra.output.throughput_mb_per_sec=128
cassandra.connection.keep_alive_ms=30000
cassandra.read.timeout_ms=600000

我正在使用sparkDataframe加载到cassandra表中。在读入spark数据集后,我按以下特定列进行分组。

Dataset<Row> dataDf = //read data from source i.e. hdfs file which are already partitioned based "load_date", "fiscal_year" , "fiscal_quarter" , "id",  "type","type_code"

Dataset<Row> groupedDf = dataDf.groupBy("id","type","value" ,"load_date","fiscal_year","fiscal_quarter" , "create_user_txt", "create_date")

 groupedDf.write().format("org.apache.spark.sql.cassandra")
    .option("table","product")
    .option("keyspace", "dataload")
    .mode(SaveMode.Append)
    .save();

Cassandra table(
    PRIMARY KEY (( id, type, value, item_code ), load_date)
) WITH CLUSTERING ORDER BY ( load_date DESC )

基本上我是按“id”、“type”、“value”、“load\u date”列分组的。由于其他列(“财政年度”、“财政季度”、“创建用户文本”、“创建日期”)应可用于存储到cassandra表中,因此我必须将它们也包括在groupby子句中。
1) 坦率地说,我不知道如何将groupby之后的那些列放入结果Dataframe,即groupeddf来存储。有什么建议吗?
2) 在上面的过程/步骤中,我的spark加载工作非常慢,因为有很多无序处理,比如读无序处理和写无序处理。
我应该怎么做来提高速度?
在从源代码(到datadf)读取数据时,我需要在这里做些什么来提高性能吗?这已经分区了。
我还需要做分区吗?如果是这样的话,根据上面的Cassandra表,最好的方法是什么?
hdfs文件列
“id”,“类型”,“值”,“类型代码”,“加载日期”,“项目代码”,“会计年度”,“会计季度”,“创建日期”,“上次更新日期”,“创建用户文本”,“更新用户文本”
旋转
我使用groupby是因为如下所示的旋转

Dataset<Row> pivot_model_vals_unpersist_df =  model_vals_df.groupBy("id","type","value","type_code","load_date","item_code","fiscal_year","fiscal_quarter","create_date")
                .pivot("type_code" )
                .agg(  first(//business logic)
                )
              )

请给我建议。非常感谢您的建议/反馈。

mqkwyuun

mqkwyuun1#

所以,正如我从评论中得到的,你的下一个任务是:
从hdfs中取出2b行。
通过一些转换将这些行保存到cassandra中。
cassandra表的模式与hdfs数据集的模式不同。
一开始,你肯定不需要分组。groupby不对列进行分组,它对行进行分组,调用sum、avg、max等聚合函数。语义类似于sql“groupby”,所以这不是你的情况。您真正需要的是-使您的“to save”数据集适合所需的cassandra模式。
在java中,这比在scala中要复杂一些。首先,我建议定义一个bean来表示cassandra行。

public class MyClass {

   // Remember to declare no-args constructor
   public MyClass() { }

   private Long id;
   private String type;
   // another fields, getters, setters, etc
}

您的数据集是dataset,您需要将其转换为javardd。所以,你需要一个转换器。

public class MyClassFabric {
   public static MyClass fromRow(Row row) {
       MyClass myClass = new MyClass();
       myClass.setId(row.getInt("id"));
       // ....
       return myClass;
   }
}

结果我们会有这样的结果:

JavaRDD<MyClass> rdd = dataDf.toJavaRDD().map(MyClassFabric::fromRow);
javaFunctions(rdd).writerBuilder("keyspace", "table", 
  mapToRow(MyClass.class)).saveToCassandra();

更多信息,你可以看看https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md

相关问题