spark无法计算表达式:窗口表达式的滞后

vmpqdwk3  于 2021-06-15  发布在  Cassandra
关注(0)|答案(2)|浏览(591)

我试图对来自cassandra表的Dataframe执行大量操作,然后将其保存到另一个表中。其中一项操作如下:

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").asc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", lag(sum(col("temp1")).over(leadWindow), 2, 0))

当我运行我的工作,我得到一个例外说 lag 无法计算操作。。

2018-10-08 12:02:22 INFO  Cluster:1543 - New Cassandra host /127.0.0.1:9042 added
    2018-10-08 12:02:22 INFO  CassandraConnector:35 - Connected to Cassandra cluster: Test Cluster
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    2018-10-08 12:02:23 INFO  CassandraSourceRelation:35 - Input Predicates: [IsNotNull(ts)]
    Exception in thread "main" java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[43, bigint, true], 2, 0)
            at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:258)
            at org.apache.spark.sql.catalyst.expressions.OffsetWindowFunction.doGenCode(windowExpressions.scala:326)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.Add.doGenCode(arithmetic.scala:174)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:496)
            at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:479)
            at org.apache.spark.sql.catalyst.expressions.BinaryComparison.doGenCode(predicates.scala:513)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.And.doGenCode(predicates.scala:397)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:202)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen$$anonfun$8.apply(conditionalExpressions.scala:201)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.immutable.List.foreach(List.scala:381)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.immutable.List.map(List.scala:285)
            at org.apache.spark.sql.catalyst.expressions.CaseWhen.doGenCode(conditionalExpressions.scala:201)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107)
            at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104)
            at scala.Option.getOrElse(Option.scala:121)
            at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104)
            at org.apache.spark.sql.catalyst.expressions.Alias.genCode(namedExpressions.scala:142)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.ProjectExec$$anonfun$6.apply(basicPhysicalOperators.scala:60)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
            at scala.collection.AbstractTraversable.map(Traversable.scala:104)
            at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:60)
            at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
            at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:383)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:354)
            at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
            at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
            at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
            at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.DeserializeToObjectExec.doExecute(objects.scala:89)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.Dataset.rdd$lzycompute(Dataset.scala:2975)
            at org.apache.spark.sql.Dataset.rdd(Dataset.scala:2973)
            at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:76)
            at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:86)
            at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
            at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
            at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
            at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
            at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
            at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
            at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
            at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
            at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
            at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
            at com.test.functions.package$ChecksFunctions.appendToTable(package.scala:66)
            at com.test.TestFromCassandra$.main(TestFromCassandra.scala:66)
            at com.test.TestFromCassandra.main(TestFromCassandra.scala)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
            at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
            at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
            at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
            at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
            at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    2018-10-08 12:02:31 INFO  CassandraConnector:35 - Disconnected from Cassandra cluster: Test Cluster

第130行 TestFromCassandra 文件是 save() 功能。我在stackoverflow上没有发现任何类似的问题。。
有人知道我为什么会遇到这个例外吗?做那件事 lag 功能对滚动有任何限制 sum 功能?
编辑:我在spark的jira上发现了类似的问题。墙上好像有个窃听器 filter 引用后的函数 window 函数,并且由于cassandra连接器正在筛选主键成员上的Dataframe(使用 isnotnull 函数)在保存之前,这是可能导致异常的原因。有没有一种方法可以通过避免这个bug而不使用聚合函数来执行这个操作?或者有人知道如何修复这个错误吗?
编辑2:我还尝试使用 foreach 写入程序和连接器 withSessionDo 函数,但我仍然得到相同的异常。。从来没有人遇到过这个问题?
编辑3:我找到了另一种方法来实现我想要的操作:

val leadWindow = Window.partitionBy(col("id")).orderBy(col("timestamp").desc).rowsBetween(Window.currentRow, 2)
df.withColumn("lead1", sum(col("temp1")).over(leadWindow))

问题不是由过滤器引起的。在窗口表达式上使用lag函数似乎是不可能的。

ki1q1bka

ki1q1bka1#

我看到了同样的错误。虽然有解决这个问题的方法,但spark应该可以解决这个问题。我相信你会碰到这个问题与任何窗口功能,而不仅仅是滞后。我认为原因是spark试图在filter上生成代码,但是窗口函数不能生成代码。解决方法是使用此窗口表达式创建一列,并在筛选器中使用该列。

ecr0jaav

ecr0jaav2#

我遇到了同样的问题,然后我注意到你在lag中使用了over函数(和我一样)。我改成这样: df.withColumn("lag1", lag(sum(col("temp1")), 2, 0).over(lagWindow))

相关问题