apachespark编写需要几个小时

mxg2im7a  于 2021-06-14  发布在  Cassandra
关注(0)|答案(0)|浏览(235)

我有一个spark工作,它基于两个表进行正确的连接,读取和连接速度非常快,但是当尝试将连接结果插入cassandra db时,速度非常慢。插入1000行需要30多分钟,插入9行需要3分钟。请看下面我的配置。我们有3个cassandra和spark节点,所有节点都安装了spark。我对spark还很陌生,不知道怎么了。ı 可以在1秒以内(超过2000行)使用dse驱动程序插入相同大小的数据。我感谢你的时间和帮助!!
spark提交:

"dse -u " + username + " -p " + password + " spark-submit --class com.SparkJoin --executor-memory=20G  " +
                "SparkJoinJob-1.0-SNAPSHOT.jar " + filterMap.toString() + "

Spark芯版本:2.7.2
SparkCassandra连接器2.11:2.3.1
spark-sql 2.11:2.3.1
Spark形态

SparkConf conf = new SparkConf(true).setAppName("Appname");
    conf.set("spark.cassandra.connection.host", host);
    conf.set("spark.cassandra.auth.username", username);
    conf.set("spark.cassandra.auth.password", password);

    conf.set("spark.network.timeout", "600s");
    conf.set("spark.cassandra.connection.keep_alive_ms", "25000");
    conf.set("spark.cassandra.connection.timeout_ms", "5000000");
    conf.set("spark.sql.broadcastTimeout", "5000000");
    SparkContext sc = new SparkContext(conf);

    SparkSession sparkSession = SparkSession.builder().sparkContext(sc).getOrCreate();
    SQLContext sqlContext = sparkSession.sqlContext();

    sqlContext.setConf("spark.cassandra.connection.host", host);
    sqlContext.setConf("spark.cassandra.auth.username", username);
    sqlContext.setConf("spark.cassandra.auth.password", password);
    sqlContext.setConf("spark.network.timeout", "600s");
    sqlContext.setConf("spark.cassandra.connection.keep_alive_ms", "2500000");
    sqlContext.setConf("spark.cassandra.connection.timeout_ms", "5000000");
    sqlContext.setConf("spark.sql.broadcastTimeout", "5000000");
    sqlContext.setConf("spark.executor.heartbeatInterval", "5000000");
    sqlContext.setConf("spark.sql.crossJoin.enabled", "true");

左右表取数;

Dataset<Row> resultsFrame = sqlContext.sql("select * from table where conditions");
return resultsFrame.map((MapFunction<Row, JavaObject>) row -> {
// some operations here

                return obj;
            }, Encoders.bean(JavaObject.class)
    );

参加

Dataset<Row> result = RigtTableJavaRDD.join(LeftTableJavaRDD,
            (LeftTableJavaRDD.col("col1").minus(RigtTableJavaRDD.col("col2"))).
                    between(new BigDecimal("0").subtract(twoHundredMilliseconds), new BigDecimal("0").add(twoHundredMilliseconds))
                    .and(LeftTableJavaRDD.col("col5").equalTo(RigtTableJavaRDD.col("col6")))
            , "right");

插入结果

CassandraJavaUtil.javaFunctions(resultRDD.javaRDD()).
            writerBuilder("keyspace", "table", CassandraJavaUtil.mapToRow(JavaObject.class)).
            saveToCassandra();

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题