scala—如何根据原始Dataframe中的行总数将Dataframe拆分为两个Dataframe

x6h2sr28  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(575)

你好,我是spark和scala的新手,我想拆分以下Dataframe:

df:
+----------+-----+------+----------+--------+
|        Ts| Temp|  Wind|  Precipit|Humidity|
+----------+-----+------+----------+--------+
|1579647600|   10|    22|        10|      50|
|1579734000|   11|    21|        10|      55|
|1579820400|   10|    18|        15|      60|
|1579906800|    9|    23|        20|      60|
|1579993200|    8|    24|        25|      50|
|1580079600|   10|    18|        27|      60|
|1580166000|   11|    20|        30|      50|
|1580252400|   12|    17|        15|      50|
|1580338800|   10|    14|        21|      50|
|1580425200|    9|    16|        25|      60|
-----------+-----+------+----------+--------+

结果Dataframe应如下所示:

df1:
+----------+-----+------+----------+--------+
|        Ts| Temp|  Wind|  Precipit|Humidity|
+----------+-----+------+----------+--------+
|1579647600|   10|    22|        10|      50|
|1579734000|   11|    21|        10|      55|
|1579820400|   10|    18|        15|      60|
|1579906800|    9|    23|        20|      60|
|1579993200|    8|    24|        25|      50|
|1580079600|   10|    18|        27|      60|
|1580166000|   11|    20|        30|      50|
|1580252400|   12|    17|        15|      50|
+----------+-----+------+----------+--------+
df2:
+----------+-----+------+----------+--------+
|        Ts| Temp|  Wind|  Precipit|Humidity|
+----------+-----+------+----------+--------+
|1580338800|   10|    14|        21|      50|
|1580425200|    9|    16|        25|      60|
-----------+-----+------+----------+--------+

其中df1有df的80%的顶行,df2剩下20%。

7vhp5slm

7vhp5slm1#

假设数据是随机分割的:

val Array(df1, df2) = df.randomSplit(Array(0.8, 0.2))

但是,如果“顶行”是指示例Dataframe中的“ts”列,则可以执行以下操作:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col,percent_rank}

val window = Window.partitionBy().orderBy(df['Ts'].desc())

val df1 = df.select('*', percent_rank().over(window).alias('rank')) 
  .filter(col('rank') >= 0.2) 
  .show()

val df2 = df.select('*', percent_rank().over(window).alias('rank')) 
  .filter(col('rank') < 0.2) 
  .show()
vof42yt1

vof42yt12#

尝试 monotonically_increasing_id() 函数 window percent_rank() 因为这个函数保持了顺序。 Example: ```
val df=sc.parallelize(Seq((1579647600,10,22,10,50),
(1579734000,11,21,10,55),
(1579820400,10,18,15,60),
(1579906800, 9,23,20,60),
(1579993200, 8,24,25,50),
(1580079600,10,18,27,60),
(1580166000,11,20,30,50),
(1580252400,12,17,15,50),
(1580338800,10,14,21,50),
(1580425200, 9,16,25,60)),10).toDF("Ts","Temp","Wind","Precipit","Humidity")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df1=df.withColumn("mid",monotonically_increasing_id)
val df_above_80=df1.withColumn("pr",percent_rank().over(w)).filter(col("pr") >= 0.8).drop(Seq("mid","pr"):_*)
val df_below_80=df1.withColumn("pr",percent_rank().over(w)).filter(col("pr") < 0.8).drop(Seq("mid","pr"):_*)

df_below_80.show()
/*
+----------+----+----+--------+--------+
| Ts|Temp|Wind|Precipit|Humidity|
+----------+----+----+--------+--------+
|1579647600| 10| 22| 10| 50|
|1579734000| 11| 21| 10| 55|
|1579820400| 10| 18| 15| 60|
|1579906800| 9| 23| 20| 60|
|1579993200| 8| 24| 25| 50|
|1580079600| 10| 18| 27| 60|
|1580166000| 11| 20| 30| 50|
|1580252400| 12| 17| 15| 50|
+----------+----+----+--------+--------+

  • /

df_above_80.show()
/*
+----------+----+----+--------+--------+
| Ts|Temp|Wind|Precipit|Humidity|
+----------+----+----+--------+--------+
|1580338800| 10| 14| 21| 50|
|1580425200| 9| 16| 25| 60|
+----------+----+----+--------+--------+

  • /

相关问题