pyspark 在python中将流 Dataframe 转换为JSON格式

wh6knrhe  于 11个月前  发布在  Spark
关注(0)|答案(1)|浏览(110)
from pyspark.sql import functions as F
import boto3
import sys

## set broadcasted varibles
table_name = "dev.emp.master_events"

df = (
    spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 2)
    .table(table_name)
)

items = df.select('*')

query = (items.writeStream.outputMode("append").foreachBatch(lambda items, epoch_id: items.write.json()).start())

字符串
低于错误

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):

Complete Error Message 
    File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
    File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in call
    raise e
    File "/databricks/spark/python/pyspark/sql/utils.py", line 114, in call
    self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
    File "<command-1376889177651808>", line 1, in <lambda>
    query = (items.writeStream.outputMode("append").foreachBatch(lambda items, epoch_id: items.write.json()).start())
    File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
    res = func(*args, **kwargs)
    TypeError: DataFrameWriter.json() missing 1 required positional argument: 'path'

    at py4j.Protocol.getReturnValue(Protocol.java:476)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy161.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:486)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:486)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:139)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:166)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:139)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:101)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:662)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616)
    at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495)
    at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493)
    at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:82)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:276)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
    at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
    at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
    at scala.util.Using$.resource(Using.scala:269)
    at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:276)

djmepvbi

djmepvbi1#

正如error明确指出的那样,您需要将path作为参数传递给items.write.json()中的.json()
查看pyspark.sql.DataFrameWriter.json的参考文档
此外,为了可读性,我将定义一个常规函数并将其传递给pyspark.sql.streaming.DataStreamWriter.foreachBatch,而不是匿名lambda函数

相关问题