我试着测试 Window function
感谢sparkr的spark sql模块。我使用spark 1.6,并尝试在两种不同的部署模式下复制zero323提供的示例( local
以及 yarn-client
).
set.seed(1)
hc <- sparkRHive.init(sc)
sdf <- createDataFrame(hc, data.frame(x=1:12, y=1:3, z=rnorm(12)))
registerTempTable(sdf, "sdf")
query <- sql(hc, "SELECT x, y, z, LAG(z) OVER (PARTITION BY y ORDER BY x) FROM sdf")
head(query)
## x y z _c3
## 1 1 1 -0.6264538 NA
## 2 4 1 1.5952808 -0.6264538
## 3 7 1 0.4874291 1.5952808
## 4 10 1 -0.3053884 0.4874291
## 5 2 2 0.1836433 NA
## 6 5 2 0.3295078 0.1836433
但是对于这两种部署模式,我在执行spark操作时得到相同的错误 head(query)
:
16/01/21 18:03:17 ERROR r.RBackendHandler: dfToCols on org.apache.spark.sql.api.r.SQLUtils failed
Error in invokeJava(isStatic = TRUE, className, methodName, ...) :
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at org.apache.spark.sql.execution.Window.doExecute(Window.scala:245)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.
我直接在配置单元中尝试了这个hql查询,并且工作正常。还有“正常”查询,如 classical_query <- sql(hc, "SELECT * FROM sdf") head(classical_query)
很好用。
谢谢
1条答案
按热度按时间6mzjoqzu1#
我解决了我的问题。只是Spark配置问题。
我刚取下了
/usr/hdp/current/hive-client/lib/hive-exec.jar
罐从spark.driver.extraClassPath
中的变量spark-defaults.conf
配置文件。