AmazonWeb服务—在aws中转换其他列的数据类型时,某些列将变为空

ibps3vxo  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(470)

我正在尝试使用aws胶水将csv数据从aws s3移动到aws redshift。我正在移动的数据使用非标准格式记录每个条目的时间戳(例如01-jan-2020 01.02.03),因此我的胶水爬虫会将此列作为字符串。
在我的作业脚本中,我使用pyspark中的“to\u timestamp”函数将此列转换为时间戳,该函数似乎工作正常。但是,这样做的结果是,数据类型为“long”的列不会转移到redshift,并且这些列的值都为null。
当我运行脚本而不转换timestamp列(即只转换生成的脚本)时,数据类型为“long”的列不会出现这个问题,它们会正确地显示为红移。
这是我的密码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_timestamp, col

## @params: [TempDir, JOB_NAME]

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource

## @args: [database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0"]

## @return: datasource0

## @inputs: []

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0")

# Convert to data frame and perform ETL

dataFrame = datasource0.toDF().withColumn("rec_open_ts", to_timestamp(col("rec_open_ts"),"dd-MMM-yyyy HH.mm.ss"))

# Convert back to a dynamic frame

editedData = DynamicFrame.fromDF(dataFrame, glueContext, "editedData")

## @type: ApplyMapping

## @args: [mapping = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1"]

## @return: applymapping1

## @inputs: [frame = datasource0]

applymapping1 = ApplyMapping.apply(frame = editedData, mappings = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1")

## @type: ResolveChoice

## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]

## @return: resolvechoice2

## @inputs: [frame = applymapping1]

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")

## @type: DropNullFields

## @args: [transformation_ctx = "dropnullfields3"]

## @return: dropnullfields3

## @inputs: [frame = resolvechoice2]

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @type: DataSink

## @args: [catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]

## @return: datasink4

## @inputs: [frame = dropnullfields3]

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

这里有什么明显的我遗漏的吗?非常感谢!
编辑:
运行EditeData.printschema()后,显示的架构为:

|-- rec_open_ts: timestamp |-- chg_id: struct | |-- long: long | |-- string: string |-- rec_seq_num: struct | |-- long: long | |-- string: string |-- imsi: struct | |-- long: long | |-- string: string |-- msisdn: struct | |-- long: long | |-- string: string |-- terminal_ip_address: string |-- pdp_type: struct | |-- long: long | |-- string: string |-- ggsn_ip_address: string |-- sgsn_ip_address: string |-- country: string |-- operator: string |-- apn: string |-- duration: struct | |-- long: long | |-- string: string |-- record_close_cause_code: struct | |-- long: long | |-- string: string |-- uploaded_data(b): struct | |-- long: long | |-- string: string |-- downloaded_data(b): struct | |-- long: long | |-- string: string

(long是structs的一部分?)
运行editeddata.show(10)后,将显示红移中应该存在的数据。其中一个长列的示例:

"chg_id": {"long": 123456789, "string": null}

编辑2:
在不使用etl运行datasource0.printschema()后(时间戳保留为字符串),架构为:

|-- rec_open_ts: string |-- chg_id: choice | |-- long | |-- string |-- rec_seq_num: choice | |-- long | |-- string |-- imsi: choice | |-- long | |-- string |-- msisdn: choice | |-- long | |-- string |-- terminal_ip_address: string |-- pdp_type: choice | |-- long | |-- string |-- ggsn_ip_address: string |-- sgsn_ip_address: string |-- country: string |-- operator: string |-- apn: string |-- duration: choice | |-- long | |-- string |-- record_close_cause_code: choice | |-- long | |-- string |-- uploaded_data(b): choice | |-- long | |-- string |-- downloaded_data(b): choice | |-- long | |-- string

似乎当我转换timestamp列时,长列变成了struct。为什么会这样?

ncgqoxb0

ncgqoxb01#

对于其他遇到这个问题的人,我找到了解决办法:
当类型不明确时(即,在本例中,爬虫程序推断出一个long,但该列中有一个值不是long),该类型将作为推断类型和字符串之间的选择。如果未解决歧义,则在从动态帧转换为Dataframe时,此选项将变为结构,并且不会正确地以红移显示。
因此,在执行任何etl之前,我使用'resolvechoice'方法解析了这些选择。这是我的最新代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import to_timestamp, col

## @params: [TempDir, JOB_NAME]

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource

## @args: [database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0"]

## @return: datasource0

## @inputs: []

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "telenors3csvdata", table_name = "gprs_reports", transformation_ctx = "datasource0")

# Resolve type choices

resolvedData = datasource0.resolveChoice(specs = [('chg_id','cast:long')]).resolveChoice(specs = [('rec_seq_num','cast:long')]).resolveChoice(specs = [('imsi','cast:long')]).resolveChoice(specs = [('msisdn','cast:long')]).resolveChoice(specs = [('pdp_type','cast:long')]).resolveChoice(specs = [('duration','cast:long')]).resolveChoice(specs = [('record_close_cause_code','cast:long')]).resolveChoice(specs = [('uploaded_data(b)','cast:long')]).resolveChoice(specs = [('downloaded_data(b)','cast:long')])

# Convert to data frame and perform ETL

dataFrame = resolvedData.toDF().withColumn("rec_open_ts", to_timestamp(col("rec_open_ts"),"dd-MMM-yyyy HH.mm.ss"))

# Convert back to a dynamic frame

editedData = DynamicFrame.fromDF(dataFrame, glueContext, "editedData")

print("Printed Schema")
editedData.printSchema()

## @type: ApplyMapping

## @args: [mapping = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1"]

## @return: applymapping1

## @inputs: [frame = datasource0]

applymapping1 = ApplyMapping.apply(frame = editedData, mappings = [("rec_open_ts", "timestamp", "rec_open_ts", "timestamp"), ("chg_id", "long", "chg_id", "long"), ("rec_seq_num", "long", "rec_seq_num", "long"), ("imsi", "long", "imsi", "long"), ("msisdn", "long", "msisdn", "long"), ("terminal_ip_address", "string", "terminal_ip_address", "string"), ("pdp_type", "long", "pdp_type", "long"), ("ggsn_ip_address", "string", "ggsn_ip_address", "string"), ("sgsn_ip_address", "string", "sgsn_ip_address", "string"), ("country", "string", "country", "string"), ("operator", "string", "operator", "string"), ("apn", "string", "apn", "string"), ("duration", "long", "duration", "long"), ("record_close_cause_code", "long", "record_close_cause_code", "long"), ("uploaded_data(b)", "long", "uploaded_data(b)", "long"), ("downloaded_data(b)", "long", "downloaded_data(b)", "long")], transformation_ctx = "applymapping1")

## @type: ResolveChoice

## @args: [choice = "make_cols", transformation_ctx = "resolvechoice2"]

## @return: resolvechoice2

## @inputs: [frame = applymapping1]

resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")

## @type: DropNullFields

## @args: [transformation_ctx = "dropnullfields3"]

## @return: dropnullfields3

## @inputs: [frame = resolvechoice2]

dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

## @type: DataSink

## @args: [catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]

## @return: datasink4

## @inputs: [frame = dropnullfields3]

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "RedshiftCluster", connection_options = {"dbtable": "gprs_reports", "database": "telenordatasync"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
job.commit()

相关问题