如果数据不符合dataframe模式,如何强制glue dynamicframe失败?

flvlnr44  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(555)

我有一个胶水作业(在spark上运行),只需将csv文件转换为parquet。我无法控制csv数据,因此我希望在转换到parquet期间捕获数据和表模式之间的任何不一致。例如,如果一列被定义为整数,我希望作业在该列中有任何字符串值时给我一个错误!目前,dynamicframe通过在结果parquet文件中提供选项(字符串和整数)来解决这个问题!这对于某些用例是有帮助的,但是我想知道是否有任何方法可以强制执行模式,并且在有任何不一致的情况下让粘合作业抛出错误。这是我的密码:

datasource0 = glueContext.create_dynamic_frame.from_catalog(databasem=mdbName, table_namem=mtable, transformation_ctx="datasource0")
df = datasource0.toDF()
df = df.coalesce(parquetFileCount)
df = convertColDataType(df, "timestamp", "timestamp", dbName, table)
applymapping1 = DynamicFrame.fromDF(df,glueContext,"finalDF")
datasink4 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": path}, format = "parquet", transformation_ctx = "datasink4")
job.commit()
zpf6vheq

zpf6vheq1#

您可以通过使用spark native lib而不是使用glue lib来解决这个问题
使用自定义模式和failfast模式从相应的s3路径读取,而不是从catalog读取

schema = StructType ([StructField ('id', IntegerType(), True),
StructField ('name', StringType(), True)]
df = spark.read.option('mode', 'FAILFAST').csv(s3Path, schema=schema)
ryhaxcpt

ryhaxcpt2#

我有一个类似的数据类型问题,我可以通过导入另一个我知道格式正确的df来解决。然后我循环了两个df的列并比较了它们的数据类型。在本例中,我在必要时重新格式化了数据类型:

df1 = inputfile
    df2 = target
    if df1.schema != df2.schema:
        colnames = df2.schema.names
        for colname in colnames:
            df1DataType = get_dtype(df1, colname)
            df2DataType = get_dtype(df2, colname)
            if df1DataType != df2DataType:
                if df1DataType == 'timestamp':
                    not_string = ''
                    df2 = df2.withColumn(colname, df2[colname].cast(TimestampType()))
                elif df1DataType == 'double':
                    not_string = ''
                    df2 = df2.withColumn(colname, df2[colname].cast(DoubleType()))
                elif df1DataType == 'int':
                    not_string = ''
                    df2 = df2.withColumn(colname, df2[colname].cast(IntegerType()))
                else:
                    not_string = 'not '

                print(not_string + 'updating: ' + colname + ' - from ' + df2DataType + ' to ' + df1DataType)

     target = df2

相关问题