from pyspark.sql import functions as F
import boto3
import sys
## set broadcasted varibles
table_name = "dev.emp.master_events"
df = (
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 2)
.table(table_name)
)
items = df.select('*')
query = (items.writeStream.outputMode("append").foreachBatch(lambda items, epoch_id: items.write.json()).start())
字符串
低于错误
py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
Complete Error Message
File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in call
raise e
File "/databricks/spark/python/pyspark/sql/utils.py", line 114, in call
self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
File "<command-1376889177651808>", line 1, in <lambda>
query = (items.writeStream.outputMode("append").foreachBatch(lambda items, epoch_id: items.write.json()).start())
File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
res = func(*args, **kwargs)
TypeError: DataFrameWriter.json() missing 1 required positional argument: 'path'
at py4j.Protocol.getReturnValue(Protocol.java:476)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy161.call(Unknown Source)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:486)
at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:486)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:139)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:166)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:139)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:101)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:662)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607)
at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616)
at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495)
at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493)
at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:82)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:276)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:276)
型
1条答案
按热度按时间djmepvbi1#
正如error明确指出的那样,您需要将
path
作为参数传递给items.write.json()
中的.json()
。查看pyspark.sql.DataFrameWriter.json的参考文档
此外,为了可读性,我将定义一个常规函数并将其传递给pyspark.sql.streaming.DataStreamWriter.foreachBatch,而不是匿名lambda函数