关于dynamodb表的粘合转换逻辑

xxls0lw8  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(334)

我使用aws胶水从dynamodb表转换数据。我能够获取数据并使用pyspark逻辑进行转换。但是当我尝试将文件写入json格式时,得到的是无效的json格式。当前格式。需要帮助转换这个json文件。
代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import SQLContext
from datetime import date

spark = SparkSession.builder.getOrCreate()
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)

# glue context to start the glue and spark session

# glueContext = GlueContext(sc)

glueContext = GlueContext(SparkContext.getOrCreate())

# Acquire data from glue tables using dynamic frame (stage table)

datasource_1 = glueContext.create_dynamic_frame.from_catalog(
             database="test",
             table_name="dynamo_edc_test_stage_table")

# Checking current schema

datasource_1.printSchema()

# Acquire data from glue tables using dynamic frame (main table)

datasource_2 = glueContext.create_dynamic_frame.from_catalog(
             database="test",
             table_name="dynamo_edc_test_main_table")

datasource_2.printSchema()

# Convert glue dynamic frames into spark dataframes using toDF() method, since table has to be registered and sql queries can be run

# Register tables for 2 dataframes so that spark.sql can run sql queries

datasource_1.toDF().registerTempTable('edc_test_stage_table')
datasource_2.toDF().registerTempTable('edc_test_main_table')

# 1) Inserts only if new record arrives

# repartition is done to store the record in a single part file and write mode is used to write the data into s3 bucket with json format

final_output_df_1 = spark.sql('select distinct d.resource_id,1 as version_id,d.file_name,d.last_modified_date,d.folder_name,d.Basic,d.Attributes,"Y" as active_ind from edc_test_stage_table d,edc_test_main_table e where d.folder_name!=e.folder_name')
final_output_target_1="s3://testversionsbeacon/test"
final_output_df_1.repartition(1).write.mode("append").json(final_output_target_1)

# 2) Increments/inserts existing record with new modified date

final_output_df_2 = spark.sql('select e.resource_id,e.version_id+1 as version_id,d.file_name,d.last_modified_date,d.folder_name,d.Basic,d.Attributes,"Y" as active_ind from edc_test_stage_table d,edc_test_main_table e where e.active_ind="Y" and d.folder_name=e.folder_name and d.file_name=e.file_name and d.last_modified_date!=e.last_modified_date')
final_output_target_2="s3://testversionsbeacon/test"
final_output_df_2.repartition(1).write.mode("append").json(final_output_target_2)

# 3) Updates old indicator record into 'N'

final_output_df_3 = spark.sql('select e.resource_id,e.version_id,e.file_name,e.last_modified_date,e.folder_name,e.Basic,e.Attributes,"N" as active_ind from edc_test_stage_table d,edc_test_main_table e where e.active_ind="Y" and d.folder_name=e.folder_name and d.file_name=e.file_name and d.last_modified_date!=e.last_modified_date')
final_output_target_3="s3://testversionsbeacon/test"
final_output_df_3.repartition(1).write.mode("append").json(final_output_target_3)
sqserrrh

sqserrrh1#

spark json将输出文件转换为json行,因此您不会得到字典列表,而是会得到\n分隔的字典,其中每行表示一条记录。
您可以直接在dynamodb中加载它。如果你真的想得到字典列表,你必须手动去做。

相关问题