在我的代码中 table_df
有一些列,我正在上面做一些计算,比如min,max,mean等,我想用指定的schema new\u df\u schema创建新的\u df。在我的逻辑中,我为计算编写了sparksql,并将每个新生成的行追加到最初的空的新的\u df中,最后,它会导致 new_df
所有列的所有计算值。
但问题是,当列的数量更多时,会导致性能问题。在不使用union()函数或任何其他方法来提高性能的情况下,可以做到这一点吗?
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._
val table_df = Seq(
(10, 20, 30, 40, 50),
(100, 200, 300, 400, 500),
(111, 222, 333, 444, 555),
(1123, 2123, 3123, 4123, 5123),
(1321, 2321, 3321, 4321, 5321)
).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
table_df.show(false)
table_df.createOrReplaceTempView("table_df")
val new_df_schema = StructType(
StructField("Column_Name", StringType, false) ::
StructField("number_of_values", LongType, false) ::
StructField("number_of_distinct_values", LongType, false) ::
StructField("distinct_count_with_nan", LongType, false) ::
StructField("distinct_count_without_nan", LongType, false) ::
StructField("is_unique", BooleanType, false) ::
StructField("number_of_missing_values", LongType, false) ::
StructField("percentage_of_missing_values", DoubleType, false) ::
StructField("percentage_of_unique_values", DoubleType, false) ::
StructField("05_PCT", DoubleType, false) ::
StructField("25_PCT", DoubleType, false) ::
StructField("50_PCT", DoubleType, false) ::
StructField("75_PCT", DoubleType, false) ::
StructField("95_PCT", DoubleType, false) ::
StructField("max", DoubleType, false) ::
StructField("min", DoubleType, false) ::
StructField("mean", DoubleType, false) ::
StructField("std", DoubleType, false) ::
StructField("skewness", DoubleType, false) ::
StructField("kurtosis", DoubleType, false) ::
StructField("range", DoubleType, false) ::
StructField("variance", DoubleType, false) :: Nil
)
var new_df = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], new_df_schema)
for (c <- table_df.columns) {
val num = sparkSession.sql(
s"""SELECT
| '$c' AS Column_Name,
| COUNT(${c}) AS number_of_values,
| COUNT(DISTINCT ${c}) AS number_of_distinct_values,
| COUNT(DISTINCT ${c}) AS distinct_count_with_nan,
| (COUNT(DISTINCT ${c}) - 1) AS distinct_count_without_nan,
| (COUNT(${c}) == COUNT(DISTINCT ${c})) AS is_unique,
| (COUNT(*) - COUNT(${c})) AS number_of_missing_values,
| ((COUNT(*) - COUNT(${c}))/COUNT(*)) AS percentage_of_missing_values,
| (COUNT(DISTINCT ${c})/COUNT(*)) AS percentage_of_unique_values,
| APPROX_PERCENTILE($c,0.05) AS 05_PCT,
| APPROX_PERCENTILE($c,0.25) AS 25_PCT,
| APPROX_PERCENTILE($c,0.50) AS 50_PCT,
| APPROX_PERCENTILE($c,0.75) AS 75_PCT,
| APPROX_PERCENTILE($c,0.95) AS 95_PCT,
| MAX($c) AS max,
| MIN($c) AS min,
| MEAN($c) AS mean,
| STD($c) AS std,
| SKEWNESS($c) AS skewness,
| KURTOSIS($c) AS kurtosis,
| (MAX($c) - MIN($c)) AS range,
| VARIANCE($c) AS variance
| FROM
| table_df""".stripMargin)
.toDF()
new_df = new_df.union(num) // this results performance issue when then number of columns in table_df is more
}
new_df.show(false)
==================================================
table_df:
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10 |20 |30 |40 |50 |
|100 |200 |300 |400 |500 |
|111 |222 |333 |444 |555 |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+
new_df:
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|Column_Name|number_of_values|number_of_distinct_values|distinct_count_with_nan|distinct_count_without_nan|is_unique|number_of_missing_values|percentage_of_missing_values|percentage_of_unique_values|05_PCT|25_PCT|50_PCT|75_PCT|95_PCT|max |min |mean |std |skewness |kurtosis |range |variance |
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|col_1 |5 |5 |5 |4 |true |0 |0.0 |1.0 |10.0 |100.0 |111.0 |1123.0|1321.0|1321.0|10.0|533.0 |634.0634826261484 |0.4334269738367067 |-1.7463346405299973|1311.0|402036.5 |
|col_2 |5 |5 |5 |4 |true |0 |0.0 |1.0 |20.0 |200.0 |222.0 |2123.0|2321.0|2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738682 |-1.799741951675132 |2301.0|1302313.7 |
|col_3 |5 |5 |5 |4 |true |0 |0.0 |1.0 |30.0 |300.0 |333.0 |3123.0|3321.0|3321.0|30.0|1421.4|1649.399072389699 |0.3979251063785061 |-1.8119558312496054|3291.0|2720517.3 |
|col_4 |5 |5 |5 |4 |true |0 |0.0 |1.0 |40.0 |400.0 |444.0 |4123.0|4321.0|4321.0|40.0|1865.6|2157.926620624529 |0.39502047381456235|-1.8165124206347685|4281.0|4656647.3 |
|col_5 |5 |5 |5 |4 |true |0 |0.0 |1.0 |50.0 |500.0 |555.0 |5123.0|5321.0|5321.0|50.0|2309.8|2666.59027598917 |0.3935246673563026 |-1.8186685628112493|5271.0|7110703.699999999|
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
3条答案
按热度按时间4ktjp1zp1#
我希望这会有帮助。此代码只是@srinivas提供的答案的扩展
yqkkidmi2#
另一个选择是
summary()
数据集内的api,该数据集按以下格式计算basicstats-类似地,您可以丰富dataframeapi,以获得所需格式的stats,如下所示-
定义要使用的richdataframe和implicits
使用以下提供的测试数据进行测试
您还可以按如下所示指定所需的函数
更新-根据ask,新的df计算如下
bgibtngc3#
替代品
union
.检查以下代码。
生成所需的表达式。
必需的列。
最终输出
更新
使用创建新Dataframe
Approx Quantile
对于所有列。