如何在spark scalaDataframe中应用布尔索引?

mqkwyuun  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(516)

我有两个spark scalaDataframe,我需要使用一个Dataframe中的一个布尔列来过滤第二个Dataframe。两个Dataframe的行数相同。
在Pandas中,我希望它是这样的:

  1. import pandas as pd
  2. df1 = pd.DataFrame({"col1": ["A", "B", "A", "C"], "boolean_column": [True, False, True, False]})
  3. df2 = pd.DataFrame({"col1": ["Z", "X", "Y", "W"], "col2": [1, 2, 3, 4]})
  4. filtered_df2 = df2[df1['boolean_column']]
  5. // Expected filtered_df2 should be this:
  6. // df2 = pd.DataFrame({"col1": ["Z", "Y"], "col2": [1, 3]})

如何在spark scala中以最节省时间的方式执行相同的操作?
我目前的解决办法是 "boolean_column"df1df2 ,然后筛选 df2 只选择具有 true 新添加列中的值,最后删除 "boolean_column"df2 ,但我不确定这是不是最好的解决办法。
任何建议都将不胜感激。
编辑:
预期的输出是一个sparkscalaDataframe(不是列表或列),它与第二个Dataframe具有相同的模式,并且只有来自的行的子集 df2 满足 "boolean_column"df1 .
的模式 df2 上面介绍的只是一个例子。我期待着收到 df2 作为参数,具有不同(且不固定)模式的任意数量的列。

w8ntj3qf

w8ntj3qf1#

我用以下代码解决了这个问题:

  1. import org.apache.spark.sql.types.{LongType, StructField, StructType}
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
  4. val spark = SparkSession.builder().appName(sc.appName).master(sc.master).getOrCreate()
  5. val sqlContext = spark.sqlContext
  6. def addColumnIndex(df: DataFrame, sqlContext: SQLContext) = sqlContext.createDataFrame(
  7. // Add Column index
  8. df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
  9. // Create schema
  10. StructType(df.schema.fields :+ StructField("columnindex", LongType, nullable = false))
  11. )
  12. import spark.implicits._
  13. val DF1 = Seq(
  14. ("A", true),
  15. ("B", false),
  16. ("A", true),
  17. ("C", false)
  18. ).toDF("col1", "boolean_column")
  19. val DF2 = Seq(
  20. ("Z", 1),
  21. ("X", 2),
  22. ("Y", 3),
  23. ("W", 4)
  24. ).toDF("col_1", "col_2")
  25. // Add index
  26. val DF1WithIndex = addColumnIndex(DF1, sqlContext)
  27. val DF2WithIndex = addColumnIndex(DF2, sqlContext)
  28. // Join
  29. val joinDF = DF2WithIndex
  30. .join(DF1WithIndex, Seq("columnindex"))
  31. .drop("columnindex", "col1")
  32. // Filter
  33. val filteredDF2 = joinDF.filter(joinDF("boolean_column")).drop("boolean_column")

过滤后的Dataframe如下所示:

  1. +-----+-----+
  2. |col_1|col_2|
  3. +-----+-----+
  4. | Z| 1|
  5. | Y| 3|
  6. +-----+-----+
展开查看全部
uubf1zoe

uubf1zoe2#

两个都可以 DataFrame 然后过滤这些元组。

  1. val ints = sparkSession.sparkContext.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
  2. val bools = sparkSession.sparkContext.parallelize(List(true, false, true, false, true, false, true, false, true, false))
  3. val filtered = ints.zip(bools).filter { case (int, bool) => bool }.map { case (int, bool) => int }
  4. println(filtered.collect().toList) //List(1, 3, 5, 7, 9)

相关问题