无法将PySpark Dataframe 保存为parquet文件

cpjpxq1n  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(277)

这里是完整的Python新手。我试图将一个PySpark Dataframe 保存为parquet文件,但是它给了我一个错误。我在PC上安装了PySpark PySpark版本3.3.0,Hadoop版本3.2.2,Java jdk1.8.0_351,并创建了环境变量,按照https://phoenixnap.com/kb/install-spark-on-windows-10。我正在通过Anaconda(Python 3)使用Jupyterlab。

  1. !pip install pyspark
  2. !pip install yfinance
  3. import pyspark
  4. from pyspark.sql import SparkSession
  5. import pandas as pd
  6. import yfinance as yf
  7. import findspark
  8. findspark.init()
  9. spark = SparkSession.builder.appName("SparkTrial").config("spark.some.config.option", "some-value").getOrCreate()
  10. data = yf.download("SPY", start="2017-01-01", end="2017-04-30")
  11. data = data.reset_index()
  12. df = spark.createDataFrame(data) #this like successfully works; data type: pyspark.sql.dataframe.DataFrame
  13. # Installing Hadoop did not work so I also tried next three lines. Still doesn't work
  14. import os
  15. os.environ['HADOOP_HOME']=r"C:\Hadoop"
  16. os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk1.8.0_351"
  17. df.write.parquet("/tmp/test2.parquet") # this line causes error

错误输出:

  1. ---------------------------------------------------------------------------
  2. Py4JJavaError Traceback (most recent call last)
  3. Input In [33], in <cell line: 1>()
  4. ----> 1 df.write.parquet("/tmp/test2.parquet")
  5. File ~\anaconda3\lib\site-packages\pyspark\sql\readwriter.py:1140, in DataFrameWriter.parquet(self, path, mode, partitionBy, compression)
  6. 1138 self.partitionBy(partitionBy)
  7. 1139 self._set_opts(compression=compression)
  8. -> 1140 self._jwrite.parquet(path)
  9. File ~\anaconda3\lib\site-packages\py4j\java_gateway.py:1321, in JavaMember.__call__(self, *args)
  10. 1315 command = proto.CALL_COMMAND_NAME +\
  11. 1316 self.command_header +\
  12. 1317 args_command +\
  13. 1318 proto.END_COMMAND_PART
  14. 1320 answer = self.gateway_client.send_command(command)
  15. -> 1321 return_value = get_return_value(
  16. 1322 answer, self.gateway_client, self.target_id, self.name)
  17. 1324 for temp_arg in temp_args:
  18. 1325 temp_arg._detach()
  19. File ~\anaconda3\lib\site-packages\pyspark\sql\utils.py:190, in capture_sql_exception.<locals>.deco(*a,**kw)
  20. 188 def deco(*a: Any,**kw: Any) -> Any:
  21. 189 try:
  22. --> 190 return f(*a,**kw)
  23. 191 except Py4JJavaError as e:
  24. 192 converted = convert_exception(e.java_exception)
  25. File ~\anaconda3\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
  26. 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
  27. 325 if answer[1] == REFERENCE_TYPE:
  28. --> 326 raise Py4JJavaError(
  29. 327 "An error occurred while calling {0}{1}{2}.\n".
  30. 328 format(target_id, ".", name), value)
  31. 329 else:
  32. 330 raise Py4JError(
  33. 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
  34. 332 format(target_id, ".", name, value))
  35. Py4JJavaError: An error occurred while calling o103.parquet.
  36. : java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
  37. at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
  38. at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
  39. at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
  40. at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
  41. at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
  42. at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
  43. at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
  44. at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
  45. at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
  46. at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
  47. at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
  48. at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
  49. at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
  50. at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
  51. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:209)
  52. at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
  53. at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
  54. at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
  55. at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
  56. at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
  57. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
  58. at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
  59. at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
  60. at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  61. at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  62. at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
  63. at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
  64. at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
  65. at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
  66. at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
  67. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  68. at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  69. at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  70. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  71. at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  72. at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
  73. at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
  74. at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
  75. at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
  76. at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
  77. at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  78. at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  79. at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  80. at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
  81. at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
  82. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  83. at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
  84. at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
  85. at java.lang.reflect.Method.invoke(Unknown Source)
  86. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  87. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  88. at py4j.Gateway.invoke(Gateway.java:282)
  89. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  90. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  91. at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
  92. at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
  93. at java.lang.Thread.run(Unknown Source)
  94. Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
  95. at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
  96. at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
  97. at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
  98. at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
  99. at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
  100. at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
  101. at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
  102. at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
  103. at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
  104. at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
  105. at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
  106. at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
  107. at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
  108. at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
  109. at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
  110. at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
  111. at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
  112. at org.apache.spark.util.Utils$.createTempDir(Utils.scala:343)
  113. at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:344)
  114. at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:901)
  115. at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
  116. at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
  117. at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
  118. at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
  119. at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
  120. at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
  121. Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
  122. at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
  123. at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
  124. at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
  125. ... 22 more
wi3ka0sx

wi3ka0sx1#

我可以知道你是如何和在哪里运行这个代码片段吗?如果你是在你的本地机器上运行,那么你需要设置你的HADOOP_HOME。
你可以参考下面的文档,其中将给予有关它的详细信息。https://phoenixnap.com/kb/install-spark-on-windows-10
在窗口上设置HADOOP_HOME:https://brain-mentors.com/hadoopinstallation/
将文件写入parquet示例:

  1. df.write.parquet("/tmp/test.parquet")

相关问题