aws-glue-with-pyspark-dynamicframe导出到s3部分失败,出现不支持的操作异常

wooyq4lh  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(598)

我应该先说,我一直在使用aws glue studio来学习如何使用pyspark的胶水,到目前为止进展非常顺利。直到我遇到一个我无法理解(更不用说解决)的错误。下面是数据示例。

上下文

我所做的只是一个简单的数据转换。 Input S3 Bucket --> CustomTransform --> Output S3 . 但程序在导出部分数据后不断崩溃。稍后我也提到了,但我甚至尝试删除customtransformation,但是s3数据导出仍然失败,即使只是从一个bucket导出到另一个bucket。

错误

下面是我得到的错误的python部分(复制自cloudwatch):

2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/GlueTest.py", line 69, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
    "path": "s3://example-bucket-here/data/",
    "compression": "snappy",
    "partitionKeys": []
}, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a,**kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

真正的难题

最让我困惑的是,这个崩溃发生在它已经将大部分数据导出到s3之后。这会立即表明数据有问题,因为它会导致一些损坏(或格式不好)的数据,然后崩溃。
因此,我查看了成功导出的数据和输入数据之间的差异,找到了所有未导出的行。对我来说,没有什么是奇怪的,也没有什么是出口失败的原因。
当我选择s3 bucket作为输入源时,知道模式是由aws glue推断出来的可能会有所帮助。

我试过的

所以我试着用glue支持的所有不同格式导出数据,但都不起作用。我还尝试跳过所有的数据转换,只获取输入s3 bucket并直接导出到输出s3 bucket,但它仍然崩溃并出现相同的错误(实际上这就是我上面包含的错误消息!)。
同样,这一切都表明数据有问题,但我查看了所有没有通过该过程的数据(只有大约180条记录),所有数据看起来都和通过该过程的数据一样。
为了进行健全性检查,我在一些其他(非常类似的)数据上使用了input s3-->output s3方法,它工作得很好,基本上起到了复制粘贴的作用。
我也看到了这篇文章。但这并没有真正起到帮助,当我试图更改输出格式以获取更多信息时,我遇到了相同的错误-没有额外的信息。
有没有人能帮忙找出这个问题?没有任何迹象表明这会崩溃。我很乐意提供java错误的其余部分,如果这对人们有帮助的话。

数据示例

以下是我的数据:

Date        ticker_name  currency exchange_name instrument_type first_trade_date Amount
1612229400  0382.HK      HKD      HKG           EQUITY          1563240600       0.049
1613140200  SO           USD      NYQ           EQUITY          378657000        0.64
1613053800  SIGI         USD      NMS           EQUITY          322151400        0.25
1614240000  SIGT.L       GBp      LSE           EQUITY          828601200        1.68
1612249200  SIH.BE       EUR      BER           EQUITY          1252044000       0.038

除日期(long)、首次交易日期(long)和金额(double)外,所有字段都是字符串。
当我打电话的时候 .printSchema() 我得到以下信息:

root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double
uttx8gqw

uttx8gqw1#

解决方案

因此,如果有人有这个问题,这可能是令人沮丧的,因为这个错误似乎没有提供任何信息,说明什么是真正出了问题。我仅有的线索之一就是这篇文章。这说明我的模式有问题。
我不得不非常仔细地查看我的数据,最终注意到只有当我将某些文件与其他文件一起运行时才会出现这个错误。
原来我的一些Parquet文件上有日期 int 在其他时候它是一个 float . 此数据是使用从Dataframe创建的 .to_parquet() 在另一个函数中,所以我不确定为什么数据类型不一致。
最让我困惑的是,为什么当我尝试将日期类型转换为all时 int (如图所示)我仍然得到了错误。
无论如何,我的解决方案是修复pandas输出数据的方式,并确保在glue处理数据之前,它总是将日期作为整数输出。

相关问题