我想将具有一组值的dataframe传递给新查询,但失败了。
1) 在这里,我选择特定的列,以便在下一个查询中传递isin
scala> val managerIdDf=finalEmployeesDf.filter($"manager_id"!==0).select($"manager_id").distinct
managerIdDf: org.apache.spark.sql.DataFrame = [manager_id: bigint]
2) 我的示例数据:
scala> managerIdDf.show
+----------+
|manager_id|
+----------+
| 67832|
| 65646|
| 5646|
| 67858|
| 69062|
| 68319|
| 66928|
+----------+
3) 执行最终查询时失败:
scala> finalEmployeesDf.filter($"emp_id".isin(managerIdDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.DataFrame [manager_id: bigint]
我也试着转换成 List
以及 Seq
但它只会产生一个错误。当我试着转换成 Seq
然后重新运行查询,它会抛出一个错误:
scala> val seqDf=managerIdDf.collect.toSeq
seqDf: Seq[org.apache.spark.sql.Row] = WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])
scala> finalEmployeesDf.filter($"emp_id".isin(seqDf)).select("*").show
java.lang.RuntimeException: Unsupported literal type class scala.collection.mutable.WrappedArray$ofRef WrappedArray([67832], [65646], [5646], [67858], [69062], [68319], [66928])
我也提到这个职位,但徒劳。这种类型的查询我尝试用它来解决sparkDataframe中的子查询。有人在吗?
2条答案
按热度按时间o4hqfura1#
使用spark sql的dataframes和tempviews以及自由格式sql的替代方法—不要担心逻辑,它只是一种约定,是您最初方法的替代方法—应该同样足够:
或
或
或
或
具体来说:
qoefvg9y2#
是的,您不能传入Dataframe
isin
.isin
需要一些它将根据其进行筛选的值。如果你想要一个例子,你可以在这里检查我的答案
根据问题更新,您可以进行以下更改,
到