pyspark使用s3a抛出java.lang.illegalargumentexception

4ngedf3f  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(697)

我有一堆代码可以很好地与s3n配合使用,但是当我尝试切换到s3a时,我只得到了一些java.lang.illegalargumentexception,而没有一个真正的指针或提示来说明到底是哪里出了问题。。希望您能给我们一些调试建议!我使用的是hadoop-aws-2.7.3和aws-java-sdk-1.7.4,所以我认为应该可以
错误:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-1aafd157ea37> in <module>
----> 1 schema_df = spark.read.json('s3a://udemy-stream-logs/cdn-access-raw/verizon/mp4-a.udemycdn.com/wpc_C9216_306_20200701_0C390000BFD7B55E_100.json_lines.gz')
      2 schema = schema_df.schema

/usr/local/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup)
    298             path = [path]
    299         if type(path) == list:
--> 300             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    301         elif isinstance(path, RDD):
    302             def func(iterator):

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a,**kw)
    126     def deco(*a,**kw):
    127         try:
--> 128             return f(*a,**kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

/usr/local/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o31.json.
: java.lang.IllegalArgumentException
    at java.base/java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1293)
    at java.base/java.util.concurrent.ThreadPoolExecutor.<init>(ThreadPoolExecutor.java:1215)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:280)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3303)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
    at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:46)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:477)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

我的代码:

conf = (SparkConf()
    .set('spark.executor.extraJavaOptions', '-Dcom.amazonaws.services.s3.enableV4=true')
    .set('spark.driver.extraJavaOptions', '-Dcom.amazonaws.services.s3.enableV4=true')
    .set('spark.master', 'local[*]')
    .set('spark.driver.memory', '4g'))

scT = SparkContext(conf=conf)
scT.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
scT.setLogLevel("INFO")

hadoopConf = scT._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3.buffer.dir', '/tmp/pyspark')

hadoopConf.set('fs.s3a.awsAccessKeyId', 'key')
hadoopConf.set('fs.s3a.awsSecretAccessKey', 'secret')
hadoopConf.set('fs.s3a.endpoint', 's3-us-east-1.amazonaws.com')
hadoopConf.set('fs.s3a.multipart.size', '104857600')
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoopConf.set('fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider')

spark = SparkSession(scT)

df = spark.read.json('s3a://mybucket/something_something.json_lines.gz')
kb5ga3dv

kb5ga3dv1#

忽略在错误属性中为s3a连接器设置用户名和密码的小细节,堆栈跟踪意味着它的from线程池构造。可能是传入的参数之一(线程池大小、保持活动时间)。是无效的。但是,对于jvm提供的具体选项没有明显的提示。
我的建议是停止复制和粘贴其他堆栈溢出示例,并查看s3a文档。查看身份验证的选项,然后查看有界和无界线程池的选项,并确保已设置这些选项

相关问题