仅当Dataframe中存在列时才应用withcolumn

vkc1a9a2  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(440)

我正在使用spark-sql-2.4.1v和java8。我有一个场景,其中我将被传递列名为list/seq,对于那些列,我只需要执行某些操作,如sum、avg、percentages等。
在我的场景中,假设我有第1列、第2列、第3列。我将第一次传递column1 name。
将拉取/选择“column1”数据并基于“column1”执行一些操作。第二次我将传递column2 name,但这次没有提取较早的column1,因此我的数据集不包含“column1”,因此较早的条件将中断,并出现错误“analysisexception:cannot resolve” column1 '给定输入列'。
因此,我需要检查列,如果某个列存在,则只执行与该列相关的操作,否则忽略这些操作。
如何在spark中执行此操作?
数据库中的示例数据。

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12 , 20),
  ("21", "score", "school", "2018-03-31", 13 , 13 , 21),
  ("22", "rate", "school", "2018-03-31", 11 , 14, 22),
  ("21", "rate", "school", "2018-03-31", 13 , 12, 23)
 )
    val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3")
.select("id", "code", "entity", "date", "column2") /// these are passed for each run....this set will keep changing.

  Dataset<Row> enrichedDs = df
             .withColumn("column1_org",col("column1"))
             .withColumn("column1",
                     when(col("column1").isNotNull() , functions.callUDF("lookUpData",col("column1").cast(DataTypes.StringType)))
                  );

上述逻辑仅在选择列中的“column1”可用时适用。这在第二个集合中是失败的,因为“column1”不是select,所以我需要理解为什么这只适用于作为“column1”的selected列。我需要一些逻辑来实现这一点。

xv8emn3q

xv8emn3q1#

检查这是否有用- you can filter out columns, and process only valid columns ```
df.show(false)
/**
* +---+-----+------+----------+-------+-------+-------+
* |id |code |entity|date |column1|column2|column3|
* +---+-----+------+----------+-------+-------+-------+
* |20 |score|school|2018-03-31|14 |12 |20 |
* |21 |score|school|2018-03-31|13 |13 |21 |
* |22 |rate |school|2018-03-31|11 |14 |22 |
* |21 |rate |school|2018-03-31|13 |12 |23 |
* +---+-----+------+----------+-------+-------+-------+
/
// list of columns
val cols = Seq("column1", "column2" ,"column3", "column4")
val processColumns = cols.filter(df.columns.contains).map(sqrt)
df.select(processColumns: _
).show(false)

/**
  * +------------------+------------------+-----------------+
  * |SQRT(column1)     |SQRT(column2)     |SQRT(column3)    |
  * +------------------+------------------+-----------------+
  * |3.7416573867739413|3.4641016151377544|4.47213595499958 |
  * |3.605551275463989 |3.605551275463989 |4.58257569495584 |
  * |3.3166247903554   |3.7416573867739413|4.69041575982343 |
  * |3.605551275463989 |3.4641016151377544|4.795831523312719|
  * +------------------+------------------+-----------------+
  */
clj7thdc

clj7thdc2#

不确定我是否完全理解您的需求,但您是否只是尝试执行一些条件操作,具体取决于Dataframe中哪些列在执行之前不知道?
如果是这样,dataframe.columns将返回一个列列表,您可以对这些列进行相应的分析和选择

df.columns.foreach { println }

相关问题