我尝试使用PySpark将流写入Elasticsearch。我从Kafka读取了两个 Dataframe 并将其连接到df_joined
。将df_joined
打印到终端显示了正确的列和值。一旦我尝试使用以下代码将其写入Elasticsearch(在本地主机:9200上):
spark_session = SparkSession.builder.appName("spark-test").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.elasticsearch:elasticsearch-hadoop:8.5.3").getOrCreate()
df1 = ...
df2 = ... # For df1 and df2 I do .select(from_json(col('value), schema))
df_joined = df1.join(df2, df1.fk == d2.pk)
query = df_joined \
.writeStream \
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("es.resource", "name_of_index/name_of_type") \
.option("es.mapping.id", "id") \
.option("es.spark.sql.streaming.sink.log.enabled", False) \
.option("checkpointLocation", "/tmp/es_checkpoint") \
.start()
query.awaitTermination()
我使用:
- ElasticSearchHadoop版本8.5.3
- PySpark版本3.3.1
我得到以下错误/堆栈跟踪:
22/12/28 00:41:48 ERROR MicroBatchExecution: Query [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812] terminated with error
java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Exception in thread "stream execution thread for [id = 3b9b239f-bd38-43e0-820a-6d8fdac56e79, runId = e4869668-2e3c-4e8e-a6be-6c111a832812]" java.lang.NoSuchMethodError: org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(Lorg/apache/spark/sql/SparkSession;Lorg/apache/spark/sql/execution/QueryExecution;Lscala/Function0;)Ljava/lang/Object;
at org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink.addBatch(EsSparkSqlStreamingSink.scala:62)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:666)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
"我所尝试的"
- 我已经尝试将PySpark降级到3.2.0版本(基于一个可接受的SO答案)
- 创建SparkSession时,我尝试将
org.elasticsearch:elasticsearch-hadoop:8.5.3
更改为org.elasticsearch:elasticsearch-spark-20_2.11:8.5.3
- 编辑:*
- Koedlt的回答对更好地理解这个问题非常有帮助,但不幸的是,将版本降级到2.3.0并没有解决这个问题,因为与Kafka的集成失败了。
1条答案
按热度按时间d7v8vwbk1#
如果您查看错误中最有趣的部分,您会看到以下内容:
java.lang.NoSuchMethodError:组织的apache. spark. sql. execution. sql执行文件,并使用新的执行ID(日志/apache/spark/sql/spark会话;日志/apache/spark/sql/执行/查询执行; Lscala/函数0;)
也就是说,它不是在寻找
SQLExecution
的一个方法,叫做withNewExecutionId
,它有一个SparkSession
,一个QueryExecution
和一个Function
作为输入,让我们看看Spark的源代码中的这个方法。在3.3.1版本中,我们看到以下函数签名:
这就解释了为什么会得到
NoSuchMethodError
:不存在具有您所期望的函数签名的方法!现在,让我们看看Maven Repository中工件的依赖关系。对于Spark,我们看到:
左栏是依赖项版本,右栏是该依赖项的最新稳定更新。让我们看看依赖项版本,2.3.0。
在2.3.0版本中,我们看到以下函数签名:
这看起来像我们所期望的!一个
SparkSession
,一个QueryExecution
和一个Function
作为输入。