使用s3a协议从本地pyspark上的s3读取Parquet文件时发生异常

mw3dktmi  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(342)

下面有一个简单的代码,可以使用s3a从s3读取Parquet文件

  1. import os
  2. from pyspark.sql import SparkSession
  3. import configparser
  4. # os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"
  5. # Get the AWS credentials for the mlandexps profile using configparser library
  6. config = configparser.ConfigParser()
  7. config.read(os.path.expanduser("~/.aws/credentials"))
  8. access_key = config.get("mlandexps", "aws_access_key_id")
  9. secret_key = config.get("mlandexps", "aws_secret_access_key")
  10. print("Access Key : %s" % access_key)
  11. print("Secret Key : %s" % secret_key)
  12. # Create a Spark session
  13. spark = SparkSession \
  14. .builder \
  15. .master("local") \
  16. .appName("ReadFilesFromS3TestUsingS3a") \
  17. .getOrCreate()
  18. sc = spark.sparkContext
  19. hadoop_conf=sc._jsc.hadoopConfiguration()
  20. hadoop_conf.set("fs.s3a.endpoint", "s3.amazonaws.com")
  21. hadoop_conf.set("fs.s3a.access.key", access_key)
  22. hadoop_conf.set("fs.s3a.secret.key", secret_key)
  23. hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
  24. df = spark.read.parquet("s3a://rppatwa/BlackWidowData/part-00000-5b22d5ed-2be1-4343-9aa2-53e2133916ae-c000.snappy.parquet")
  25. df.printSchema()
  26. df.show(3)

当我在本地运行它时,会出现以下异常

  1. Py4JJavaError Traceback (most recent call last)
  2. <ipython-input-17-ffd16a1a558f> in <module>
  3. 37 print("Spark Context : {}".format(sc))
  4. 38
  5. ---> 39 df = spark.read.parquet("s3a://rppatwa/BlackWidowData/part-00000-5b22d5ed-2be1-4343-9aa2-53e2133916ae-c000.snappy.parquet")
  6. 40 df.printSchema()
  7. 41 df.show(3)
  8. ~/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/readwriter.py in parquet(self, *paths)
  9. 314 [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
  10. 315 """
  11. --> 316 return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))
  12. 317
  13. 318 @ignore_unicode_prefix
  14. ~/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
  15. 1255 answer = self.gateway_client.send_command(command)
  16. 1256 return_value = get_return_value(
  17. -> 1257 answer, self.gateway_client, self.target_id, self.name)
  18. 1258
  19. 1259 for temp_arg in temp_args:
  20. ~/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
  21. 61 def deco(*a,**kw):
  22. 62 try:
  23. ---> 63 return f(*a,**kw)
  24. 64 except py4j.protocol.Py4JJavaError as e:
  25. 65 s = e.java_exception.toString()
  26. ~/Downloads/Spark/spark-2.4.5-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
  27. 326 raise Py4JJavaError(
  28. 327 "An error occurred while calling {0}{1}{2}.\n".
  29. --> 328 format(target_id, ".", name), value)
  30. 329 else:
  31. 330 raise Py4JError(
  32. Py4JJavaError: An error occurred while calling o514.parquet.
  33. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 7, localhost, executor driver): java.io.IOException: Failed to connect to /192.168.19.237:62286
  34. at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
  35. at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
  36. at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$downloadClient(NettyRpcEnv.scala:368)
  37. at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply$mcV$sp(NettyRpcEnv.scala:336)
  38. at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
  39. at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
  40. at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
  41. at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:339)
  42. at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:693)
  43. at org.apache.spark.util.Utils$.fetchFile(Utils.scala:509)
  44. at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:816)
  45. at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:808)
  46. at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
  47. at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
  48. at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
  49. at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
  50. at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
  51. at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
  52. at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
  53. at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:808)
  54. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
  55. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  56. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  57. at java.lang.Thread.run(Thread.java:748)
  58. Suppressed: java.lang.NullPointerException
  59. at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1402)
  60. ... 17 more
  61. Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: /192.168.19.237:62286
  62. Caused by: java.net.ConnectException: Operation timed out
  63. at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  64. at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  65. at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
  66. at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
  67. at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:688)
  68. at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
  69. at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
  70. at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
  71. at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
  72. at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  73. at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  74. at java.lang.Thread.run(Thread.java:748)
  75. Driver stacktrace:
  76. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
  77. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
  78. at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
  79. at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  80. at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  81. at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
  82. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
  83. at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
  84. at scala.Option.foreach(Option.scala:257)
  85. at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
  86. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
  87. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
  88. at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
  89. at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  90. at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
  91. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  92. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
  93. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
  94. at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
  95. at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
  96. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  97. at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  98. at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
  99. at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
  100. at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.mergeSchemasInParallel(ParquetFileFormat.scala:633)
  101. at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.inferSchema(ParquetFileFormat.scala:241)
  102. at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
  103. at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180)
  104. at scala.Option.orElse(Option.scala:289)
  105. at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:179)
  106. at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
  107. at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  108. at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  109. at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:645)
  110. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  111. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  112. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  113. at java.lang.reflect.Method.invoke(Method.java:498)
  114. at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  115. at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  116. at py4j.Gateway.invoke(Gateway.java:282)
  117. at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  118. at py4j.commands.CallCommand.execute(CallCommand.java:79)
  119. at py4j.GatewayConnection.run(GatewayConnection.java:238)
  120. at java.lang.Thread.run(Thread.java:748)
  121. Caused by: java.io.IOException: Failed to connect to /192.168.19.237:62286
  122. at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
  123. at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
  124. at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$downloadClient(NettyRpcEnv.scala:368)
  125. at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply$mcV$sp(NettyRpcEnv.scala:336)
  126. at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
  127. at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$openChannel$1.apply(NettyRpcEnv.scala:335)
  128. at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
  129. at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:339)
  130. at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:693)
  131. at org.apache.spark.util.Utils$.fetchFile(Utils.scala:509)
  132. at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:816)
  133. at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:808)
  134. at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
  135. at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
  136. at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
  137. at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
  138. at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
  139. at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
  140. at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
  141. at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:808)
  142. at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:375)
  143. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  144. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  145. ... 1 more
  146. Suppressed: java.lang.NullPointerException
  147. at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1402)
  148. ... 17 more
  149. Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Operation timed out: /192.168.19.237:62286
  150. Caused by: java.net.ConnectException: Operation timed out
  151. at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
  152. at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
  153. at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
  154. at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
  155. at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:688)
  156. at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
  157. at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
  158. at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
  159. at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
  160. at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
  161. at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
  162. at java.lang.Thread.run(Thread.java:748)

我猜这个异常的根本原因和这行有关
原因:java.io.ioexception:无法连接到/192.168.19.237:62286
我环顾了一下网络,没有找到一个非常有用的答案。感谢你的观点!
谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题