我有一个胶水作业(在spark上运行),只需将csv文件转换为parquet。我无法控制csv数据,因此我希望在转换到parquet期间捕获数据和表模式之间的任何不一致。例如,如果一列被定义为整数,我希望作业在该列中有任何字符串值时给我一个错误!目前,dynamicframe通过在结果parquet文件中提供选项(字符串和整数)来解决这个问题!这对于某些用例是有帮助的,但是我想知道是否有任何方法可以强制执行模式,并且在有任何不一致的情况下让粘合作业抛出错误。这是我的密码:
datasource0 = glueContext.create_dynamic_frame.from_catalog(databasem=mdbName, table_namem=mtable, transformation_ctx="datasource0")
df = datasource0.toDF()
df = df.coalesce(parquetFileCount)
df = convertColDataType(df, "timestamp", "timestamp", dbName, table)
applymapping1 = DynamicFrame.fromDF(df,glueContext,"finalDF")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
2条答案
按热度按时间zpf6vheq1#
您可以通过使用spark native lib而不是使用glue lib来解决这个问题
使用自定义模式和failfast模式从相应的s3路径读取,而不是从catalog读取
ryhaxcpt2#
我有一个类似的数据类型问题,我可以通过导入另一个我知道格式正确的df来解决。然后我循环了两个df的列并比较了它们的数据类型。在本例中,我在必要时重新格式化了数据类型: