pyspark 在python中将流 Dataframe 转换为JSON格式

wh6knrhe  于 2024-01-06  发布在  Spark
关注(0)|答案(1)|浏览(160)
  1. from pyspark.sql import functions as F
  2. import boto3
  3. import sys
  4. ## set broadcasted varibles
  5. table_name = "dev.emp.master_events"
  6. df = (
  7. spark.readStream.format("delta")
  8. .option("readChangeFeed", "true")
  9. .option("startingVersion", 2)
  10. .table(table_name)
  11. )
  12. items = df.select('*')
  13. query = (items.writeStream.outputMode("append").foreachBatch(lambda items, epoch_id: items.write.json()).start())

字符串
低于错误

  1. py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  2. Complete Error Message
  3. File "/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 617, in _call_proxy
  4. return_value = getattr(self.pool[obj_id], method)(*params)
  5. File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in call
  6. raise e
  7. File "/databricks/spark/python/pyspark/sql/utils.py", line 114, in call
  8. self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
  9. File "<command-1376889177651808>", line 1, in <lambda>
  10. query = (items.writeStream.outputMode("append").foreachBatch(lambda items, epoch_id: items.write.json()).start())
  11. File "/databricks/spark/python/pyspark/instrumentation_utils.py", line 48, in wrapper
  12. res = func(*args, **kwargs)
  13. TypeError: DataFrameWriter.json() missing 1 required positional argument: 'path'
  14. at py4j.Protocol.getReturnValue(Protocol.java:476)
  15. at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
  16. at com.sun.proxy.$Proxy161.call(Unknown Source)
  17. at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:486)
  18. at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:486)
  19. at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatchLegacy$1(ForeachBatchSink.scala:139)
  20. at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  21. at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.runWithAQE(ForeachBatchSink.scala:166)
  22. at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatchLegacy(ForeachBatchSink.scala:139)
  23. at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.$anonfun$addBatch$2(ForeachBatchSink.scala:101)
  24. at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  25. at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:662)
  26. at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:607)
  27. at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:616)
  28. at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:495)
  29. at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:493)
  30. at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:82)
  31. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354)
  32. at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:276)
  33. at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  34. at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)
  35. at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:99)
  36. at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:104)
  37. at scala.util.Using$.resource(Using.scala:269)
  38. at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:103)
  39. at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:276)

djmepvbi

djmepvbi1#

正如error明确指出的那样,您需要将path作为参数传递给items.write.json()中的.json()
查看pyspark.sql.DataFrameWriter.json的参考文档
此外,为了可读性,我将定义一个常规函数并将其传递给pyspark.sql.streaming.DataStreamWriter.foreachBatch,而不是匿名lambda函数

相关问题